123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- """
- 寓信ETL
- 合同业务明细表 - bill_flow和bill_detail关联表
- """
- import pandas as pd
- from sqlalchemy import create_engine, text
- from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
- from datetime import datetime, timedelta
- from tqdm import tqdm
- import json
- from dateutil.relativedelta import relativedelta
- import calendar
- from urllib.parse import quote_plus
- from utils import load_config, truncate_target_db, update_column_comment
- import json
- debug = False
- if debug:
- debug_condition = ''
- pass
- else:
- debug_condition = ''
- pass
- cols = {
- 'flow_id': [NVARCHAR(32), '流水ID'],
- 'money': [DECIMAL(14, 2), '金额'],
- 'is_exempt': [INTEGER, '是否免除'],
- 'bill_detail_id': [NVARCHAR(32), '账单明细ID'],
- }
- def query_total(conn) -> int:
- query = """
- select
- count(1)
- from yuxin_finance.fin_finance_bill_flow
- where is_delete=0 and is_valid=1 {debug_condition}
- """.format(debug_condition=debug_condition)
- return conn.execute(text(query)).scalar()
- def extract(conn, batch_size, i) -> pd.DataFrame:
- """ This API extracts data from
- """
- query = """
- select
- id 'flow_id', bill_detail_info
- from yuxin_finance.fin_finance_bill_flow
- where is_delete=0 and is_valid=1 {debug_condition}
- limit {batch_size} offset {offset}
- """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
- source_data = pd.read_sql(query, con=conn)
- return source_data
- def transform(conn, data) -> pd.DataFrame:
- """ Transforms the dataset into desired structure and filters
- """
- df_data = []
- for index, row in data.iterrows():
- # print(row['flow_id'], row['bill_detail_info'])
- details = json.loads(row['bill_detail_info'])
- for detail in details:
- df_data.append({
- 'flow_id': row['flow_id'],
- 'money': detail['money'],
- 'is_exempt': detail['isExempt'],
- 'bill_detail_id': detail['billDetailId'],
- })
- pass
- columns = [k for k, v in cols.items()]
- df = pd.DataFrame(df_data, columns=columns)
- return df
- def load(conn, df: pd.DataFrame, target_db) -> None:
- """ Loads data into a MySQL database
- """
- # Define the column types for the table
- dtypes = {key: value[0] for key, value in cols.items()}
- # create target table with df.dtypes
- df.to_sql(target_db, con=conn, if_exists='append',
- index=False, dtype=dtypes)
- def etl():
- config = load_config('production')
- target_db = 'bi_bill_flow_detail_'
- connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
- .format(username=config['mysql']['username'],
- pwd=quote_plus(config['mysql']['password']),
- host=config['mysql']['host'],
- port=config['mysql']['port'],
- db=config['mysql']['db'])
- engine = create_engine(connection_string)
- with engine.begin() as conn:
- # Get the total number of rows
- total = query_total(conn)
- # Define the batch size
- batch_size = 100
- truncate_target_db(conn, target_db)
- if debug:
- total = 200
- batch_size = 100
- print('total', total)
- # Write the data to the table in batches
- for i in tqdm(range(0, total, batch_size)):
- data = extract(conn, batch_size, i)
- data = transform(conn, data)
- if debug:
- print(data.head())
- load(conn, data, target_db)
- update_column_comment(conn, target_db, cols)
- etl()
|