etl_bill_detail.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. """
  2. 寓信账单明细ETL
  3. 综合财务报表
  4. """
  5. import pandas as pd
  6. from sqlalchemy import create_engine, text
  7. from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
  8. from datetime import datetime, timedelta
  9. from tqdm import tqdm
  10. import json
  11. from dateutil.relativedelta import relativedelta
  12. import calendar
  13. from urllib.parse import quote_plus
  14. from utils import load_config, truncate_target_db
  15. debug = False
  16. if debug:
  17. # debug_condition = "and bd.biz_id='1688796583287914497'"
  18. debug_condition = ''
  19. else:
  20. debug_condition = ''
  21. def query_total(conn) -> int:
  22. query = """
  23. select
  24. count(1)
  25. from yuxin_finance.fin_finance_bill_detail bd
  26. where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
  27. """.format(debug_condition=debug_condition)
  28. return conn.execute(text(query)).scalar()
  29. def extract(conn, batch_size, i) -> pd.DataFrame:
  30. """ This API extracts data from
  31. """
  32. query = """
  33. select
  34. bd.tenant_id, rc.id 'contract_id', dept.id 'dept_id', dept.name 'dept_name', rc.maintainer_id 'emp_id', emp.name 'emp_name',
  35. bd.bill_id, bd.id as 'bill_detail_id', bd.fee_subject_id, sd.label as 'fee_subject_label', sd.name as 'fee_subject_name',
  36. bd.fee_direction, bd.original_money, bd.occurred_money,
  37. bd.begin_time, bd.end_time, bd.is_occur, rc.cancel_info, bd.predict_time
  38. from yuxin_finance.fin_finance_bill_detail bd
  39. left join yuxin_setting.setting_dictionary sd on sd.id=bd.fee_subject_id
  40. left join yuxin_contract.cont_renter_contract rc on rc.id=bd.biz_id
  41. left join yuxin_setting.setting_employee_dept ed on ed.emp_id=rc.maintainer_id and ed.is_delete=0
  42. left join yuxin_setting.setting_employee_info emp on emp.id=rc.maintainer_id and emp.is_delete=0
  43. left join yuxin_setting.setting_department dept on dept.id=ed.dept_id and dept.is_delete=0
  44. where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
  45. limit {batch_size} offset {offset}
  46. """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
  47. source_data = pd.read_sql(query, con=conn)
  48. return source_data
  49. def transform(data) -> pd.DataFrame:
  50. """ Transforms the dataset into desired structure and filters
  51. --- 维度:租户,合同,房源,维护人,所属部门,日,财务科目分类,财务科目,应收,应付,实收,实付
  52. --- 指标:金额(尾差保留在最后一日中)
  53. """
  54. # target columns
  55. columns = list(data.columns[:11])
  56. columns.extend(['day', 'kind', 'money',
  57. 'reject_time',
  58. 'reject_name',
  59. 'reject_payment_account',
  60. 'reject_payment_account_type',
  61. 'reject_reason',
  62. 'is_apportion',
  63. ])
  64. # target data
  65. df = pd.DataFrame(columns=columns)
  66. # Iterate over each row in the DataFrame
  67. for index, row in data.iterrows():
  68. begin_date = row['begin_time']
  69. end_date = row['end_time']
  70. # Calculate the number of days between the two dates
  71. num_days = (end_date - begin_date).days + 1
  72. num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1
  73. # Calculate the amount per day
  74. if num_days == 0:
  75. num_days = 1
  76. elif num_days < 0:
  77. num_days = abs(num_days)
  78. money_per_day = row['original_money'] / num_days
  79. # 退租信息
  80. # {"reason": "", "rejectName": "专区客服1号", "rejectTime": "2022-11-18 15:02:03", "paymentAccount": null, "financeDetailInfo": "[{\"bizId\":\"1593499215339642882\",\"id\":\"1593499216711200770\",\"feeDirection\":2,\"feeSubjectId\":\"1593215386397958146\",\"feeSubjectName\":\"房屋押金\",\"amount\":6000,\"payDate\":1668700800000,\"beginDate\":1652889600000,\"endDate\":1668700800000,\"freeDay\":0,\"occurredMoney\":0,\"notOccurredMoney\":6000,\"comment\":\"\",\"status\":4,\"price\":6000}]", "paymentAccountType": null}
  81. # paymentAccountType
  82. # BANK_CARD(1, "银行卡"),
  83. # ALIPAY(2, "支付宝"),
  84. # WECHAT(3, "微信");
  85. reject_time = None
  86. reject_name = None
  87. reject_payment_account = None
  88. reject_payment_account_type = None
  89. reject_reason = None
  90. if row['cancel_info'] is not None:
  91. cancel_info = json.loads(row['cancel_info'])
  92. if cancel_info['reason'] is not None:
  93. reject_reason = cancel_info['reason']
  94. if cancel_info['rejectTime'] is not None:
  95. reject_time = datetime.strptime(cancel_info['rejectTime'], '%Y-%m-%d %H:%M:%S').date()
  96. reject_name = cancel_info['rejectName']
  97. if 'paymentAccount' in cancel_info:
  98. reject_payment_account = cancel_info['paymentAccount']
  99. if 'paymentAccountType' in cancel_info:
  100. reject_payment_account_type = cancel_info['paymentAccountType']
  101. if reject_payment_account_type == 1:
  102. reject_payment_account_type = '银联'
  103. elif reject_payment_account_type == 2:
  104. reject_payment_account_type = '支付宝'
  105. elif reject_payment_account_type == 3:
  106. reject_payment_account_type = '微信'
  107. # 权责发生制
  108. # Loop from begin_date to end_date by day
  109. remainder = row['original_money']
  110. current_month = begin_date
  111. while current_month <= end_date:
  112. first_day = 1
  113. last_day = calendar.monthrange(current_month.year, current_month.month)[1]
  114. if current_month.month == begin_date.month:
  115. first_day = begin_date.day
  116. if current_month.month == end_date.month:
  117. last_day = end_date.day
  118. num_days_month = last_day - first_day + 1
  119. # print('current_month', current_month, first_day, last_day, num_days_month)
  120. # Copy the row and add extra columns
  121. new_row = row.copy()
  122. new_row['day'] = current_month.date()
  123. new_row['is_apportion'] = 1
  124. if current_month.month == end_date.month:
  125. # keep remainder in the last day
  126. new_row['money'] = remainder
  127. else:
  128. new_row['money'] = money_per_day * num_days_month
  129. remainder -= new_row['money']
  130. new_row['money'] = round(new_row['money'], 2)
  131. # print('money', new_row['money'], row['original_money'], money_per_day)
  132. new_row['reject_time'] = reject_time
  133. new_row['reject_name'] = reject_name
  134. new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
  135. new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
  136. new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
  137. not_occur_kind = None
  138. if row['fee_direction'] == 1:
  139. if row['is_occur'] == 1:
  140. new_row['kind'] = '实收'
  141. not_occur_kind = '应收'
  142. else:
  143. new_row['kind'] = '应收'
  144. else:
  145. if row['is_occur'] == 1:
  146. new_row['kind'] = '实付'
  147. not_occur_kind = '应付'
  148. else:
  149. new_row['kind'] = '应付'
  150. # Append the new row to the new DataFrame
  151. df.loc[len(df)] = new_row
  152. if not_occur_kind is not None:
  153. # append extra row if detail is occurred
  154. new_row = new_row.copy()
  155. new_row['kind'] = not_occur_kind
  156. df.loc[len(df)] = new_row
  157. # Increment the current date by one day
  158. # current_month += timedelta(days=1)
  159. current_month += relativedelta(months=1)
  160. # 收付实现制
  161. # 定金、押金
  162. if row['fee_subject_label'] == 'FEESUBJECT@DEPOSIT':
  163. # print('收付实现制', row['fee_subject_label'])
  164. new_row = row.copy()
  165. new_row['day'] = row['predict_time']
  166. new_row['is_apportion'] = 0
  167. if row['fee_direction'] == 1:
  168. if row['is_occur'] == 1:
  169. new_row['kind'] = '实收'
  170. else:
  171. new_row['kind'] = '应收'
  172. else:
  173. if row['is_occur'] == 1:
  174. new_row['kind'] = '实付'
  175. else:
  176. new_row['kind'] = '应付'
  177. new_row['money'] = row['original_money']
  178. new_row['reject_time'] = reject_time
  179. new_row['reject_name'] = reject_name
  180. new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
  181. new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
  182. new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
  183. df.loc[len(df)] = new_row
  184. return df
  185. def load(conn, df: pd.DataFrame, target_db) -> None:
  186. """ Loads data into a MySQL database
  187. """
  188. # Define the column types for the table
  189. dtype = {
  190. 'tenant_id': NVARCHAR(10),
  191. 'contract_id': NVARCHAR(64),
  192. 'dept_id': NVARCHAR(64),
  193. 'dept_name': NVARCHAR(50),
  194. 'emp_id': NVARCHAR(64),
  195. 'emp_name': NVARCHAR(50),
  196. 'bill_id': NVARCHAR(64),
  197. 'bill_detail_id': NVARCHAR(64),
  198. 'fee_subject_id': NVARCHAR(64),
  199. 'fee_subject_label': NVARCHAR(64),
  200. 'fee_subject_name': NVARCHAR(64),
  201. 'day': Date,
  202. 'kind': NVARCHAR(10),
  203. 'money': DECIMAL(14,2),
  204. 'reject_time': Date,
  205. 'reject_name': NVARCHAR(64),
  206. 'reject_payment_account': NVARCHAR(64),
  207. 'reject_payment_account_type': NVARCHAR(32),
  208. 'reject_reason': NVARCHAR(64),
  209. 'is_apportion': INTEGER,
  210. }
  211. # create target table with df.dtypes
  212. df.to_sql(target_db, con=conn, if_exists='append',
  213. index=False, dtype=dtype)
  214. pass
  215. def etl():
  216. config = load_config()
  217. target_db = 'bi_bill_detail_'
  218. connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
  219. .format(username=config['mysql']['username'],
  220. pwd=quote_plus(config['mysql']['password']),
  221. host=config['mysql']['host'],
  222. port=config['mysql']['port'],
  223. db=config['mysql']['db'])
  224. engine = create_engine(connection_string)
  225. with engine.begin() as conn:
  226. # Get the total number of rows
  227. total = query_total(conn)
  228. # Define the batch size
  229. batch_size = 100
  230. truncate_target_db(conn, target_db)
  231. if debug:
  232. total = 200
  233. batch_size = 100
  234. print('total', total)
  235. # Write the data to the table in batches
  236. for i in tqdm(range(0, total, batch_size)):
  237. data = extract(conn, batch_size, i)
  238. data = transform(data)
  239. if debug:
  240. print(data.head())
  241. load(conn, data, target_db)
  242. etl()