123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- """
- 寓信ETL
- 财务权责明细表
- 按月、权责、费用科目、应收、应付、实收、实付、待收、待付划分账单明细
- """
- import pandas as pd
- from sqlalchemy import create_engine, text
- from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
- from datetime import datetime, timedelta
- from tqdm import tqdm
- import json
- from dateutil.relativedelta import relativedelta
- import calendar
- from urllib.parse import quote_plus
- from utils import load_config, truncate_target_db, update_column_comment
- import json
- debug = False
- if debug:
- debug_condition = ''
- pass
- else:
- debug_condition = ''
- pass
- cols = {
- 'contract_id': [NVARCHAR(32), '租赁合同ID'],
- 'fee_subject_id': [NVARCHAR(32), '费用类型ID'],
- 'fee_direction': [INTEGER, '费用方向'],
- 'themonth': [NVARCHAR(32), '账单明细ID'],
- 'kind': [NVARCHAR(16), '类型'],
- 'money': [DECIMAL(14, 2), '金额'],
- }
- def query_total(conn) -> int:
- query = """
- select
- count(1)
- from yuxin_finance.fin_finance_bill_detail
- where is_delete=0 and is_valid=1 and biz_type=2 {debug_condition}
- """.format(debug_condition=debug_condition)
- return conn.execute(text(query)).scalar()
- def extract(conn, batch_size, i) -> pd.DataFrame:
- """ This API extracts data from
- """
- query = """
- select biz_id, fee_subject_id, fee_direction, begin_time, end_time, original_money, occurred_money
- from yuxin_finance.fin_finance_bill_detail
- where is_delete=0 and is_valid=1 and biz_type=2 {debug_condition}
- limit {batch_size} offset {offset}
- """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
- source_data = pd.read_sql(query, con=conn)
- return source_data
- def transform(conn, data) -> pd.DataFrame:
- """ Transforms the dataset into desired structure and filters
- """
- df_data = []
- for index, row in data.iterrows():
- begin_date = row['begin_time']
- end_date = row['end_time']
- # Calculate the number of days between the two dates
- num_days = (end_date - begin_date).days + 1
- num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1
- # Calculate the amount per day
- if num_days == 0:
- num_days = 1
- elif num_days < 0:
- num_days = abs(num_days)
- original_money_per_day = row['original_money'] / num_days
- occurred_money_per_day = row['occurred_money'] / num_days
- original_money_remainder = row['original_money']
- occurred_money_remainder = row['occurred_money']
- current_month = begin_date
- while current_month <= end_date:
- first_day = 1
- last_day = calendar.monthrange(current_month.year, current_month.month)[1]
- if current_month.month == begin_date.month and current_month.year == begin_date.year:
- first_day = begin_date.day
- if current_month.month == end_date.month and current_month.year == end_date.year:
- last_day = end_date.day
- num_days_month = last_day - first_day + 1
- if current_month.month == end_date.month and current_month.year == end_date.year:
- # keep remainder in the last day
- original_money = original_money_remainder
- occurred_money = occurred_money_remainder
- else:
- original_money = original_money_per_day * num_days_month
- original_money_remainder -= original_money
- occurred_money = occurred_money_per_day * num_days_month
- occurred_money_remainder -= occurred_money
- original_money = round(original_money, 2)
- occurred_money = round(occurred_money, 2)
- def append_row(_kind, _money):
- new_row = {
- 'contract_id': row['biz_id'],
- 'fee_subject_id': row['fee_subject_id'],
- 'fee_direction': row['fee_direction'],
- 'themonth': current_month.date(),
- 'kind': _kind,
- 'money': _money,
- }
- df_data.append(new_row)
- if row['fee_direction'] == 1:
- append_row('应收', original_money)
- append_row('实收', occurred_money)
- append_row('待收', original_money-occurred_money)
- else:
- append_row('应付', original_money)
- append_row('实付', occurred_money)
- append_row('待付', original_money-occurred_money)
- # Increment the current date by one month
- current_month += relativedelta(months=1)
- pass
- columns = [k for k, v in cols.items()]
- df = pd.DataFrame(df_data, columns=columns)
- return df
- def load(conn, df: pd.DataFrame, target_db) -> None:
- """ Loads data into a MySQL database
- """
- # Define the column types for the table
- dtypes = {key: value[0] for key, value in cols.items()}
- # create target table with df.dtypes
- df.to_sql(target_db, con=conn, if_exists='append',
- index=False, dtype=dtypes)
- def etl():
- config = load_config()
- target_db = 'bi_bill_detail_month_'
- connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
- .format(username=config['mysql']['username'],
- pwd=quote_plus(config['mysql']['password']),
- host=config['mysql']['host'],
- port=config['mysql']['port'],
- db=config['mysql']['db'])
- engine = create_engine(connection_string)
- with engine.begin() as conn:
- # Get the total number of rows
- total = query_total(conn)
- # Define the batch size
- batch_size = 100
- truncate_target_db(conn, target_db)
- if debug:
- total = 200
- batch_size = 100
- print('total', total)
- # Write the data to the table in batches
- for i in tqdm(range(0, total, batch_size)):
- data = extract(conn, batch_size, i)
- data = transform(conn, data)
- if debug:
- print(data.head())
- load(conn, data, target_db)
- update_column_comment(conn, target_db, cols)
- etl()
|