etl_finance_bill_detail_month.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. """
  2. 寓信ETL
  3. 财务权责明细表
  4. 按月、权责、费用科目、应收、应付、实收、实付、待收、待付划分账单明细
  5. """
  6. import pandas as pd
  7. from sqlalchemy import create_engine, text
  8. from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
  9. from datetime import datetime, timedelta
  10. from tqdm import tqdm
  11. import json
  12. from dateutil.relativedelta import relativedelta
  13. import calendar
  14. from urllib.parse import quote_plus
  15. from utils import load_config, truncate_target_db, update_column_comment
  16. import json
  17. debug = False
  18. if debug:
  19. debug_condition = ''
  20. pass
  21. else:
  22. debug_condition = ''
  23. pass
  24. cols = {
  25. 'contract_id': [NVARCHAR(32), '租赁合同ID'],
  26. 'fee_subject_id': [NVARCHAR(32), '费用类型ID'],
  27. 'fee_direction': [INTEGER, '费用方向'],
  28. 'themonth': [NVARCHAR(32), '账单明细ID'],
  29. 'kind': [NVARCHAR(16), '类型'],
  30. 'money': [DECIMAL(14, 2), '金额'],
  31. }
  32. def query_total(conn) -> int:
  33. query = """
  34. select
  35. count(1)
  36. from yuxin_finance.fin_finance_bill_detail
  37. where is_delete=0 and is_valid=1 and biz_type=2 {debug_condition}
  38. """.format(debug_condition=debug_condition)
  39. return conn.execute(text(query)).scalar()
  40. def extract(conn, batch_size, i) -> pd.DataFrame:
  41. """ This API extracts data from
  42. """
  43. query = """
  44. select biz_id, fee_subject_id, fee_direction, begin_time, end_time, original_money, occurred_money
  45. from yuxin_finance.fin_finance_bill_detail
  46. where is_delete=0 and is_valid=1 and biz_type=2 {debug_condition}
  47. limit {batch_size} offset {offset}
  48. """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
  49. source_data = pd.read_sql(query, con=conn)
  50. return source_data
  51. def transform(conn, data) -> pd.DataFrame:
  52. """ Transforms the dataset into desired structure and filters
  53. """
  54. df_data = []
  55. for index, row in data.iterrows():
  56. begin_date = row['begin_time']
  57. end_date = row['end_time']
  58. # Calculate the number of days between the two dates
  59. num_days = (end_date - begin_date).days + 1
  60. num_months = (end_date.year - begin_date.year) * 12 + (end_date.month - begin_date.month) + 1
  61. # Calculate the amount per day
  62. if num_days == 0:
  63. num_days = 1
  64. elif num_days < 0:
  65. num_days = abs(num_days)
  66. original_money_per_day = row['original_money'] / num_days
  67. occurred_money_per_day = row['occurred_money'] / num_days
  68. original_money_remainder = row['original_money']
  69. occurred_money_remainder = row['occurred_money']
  70. current_month = begin_date
  71. while current_month <= end_date:
  72. first_day = 1
  73. last_day = calendar.monthrange(current_month.year, current_month.month)[1]
  74. if current_month.month == begin_date.month and current_month.year == begin_date.year:
  75. first_day = begin_date.day
  76. if current_month.month == end_date.month and current_month.year == end_date.year:
  77. last_day = end_date.day
  78. num_days_month = last_day - first_day + 1
  79. if current_month.month == end_date.month and current_month.year == end_date.year:
  80. # keep remainder in the last day
  81. original_money = original_money_remainder
  82. occurred_money = occurred_money_remainder
  83. else:
  84. original_money = original_money_per_day * num_days_month
  85. original_money_remainder -= original_money
  86. occurred_money = occurred_money_per_day * num_days_month
  87. occurred_money_remainder -= occurred_money
  88. original_money = round(original_money, 2)
  89. occurred_money = round(occurred_money, 2)
  90. def append_row(_kind, _money):
  91. new_row = {
  92. 'contract_id': row['biz_id'],
  93. 'fee_subject_id': row['fee_subject_id'],
  94. 'fee_direction': row['fee_direction'],
  95. 'themonth': current_month.date(),
  96. 'kind': _kind,
  97. 'money': _money,
  98. }
  99. df_data.append(new_row)
  100. if row['fee_direction'] == 1:
  101. append_row('应收', original_money)
  102. append_row('实收', occurred_money)
  103. append_row('待收', original_money-occurred_money)
  104. else:
  105. append_row('应付', original_money)
  106. append_row('实付', occurred_money)
  107. append_row('待付', original_money-occurred_money)
  108. # Increment the current date by one month
  109. current_month += relativedelta(months=1)
  110. pass
  111. columns = [k for k, v in cols.items()]
  112. df = pd.DataFrame(df_data, columns=columns)
  113. return df
  114. def load(conn, df: pd.DataFrame, target_db) -> None:
  115. """ Loads data into a MySQL database
  116. """
  117. # Define the column types for the table
  118. dtypes = {key: value[0] for key, value in cols.items()}
  119. # create target table with df.dtypes
  120. df.to_sql(target_db, con=conn, if_exists='append',
  121. index=False, dtype=dtypes)
  122. def etl():
  123. config = load_config()
  124. target_db = 'bi_bill_detail_month_'
  125. connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
  126. .format(username=config['mysql']['username'],
  127. pwd=quote_plus(config['mysql']['password']),
  128. host=config['mysql']['host'],
  129. port=config['mysql']['port'],
  130. db=config['mysql']['db'])
  131. engine = create_engine(connection_string)
  132. with engine.begin() as conn:
  133. # Get the total number of rows
  134. total = query_total(conn)
  135. # Define the batch size
  136. batch_size = 100
  137. truncate_target_db(conn, target_db)
  138. if debug:
  139. total = 200
  140. batch_size = 100
  141. print('total', total)
  142. # Write the data to the table in batches
  143. for i in tqdm(range(0, total, batch_size)):
  144. data = extract(conn, batch_size, i)
  145. data = transform(conn, data)
  146. if debug:
  147. print(data.head())
  148. load(conn, data, target_db)
  149. update_column_comment(conn, target_db, cols)
  150. etl()