etl_bill_detail.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. """
  2. 寓信账单明细ETL
  3. 综合财务报表
  4. """
  5. import pandas as pd
  6. from sqlalchemy import create_engine, text
  7. from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
  8. from datetime import datetime, timedelta
  9. from tqdm import tqdm
  10. import json
  11. from dateutil.relativedelta import relativedelta
  12. import calendar
  13. from urllib.parse import quote_plus
  14. from utils import load_config, truncate_target_db
  15. debug = False
  16. if debug:
  17. debug_condition = "and bd.biz_id='1687400201327599617'" # P.53
  18. # debug_condition = ''
  19. else:
  20. debug_condition = ''
  21. cols = {
  22. 'tenant_id': [NVARCHAR(10), '租户ID'],
  23. 'dept_id': [NVARCHAR(64), '房源所属门店ID'],
  24. 'dept_name': [NVARCHAR(50), '房源所属门店名称'],
  25. 'house_address': [NVARCHAR(100), '房源地址'],
  26. 'community_id': [NVARCHAR(64), '房源所属项目/小区ID'],
  27. 'community_name': [NVARCHAR(50), '房源所属项目/小区'],
  28. 'house_area': [DECIMAL(10, 2), '房源面积'],
  29. 'contract_dept_id': [NVARCHAR(64), '合同所属门店/部门ID'],
  30. 'contract_dept_name': [NVARCHAR(50), '合同所属门店/部门'],
  31. 'contract_id': [NVARCHAR(64), '合同ID'],
  32. 'contract_no': [NVARCHAR(64), '合同编号'],
  33. 'contract_medium': [NVARCHAR(50), '合同类型【1-纸质,2-电子】'],
  34. 'renter_name': [NVARCHAR(50), '租客姓名'],
  35. 'renter_phone': [NVARCHAR(20), '租客手机号'],
  36. 'renter_id_type': [NVARCHAR(50), '租客证件类型证件类型【1-身份证,2-护照,3-港澳通行证,4-台湾同胞证,5-军官证,6-驾驶证 7-社会信用代码'],
  37. 'renter_id_number': [NVARCHAR(50), '租客证件号码'],
  38. 'sign_type': [NVARCHAR(50), '成交方式【1-新签,2-续签,3-转租,4-换房】'],
  39. 'contract_begin_date': [Date, '合同开始日期'],
  40. 'contract_end_date': [Date, '合同结束日期'],
  41. 'payment_method': [NVARCHAR(50), '付款方式【付款模式【1-提前付款天数,2-固定付款日期,3-提前1个月固定付款日期】'],
  42. 'sign_date': [Date, '签约日期'],
  43. 'signer_id': [NVARCHAR(64), '签约人ID'],
  44. 'signer_name': [NVARCHAR(50), '签约人'],
  45. 'pay_months': [INTEGER, '付几'],
  46. 'deposit_months': [INTEGER, '押几'],
  47. 'bill_id': [NVARCHAR(64), '账单ID'],
  48. 'bill_detail_id': [NVARCHAR(64), '账单明细ID'],
  49. 'fee_subject_id': [NVARCHAR(64), '费用科目ID'],
  50. 'fee_subject_label': [NVARCHAR(64), '费用科目标签'],
  51. 'fee_subject_name': [NVARCHAR(64), '费用科目名称'],
  52. 'day': [Date, '日期'],
  53. 'kind': [NVARCHAR(10), '类型'],
  54. 'money': [DECIMAL(14, 2), '金额'],
  55. 'reject_time': [Date, '退租时间'],
  56. 'reject_name': [NVARCHAR(64), '退租处理人'],
  57. 'reject_payment_account': [NVARCHAR(64), '退款账号'],
  58. 'reject_payment_account_type': [NVARCHAR(32), '退款途径'],
  59. 'reject_reason': [NVARCHAR(64), '退租原因'],
  60. 'is_apportion': [INTEGER, '是否分摊']
  61. }
  62. def query_total(conn) -> int:
  63. query = """
  64. select
  65. count(1)
  66. from yuxin_finance.fin_finance_bill_detail bd
  67. where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
  68. """.format(debug_condition=debug_condition)
  69. return conn.execute(text(query)).scalar()
  70. def extract(conn, batch_size, i) -> pd.DataFrame:
  71. """ This API extracts data from
  72. """
  73. query = """
  74. select
  75. bd.tenant_id,
  76. hhb.dept_id 'dept_id', house_sd.name 'dept_name', hhr.address 'house_address',
  77. hhb.community_id 'community_id', hc.name 'community_name', hhr.house_area 'house_area',
  78. crc_sd.id 'contract_dept_id', crc_sd.name 'contract_dept_name',
  79. rc.id 'contract_id', rc.contract_no 'contract_no', rc.contract_medium 'contract_medium',
  80. cri.name 'renter_name', cri.phone 'renter_phone', cri.certification_type 'renter_id_type', cri.identity_card 'renter_id_number',
  81. rc.sign_type 'sign_type', rc.begin_time 'contract_begin_date', rc.end_time 'contract_end_date', rc.pay_pattern 'payment_method',
  82. rc.sign_time 'sign_date', rc.sign_emp_id 'signer_id', sign_emp.name 'signer_name', rc.periodMonth 'pay_months', rc.depositMonth 'deposit_months',
  83. bd.bill_id, bd.id as 'bill_detail_id', bd.fee_subject_id, sd.label as 'fee_subject_label', sd.name as 'fee_subject_name',
  84. 0 'splitter',
  85. bd.fee_direction, bd.original_money, bd.occurred_money,
  86. bd.begin_time, bd.end_time, bd.is_occur, rc.cancel_info, bd.predict_time
  87. from yuxin_finance.fin_finance_bill_detail bd
  88. left join yuxin_setting.setting_dictionary sd on sd.id=bd.fee_subject_id
  89. left join yuxin_contract.cont_renter_contract rc on rc.id=bd.biz_id
  90. left join yuxin_house.hse_house_room hhr on hhr.is_delete=0 and hhr.id=rc.house_id
  91. left join yuxin_house.hse_house_base hhb on hhb.is_delete=0 and hhb.id=rc.house_id
  92. left join yuxin_setting.setting_department house_sd on house_sd.id=hhb.dept_id and house_sd.is_delete=0
  93. left join yuxin_house.hse_community hc on hc.id=hhb.community_id and hc.is_delete=0
  94. left join yuxin_setting.setting_employee_dept crc_ed on crc_ed.emp_id=rc.maintainer_id and crc_ed.is_delete=0
  95. left join yuxin_setting.setting_department crc_sd on crc_sd.id=crc_ed.dept_id and crc_sd.is_delete=0
  96. left join yuxin_contract.cont_renter_info cri on cri.is_delete=0 and cri.customer_type=1 and cri.contract_id=bd.biz_id
  97. left join yuxin_setting.setting_employee_info sign_emp on sign_emp.id=rc.sign_emp_id and sign_emp.is_delete=0
  98. where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
  99. limit {batch_size} offset {offset}
  100. """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
  101. source_data = pd.read_sql(query, con=conn)
  102. return source_data
  103. def transform(data) -> pd.DataFrame:
  104. """ Transforms the dataset into desired structure and filters
  105. --- 维度:租户,合同,房源,维护人,所属部门,日,财务科目分类,财务科目,应收,应付,实收,实付
  106. --- 指标:金额(尾差保留在最后一日中)
  107. """
  108. # target columns
  109. last_splitter_index = list(data.columns).index('splitter')
  110. columns = list(data.columns[:last_splitter_index])
  111. columns.extend(['day', 'kind', 'money',
  112. 'reject_time',
  113. 'reject_name',
  114. 'reject_payment_account',
  115. 'reject_payment_account_type',
  116. 'reject_reason',
  117. 'is_apportion',
  118. ])
  119. # target data
  120. df = pd.DataFrame(columns=columns)
  121. # Iterate over each row in the DataFrame
  122. for index, row in data.iterrows():
  123. begin_date = row['begin_time']
  124. end_date = row['end_time']
  125. # Calculate the number of days between the two dates
  126. num_days = (end_date - begin_date).days + 1
  127. num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1
  128. # Calculate the amount per day
  129. if num_days == 0:
  130. num_days = 1
  131. elif num_days < 0:
  132. num_days = abs(num_days)
  133. money_per_day = row['original_money'] / num_days
  134. # 退租信息
  135. # {"reason": "", "rejectName": "专区客服1号", "rejectTime": "2022-11-18 15:02:03", "paymentAccount": null, "financeDetailInfo": "[{\"bizId\":\"1593499215339642882\",\"id\":\"1593499216711200770\",\"feeDirection\":2,\"feeSubjectId\":\"1593215386397958146\",\"feeSubjectName\":\"房屋押金\",\"amount\":6000,\"payDate\":1668700800000,\"beginDate\":1652889600000,\"endDate\":1668700800000,\"freeDay\":0,\"occurredMoney\":0,\"notOccurredMoney\":6000,\"comment\":\"\",\"status\":4,\"price\":6000}]", "paymentAccountType": null}
  136. # paymentAccountType
  137. # BANK_CARD(1, "银行卡"),
  138. # ALIPAY(2, "支付宝"),
  139. # WECHAT(3, "微信");
  140. reject_time = None
  141. reject_name = None
  142. reject_payment_account = None
  143. reject_payment_account_type = None
  144. reject_reason = None
  145. if row['cancel_info'] is not None:
  146. cancel_info = json.loads(row['cancel_info'])
  147. if cancel_info['reason'] is not None:
  148. reject_reason = cancel_info['reason']
  149. if cancel_info['rejectTime'] is not None:
  150. reject_time = datetime.strptime(cancel_info['rejectTime'], '%Y-%m-%d %H:%M:%S').date()
  151. reject_name = cancel_info['rejectName']
  152. if 'paymentAccount' in cancel_info:
  153. reject_payment_account = cancel_info['paymentAccount']
  154. if 'paymentAccountType' in cancel_info:
  155. reject_payment_account_type = cancel_info['paymentAccountType']
  156. if reject_payment_account_type == 1:
  157. reject_payment_account_type = '银联'
  158. elif reject_payment_account_type == 2:
  159. reject_payment_account_type = '支付宝'
  160. elif reject_payment_account_type == 3:
  161. reject_payment_account_type = '微信'
  162. # 权责发生制
  163. # Loop from begin_date to end_date by day
  164. remainder = row['original_money']
  165. current_month = begin_date
  166. while current_month <= end_date:
  167. first_day = 1
  168. last_day = calendar.monthrange(current_month.year, current_month.month)[1]
  169. if current_month.month == begin_date.month:
  170. first_day = begin_date.day
  171. if current_month.month == end_date.month:
  172. last_day = end_date.day
  173. num_days_month = last_day - first_day + 1
  174. # print('current_month', current_month, first_day, last_day, num_days_month)
  175. # Copy the row and add extra columns
  176. new_row = row.copy()
  177. new_row['day'] = current_month.date()
  178. new_row['is_apportion'] = 1
  179. if current_month.month == end_date.month:
  180. # keep remainder in the last day
  181. new_row['money'] = remainder
  182. else:
  183. new_row['money'] = money_per_day * num_days_month
  184. remainder -= new_row['money']
  185. new_row['money'] = round(new_row['money'], 2)
  186. # print('money', new_row['money'], row['original_money'], money_per_day)
  187. new_row['reject_time'] = reject_time
  188. new_row['reject_name'] = reject_name
  189. new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
  190. new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
  191. new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
  192. not_occur_kind = None
  193. if row['fee_direction'] == 1:
  194. if row['is_occur'] == 1:
  195. new_row['kind'] = '实收'
  196. not_occur_kind = '应收'
  197. else:
  198. new_row['kind'] = '应收'
  199. else:
  200. if row['is_occur'] == 1:
  201. new_row['kind'] = '实付'
  202. not_occur_kind = '应付'
  203. else:
  204. new_row['kind'] = '应付'
  205. # Append the new row to the new DataFrame
  206. df.loc[len(df)] = new_row
  207. if not_occur_kind is not None:
  208. # append extra row if detail is occurred
  209. new_row = new_row.copy()
  210. new_row['kind'] = not_occur_kind
  211. df.loc[len(df)] = new_row
  212. # Increment the current date by one day
  213. # current_month += timedelta(days=1)
  214. current_month += relativedelta(months=1)
  215. # 收付实现制
  216. # 定金、押金
  217. if row['fee_subject_label'] == 'FEESUBJECT@DEPOSIT':
  218. # print('收付实现制', row['fee_subject_label'])
  219. new_row = row.copy()
  220. new_row['day'] = row['predict_time']
  221. new_row['is_apportion'] = 0
  222. if row['fee_direction'] == 1:
  223. if row['is_occur'] == 1:
  224. new_row['kind'] = '实收'
  225. else:
  226. new_row['kind'] = '应收'
  227. else:
  228. if row['is_occur'] == 1:
  229. new_row['kind'] = '实付'
  230. else:
  231. new_row['kind'] = '应付'
  232. new_row['money'] = row['original_money']
  233. new_row['reject_time'] = reject_time
  234. new_row['reject_name'] = reject_name
  235. new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
  236. new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
  237. new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
  238. df.loc[len(df)] = new_row
  239. return df
  240. def load(conn, df: pd.DataFrame, target_db) -> None:
  241. """ Loads data into a MySQL database
  242. """
  243. # Define the column types for the table
  244. dtypes = {key: value[0] for key, value in cols.items()}
  245. # create target table with df.dtypes
  246. df.to_sql(target_db, con=conn, if_exists='append',
  247. index=False, dtype=dtypes)
  248. # add columns' comment into table
  249. pass
  250. def update_column_comment(conn, target_db):
  251. for key, value in cols.items():
  252. dtype = value[0]
  253. if dtype == Date:
  254. dtype = 'date'
  255. elif dtype == INTEGER:
  256. dtype = 'INT'
  257. conn.execute(text('alter table {target_db} modify column `{col}` {dtype} comment \'{comment}\''
  258. .format(target_db=target_db, col=key, dtype=dtype, comment=value[1])))
  259. def etl():
  260. config = load_config()
  261. target_db = 'bi_finance_statement_'
  262. connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
  263. .format(username=config['mysql']['username'],
  264. pwd=quote_plus(config['mysql']['password']),
  265. host=config['mysql']['host'],
  266. port=config['mysql']['port'],
  267. db=config['mysql']['db'])
  268. engine = create_engine(connection_string)
  269. with engine.begin() as conn:
  270. # Get the total number of rows
  271. total = query_total(conn)
  272. # Define the batch size
  273. batch_size = 100
  274. truncate_target_db(conn, target_db)
  275. if debug:
  276. total = 200
  277. batch_size = 100
  278. print('total', total)
  279. # Write the data to the table in batches
  280. for i in tqdm(range(0, total, batch_size)):
  281. data = extract(conn, batch_size, i)
  282. data = transform(data)
  283. if debug:
  284. print(data.head())
  285. load(conn, data, target_db)
  286. update_column_comment(conn, target_db)
  287. etl()