|
@@ -0,0 +1,184 @@
|
|
|
+"""
|
|
|
+寓信ETL
|
|
|
+财务权责明细表
|
|
|
+按月、权责、费用科目、应收、应付、实收、实付、待收、待付划分账单明细
|
|
|
+"""
|
|
|
+
|
|
|
+import pandas as pd
|
|
|
+from sqlalchemy import create_engine, text
|
|
|
+from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from tqdm import tqdm
|
|
|
+import json
|
|
|
+from dateutil.relativedelta import relativedelta
|
|
|
+import calendar
|
|
|
+from urllib.parse import quote_plus
|
|
|
+from utils import load_config, truncate_target_db, update_column_comment
|
|
|
+import json
|
|
|
+
|
|
|
+
|
|
|
+debug = False
|
|
|
+if debug:
|
|
|
+ debug_condition = ''
|
|
|
+ pass
|
|
|
+else:
|
|
|
+ debug_condition = ''
|
|
|
+ pass
|
|
|
+
|
|
|
+cols = {
|
|
|
+ 'contract_id': [NVARCHAR(32), '租赁合同ID'],
|
|
|
+ 'fee_subject_id': [NVARCHAR(32), '费用类型ID'],
|
|
|
+ 'fee_direction': [INTEGER, '费用方向'],
|
|
|
+ 'themonth': [NVARCHAR(32), '账单明细ID'],
|
|
|
+ 'kind': [NVARCHAR(16), '类型'],
|
|
|
+ 'money': [DECIMAL(14, 2), '金额'],
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+def query_total(conn) -> int:
|
|
|
+ query = """
|
|
|
+ select
|
|
|
+ count(1)
|
|
|
+ from yuxin_finance.fin_finance_bill_detail
|
|
|
+ where is_delete=0 and is_valid=1 and biz_type=2 {debug_condition}
|
|
|
+ """.format(debug_condition=debug_condition)
|
|
|
+ return conn.execute(text(query)).scalar()
|
|
|
+
|
|
|
+
|
|
|
+def extract(conn, batch_size, i) -> pd.DataFrame:
|
|
|
+ """ This API extracts data from
|
|
|
+ """
|
|
|
+ query = """
|
|
|
+ select biz_id, fee_subject_id, fee_direction, begin_time, end_time, original_money, occurred_money
|
|
|
+ from yuxin_finance.fin_finance_bill_detail
|
|
|
+ where is_delete=0 and is_valid=1 and biz_type=2 {debug_condition}
|
|
|
+ limit {batch_size} offset {offset}
|
|
|
+ """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
|
|
|
+ source_data = pd.read_sql(query, con=conn)
|
|
|
+ return source_data
|
|
|
+
|
|
|
+
|
|
|
+def transform(conn, data) -> pd.DataFrame:
|
|
|
+ """ Transforms the dataset into desired structure and filters
|
|
|
+ """
|
|
|
+
|
|
|
+ df_data = []
|
|
|
+ for index, row in data.iterrows():
|
|
|
+ begin_date = row['begin_time']
|
|
|
+ end_date = row['end_time']
|
|
|
+
|
|
|
+ # Calculate the number of days between the two dates
|
|
|
+ num_days = (end_date - begin_date).days + 1
|
|
|
+ num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1
|
|
|
+ # Calculate the amount per day
|
|
|
+ if num_days == 0:
|
|
|
+ num_days = 1
|
|
|
+ elif num_days < 0:
|
|
|
+ num_days = abs(num_days)
|
|
|
+ original_money_per_day = row['original_money'] / num_days
|
|
|
+ occurred_money_per_day = row['occurred_money'] / num_days
|
|
|
+
|
|
|
+ original_money_remainder = row['original_money']
|
|
|
+ occurred_money_remainder = row['occurred_money']
|
|
|
+ current_month = begin_date
|
|
|
+ while current_month <= end_date:
|
|
|
+ first_day = 1
|
|
|
+ last_day = calendar.monthrange(current_month.year, current_month.month)[1]
|
|
|
+ if current_month.month == begin_date.month and current_month.year == begin_date.year:
|
|
|
+ first_day = begin_date.day
|
|
|
+ if current_month.month == end_date.month and current_month.year == end_date.year:
|
|
|
+ last_day = end_date.day
|
|
|
+ num_days_month = last_day - first_day + 1
|
|
|
+
|
|
|
+ if current_month.month == end_date.month and current_month.year == end_date.year:
|
|
|
+ # keep remainder in the last day
|
|
|
+ original_money = original_money_remainder
|
|
|
+ occurred_money = occurred_money_remainder
|
|
|
+ else:
|
|
|
+ original_money = original_money_per_day * num_days_month
|
|
|
+ original_money_remainder -= original_money
|
|
|
+ occurred_money = occurred_money_per_day * num_days_month
|
|
|
+ occurred_money_remainder -= occurred_money
|
|
|
+ original_money = round(original_money, 2)
|
|
|
+ occurred_money = round(occurred_money, 2)
|
|
|
+
|
|
|
+ def append_row(_kind, _money):
|
|
|
+ new_row = {
|
|
|
+ 'contract_id': row['biz_id'],
|
|
|
+ 'fee_subject_id': row['fee_subject_id'],
|
|
|
+ 'fee_direction': row['fee_direction'],
|
|
|
+ 'themonth': current_month.date(),
|
|
|
+ 'kind': _kind,
|
|
|
+ 'money': _money,
|
|
|
+ }
|
|
|
+ df_data.append(new_row)
|
|
|
+
|
|
|
+ if row['fee_direction'] == 1:
|
|
|
+ append_row('应收', original_money)
|
|
|
+ append_row('实收', occurred_money)
|
|
|
+ append_row('待收', original_money-occurred_money)
|
|
|
+ else:
|
|
|
+ append_row('应付', original_money)
|
|
|
+ append_row('实付', occurred_money)
|
|
|
+ append_row('待付', original_money-occurred_money)
|
|
|
+
|
|
|
+ # Increment the current date by one month
|
|
|
+ current_month += relativedelta(months=1)
|
|
|
+
|
|
|
+ pass
|
|
|
+
|
|
|
+ columns = [k for k, v in cols.items()]
|
|
|
+ df = pd.DataFrame(df_data, columns=columns)
|
|
|
+ return df
|
|
|
+
|
|
|
+
|
|
|
+def load(conn, df: pd.DataFrame, target_db) -> None:
|
|
|
+ """ Loads data into a MySQL database
|
|
|
+ """
|
|
|
+ # Define the column types for the table
|
|
|
+ dtypes = {key: value[0] for key, value in cols.items()}
|
|
|
+ # create target table with df.dtypes
|
|
|
+ df.to_sql(target_db, con=conn, if_exists='append',
|
|
|
+ index=False, dtype=dtypes)
|
|
|
+
|
|
|
+
|
|
|
+def etl():
|
|
|
+ config = load_config()
|
|
|
+
|
|
|
+ target_db = 'bi_bill_detail_month_'
|
|
|
+
|
|
|
+ connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
|
|
|
+ .format(username=config['mysql']['username'],
|
|
|
+ pwd=quote_plus(config['mysql']['password']),
|
|
|
+ host=config['mysql']['host'],
|
|
|
+ port=config['mysql']['port'],
|
|
|
+ db=config['mysql']['db'])
|
|
|
+
|
|
|
+ engine = create_engine(connection_string)
|
|
|
+
|
|
|
+ with engine.begin() as conn:
|
|
|
+ # Get the total number of rows
|
|
|
+ total = query_total(conn)
|
|
|
+ # Define the batch size
|
|
|
+ batch_size = 100
|
|
|
+
|
|
|
+ truncate_target_db(conn, target_db)
|
|
|
+
|
|
|
+ if debug:
|
|
|
+ total = 200
|
|
|
+ batch_size = 100
|
|
|
+ print('total', total)
|
|
|
+
|
|
|
+ # Write the data to the table in batches
|
|
|
+ for i in tqdm(range(0, total, batch_size)):
|
|
|
+ data = extract(conn, batch_size, i)
|
|
|
+ data = transform(conn, data)
|
|
|
+
|
|
|
+ if debug:
|
|
|
+ print(data.head())
|
|
|
+ load(conn, data, target_db)
|
|
|
+
|
|
|
+ update_column_comment(conn, target_db, cols)
|
|
|
+
|
|
|
+
|
|
|
+etl()
|