""" 寓信账单明细ETL 综合财务报表 """ import pandas as pd from sqlalchemy import create_engine, text from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER from datetime import datetime, timedelta from tqdm import tqdm import json from dateutil.relativedelta import relativedelta import calendar from urllib.parse import quote_plus from utils import load_config, truncate_target_db, find_renewal_contract debug = False if debug: # debug_condition = "and bd.biz_id='1687400201327599617'" # P.53 debug_condition = "and bd.biz_id='1731870365341253635'" # 调试押金权责 # debug_condition = '' else: debug_condition = '' cols = { 'tenant_id': [NVARCHAR(10), '租户ID'], 'dept_id': [NVARCHAR(64), '房源所属门店ID'], 'dept_name': [NVARCHAR(50), '房源所属门店名称'], 'house_address': [NVARCHAR(100), '房源地址'], 'community_id': [NVARCHAR(64), '房源所属项目/小区ID'], 'community_name': [NVARCHAR(50), '房源所属项目/小区'], 'house_area': [DECIMAL(10, 2), '房源面积'], 'contract_dept_id': [NVARCHAR(64), '合同所属门店/部门ID'], 'contract_dept_name': [NVARCHAR(50), '合同所属门店/部门'], 'contract_id': [NVARCHAR(64), '合同ID'], 'contract_no': [NVARCHAR(64), '合同编号'], 'contract_medium': [NVARCHAR(50), '合同类型【1-纸质,2-电子】'], 'renter_name': [NVARCHAR(50), '租客姓名'], 'renter_phone': [NVARCHAR(20), '租客手机号'], 'renter_id_type': [NVARCHAR(50), '租客证件类型证件类型【1-身份证,2-护照,3-港澳通行证,4-台湾同胞证,5-军官证,6-驾驶证 7-社会信用代码'], 'renter_id_number': [NVARCHAR(50), '租客证件号码'], 'sign_type': [NVARCHAR(50), '成交方式【1-新签,2-续签,3-转租,4-换房,5-补签】'], 'contract_begin_date': [Date, '合同开始日期'], 'contract_end_date': [Date, '合同结束日期'], 'payment_method': [NVARCHAR(50), '付款方式【付款模式【1-提前付款天数,2-固定付款日期,3-提前1个月固定付款日期】'], 'sign_date': [Date, '签约日期'], 'signer_id': [NVARCHAR(64), '签约人ID'], 'signer_name': [NVARCHAR(50), '签约人'], 'pay_months': [INTEGER, '付几'], 'deposit_months': [INTEGER, '押几'], 'bill_id': [NVARCHAR(64), '账单ID'], 'bill_detail_id': [NVARCHAR(64), '账单明细ID'], 'fee_subject_id': [NVARCHAR(64), '费用科目ID'], 'fee_subject_label': [NVARCHAR(64), '费用科目标签'], 'fee_subject_name': [NVARCHAR(64), '费用科目名称'], 'contract_type': [INTEGER, '合同类型 1个人、2企业、4协议'], 'day': [Date, '日期'], 'kind': [NVARCHAR(10), '类型'], 'money': [DECIMAL(14, 2), '金额'], 'reject_time': [Date, '退租时间'], 'reject_name': [NVARCHAR(64), '退租处理人'], 'reject_payment_account': [NVARCHAR(64), '退款账号'], 'reject_payment_account_type': [NVARCHAR(32), '退款途径'], 'reject_reason': [NVARCHAR(64), '退租原因'], 'is_apportion': [INTEGER, '是否分摊'] } def query_total(conn) -> int: query = """ select count(1) from yuxin_finance.fin_finance_bill_detail bd where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition} """.format(debug_condition=debug_condition) return conn.execute(text(query)).scalar() def extract(conn, batch_size, i) -> pd.DataFrame: """ This API extracts data from """ query = """ select bd.tenant_id, hhb.dept_id 'dept_id', house_sd.name 'dept_name', hhr.address 'house_address', hhb.community_id 'community_id', hc.name 'community_name', hhr.house_area 'house_area', crc_sd.id 'contract_dept_id', crc_sd.name 'contract_dept_name', rc.id 'contract_id', rc.contract_no 'contract_no', rc.contract_medium 'contract_medium', cri.name 'renter_name', cri.phone 'renter_phone', cri.certification_type 'renter_id_type', cri.identity_card 'renter_id_number', rc.sign_type 'sign_type', rc.begin_time 'contract_begin_date', rc.end_time 'contract_end_date', rc.pay_pattern 'payment_method', rc.sign_time 'sign_date', rc.sign_emp_id 'signer_id', sign_emp.name 'signer_name', rc.periodMonth 'pay_months', rc.depositMonth 'deposit_months', bd.bill_id, bd.id as 'bill_detail_id', bd.fee_subject_id, sd.label as 'fee_subject_label', sd.name as 'fee_subject_name', rc.`type` 'contract_type', 0 'splitter', rc.quite_date, bd.fee_direction, bd.original_money, bd.occurred_money, bd.begin_time, bd.end_time, bd.is_occur, rc.cancel_info, bd.predict_time from yuxin_finance.fin_finance_bill_detail bd left join yuxin_setting.setting_dictionary sd on sd.id=bd.fee_subject_id left join yuxin_contract.cont_renter_contract rc on rc.id=bd.biz_id and rc.is_delete=0 left join yuxin_house.hse_house_room hhr on hhr.is_delete=0 and hhr.id=rc.house_id left join yuxin_house.hse_house_base hhb on hhb.is_delete=0 and hhb.id=rc.house_id left join yuxin_setting.setting_department house_sd on house_sd.id=hhb.dept_id and house_sd.is_delete=0 left join yuxin_house.hse_community hc on hc.id=hhb.community_id and hc.is_delete=0 left join yuxin_setting.setting_employee_dept crc_ed on crc_ed.emp_id=rc.maintainer_id and crc_ed.is_delete=0 left join yuxin_setting.setting_department crc_sd on crc_sd.id=crc_ed.dept_id and crc_sd.is_delete=0 left join yuxin_contract.cont_renter_info cri on cri.is_delete=0 and cri.customer_type=1 and cri.contract_id=bd.biz_id left join yuxin_setting.setting_employee_info sign_emp on sign_emp.id=rc.sign_emp_id and sign_emp.is_delete=0 where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition} limit {batch_size} offset {offset} """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition) source_data = pd.read_sql(query, con=conn) return source_data def transform(conn, data) -> pd.DataFrame: """ Transforms the dataset into desired structure and filters --- 维度:租户,合同,房源,维护人,所属部门,日,财务科目分类,财务科目,应收,应付,实收,实付 --- 指标:金额(尾差保留在最后一日中) """ # target columns last_splitter_index = list(data.columns).index('splitter') columns = list(data.columns[:last_splitter_index]) columns.extend(['day', 'kind', 'money', 'reject_time', 'reject_name', 'reject_payment_account', 'reject_payment_account_type', 'reject_reason', 'is_apportion', ]) # target data df = pd.DataFrame(columns=columns) # Iterate over each row in the DataFrame for index, row in data.iterrows(): # 查找续租合同 renewed_end_time = find_renewal_contract(conn, row['contract_id']) begin_date = row['begin_time'] if renewed_end_time is None: end_date = row['quite_date'] if row['quite_date'] is not None and not pd.isnull( row['quite_date']) else row['end_time'] else: end_date = renewed_end_time # end_date = renewed_end_time # Calculate the number of days between the two dates num_days = (end_date - begin_date).days + 1 num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1 # Calculate the amount per day if num_days == 0: num_days = 1 elif num_days < 0: num_days = abs(num_days) money_per_day = row['original_money'] / num_days # 退租信息 # {"reason": "", "rejectName": "专区客服1号", "rejectTime": "2022-11-18 15:02:03", "paymentAccount": null, "financeDetailInfo": "[{\"bizId\":\"1593499215339642882\",\"id\":\"1593499216711200770\",\"feeDirection\":2,\"feeSubjectId\":\"1593215386397958146\",\"feeSubjectName\":\"房屋押金\",\"amount\":6000,\"payDate\":1668700800000,\"beginDate\":1652889600000,\"endDate\":1668700800000,\"freeDay\":0,\"occurredMoney\":0,\"notOccurredMoney\":6000,\"comment\":\"\",\"status\":4,\"price\":6000}]", "paymentAccountType": null} # paymentAccountType # BANK_CARD(1, "银行卡"), # ALIPAY(2, "支付宝"), # WECHAT(3, "微信"); reject_time = None reject_name = None reject_payment_account = None reject_payment_account_type = None reject_reason = None if row['cancel_info'] is not None: cancel_info = json.loads(row['cancel_info']) if cancel_info['reason'] is not None: reject_reason = cancel_info['reason'] if cancel_info['rejectTime'] is not None: reject_time = datetime.strptime(cancel_info['rejectTime'], '%Y-%m-%d %H:%M:%S').date() reject_name = cancel_info['rejectName'] if 'paymentAccount' in cancel_info: reject_payment_account = cancel_info['paymentAccount'] if 'paymentAccountType' in cancel_info: reject_payment_account_type = cancel_info['paymentAccountType'] if reject_payment_account_type == 1: reject_payment_account_type = '银联' elif reject_payment_account_type == 2: reject_payment_account_type = '支付宝' elif reject_payment_account_type == 3: reject_payment_account_type = '微信' # 权责发生制 # Loop from begin_date to end_date by day remainder = row['original_money'] current_month = begin_date while current_month <= end_date: first_day = 1 last_day = calendar.monthrange(current_month.year, current_month.month)[1] if current_month.month == begin_date.month and current_month.year == begin_date.year: first_day = begin_date.day if current_month.month == end_date.month and current_month.year == end_date.year: last_day = end_date.day num_days_month = last_day - first_day + 1 # print('current_month', current_month, first_day, last_day, num_days_month) # Copy the row and add extra columns new_row = row.copy() new_row['day'] = current_month.date() new_row['is_apportion'] = 1 if current_month.month == end_date.month and current_month.year == end_date.year: # keep remainder in the last day new_row['money'] = remainder else: new_row['money'] = money_per_day * num_days_month remainder -= new_row['money'] new_row['money'] = round(new_row['money'], 2) # print('money', new_row['money'], row['original_money'], money_per_day) new_row['reject_time'] = reject_time new_row['reject_name'] = reject_name new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else '' new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else '' new_row['reject_reason'] = reject_reason if reject_reason is not None else '' not_occur_kind = None if row['fee_direction'] == 1: if row['is_occur'] == 1: new_row['kind'] = '实收' not_occur_kind = '应收' else: new_row['kind'] = '应收' else: if row['is_occur'] == 1: new_row['kind'] = '实付' not_occur_kind = '应付' else: new_row['kind'] = '应付' # Append the new row to the new DataFrame df.loc[len(df)] = new_row if not_occur_kind is not None: # append extra row if detail is occurred new_row = new_row.copy() new_row['kind'] = not_occur_kind df.loc[len(df)] = new_row # Increment the current date by one day # current_month += timedelta(days=1) current_month += relativedelta(months=1) # 收付实现制 # 定金、押金 if row['fee_subject_label'] == 'FEESUBJECT@DEPOSIT': # print('收付实现制', row['fee_subject_label']) new_row = row.copy() new_row['day'] = row['predict_time'] new_row['is_apportion'] = 0 if row['fee_direction'] == 1: if row['is_occur'] == 1: new_row['kind'] = '实收' else: new_row['kind'] = '应收' else: if row['is_occur'] == 1: new_row['kind'] = '实付' else: new_row['kind'] = '应付' new_row['money'] = row['original_money'] new_row['reject_time'] = reject_time new_row['reject_name'] = reject_name new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else '' new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else '' new_row['reject_reason'] = reject_reason if reject_reason is not None else '' df.loc[len(df)] = new_row return df def load(conn, df: pd.DataFrame, target_db) -> None: """ Loads data into a MySQL database """ # Define the column types for the table dtypes = {key: value[0] for key, value in cols.items()} # create target table with df.dtypes df.to_sql(target_db, con=conn, if_exists='append', index=False, dtype=dtypes) # add columns' comment into table pass def update_column_comment(conn, target_db): for key, value in cols.items(): dtype = value[0] if dtype == Date: dtype = 'date' elif dtype == INTEGER: dtype = 'INT' conn.execute(text('alter table {target_db} modify column `{col}` {dtype} comment \'{comment}\'' .format(target_db=target_db, col=key, dtype=dtype, comment=value[1]))) def etl(): config = load_config() target_db = 'bi_finance_statement_' connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\ .format(username=config['mysql']['username'], pwd=quote_plus(config['mysql']['password']), host=config['mysql']['host'], port=config['mysql']['port'], db=config['mysql']['db']) engine = create_engine(connection_string) with engine.begin() as conn: # Get the total number of rows total = query_total(conn) # Define the batch size batch_size = 100 truncate_target_db(conn, target_db) if debug: total = 200 batch_size = 100 print('total', total) # Write the data to the table in batches for i in tqdm(range(0, total, batch_size)): data = extract(conn, batch_size, i) data = transform(conn, data) if debug: print(data.head()) load(conn, data, target_db) update_column_comment(conn, target_db) etl()