123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- """
- 寓信账单明细ETL
- 综合财务报表
- """
- import pandas as pd
- from sqlalchemy import create_engine, text
- from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
- from datetime import datetime, timedelta
- from tqdm import tqdm
- import json
- from dateutil.relativedelta import relativedelta
- import calendar
- from urllib.parse import quote_plus
- from utils import load_config, truncate_target_db
- debug = False
- if debug:
- # debug_condition = "and bd.biz_id='1688796583287914497'"
- debug_condition = ''
- else:
- debug_condition = ''
- def query_total(conn) -> int:
- query = """
- select
- count(1)
- from yuxin_finance.fin_finance_bill_detail bd
- where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
- """.format(debug_condition=debug_condition)
- return conn.execute(text(query)).scalar()
- def extract(conn, batch_size, i) -> pd.DataFrame:
- """ This API extracts data from
- """
- query = """
- select
- bd.tenant_id, rc.id 'contract_id', dept.id 'dept_id', dept.name 'dept_name', rc.maintainer_id 'emp_id', emp.name 'emp_name',
- bd.bill_id, bd.id as 'bill_detail_id', bd.fee_subject_id, sd.label as 'fee_subject_label', sd.name as 'fee_subject_name',
- bd.fee_direction, bd.original_money, bd.occurred_money,
- bd.begin_time, bd.end_time, bd.is_occur, rc.cancel_info, bd.predict_time
- from yuxin_finance.fin_finance_bill_detail bd
- left join yuxin_setting.setting_dictionary sd on sd.id=bd.fee_subject_id
- left join yuxin_contract.cont_renter_contract rc on rc.id=bd.biz_id
- left join yuxin_setting.setting_employee_dept ed on ed.emp_id=rc.maintainer_id and ed.is_delete=0
- left join yuxin_setting.setting_employee_info emp on emp.id=rc.maintainer_id and emp.is_delete=0
- left join yuxin_setting.setting_department dept on dept.id=ed.dept_id and dept.is_delete=0
- where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
- limit {batch_size} offset {offset}
- """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
- source_data = pd.read_sql(query, con=conn)
- return source_data
- def transform(data) -> pd.DataFrame:
- """ Transforms the dataset into desired structure and filters
- --- 维度:租户,合同,房源,维护人,所属部门,日,财务科目分类,财务科目,应收,应付,实收,实付
- --- 指标:金额(尾差保留在最后一日中)
- """
- # target columns
- columns = list(data.columns[:11])
- columns.extend(['day', 'kind', 'money',
- 'reject_time',
- 'reject_name',
- 'reject_payment_account',
- 'reject_payment_account_type',
- 'reject_reason',
- 'is_apportion',
- ])
- # target data
- df = pd.DataFrame(columns=columns)
- # Iterate over each row in the DataFrame
- for index, row in data.iterrows():
-
- begin_date = row['begin_time']
- end_date = row['end_time']
- # Calculate the number of days between the two dates
- num_days = (end_date - begin_date).days + 1
- num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1
- # Calculate the amount per day
- if num_days == 0:
- num_days = 1
- elif num_days < 0:
- num_days = abs(num_days)
- money_per_day = row['original_money'] / num_days
- # 退租信息
- # {"reason": "", "rejectName": "专区客服1号", "rejectTime": "2022-11-18 15:02:03", "paymentAccount": null, "financeDetailInfo": "[{\"bizId\":\"1593499215339642882\",\"id\":\"1593499216711200770\",\"feeDirection\":2,\"feeSubjectId\":\"1593215386397958146\",\"feeSubjectName\":\"房屋押金\",\"amount\":6000,\"payDate\":1668700800000,\"beginDate\":1652889600000,\"endDate\":1668700800000,\"freeDay\":0,\"occurredMoney\":0,\"notOccurredMoney\":6000,\"comment\":\"\",\"status\":4,\"price\":6000}]", "paymentAccountType": null}
- # paymentAccountType
- # BANK_CARD(1, "银行卡"),
- # ALIPAY(2, "支付宝"),
- # WECHAT(3, "微信");
- reject_time = None
- reject_name = None
- reject_payment_account = None
- reject_payment_account_type = None
- reject_reason = None
- if row['cancel_info'] is not None:
- cancel_info = json.loads(row['cancel_info'])
- if cancel_info['reason'] is not None:
- reject_reason = cancel_info['reason']
- if cancel_info['rejectTime'] is not None:
- reject_time = datetime.strptime(cancel_info['rejectTime'], '%Y-%m-%d %H:%M:%S').date()
- reject_name = cancel_info['rejectName']
- if 'paymentAccount' in cancel_info:
- reject_payment_account = cancel_info['paymentAccount']
- if 'paymentAccountType' in cancel_info:
- reject_payment_account_type = cancel_info['paymentAccountType']
- if reject_payment_account_type == 1:
- reject_payment_account_type = '银联'
- elif reject_payment_account_type == 2:
- reject_payment_account_type = '支付宝'
- elif reject_payment_account_type == 3:
- reject_payment_account_type = '微信'
-
- # 权责发生制
- # Loop from begin_date to end_date by day
- remainder = row['original_money']
- current_month = begin_date
- while current_month <= end_date:
- first_day = 1
- last_day = calendar.monthrange(current_month.year, current_month.month)[1]
- if current_month.month == begin_date.month:
- first_day = begin_date.day
- if current_month.month == end_date.month:
- last_day = end_date.day
- num_days_month = last_day - first_day + 1
- # print('current_month', current_month, first_day, last_day, num_days_month)
- # Copy the row and add extra columns
- new_row = row.copy()
- new_row['day'] = current_month.date()
- new_row['is_apportion'] = 1
-
- if current_month.month == end_date.month:
- # keep remainder in the last day
- new_row['money'] = remainder
- else:
- new_row['money'] = money_per_day * num_days_month
- remainder -= new_row['money']
- new_row['money'] = round(new_row['money'], 2)
- # print('money', new_row['money'], row['original_money'], money_per_day)
- new_row['reject_time'] = reject_time
- new_row['reject_name'] = reject_name
- new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
- new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
- new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
- not_occur_kind = None
- if row['fee_direction'] == 1:
- if row['is_occur'] == 1:
- new_row['kind'] = '实收'
- not_occur_kind = '应收'
- else:
- new_row['kind'] = '应收'
- else:
- if row['is_occur'] == 1:
- new_row['kind'] = '实付'
- not_occur_kind = '应付'
- else:
- new_row['kind'] = '应付'
- # Append the new row to the new DataFrame
- df.loc[len(df)] = new_row
- if not_occur_kind is not None:
- # append extra row if detail is occurred
- new_row = new_row.copy()
- new_row['kind'] = not_occur_kind
- df.loc[len(df)] = new_row
- # Increment the current date by one day
- # current_month += timedelta(days=1)
- current_month += relativedelta(months=1)
- # 收付实现制
- # 定金、押金
- if row['fee_subject_label'] == 'FEESUBJECT@DEPOSIT':
- # print('收付实现制', row['fee_subject_label'])
- new_row = row.copy()
- new_row['day'] = row['predict_time']
- new_row['is_apportion'] = 0
- if row['fee_direction'] == 1:
- if row['is_occur'] == 1:
- new_row['kind'] = '实收'
- else:
- new_row['kind'] = '应收'
- else:
- if row['is_occur'] == 1:
- new_row['kind'] = '实付'
- else:
- new_row['kind'] = '应付'
- new_row['money'] = row['original_money']
- new_row['reject_time'] = reject_time
- new_row['reject_name'] = reject_name
- new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
- new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
- new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
- df.loc[len(df)] = new_row
- return df
- def load(conn, df: pd.DataFrame, target_db) -> None:
- """ Loads data into a MySQL database
- """
-
- # Define the column types for the table
- dtype = {
- 'tenant_id': NVARCHAR(10),
- 'contract_id': NVARCHAR(64),
- 'dept_id': NVARCHAR(64),
- 'dept_name': NVARCHAR(50),
- 'emp_id': NVARCHAR(64),
- 'emp_name': NVARCHAR(50),
- 'bill_id': NVARCHAR(64),
- 'bill_detail_id': NVARCHAR(64),
- 'fee_subject_id': NVARCHAR(64),
- 'fee_subject_label': NVARCHAR(64),
- 'fee_subject_name': NVARCHAR(64),
- 'day': Date,
- 'kind': NVARCHAR(10),
- 'money': DECIMAL(14,2),
- 'reject_time': Date,
- 'reject_name': NVARCHAR(64),
- 'reject_payment_account': NVARCHAR(64),
- 'reject_payment_account_type': NVARCHAR(32),
- 'reject_reason': NVARCHAR(64),
- 'is_apportion': INTEGER,
- }
- # create target table with df.dtypes
- df.to_sql(target_db, con=conn, if_exists='append',
- index=False, dtype=dtype)
- pass
- def etl():
- config = load_config()
- target_db = 'bi_bill_detail_'
- connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
- .format(username=config['mysql']['username'],
- pwd=quote_plus(config['mysql']['password']),
- host=config['mysql']['host'],
- port=config['mysql']['port'],
- db=config['mysql']['db'])
- engine = create_engine(connection_string)
- with engine.begin() as conn:
- # Get the total number of rows
- total = query_total(conn)
- # Define the batch size
- batch_size = 100
- truncate_target_db(conn, target_db)
- if debug:
- total = 200
- batch_size = 100
- print('total', total)
- # Write the data to the table in batches
- for i in tqdm(range(0, total, batch_size)):
- data = extract(conn, batch_size, i)
- data = transform(data)
- if debug:
- print(data.head())
- load(conn, data, target_db)
- etl()
|