""" 寓信ETL 合同业务明细表 - bill_flow和bill_detail关联表 """ import pandas as pd from sqlalchemy import create_engine, text from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER from datetime import datetime, timedelta from tqdm import tqdm import json from dateutil.relativedelta import relativedelta import calendar from urllib.parse import quote_plus from utils import load_config, truncate_target_db, update_column_comment import json debug = False if debug: debug_condition = '' pass else: debug_condition = '' pass cols = { 'flow_id': [NVARCHAR(32), '流水ID'], 'money': [DECIMAL(14, 2), '金额'], 'is_exempt': [INTEGER, '是否免除'], 'bill_detail_id': [NVARCHAR(32), '账单明细ID'], } def query_total(conn) -> int: query = """ select count(1) from yuxin_finance.fin_finance_bill_flow where is_delete=0 and is_valid=1 {debug_condition} """.format(debug_condition=debug_condition) return conn.execute(text(query)).scalar() def extract(conn, batch_size, i) -> pd.DataFrame: """ This API extracts data from """ query = """ select id 'flow_id', bill_detail_info from yuxin_finance.fin_finance_bill_flow where is_delete=0 and is_valid=1 {debug_condition} limit {batch_size} offset {offset} """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition) source_data = pd.read_sql(query, con=conn) return source_data def transform(conn, data) -> pd.DataFrame: """ Transforms the dataset into desired structure and filters """ df_data = [] for index, row in data.iterrows(): # print(row['flow_id'], row['bill_detail_info']) details = json.loads(row['bill_detail_info']) for detail in details: df_data.append({ 'flow_id': row['flow_id'], 'money': detail['money'], 'is_exempt': detail['isExempt'], 'bill_detail_id': detail['billDetailId'], }) pass columns = [k for k, v in cols.items()] df = pd.DataFrame(df_data, columns=columns) return df def load(conn, df: pd.DataFrame, target_db) -> None: """ Loads data into a MySQL database """ # Define the column types for the table dtypes = {key: value[0] for key, value in cols.items()} # create target table with df.dtypes df.to_sql(target_db, con=conn, if_exists='append', index=False, dtype=dtypes) def etl(): config = load_config('production') target_db = 'bi_bill_flow_detail_' connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\ .format(username=config['mysql']['username'], pwd=quote_plus(config['mysql']['password']), host=config['mysql']['host'], port=config['mysql']['port'], db=config['mysql']['db']) engine = create_engine(connection_string) with engine.begin() as conn: # Get the total number of rows total = query_total(conn) # Define the batch size batch_size = 100 truncate_target_db(conn, target_db) if debug: total = 200 batch_size = 100 print('total', total) # Write the data to the table in batches for i in tqdm(range(0, total, batch_size)): data = extract(conn, batch_size, i) data = transform(conn, data) if debug: print(data.head()) load(conn, data, target_db) update_column_comment(conn, target_db, cols) etl()