etl_bill_detail.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. """
  2. 寓信账单明细ETL
  3. 综合财务报表
  4. """
  5. import pandas as pd
  6. from sqlalchemy import create_engine, text
  7. from sqlalchemy.types import NVARCHAR, Date, DECIMAL
  8. from datetime import datetime, timedelta
  9. from tqdm import tqdm
  10. import json
  11. from dateutil.relativedelta import relativedelta
  12. import calendar
  13. from urllib.parse import quote_plus
  14. from utils import load_config, truncate_target_db
  15. debug = False
  16. if debug:
  17. debug_condition = "and bd.biz_id='1593145414334246914'"
  18. else:
  19. debug_condition = ''
  20. def query_total(conn) -> int:
  21. query = """
  22. select
  23. count(1)
  24. from yuxin_finance.fin_finance_bill_detail bd
  25. where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
  26. """.format(debug_condition=debug_condition)
  27. return conn.execute(text(query)).scalar()
  28. def extract(conn, batch_size, i) -> pd.DataFrame:
  29. """ This API extracts data from
  30. """
  31. query = """
  32. select
  33. bd.tenant_id, rc.id 'contract_id', dept.id 'dept_id', dept.name 'dept_name', rc.maintainer_id 'emp_id', emp.name 'emp_name',
  34. bd.bill_id, bd.id as 'bill_detail_id', bd.fee_subject_id, sd.label as 'fee_subject_label', sd.name as 'fee_subject_name',
  35. bd.fee_direction, bd.original_money, bd.occurred_money,
  36. bd.begin_time, bd.end_time, bd.is_occur, rc.cancel_info
  37. from yuxin_finance.fin_finance_bill_detail bd
  38. left join yuxin_setting.setting_dictionary sd on sd.id=bd.fee_subject_id
  39. left join yuxin_contract.cont_renter_contract rc on rc.id=bd.biz_id
  40. left join yuxin_setting.setting_employee_dept ed on ed.emp_id=rc.maintainer_id and ed.is_delete=0
  41. left join yuxin_setting.setting_employee_info emp on emp.id=rc.maintainer_id and emp.is_delete=0
  42. left join yuxin_setting.setting_department dept on dept.id=ed.dept_id and dept.is_delete=0
  43. where bd.is_valid=1 and bd.is_delete=0 and bd.biz_type=2 {debug_condition}
  44. limit {batch_size} offset {offset}
  45. """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
  46. source_data = pd.read_sql(query, con=conn)
  47. return source_data
  48. def transform(data) -> pd.DataFrame:
  49. """ Transforms the dataset into desired structure and filters
  50. --- 维度:租户,合同,房源,维护人,所属部门,日,财务科目分类,财务科目,应收,应付,实收,实付
  51. --- 指标:金额(尾差保留在最后一日中)
  52. """
  53. # target columns
  54. columns = list(data.columns[:11])
  55. columns.extend(['day', 'kind', 'money',
  56. 'reject_time',
  57. 'reject_name',
  58. 'reject_payment_account',
  59. 'reject_payment_account_type',
  60. 'reject_reason'
  61. ])
  62. # target data
  63. df = pd.DataFrame(columns=columns)
  64. # Iterate over each row in the DataFrame
  65. for index, row in data.iterrows():
  66. begin_date = row['begin_time']
  67. end_date = row['end_time']
  68. # Calculate the number of days between the two dates
  69. num_days = (end_date - begin_date).days + 1
  70. num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1
  71. # Calculate the amount per day
  72. if num_days == 0:
  73. num_days = 1
  74. elif num_days < 0:
  75. num_days = abs(num_days)
  76. money_per_day = row['original_money'] / num_days
  77. # 退租信息
  78. # {"reason": "", "rejectName": "专区客服1号", "rejectTime": "2022-11-18 15:02:03", "paymentAccount": null, "financeDetailInfo": "[{\"bizId\":\"1593499215339642882\",\"id\":\"1593499216711200770\",\"feeDirection\":2,\"feeSubjectId\":\"1593215386397958146\",\"feeSubjectName\":\"房屋押金\",\"amount\":6000,\"payDate\":1668700800000,\"beginDate\":1652889600000,\"endDate\":1668700800000,\"freeDay\":0,\"occurredMoney\":0,\"notOccurredMoney\":6000,\"comment\":\"\",\"status\":4,\"price\":6000}]", "paymentAccountType": null}
  79. # paymentAccountType
  80. # BANK_CARD(1, "银行卡"),
  81. # ALIPAY(2, "支付宝"),
  82. # WECHAT(3, "微信");
  83. reject_time = None
  84. reject_name = None
  85. reject_payment_account = None
  86. reject_payment_account_type = None
  87. reject_reason = None
  88. if row['cancel_info'] is not None:
  89. cancel_info = json.loads(row['cancel_info'])
  90. if cancel_info['reason'] is not None:
  91. reject_reason = cancel_info['reason']
  92. if cancel_info['rejectTime'] is not None:
  93. reject_time = datetime.strptime(cancel_info['rejectTime'], '%Y-%m-%d %H:%M:%S').date()
  94. reject_name = cancel_info['rejectName']
  95. if 'paymentAccount' in cancel_info:
  96. reject_payment_account = cancel_info['paymentAccount']
  97. if 'paymentAccountType' in cancel_info:
  98. reject_payment_account_type = cancel_info['paymentAccountType']
  99. if reject_payment_account_type == 1:
  100. reject_payment_account_type = '银联'
  101. elif reject_payment_account_type == 2:
  102. reject_payment_account_type = '支付宝'
  103. elif reject_payment_account_type == 3:
  104. reject_payment_account_type = '微信'
  105. # Loop from begin_date to end_date by day
  106. remainder = row['original_money']
  107. current_month = begin_date
  108. while current_month <= end_date:
  109. first_day = 1
  110. last_day = calendar.monthrange(current_month.year, current_month.month)[1]
  111. if current_month.month == begin_date.month:
  112. first_day = begin_date.day
  113. if current_month.month == end_date.month:
  114. last_day = end_date.day
  115. num_days_month = last_day - first_day + 1
  116. # print('current_month', current_month, first_day, last_day, num_days_month)
  117. # Copy the row and add extra columns
  118. new_row = row.copy()
  119. new_row['day'] = current_month.date()
  120. if current_month.month == end_date.month:
  121. # keep remainder in the last day
  122. new_row['money'] = remainder
  123. else:
  124. new_row['money'] = money_per_day * num_days_month
  125. remainder -= new_row['money']
  126. new_row['money'] = round(new_row['money'], 2)
  127. # print('money', new_row['money'], row['original_money'], money_per_day)
  128. new_row['reject_time'] = reject_time
  129. new_row['reject_name'] = reject_name
  130. new_row['reject_payment_account'] = reject_payment_account if reject_payment_account is not None else ''
  131. new_row['reject_payment_account_type'] = reject_payment_account_type if reject_payment_account_type is not None else ''
  132. new_row['reject_reason'] = reject_reason if reject_reason is not None else ''
  133. not_occur_kind = None
  134. if row['fee_direction'] == 1:
  135. if row['is_occur'] == 1:
  136. new_row['kind'] = '实收'
  137. not_occur_kind = '应收'
  138. else:
  139. new_row['kind'] = '应收'
  140. else:
  141. if row['is_occur'] == 1:
  142. new_row['kind'] = '实付'
  143. not_occur_kind = '应付'
  144. else:
  145. new_row['kind'] = '应付'
  146. # Append the new row to the new DataFrame
  147. df.loc[len(df)] = new_row
  148. if not_occur_kind is not None:
  149. # append extra row if detail is occurred
  150. new_row = new_row.copy()
  151. new_row['kind'] = not_occur_kind
  152. df.loc[len(df)] = new_row
  153. # Increment the current date by one day
  154. # current_month += timedelta(days=1)
  155. current_month += relativedelta(months=1)
  156. return df
  157. def load(conn, df: pd.DataFrame, target_db) -> None:
  158. """ Loads data into a MySQL database
  159. """
  160. # Define the column types for the table
  161. dtype = {
  162. 'tenant_id': NVARCHAR(10),
  163. 'contract_id': NVARCHAR(64),
  164. 'dept_id': NVARCHAR(64),
  165. 'dept_name': NVARCHAR(50),
  166. 'emp_id': NVARCHAR(64),
  167. 'emp_name': NVARCHAR(50),
  168. 'bill_id': NVARCHAR(64),
  169. 'bill_detail_id': NVARCHAR(64),
  170. 'fee_subject_id': NVARCHAR(64),
  171. 'fee_subject_label': NVARCHAR(64),
  172. 'fee_subject_name': NVARCHAR(64),
  173. 'day': Date,
  174. 'kind': NVARCHAR(10),
  175. 'money': DECIMAL(14,2),
  176. 'reject_time': Date,
  177. 'reject_name': NVARCHAR(64),
  178. 'reject_payment_account': NVARCHAR(64),
  179. 'reject_payment_account_type': NVARCHAR(32),
  180. 'reject_reason': NVARCHAR(64),
  181. }
  182. # create target table with df.dtypes
  183. df.to_sql(target_db, con=conn, if_exists='append',
  184. index=False, dtype=dtype)
  185. pass
  186. def etl():
  187. config = load_config('production')
  188. target_db = 'bi_bill_detail_'
  189. connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
  190. .format(username=config['mysql']['username'],
  191. pwd=quote_plus(config['mysql']['password']),
  192. host=config['mysql']['host'],
  193. port=config['mysql']['port'],
  194. db=config['mysql']['db'])
  195. engine = create_engine(connection_string)
  196. with engine.begin() as conn:
  197. # Get the total number of rows
  198. total = query_total(conn)
  199. # Define the batch size
  200. batch_size = 100
  201. truncate_target_db(conn, target_db)
  202. if debug:
  203. total = 1
  204. batch_size = 1
  205. # total = 200
  206. print('total', total)
  207. # Write the data to the table in batches
  208. for i in tqdm(range(0, total, batch_size)):
  209. data = extract(conn, batch_size, i)
  210. data = transform(data)
  211. if debug:
  212. print(data.head())
  213. load(conn, data, target_db)
  214. etl()