""" 寓信ETL 财务权责明细表 按月、权责、费用科目、应收、应付、实收、实付、待收、待付划分账单明细 """ import pandas as pd from sqlalchemy import create_engine, text from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER from datetime import datetime, timedelta from tqdm import tqdm import json from dateutil.relativedelta import relativedelta import calendar from urllib.parse import quote_plus from utils import load_config, truncate_target_db, update_column_comment import json debug = False if debug: debug_condition = '' pass else: debug_condition = '' pass cols = { 'contract_id': [NVARCHAR(32), '租赁合同ID'], 'fee_subject_id': [NVARCHAR(32), '费用类型ID'], 'fee_direction': [INTEGER, '费用方向'], 'themonth': [NVARCHAR(32), '账单明细ID'], 'kind': [NVARCHAR(16), '类型'], 'money': [DECIMAL(14, 2), '金额'], } def query_total(conn) -> int: query = """ select count(1) from yuxin_finance.fin_finance_bill_detail where is_delete=0 and is_valid=1 and biz_type=2 {debug_condition} """.format(debug_condition=debug_condition) return conn.execute(text(query)).scalar() def extract(conn, batch_size, i) -> pd.DataFrame: """ This API extracts data from """ query = """ select biz_id, fee_subject_id, fee_direction, begin_time, end_time, original_money, occurred_money from yuxin_finance.fin_finance_bill_detail where is_delete=0 and is_valid=1 and biz_type=2 {debug_condition} limit {batch_size} offset {offset} """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition) source_data = pd.read_sql(query, con=conn) return source_data def transform(conn, data) -> pd.DataFrame: """ Transforms the dataset into desired structure and filters """ df_data = [] for index, row in data.iterrows(): begin_date = row['begin_time'] end_date = row['end_time'] # Calculate the number of days between the two dates num_days = (end_date - begin_date).days + 1 num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1 # Calculate the amount per day if num_days == 0: num_days = 1 elif num_days < 0: num_days = abs(num_days) original_money_per_day = row['original_money'] / num_days occurred_money_per_day = row['occurred_money'] / num_days original_money_remainder = row['original_money'] occurred_money_remainder = row['occurred_money'] current_month = begin_date while current_month <= end_date: first_day = 1 last_day = calendar.monthrange(current_month.year, current_month.month)[1] if current_month.month == begin_date.month and current_month.year == begin_date.year: first_day = begin_date.day if current_month.month == end_date.month and current_month.year == end_date.year: last_day = end_date.day num_days_month = last_day - first_day + 1 if current_month.month == end_date.month and current_month.year == end_date.year: # keep remainder in the last day original_money = original_money_remainder occurred_money = occurred_money_remainder else: original_money = original_money_per_day * num_days_month original_money_remainder -= original_money occurred_money = occurred_money_per_day * num_days_month occurred_money_remainder -= occurred_money original_money = round(original_money, 2) occurred_money = round(occurred_money, 2) def append_row(_kind, _money): new_row = { 'contract_id': row['biz_id'], 'fee_subject_id': row['fee_subject_id'], 'fee_direction': row['fee_direction'], 'themonth': current_month.date(), 'kind': _kind, 'money': _money, } df_data.append(new_row) if row['fee_direction'] == 1: append_row('应收', original_money) append_row('实收', occurred_money) append_row('待收', original_money-occurred_money) else: append_row('应付', original_money) append_row('实付', occurred_money) append_row('待付', original_money-occurred_money) # Increment the current date by one month current_month += relativedelta(months=1) pass columns = [k for k, v in cols.items()] df = pd.DataFrame(df_data, columns=columns) return df def load(conn, df: pd.DataFrame, target_db) -> None: """ Loads data into a MySQL database """ # Define the column types for the table dtypes = {key: value[0] for key, value in cols.items()} # create target table with df.dtypes df.to_sql(target_db, con=conn, if_exists='append', index=False, dtype=dtypes) def etl(): config = load_config() target_db = 'bi_bill_detail_month_' connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\ .format(username=config['mysql']['username'], pwd=quote_plus(config['mysql']['password']), host=config['mysql']['host'], port=config['mysql']['port'], db=config['mysql']['db']) engine = create_engine(connection_string) with engine.begin() as conn: # Get the total number of rows total = query_total(conn) # Define the batch size batch_size = 100 truncate_target_db(conn, target_db) if debug: total = 200 batch_size = 100 print('total', total) # Write the data to the table in batches for i in tqdm(range(0, total, batch_size)): data = extract(conn, batch_size, i) data = transform(conn, data) if debug: print(data.head()) load(conn, data, target_db) update_column_comment(conn, target_db, cols) etl()