123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326 |
- """
- 寓信账单明细ETL
- 综合财务报表
- """
- import pandas as pd
- from sqlalchemy import create_engine, text
- from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
- from datetime import datetime, timedelta
- from tqdm import tqdm
- import json
- from dateutil.relativedelta import relativedelta
- import calendar
- from urllib.parse import quote_plus
- from utils import load_config, truncate_target_db
- debug = False
- if debug:
- debug_condition = "and bd.biz_id='1687400201327599617'" # P.53
- # debug_condition = ''
- else:
- debug_condition = ''
- cols = {
- 'tenant_id': [NVARCHAR(10), '租户ID'],
- 'dept_id': [NVARCHAR(64), '房源所属门店ID'],
- 'dept_name': [NVARCHAR(50), '房源所属门店名称'],
- 'house_address': [NVARCHAR(100), '房源地址'],
- 'community_id': [NVARCHAR(64), '房源所属项目/小区ID'],
- 'community_name': [NVARCHAR(50), '房源所属项目/小区'],
- 'house_area': [DECIMAL(10, 2), '房源面积'],
- 'contract_dept_id': [NVARCHAR(64), '合同所属门店/部门ID'],
- 'contract_dept_name': [NVARCHAR(50), '合同所属门店/部门'],
- 'contract_id': [NVARCHAR(64), '合同ID'],
- 'contract_no': [NVARCHAR(64), '合同编号'],
- 'contract_medium': [NVARCHAR(50), '合同类型【1-纸质,2-电子】'],
- 'renter_name': [NVARCHAR(50), '租客姓名'],
- 'renter_phone': [NVARCHAR(20), '租客手机号'],
- 'renter_id_type': [NVARCHAR(50), '租客证件类型证件类型【1-身份证,2-护照,3-港澳通行证,4-台湾同胞证,5-军官证,6-驾驶证 7-社会信用代码'],
- 'renter_id_number': [NVARCHAR(50), '租客证件号码'],
- 'sign_type': [NVARCHAR(50), '成交方式【1-新签,2-续签,3-转租,4-换房,5-补签】'],
- 'contract_begin_date': [Date, '合同开始日期'],
- 'contract_end_date': [Date, '合同结束日期'],
- 'payment_method': [NVARCHAR(50), '付款方式【付款模式【1-提前付款天数,2-固定付款日期,3-提前1个月固定付款日期】'],
- 'sign_date': [Date, '签约日期'],
- 'signer_id': [NVARCHAR(64), '签约人ID'],
- 'signer_name': [NVARCHAR(50), '签约人'],
- 'pay_months': [INTEGER, '付几'],
- 'deposit_months': [INTEGER, '押几'],
- 'bill_id': [NVARCHAR(64), '账单ID'],
- 'bill_detail_id': [NVARCHAR(64), '账单明细ID'],
- 'fee_subject_id': [NVARCHAR(64), '费用科目ID'],
- 'fee_subject_label': [NVARCHAR(64), '费用科目标签'],
- 'fee_subject_name': [NVARCHAR(64), '费用科目名称'],
- 'contract_type': [INTEGER, '合同类型 1个人、2企业、4协议'],
- 'day': [Date, '日期'],
- 'kind': [NVARCHAR(10), '类型'],
- 'money': [DECIMAL(14, 2), '金额'],
- 'reject_time': [Date, '退租时间'],
- 'reject_name': [NVARCHAR(64), '退租处理人'],
- 'reject_payment_account': [NVARCHAR(64), '退款账号'],
- 'reject_payment_account_type': [NVARCHAR(32), '退款途径'],
- 'reject_reason': [NVARCHAR(64), '退租原因'],
- 'is_apportion': [INTEGER, '是否分摊']
- }
- def query_total(conn) -> int:
- query = """
- select
- count(1)
- from yuxin_finance.fin_finance_bill_detail bd
- where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
- """.format(debug_condition=debug_condition)
- return conn.execute(text(query)).scalar()
- def extract(conn, batch_size, i) -> pd.DataFrame:
- """ This API extracts data from
- """
- query = """
- select
- bd.tenant_id,
- hhb.dept_id 'dept_id', house_sd.name 'dept_name', hhr.address 'house_address',
- hhb.community_id 'community_id', hc.name 'community_name', hhr.house_area 'house_area',
- crc_sd.id 'contract_dept_id', crc_sd.name 'contract_dept_name',
- rc.id 'contract_id', rc.contract_no 'contract_no', rc.contract_medium 'contract_medium',
- cri.name 'renter_name', cri.phone 'renter_phone', cri.certification_type 'renter_id_type', cri.identity_card 'renter_id_number',
- rc.sign_type 'sign_type', rc.begin_time 'contract_begin_date', rc.end_time 'contract_end_date', rc.pay_pattern 'payment_method',
- rc.sign_time 'sign_date', rc.sign_emp_id 'signer_id', sign_emp.name 'signer_name', rc.periodMonth 'pay_months', rc.depositMonth 'deposit_months',
- bd.bill_id, bd.id as 'bill_detail_id', bd.fee_subject_id, sd.label as 'fee_subject_label', sd.name as 'fee_subject_name',
- rc.`type` 'contract_type',
- 0 'splitter',
- bd.fee_direction, bd.original_money, bd.occurred_money,
- bd.begin_time, bd.end_time, bd.is_occur, rc.cancel_info, bd.predict_time
- from yuxin_finance.fin_finance_bill_detail bd
- left join yuxin_setting.setting_dictionary sd on sd.id=bd.fee_subject_id
- left join yuxin_contract.cont_renter_contract rc on rc.id=bd.biz_id
- left join yuxin_house.hse_house_room hhr on hhr.is_delete=0 and hhr.id=rc.house_id
- left join yuxin_house.hse_house_base hhb on hhb.is_delete=0 and hhb.id=rc.house_id
- left join yuxin_setting.setting_department house_sd on house_sd.id=hhb.dept_id and house_sd.is_delete=0
- left join yuxin_house.hse_community hc on hc.id=hhb.community_id and hc.is_delete=0
- left join yuxin_setting.setting_employee_dept crc_ed on crc_ed.emp_id=rc.maintainer_id and crc_ed.is_delete=0
- left join yuxin_setting.setting_department crc_sd on crc_sd.id=crc_ed.dept_id and crc_sd.is_delete=0
- left join yuxin_contract.cont_renter_info cri on cri.is_delete=0 and cri.customer_type=1 and cri.contract_id=bd.biz_id
- left join yuxin_setting.setting_employee_info sign_emp on sign_emp.id=rc.sign_emp_id and sign_emp.is_delete=0
- where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
- limit {batch_size} offset {offset}
- """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
- source_data = pd.read_sql(query, con=conn)
- return source_data
- def transform(data) -> pd.DataFrame:
- """ Transforms the dataset into desired structure and filters
- --- 维度:租户,合同,房源,维护人,所属部门,日,财务科目分类,财务科目,应收,应付,实收,实付
- --- 指标:金额(尾差保留在最后一日中)
- """
- # target columns
- last_splitter_index = list(data.columns).index('splitter')
- columns = list(data.columns[:last_splitter_index])
- columns.extend(['day', 'kind', 'money',
- 'reject_time',
- 'reject_name',
- 'reject_payment_account',
- 'reject_payment_account_type',
- 'reject_reason',
- 'is_apportion',
- ])
- # target data
- df = pd.DataFrame(columns=columns)
- # Iterate over each row in the DataFrame
- for index, row in data.iterrows():
-
- begin_date = row['begin_time']
- end_date = row['end_time']
- # Calculate the number of days between the two dates
- num_days = (end_date - begin_date).days + 1
- num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1
- # Calculate the amount per day
- if num_days == 0:
- num_days = 1
- elif num_days < 0:
- num_days = abs(num_days)
- money_per_day = row['original_money'] / num_days
- # 退租信息
- # {"reason": "", "rejectName": "专区客服1号", "rejectTime": "2022-11-18 15:02:03", "paymentAccount": null, "financeDetailInfo": "[{\"bizId\":\"1593499215339642882\",\"id\":\"1593499216711200770\",\"feeDirection\":2,\"feeSubjectId\":\"1593215386397958146\",\"feeSubjectName\":\"房屋押金\",\"amount\":6000,\"payDate\":1668700800000,\"beginDate\":1652889600000,\"endDate\":1668700800000,\"freeDay\":0,\"occurredMoney\":0,\"notOccurredMoney\":6000,\"comment\":\"\",\"status\":4,\"price\":6000}]", "paymentAccountType": null}
- # paymentAccountType
- # BANK_CARD(1, "银行卡"),
- # ALIPAY(2, "支付宝"),
- # WECHAT(3, "微信");
- reject_time = None
- reject_name = None
- reject_payment_account = None
- reject_payment_account_type = None
- reject_reason = None
- if row['cancel_info'] is not None:
- cancel_info = json.loads(row['cancel_info'])
- if cancel_info['reason'] is not None:
- reject_reason = cancel_info['reason']
- if cancel_info['rejectTime'] is not None:
- reject_time = datetime.strptime(cancel_info['rejectTime'], '%Y-%m-%d %H:%M:%S').date()
- reject_name = cancel_info['rejectName']
- if 'paymentAccount' in cancel_info:
- reject_payment_account = cancel_info['paymentAccount']
- if 'paymentAccountType' in cancel_info:
- reject_payment_account_type = cancel_info['paymentAccountType']
- if reject_payment_account_type == 1:
- reject_payment_account_type = '银联'
- elif reject_payment_account_type == 2:
- reject_payment_account_type = '支付宝'
- elif reject_payment_account_type == 3:
- reject_payment_account_type = '微信'
-
- # 权责发生制
- # Loop from begin_date to end_date by day
- remainder = row['original_money']
- current_month = begin_date
- while current_month <= end_date:
- first_day = 1
- last_day = calendar.monthrange(current_month.year, current_month.month)[1]
- if current_month.month == begin_date.month:
- first_day = begin_date.day
- if current_month.month == end_date.month:
- last_day = end_date.day
- num_days_month = last_day - first_day + 1
- # print('current_month', current_month, first_day, last_day, num_days_month)
- # Copy the row and add extra columns
- new_row = row.copy()
- new_row['day'] = current_month.date()
- new_row['is_apportion'] = 1
-
- if current_month.month == end_date.month:
- # keep remainder in the last day
- new_row['money'] = remainder
- else:
- new_row['money'] = money_per_day * num_days_month
- remainder -= new_row['money']
- new_row['money'] = round(new_row['money'], 2)
- # print('money', new_row['money'], row['original_money'], money_per_day)
- new_row['reject_time'] = reject_time
- new_row['reject_name'] = reject_name
- new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
- new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
- new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
- not_occur_kind = None
- if row['fee_direction'] == 1:
- if row['is_occur'] == 1:
- new_row['kind'] = '实收'
- not_occur_kind = '应收'
- else:
- new_row['kind'] = '应收'
- else:
- if row['is_occur'] == 1:
- new_row['kind'] = '实付'
- not_occur_kind = '应付'
- else:
- new_row['kind'] = '应付'
- # Append the new row to the new DataFrame
- df.loc[len(df)] = new_row
- if not_occur_kind is not None:
- # append extra row if detail is occurred
- new_row = new_row.copy()
- new_row['kind'] = not_occur_kind
- df.loc[len(df)] = new_row
- # Increment the current date by one day
- # current_month += timedelta(days=1)
- current_month += relativedelta(months=1)
- # 收付实现制
- # 定金、押金
- if row['fee_subject_label'] == 'FEESUBJECT@DEPOSIT':
- # print('收付实现制', row['fee_subject_label'])
- new_row = row.copy()
- new_row['day'] = row['predict_time']
- new_row['is_apportion'] = 0
- if row['fee_direction'] == 1:
- if row['is_occur'] == 1:
- new_row['kind'] = '实收'
- else:
- new_row['kind'] = '应收'
- else:
- if row['is_occur'] == 1:
- new_row['kind'] = '实付'
- else:
- new_row['kind'] = '应付'
- new_row['money'] = row['original_money']
- new_row['reject_time'] = reject_time
- new_row['reject_name'] = reject_name
- new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
- new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
- new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
- df.loc[len(df)] = new_row
- return df
- def load(conn, df: pd.DataFrame, target_db) -> None:
- """ Loads data into a MySQL database
- """
-
- # Define the column types for the table
- dtypes = {key: value[0] for key, value in cols.items()}
- # create target table with df.dtypes
- df.to_sql(target_db, con=conn, if_exists='append',
- index=False, dtype=dtypes)
- # add columns' comment into table
- pass
- def update_column_comment(conn, target_db):
- for key, value in cols.items():
- dtype = value[0]
- if dtype == Date:
- dtype = 'date'
- elif dtype == INTEGER:
- dtype = 'INT'
- conn.execute(text('alter table {target_db} modify column `{col}` {dtype} comment \'{comment}\''
- .format(target_db=target_db, col=key, dtype=dtype, comment=value[1])))
- def etl():
- config = load_config()
- target_db = 'bi_finance_statement_'
- connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
- .format(username=config['mysql']['username'],
- pwd=quote_plus(config['mysql']['password']),
- host=config['mysql']['host'],
- port=config['mysql']['port'],
- db=config['mysql']['db'])
- engine = create_engine(connection_string)
- with engine.begin() as conn:
- # Get the total number of rows
- total = query_total(conn)
- # Define the batch size
- batch_size = 100
- truncate_target_db(conn, target_db)
- if debug:
- total = 200
- batch_size = 100
- print('total', total)
- # Write the data to the table in batches
- for i in tqdm(range(0, total, batch_size)):
- data = extract(conn, batch_size, i)
- data = transform(data)
- if debug:
- print(data.head())
- load(conn, data, target_db)
- update_column_comment(conn, target_db)
- etl()
|