etl_bill_detail.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. """
  2. 寓信账单明细ETL
  3. 综合财务报表
  4. """
  5. import pandas as pd
  6. from sqlalchemy import create_engine, text
  7. from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
  8. from datetime import datetime, timedelta
  9. from tqdm import tqdm
  10. import json
  11. from dateutil.relativedelta import relativedelta
  12. import calendar
  13. from urllib.parse import quote_plus
  14. from utils import load_config, truncate_target_db
  15. debug = False
  16. if debug:
  17. debug_condition = "and bd.biz_id='1687400201327599617'" # P.53
  18. # debug_condition = ''
  19. else:
  20. debug_condition = ''
  21. cols = {
  22. 'tenant_id': [NVARCHAR(10), '租户ID'],
  23. 'dept_id': [NVARCHAR(64), '房源所属门店ID'],
  24. 'dept_name': [NVARCHAR(50), '房源所属门店名称'],
  25. 'house_address': [NVARCHAR(100), '房源地址'],
  26. 'community_id': [NVARCHAR(64), '房源所属项目/小区ID'],
  27. 'community_name': [NVARCHAR(50), '房源所属项目/小区'],
  28. 'house_area': [DECIMAL(10, 2), '房源面积'],
  29. 'contract_dept_id': [NVARCHAR(64), '合同所属门店/部门ID'],
  30. 'contract_dept_name': [NVARCHAR(50), '合同所属门店/部门'],
  31. 'contract_id': [NVARCHAR(64), '合同ID'],
  32. 'contract_no': [NVARCHAR(64), '合同编号'],
  33. 'contract_medium': [NVARCHAR(50), '合同类型【1-纸质,2-电子】'],
  34. 'renter_name': [NVARCHAR(50), '租客姓名'],
  35. 'renter_phone': [NVARCHAR(20), '租客手机号'],
  36. 'renter_id_type': [NVARCHAR(50), '租客证件类型证件类型【1-身份证,2-护照,3-港澳通行证,4-台湾同胞证,5-军官证,6-驾驶证 7-社会信用代码'],
  37. 'renter_id_number': [NVARCHAR(50), '租客证件号码'],
  38. 'sign_type': [NVARCHAR(50), '成交方式【1-新签,2-续签,3-转租,4-换房,5-补签】'],
  39. 'contract_begin_date': [Date, '合同开始日期'],
  40. 'contract_end_date': [Date, '合同结束日期'],
  41. 'payment_method': [NVARCHAR(50), '付款方式【付款模式【1-提前付款天数,2-固定付款日期,3-提前1个月固定付款日期】'],
  42. 'sign_date': [Date, '签约日期'],
  43. 'signer_id': [NVARCHAR(64), '签约人ID'],
  44. 'signer_name': [NVARCHAR(50), '签约人'],
  45. 'pay_months': [INTEGER, '付几'],
  46. 'deposit_months': [INTEGER, '押几'],
  47. 'bill_id': [NVARCHAR(64), '账单ID'],
  48. 'bill_detail_id': [NVARCHAR(64), '账单明细ID'],
  49. 'fee_subject_id': [NVARCHAR(64), '费用科目ID'],
  50. 'fee_subject_label': [NVARCHAR(64), '费用科目标签'],
  51. 'fee_subject_name': [NVARCHAR(64), '费用科目名称'],
  52. 'contract_type': [INTEGER, '合同类型 1个人、2企业、4协议'],
  53. 'day': [Date, '日期'],
  54. 'kind': [NVARCHAR(10), '类型'],
  55. 'money': [DECIMAL(14, 2), '金额'],
  56. 'reject_time': [Date, '退租时间'],
  57. 'reject_name': [NVARCHAR(64), '退租处理人'],
  58. 'reject_payment_account': [NVARCHAR(64), '退款账号'],
  59. 'reject_payment_account_type': [NVARCHAR(32), '退款途径'],
  60. 'reject_reason': [NVARCHAR(64), '退租原因'],
  61. 'is_apportion': [INTEGER, '是否分摊']
  62. }
  63. def query_total(conn) -> int:
  64. query = """
  65. select
  66. count(1)
  67. from yuxin_finance.fin_finance_bill_detail bd
  68. where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
  69. """.format(debug_condition=debug_condition)
  70. return conn.execute(text(query)).scalar()
  71. def extract(conn, batch_size, i) -> pd.DataFrame:
  72. """ This API extracts data from
  73. """
  74. query = """
  75. select
  76. bd.tenant_id,
  77. hhb.dept_id 'dept_id', house_sd.name 'dept_name', hhr.address 'house_address',
  78. hhb.community_id 'community_id', hc.name 'community_name', hhr.house_area 'house_area',
  79. crc_sd.id 'contract_dept_id', crc_sd.name 'contract_dept_name',
  80. rc.id 'contract_id', rc.contract_no 'contract_no', rc.contract_medium 'contract_medium',
  81. cri.name 'renter_name', cri.phone 'renter_phone', cri.certification_type 'renter_id_type', cri.identity_card 'renter_id_number',
  82. rc.sign_type 'sign_type', rc.begin_time 'contract_begin_date', rc.end_time 'contract_end_date', rc.pay_pattern 'payment_method',
  83. rc.sign_time 'sign_date', rc.sign_emp_id 'signer_id', sign_emp.name 'signer_name', rc.periodMonth 'pay_months', rc.depositMonth 'deposit_months',
  84. bd.bill_id, bd.id as 'bill_detail_id', bd.fee_subject_id, sd.label as 'fee_subject_label', sd.name as 'fee_subject_name',
  85. rc.`type` 'contract_type',
  86. 0 'splitter',
  87. bd.fee_direction, bd.original_money, bd.occurred_money,
  88. bd.begin_time, bd.end_time, bd.is_occur, rc.cancel_info, bd.predict_time
  89. from yuxin_finance.fin_finance_bill_detail bd
  90. left join yuxin_setting.setting_dictionary sd on sd.id=bd.fee_subject_id
  91. left join yuxin_contract.cont_renter_contract rc on rc.id=bd.biz_id
  92. left join yuxin_house.hse_house_room hhr on hhr.is_delete=0 and hhr.id=rc.house_id
  93. left join yuxin_house.hse_house_base hhb on hhb.is_delete=0 and hhb.id=rc.house_id
  94. left join yuxin_setting.setting_department house_sd on house_sd.id=hhb.dept_id and house_sd.is_delete=0
  95. left join yuxin_house.hse_community hc on hc.id=hhb.community_id and hc.is_delete=0
  96. left join yuxin_setting.setting_employee_dept crc_ed on crc_ed.emp_id=rc.maintainer_id and crc_ed.is_delete=0
  97. left join yuxin_setting.setting_department crc_sd on crc_sd.id=crc_ed.dept_id and crc_sd.is_delete=0
  98. left join yuxin_contract.cont_renter_info cri on cri.is_delete=0 and cri.customer_type=1 and cri.contract_id=bd.biz_id
  99. left join yuxin_setting.setting_employee_info sign_emp on sign_emp.id=rc.sign_emp_id and sign_emp.is_delete=0
  100. where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
  101. limit {batch_size} offset {offset}
  102. """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
  103. source_data = pd.read_sql(query, con=conn)
  104. return source_data
  105. def transform(data) -> pd.DataFrame:
  106. """ Transforms the dataset into desired structure and filters
  107. --- 维度:租户,合同,房源,维护人,所属部门,日,财务科目分类,财务科目,应收,应付,实收,实付
  108. --- 指标:金额(尾差保留在最后一日中)
  109. """
  110. # target columns
  111. last_splitter_index = list(data.columns).index('splitter')
  112. columns = list(data.columns[:last_splitter_index])
  113. columns.extend(['day', 'kind', 'money',
  114. 'reject_time',
  115. 'reject_name',
  116. 'reject_payment_account',
  117. 'reject_payment_account_type',
  118. 'reject_reason',
  119. 'is_apportion',
  120. ])
  121. # target data
  122. df = pd.DataFrame(columns=columns)
  123. # Iterate over each row in the DataFrame
  124. for index, row in data.iterrows():
  125. begin_date = row['begin_time']
  126. end_date = row['end_time']
  127. # Calculate the number of days between the two dates
  128. num_days = (end_date - begin_date).days + 1
  129. num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1
  130. # Calculate the amount per day
  131. if num_days == 0:
  132. num_days = 1
  133. elif num_days < 0:
  134. num_days = abs(num_days)
  135. money_per_day = row['original_money'] / num_days
  136. # 退租信息
  137. # {"reason": "", "rejectName": "专区客服1号", "rejectTime": "2022-11-18 15:02:03", "paymentAccount": null, "financeDetailInfo": "[{\"bizId\":\"1593499215339642882\",\"id\":\"1593499216711200770\",\"feeDirection\":2,\"feeSubjectId\":\"1593215386397958146\",\"feeSubjectName\":\"房屋押金\",\"amount\":6000,\"payDate\":1668700800000,\"beginDate\":1652889600000,\"endDate\":1668700800000,\"freeDay\":0,\"occurredMoney\":0,\"notOccurredMoney\":6000,\"comment\":\"\",\"status\":4,\"price\":6000}]", "paymentAccountType": null}
  138. # paymentAccountType
  139. # BANK_CARD(1, "银行卡"),
  140. # ALIPAY(2, "支付宝"),
  141. # WECHAT(3, "微信");
  142. reject_time = None
  143. reject_name = None
  144. reject_payment_account = None
  145. reject_payment_account_type = None
  146. reject_reason = None
  147. if row['cancel_info'] is not None:
  148. cancel_info = json.loads(row['cancel_info'])
  149. if cancel_info['reason'] is not None:
  150. reject_reason = cancel_info['reason']
  151. if cancel_info['rejectTime'] is not None:
  152. reject_time = datetime.strptime(cancel_info['rejectTime'], '%Y-%m-%d %H:%M:%S').date()
  153. reject_name = cancel_info['rejectName']
  154. if 'paymentAccount' in cancel_info:
  155. reject_payment_account = cancel_info['paymentAccount']
  156. if 'paymentAccountType' in cancel_info:
  157. reject_payment_account_type = cancel_info['paymentAccountType']
  158. if reject_payment_account_type == 1:
  159. reject_payment_account_type = '银联'
  160. elif reject_payment_account_type == 2:
  161. reject_payment_account_type = '支付宝'
  162. elif reject_payment_account_type == 3:
  163. reject_payment_account_type = '微信'
  164. # 权责发生制
  165. # Loop from begin_date to end_date by day
  166. remainder = row['original_money']
  167. current_month = begin_date
  168. while current_month <= end_date:
  169. first_day = 1
  170. last_day = calendar.monthrange(current_month.year, current_month.month)[1]
  171. if current_month.month == begin_date.month:
  172. first_day = begin_date.day
  173. if current_month.month == end_date.month:
  174. last_day = end_date.day
  175. num_days_month = last_day - first_day + 1
  176. # print('current_month', current_month, first_day, last_day, num_days_month)
  177. # Copy the row and add extra columns
  178. new_row = row.copy()
  179. new_row['day'] = current_month.date()
  180. new_row['is_apportion'] = 1
  181. if current_month.month == end_date.month:
  182. # keep remainder in the last day
  183. new_row['money'] = remainder
  184. else:
  185. new_row['money'] = money_per_day * num_days_month
  186. remainder -= new_row['money']
  187. new_row['money'] = round(new_row['money'], 2)
  188. # print('money', new_row['money'], row['original_money'], money_per_day)
  189. new_row['reject_time'] = reject_time
  190. new_row['reject_name'] = reject_name
  191. new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
  192. new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
  193. new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
  194. not_occur_kind = None
  195. if row['fee_direction'] == 1:
  196. if row['is_occur'] == 1:
  197. new_row['kind'] = '实收'
  198. not_occur_kind = '应收'
  199. else:
  200. new_row['kind'] = '应收'
  201. else:
  202. if row['is_occur'] == 1:
  203. new_row['kind'] = '实付'
  204. not_occur_kind = '应付'
  205. else:
  206. new_row['kind'] = '应付'
  207. # Append the new row to the new DataFrame
  208. df.loc[len(df)] = new_row
  209. if not_occur_kind is not None:
  210. # append extra row if detail is occurred
  211. new_row = new_row.copy()
  212. new_row['kind'] = not_occur_kind
  213. df.loc[len(df)] = new_row
  214. # Increment the current date by one day
  215. # current_month += timedelta(days=1)
  216. current_month += relativedelta(months=1)
  217. # 收付实现制
  218. # 定金、押金
  219. if row['fee_subject_label'] == 'FEESUBJECT@DEPOSIT':
  220. # print('收付实现制', row['fee_subject_label'])
  221. new_row = row.copy()
  222. new_row['day'] = row['predict_time']
  223. new_row['is_apportion'] = 0
  224. if row['fee_direction'] == 1:
  225. if row['is_occur'] == 1:
  226. new_row['kind'] = '实收'
  227. else:
  228. new_row['kind'] = '应收'
  229. else:
  230. if row['is_occur'] == 1:
  231. new_row['kind'] = '实付'
  232. else:
  233. new_row['kind'] = '应付'
  234. new_row['money'] = row['original_money']
  235. new_row['reject_time'] = reject_time
  236. new_row['reject_name'] = reject_name
  237. new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
  238. new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
  239. new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
  240. df.loc[len(df)] = new_row
  241. return df
  242. def load(conn, df: pd.DataFrame, target_db) -> None:
  243. """ Loads data into a MySQL database
  244. """
  245. # Define the column types for the table
  246. dtypes = {key: value[0] for key, value in cols.items()}
  247. # create target table with df.dtypes
  248. df.to_sql(target_db, con=conn, if_exists='append',
  249. index=False, dtype=dtypes)
  250. # add columns' comment into table
  251. pass
  252. def update_column_comment(conn, target_db):
  253. for key, value in cols.items():
  254. dtype = value[0]
  255. if dtype == Date:
  256. dtype = 'date'
  257. elif dtype == INTEGER:
  258. dtype = 'INT'
  259. conn.execute(text('alter table {target_db} modify column `{col}` {dtype} comment \'{comment}\''
  260. .format(target_db=target_db, col=key, dtype=dtype, comment=value[1])))
  261. def etl():
  262. config = load_config()
  263. target_db = 'bi_finance_statement_'
  264. connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
  265. .format(username=config['mysql']['username'],
  266. pwd=quote_plus(config['mysql']['password']),
  267. host=config['mysql']['host'],
  268. port=config['mysql']['port'],
  269. db=config['mysql']['db'])
  270. engine = create_engine(connection_string)
  271. with engine.begin() as conn:
  272. # Get the total number of rows
  273. total = query_total(conn)
  274. # Define the batch size
  275. batch_size = 100
  276. truncate_target_db(conn, target_db)
  277. if debug:
  278. total = 200
  279. batch_size = 100
  280. print('total', total)
  281. # Write the data to the table in batches
  282. for i in tqdm(range(0, total, batch_size)):
  283. data = extract(conn, batch_size, i)
  284. data = transform(data)
  285. if debug:
  286. print(data.head())
  287. load(conn, data, target_db)
  288. update_column_comment(conn, target_db)
  289. etl()