etl_finance_statement.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. """
  2. 寓信账单明细ETL
  3. 综合财务报表
  4. """
  5. import pandas as pd
  6. from sqlalchemy import create_engine, text
  7. from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
  8. from datetime import datetime, timedelta
  9. from tqdm import tqdm
  10. import json
  11. from dateutil.relativedelta import relativedelta
  12. import calendar
  13. from urllib.parse import quote_plus
  14. from utils import load_config, truncate_target_db, find_renewal_contract
  15. debug = False
  16. if debug:
  17. # debug_condition = "and bd.biz_id='1687400201327599617'" # P.53
  18. debug_condition = "and bd.biz_id='1731870365341253635'" # 调试押金权责
  19. # debug_condition = ''
  20. else:
  21. debug_condition = ''
  22. cols = {
  23. 'tenant_id': [NVARCHAR(10), '租户ID'],
  24. 'dept_id': [NVARCHAR(64), '房源所属门店ID'],
  25. 'dept_name': [NVARCHAR(50), '房源所属门店名称'],
  26. 'house_address': [NVARCHAR(100), '房源地址'],
  27. 'community_id': [NVARCHAR(64), '房源所属项目/小区ID'],
  28. 'community_name': [NVARCHAR(50), '房源所属项目/小区'],
  29. 'house_area': [DECIMAL(10, 2), '房源面积'],
  30. 'contract_dept_id': [NVARCHAR(64), '合同所属门店/部门ID'],
  31. 'contract_dept_name': [NVARCHAR(50), '合同所属门店/部门'],
  32. 'contract_id': [NVARCHAR(64), '合同ID'],
  33. 'contract_no': [NVARCHAR(64), '合同编号'],
  34. 'contract_medium': [NVARCHAR(50), '合同类型【1-纸质,2-电子】'],
  35. 'renter_name': [NVARCHAR(50), '租客姓名'],
  36. 'renter_phone': [NVARCHAR(20), '租客手机号'],
  37. 'renter_id_type': [NVARCHAR(50), '租客证件类型证件类型【1-身份证,2-护照,3-港澳通行证,4-台湾同胞证,5-军官证,6-驾驶证 7-社会信用代码'],
  38. 'renter_id_number': [NVARCHAR(50), '租客证件号码'],
  39. 'sign_type': [NVARCHAR(50), '成交方式【1-新签,2-续签,3-转租,4-换房,5-补签】'],
  40. 'contract_begin_date': [Date, '合同开始日期'],
  41. 'contract_end_date': [Date, '合同结束日期'],
  42. 'payment_method': [NVARCHAR(50), '付款方式【付款模式【1-提前付款天数,2-固定付款日期,3-提前1个月固定付款日期】'],
  43. 'sign_date': [Date, '签约日期'],
  44. 'signer_id': [NVARCHAR(64), '签约人ID'],
  45. 'signer_name': [NVARCHAR(50), '签约人'],
  46. 'pay_months': [INTEGER, '付几'],
  47. 'deposit_months': [INTEGER, '押几'],
  48. 'bill_id': [NVARCHAR(64), '账单ID'],
  49. 'bill_detail_id': [NVARCHAR(64), '账单明细ID'],
  50. 'fee_subject_id': [NVARCHAR(64), '费用科目ID'],
  51. 'fee_subject_label': [NVARCHAR(64), '费用科目标签'],
  52. 'fee_subject_name': [NVARCHAR(64), '费用科目名称'],
  53. 'contract_type': [INTEGER, '合同类型 1个人、2企业、4协议'],
  54. 'day': [Date, '日期'],
  55. 'kind': [NVARCHAR(10), '类型'],
  56. 'money': [DECIMAL(14, 2), '金额'],
  57. 'reject_time': [Date, '退租时间'],
  58. 'reject_name': [NVARCHAR(64), '退租处理人'],
  59. 'reject_payment_account': [NVARCHAR(64), '退款账号'],
  60. 'reject_payment_account_type': [NVARCHAR(32), '退款途径'],
  61. 'reject_reason': [NVARCHAR(64), '退租原因'],
  62. 'is_apportion': [INTEGER, '是否分摊']
  63. }
  64. def query_total(conn) -> int:
  65. query = """
  66. select
  67. count(1)
  68. from yuxin_finance.fin_finance_bill_detail bd
  69. where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
  70. """.format(debug_condition=debug_condition)
  71. return conn.execute(text(query)).scalar()
  72. def extract(conn, batch_size, i) -> pd.DataFrame:
  73. """ This API extracts data from
  74. """
  75. query = """
  76. select
  77. bd.tenant_id,
  78. hhb.dept_id 'dept_id', house_sd.name 'dept_name', hhr.address 'house_address',
  79. hhb.community_id 'community_id', hc.name 'community_name', hhr.house_area 'house_area',
  80. crc_sd.id 'contract_dept_id', crc_sd.name 'contract_dept_name',
  81. rc.id 'contract_id', rc.contract_no 'contract_no', rc.contract_medium 'contract_medium',
  82. cri.name 'renter_name', cri.phone 'renter_phone', cri.certification_type 'renter_id_type', cri.identity_card 'renter_id_number',
  83. rc.sign_type 'sign_type', rc.begin_time 'contract_begin_date', rc.end_time 'contract_end_date', rc.pay_pattern 'payment_method',
  84. rc.sign_time 'sign_date', rc.sign_emp_id 'signer_id', sign_emp.name 'signer_name', rc.periodMonth 'pay_months', rc.depositMonth 'deposit_months',
  85. bd.bill_id, bd.id as 'bill_detail_id', bd.fee_subject_id, sd.label as 'fee_subject_label', sd.name as 'fee_subject_name',
  86. rc.`type` 'contract_type',
  87. 0 'splitter',
  88. rc.quite_date,
  89. bd.fee_direction, bd.original_money, bd.occurred_money,
  90. bd.begin_time, bd.end_time, bd.is_occur, rc.cancel_info, bd.predict_time
  91. from yuxin_finance.fin_finance_bill_detail bd
  92. left join yuxin_setting.setting_dictionary sd on sd.id=bd.fee_subject_id
  93. left join yuxin_contract.cont_renter_contract rc on rc.id=bd.biz_id and rc.is_delete=0
  94. left join yuxin_house.hse_house_room hhr on hhr.is_delete=0 and hhr.id=rc.house_id
  95. left join yuxin_house.hse_house_base hhb on hhb.is_delete=0 and hhb.id=rc.house_id
  96. left join yuxin_setting.setting_department house_sd on house_sd.id=hhb.dept_id and house_sd.is_delete=0
  97. left join yuxin_house.hse_community hc on hc.id=hhb.community_id and hc.is_delete=0
  98. left join yuxin_setting.setting_employee_dept crc_ed on crc_ed.emp_id=rc.maintainer_id and crc_ed.is_delete=0
  99. left join yuxin_setting.setting_department crc_sd on crc_sd.id=crc_ed.dept_id and crc_sd.is_delete=0
  100. left join yuxin_contract.cont_renter_info cri on cri.is_delete=0 and cri.customer_type=1 and cri.contract_id=bd.biz_id
  101. left join yuxin_setting.setting_employee_info sign_emp on sign_emp.id=rc.sign_emp_id and sign_emp.is_delete=0
  102. where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
  103. limit {batch_size} offset {offset}
  104. """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
  105. source_data = pd.read_sql(query, con=conn)
  106. return source_data
  107. def transform(conn, data) -> pd.DataFrame:
  108. """ Transforms the dataset into desired structure and filters
  109. --- 维度:租户,合同,房源,维护人,所属部门,日,财务科目分类,财务科目,应收,应付,实收,实付
  110. --- 指标:金额(尾差保留在最后一日中)
  111. """
  112. # target columns
  113. last_splitter_index = list(data.columns).index('splitter')
  114. columns = list(data.columns[:last_splitter_index])
  115. columns.extend(['day', 'kind', 'money',
  116. 'reject_time',
  117. 'reject_name',
  118. 'reject_payment_account',
  119. 'reject_payment_account_type',
  120. 'reject_reason',
  121. 'is_apportion',
  122. ])
  123. # target data
  124. df = pd.DataFrame(columns=columns)
  125. # Iterate over each row in the DataFrame
  126. for index, row in data.iterrows():
  127. # 查找续租合同
  128. renewed_end_time = find_renewal_contract(conn, row['contract_id'])
  129. begin_date = row['begin_time']
  130. if renewed_end_time is None:
  131. end_date = row['quite_date'] if row['quite_date'] is not None and not pd.isnull(
  132. row['quite_date']) else row['end_time']
  133. else:
  134. end_date = renewed_end_time
  135. # end_date = renewed_end_time
  136. # Calculate the number of days between the two dates
  137. num_days = (end_date - begin_date).days + 1
  138. num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1
  139. # Calculate the amount per day
  140. if num_days == 0:
  141. num_days = 1
  142. elif num_days < 0:
  143. num_days = abs(num_days)
  144. money_per_day = row['original_money'] / num_days
  145. # 退租信息
  146. # {"reason": "", "rejectName": "专区客服1号", "rejectTime": "2022-11-18 15:02:03", "paymentAccount": null, "financeDetailInfo": "[{\"bizId\":\"1593499215339642882\",\"id\":\"1593499216711200770\",\"feeDirection\":2,\"feeSubjectId\":\"1593215386397958146\",\"feeSubjectName\":\"房屋押金\",\"amount\":6000,\"payDate\":1668700800000,\"beginDate\":1652889600000,\"endDate\":1668700800000,\"freeDay\":0,\"occurredMoney\":0,\"notOccurredMoney\":6000,\"comment\":\"\",\"status\":4,\"price\":6000}]", "paymentAccountType": null}
  147. # paymentAccountType
  148. # BANK_CARD(1, "银行卡"),
  149. # ALIPAY(2, "支付宝"),
  150. # WECHAT(3, "微信");
  151. reject_time = None
  152. reject_name = None
  153. reject_payment_account = None
  154. reject_payment_account_type = None
  155. reject_reason = None
  156. if row['cancel_info'] is not None:
  157. cancel_info = json.loads(row['cancel_info'])
  158. if cancel_info['reason'] is not None:
  159. reject_reason = cancel_info['reason']
  160. if cancel_info['rejectTime'] is not None:
  161. reject_time = datetime.strptime(cancel_info['rejectTime'], '%Y-%m-%d %H:%M:%S').date()
  162. reject_name = cancel_info['rejectName']
  163. if 'paymentAccount' in cancel_info:
  164. reject_payment_account = cancel_info['paymentAccount']
  165. if 'paymentAccountType' in cancel_info:
  166. reject_payment_account_type = cancel_info['paymentAccountType']
  167. if reject_payment_account_type == 1:
  168. reject_payment_account_type = '银联'
  169. elif reject_payment_account_type == 2:
  170. reject_payment_account_type = '支付宝'
  171. elif reject_payment_account_type == 3:
  172. reject_payment_account_type = '微信'
  173. # 权责发生制
  174. # Loop from begin_date to end_date by day
  175. remainder = row['original_money']
  176. current_month = begin_date
  177. while current_month <= end_date:
  178. first_day = 1
  179. last_day = calendar.monthrange(current_month.year, current_month.month)[1]
  180. if current_month.month == begin_date.month and current_month.year == begin_date.year:
  181. first_day = begin_date.day
  182. if current_month.month == end_date.month and current_month.year == end_date.year:
  183. last_day = end_date.day
  184. num_days_month = last_day - first_day + 1
  185. # print('current_month', current_month, first_day, last_day, num_days_month)
  186. # Copy the row and add extra columns
  187. new_row = row.copy()
  188. new_row['day'] = current_month.date()
  189. new_row['is_apportion'] = 1
  190. if current_month.month == end_date.month and current_month.year == end_date.year:
  191. # keep remainder in the last day
  192. new_row['money'] = remainder
  193. else:
  194. new_row['money'] = money_per_day * num_days_month
  195. remainder -= new_row['money']
  196. new_row['money'] = round(new_row['money'], 2)
  197. # print('money', new_row['money'], row['original_money'], money_per_day)
  198. new_row['reject_time'] = reject_time
  199. new_row['reject_name'] = reject_name
  200. new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
  201. new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
  202. new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
  203. not_occur_kind = None
  204. if row['fee_direction'] == 1:
  205. if row['is_occur'] == 1:
  206. new_row['kind'] = '实收'
  207. not_occur_kind = '应收'
  208. else:
  209. new_row['kind'] = '应收'
  210. else:
  211. if row['is_occur'] == 1:
  212. new_row['kind'] = '实付'
  213. not_occur_kind = '应付'
  214. else:
  215. new_row['kind'] = '应付'
  216. # Append the new row to the new DataFrame
  217. df.loc[len(df)] = new_row
  218. if not_occur_kind is not None:
  219. # append extra row if detail is occurred
  220. new_row = new_row.copy()
  221. new_row['kind'] = not_occur_kind
  222. df.loc[len(df)] = new_row
  223. # Increment the current date by one day
  224. # current_month += timedelta(days=1)
  225. current_month += relativedelta(months=1)
  226. # 收付实现制
  227. # 定金、押金
  228. if row['fee_subject_label'] == 'FEESUBJECT@DEPOSIT':
  229. # print('收付实现制', row['fee_subject_label'])
  230. new_row = row.copy()
  231. new_row['day'] = row['predict_time']
  232. new_row['is_apportion'] = 0
  233. if row['fee_direction'] == 1:
  234. if row['is_occur'] == 1:
  235. new_row['kind'] = '实收'
  236. else:
  237. new_row['kind'] = '应收'
  238. else:
  239. if row['is_occur'] == 1:
  240. new_row['kind'] = '实付'
  241. else:
  242. new_row['kind'] = '应付'
  243. new_row['money'] = row['original_money']
  244. new_row['reject_time'] = reject_time
  245. new_row['reject_name'] = reject_name
  246. new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
  247. new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
  248. new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
  249. df.loc[len(df)] = new_row
  250. return df
  251. def load(conn, df: pd.DataFrame, target_db) -> None:
  252. """ Loads data into a MySQL database
  253. """
  254. # Define the column types for the table
  255. dtypes = {key: value[0] for key, value in cols.items()}
  256. # create target table with df.dtypes
  257. df.to_sql(target_db, con=conn, if_exists='append',
  258. index=False, dtype=dtypes)
  259. # add columns' comment into table
  260. pass
  261. def update_column_comment(conn, target_db):
  262. for key, value in cols.items():
  263. dtype = value[0]
  264. if dtype == Date:
  265. dtype = 'date'
  266. elif dtype == INTEGER:
  267. dtype = 'INT'
  268. conn.execute(text('alter table {target_db} modify column `{col}` {dtype} comment \'{comment}\''
  269. .format(target_db=target_db, col=key, dtype=dtype, comment=value[1])))
  270. def etl():
  271. config = load_config()
  272. target_db = 'bi_finance_statement_'
  273. connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
  274. .format(username=config['mysql']['username'],
  275. pwd=quote_plus(config['mysql']['password']),
  276. host=config['mysql']['host'],
  277. port=config['mysql']['port'],
  278. db=config['mysql']['db'])
  279. engine = create_engine(connection_string)
  280. with engine.begin() as conn:
  281. # Get the total number of rows
  282. total = query_total(conn)
  283. # Define the batch size
  284. batch_size = 100
  285. truncate_target_db(conn, target_db)
  286. if debug:
  287. total = 200
  288. batch_size = 100
  289. print('total', total)
  290. # Write the data to the table in batches
  291. for i in tqdm(range(0, total, batch_size)):
  292. data = extract(conn, batch_size, i)
  293. data = transform(conn, data)
  294. if debug:
  295. print(data.head())
  296. load(conn, data, target_db)
  297. update_column_comment(conn, target_db)
  298. etl()