""" 寓信账单明细ETL 综合财务报表 """ import pandas as pd from sqlalchemy import create_engine, text from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER from datetime import datetime, timedelta from tqdm import tqdm import json from dateutil.relativedelta import relativedelta import calendar from urllib.parse import quote_plus from utils import load_config, truncate_target_db debug = False if debug: # debug_condition = "and bd.biz_id='1688796583287914497'" debug_condition = '' else: debug_condition = '' def query_total(conn) -> int: query = """ select count(1) from yuxin_finance.fin_finance_bill_detail bd where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition} """.format(debug_condition=debug_condition) return conn.execute(text(query)).scalar() def extract(conn, batch_size, i) -> pd.DataFrame: """ This API extracts data from """ query = """ select bd.tenant_id, rc.id 'contract_id', dept.id 'dept_id', dept.name 'dept_name', rc.maintainer_id 'emp_id', emp.name 'emp_name', bd.bill_id, bd.id as 'bill_detail_id', bd.fee_subject_id, sd.label as 'fee_subject_label', sd.name as 'fee_subject_name', bd.fee_direction, bd.original_money, bd.occurred_money, bd.begin_time, bd.end_time, bd.is_occur, rc.cancel_info, bd.predict_time from yuxin_finance.fin_finance_bill_detail bd left join yuxin_setting.setting_dictionary sd on sd.id=bd.fee_subject_id left join yuxin_contract.cont_renter_contract rc on rc.id=bd.biz_id left join yuxin_setting.setting_employee_dept ed on ed.emp_id=rc.maintainer_id and ed.is_delete=0 left join yuxin_setting.setting_employee_info emp on emp.id=rc.maintainer_id and emp.is_delete=0 left join yuxin_setting.setting_department dept on dept.id=ed.dept_id and dept.is_delete=0 where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition} limit {batch_size} offset {offset} """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition) source_data = pd.read_sql(query, con=conn) return source_data def transform(data) -> pd.DataFrame: """ Transforms the dataset into desired structure and filters --- 维度:租户,合同,房源,维护人,所属部门,日,财务科目分类,财务科目,应收,应付,实收,实付 --- 指标:金额(尾差保留在最后一日中) """ # target columns columns = list(data.columns[:11]) columns.extend(['day', 'kind', 'money', 'reject_time', 'reject_name', 'reject_payment_account', 'reject_payment_account_type', 'reject_reason', 'is_apportion', ]) # target data df = pd.DataFrame(columns=columns) # Iterate over each row in the DataFrame for index, row in data.iterrows(): begin_date = row['begin_time'] end_date = row['end_time'] # Calculate the number of days between the two dates num_days = (end_date - begin_date).days + 1 num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1 # Calculate the amount per day if num_days == 0: num_days = 1 elif num_days < 0: num_days = abs(num_days) money_per_day = row['original_money'] / num_days # 退租信息 # {"reason": "", "rejectName": "专区客服1号", "rejectTime": "2022-11-18 15:02:03", "paymentAccount": null, "financeDetailInfo": "[{\"bizId\":\"1593499215339642882\",\"id\":\"1593499216711200770\",\"feeDirection\":2,\"feeSubjectId\":\"1593215386397958146\",\"feeSubjectName\":\"房屋押金\",\"amount\":6000,\"payDate\":1668700800000,\"beginDate\":1652889600000,\"endDate\":1668700800000,\"freeDay\":0,\"occurredMoney\":0,\"notOccurredMoney\":6000,\"comment\":\"\",\"status\":4,\"price\":6000}]", "paymentAccountType": null} # paymentAccountType # BANK_CARD(1, "银行卡"), # ALIPAY(2, "支付宝"), # WECHAT(3, "微信"); reject_time = None reject_name = None reject_payment_account = None reject_payment_account_type = None reject_reason = None if row['cancel_info'] is not None: cancel_info = json.loads(row['cancel_info']) if cancel_info['reason'] is not None: reject_reason = cancel_info['reason'] if cancel_info['rejectTime'] is not None: reject_time = datetime.strptime(cancel_info['rejectTime'], '%Y-%m-%d %H:%M:%S').date() reject_name = cancel_info['rejectName'] if 'paymentAccount' in cancel_info: reject_payment_account = cancel_info['paymentAccount'] if 'paymentAccountType' in cancel_info: reject_payment_account_type = cancel_info['paymentAccountType'] if reject_payment_account_type == 1: reject_payment_account_type = '银联' elif reject_payment_account_type == 2: reject_payment_account_type = '支付宝' elif reject_payment_account_type == 3: reject_payment_account_type = '微信' # 权责发生制 # Loop from begin_date to end_date by day remainder = row['original_money'] current_month = begin_date while current_month <= end_date: first_day = 1 last_day = calendar.monthrange(current_month.year, current_month.month)[1] if current_month.month == begin_date.month: first_day = begin_date.day if current_month.month == end_date.month: last_day = end_date.day num_days_month = last_day - first_day + 1 # print('current_month', current_month, first_day, last_day, num_days_month) # Copy the row and add extra columns new_row = row.copy() new_row['day'] = current_month.date() new_row['is_apportion'] = 1 if current_month.month == end_date.month: # keep remainder in the last day new_row['money'] = remainder else: new_row['money'] = money_per_day * num_days_month remainder -= new_row['money'] new_row['money'] = round(new_row['money'], 2) # print('money', new_row['money'], row['original_money'], money_per_day) new_row['reject_time'] = reject_time new_row['reject_name'] = reject_name new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else '' new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else '' new_row['reject_reason'] = reject_reason if reject_reason is not None else '' not_occur_kind = None if row['fee_direction'] == 1: if row['is_occur'] == 1: new_row['kind'] = '实收' not_occur_kind = '应收' else: new_row['kind'] = '应收' else: if row['is_occur'] == 1: new_row['kind'] = '实付' not_occur_kind = '应付' else: new_row['kind'] = '应付' # Append the new row to the new DataFrame df.loc[len(df)] = new_row if not_occur_kind is not None: # append extra row if detail is occurred new_row = new_row.copy() new_row['kind'] = not_occur_kind df.loc[len(df)] = new_row # Increment the current date by one day # current_month += timedelta(days=1) current_month += relativedelta(months=1) # 收付实现制 # 定金、押金 if row['fee_subject_label'] == 'FEESUBJECT@DEPOSIT': # print('收付实现制', row['fee_subject_label']) new_row = row.copy() new_row['day'] = row['predict_time'] new_row['is_apportion'] = 0 if row['fee_direction'] == 1: if row['is_occur'] == 1: new_row['kind'] = '实收' else: new_row['kind'] = '应收' else: if row['is_occur'] == 1: new_row['kind'] = '实付' else: new_row['kind'] = '应付' new_row['money'] = row['original_money'] new_row['reject_time'] = reject_time new_row['reject_name'] = reject_name new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else '' new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else '' new_row['reject_reason'] = reject_reason if reject_reason is not None else '' df.loc[len(df)] = new_row return df def load(conn, df: pd.DataFrame, target_db) -> None: """ Loads data into a MySQL database """ # Define the column types for the table dtype = { 'tenant_id': NVARCHAR(10), 'contract_id': NVARCHAR(64), 'dept_id': NVARCHAR(64), 'dept_name': NVARCHAR(50), 'emp_id': NVARCHAR(64), 'emp_name': NVARCHAR(50), 'bill_id': NVARCHAR(64), 'bill_detail_id': NVARCHAR(64), 'fee_subject_id': NVARCHAR(64), 'fee_subject_label': NVARCHAR(64), 'fee_subject_name': NVARCHAR(64), 'day': Date, 'kind': NVARCHAR(10), 'money': DECIMAL(14,2), 'reject_time': Date, 'reject_name': NVARCHAR(64), 'reject_payment_account': NVARCHAR(64), 'reject_payment_account_type': NVARCHAR(32), 'reject_reason': NVARCHAR(64), 'is_apportion': INTEGER, } # create target table with df.dtypes df.to_sql(target_db, con=conn, if_exists='append', index=False, dtype=dtype) pass def etl(): config = load_config() target_db = 'bi_bill_detail_' connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\ .format(username=config['mysql']['username'], pwd=quote_plus(config['mysql']['password']), host=config['mysql']['host'], port=config['mysql']['port'], db=config['mysql']['db']) engine = create_engine(connection_string) with engine.begin() as conn: # Get the total number of rows total = query_total(conn) # Define the batch size batch_size = 100 truncate_target_db(conn, target_db) if debug: total = 200 batch_size = 100 print('total', total) # Write the data to the table in batches for i in tqdm(range(0, total, batch_size)): data = extract(conn, batch_size, i) data = transform(data) if debug: print(data.head()) load(conn, data, target_db) etl()