etl_finance_bill_flow.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. """
  2. 寓信ETL
  3. 合同业务明细表 - bill_flow和bill_detail关联表
  4. """
  5. import pandas as pd
  6. from sqlalchemy import create_engine, text
  7. from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
  8. from datetime import datetime, timedelta
  9. from tqdm import tqdm
  10. import json
  11. from dateutil.relativedelta import relativedelta
  12. import calendar
  13. from urllib.parse import quote_plus
  14. from utils import load_config, truncate_target_db, update_column_comment
  15. import json
  16. debug = False
  17. if debug:
  18. debug_condition = ''
  19. pass
  20. else:
  21. debug_condition = ''
  22. pass
  23. cols = {
  24. 'flow_id': [NVARCHAR(32), '流水ID'],
  25. 'money': [DECIMAL(14, 2), '金额'],
  26. 'is_exempt': [INTEGER, '是否免除'],
  27. 'bill_detail_id': [NVARCHAR(32), '账单明细ID'],
  28. }
  29. def query_total(conn) -> int:
  30. query = """
  31. select
  32. count(1)
  33. from yuxin_finance.fin_finance_bill_flow
  34. where is_delete=0 and is_valid=1 {debug_condition}
  35. """.format(debug_condition=debug_condition)
  36. return conn.execute(text(query)).scalar()
  37. def extract(conn, batch_size, i) -> pd.DataFrame:
  38. """ This API extracts data from
  39. """
  40. query = """
  41. select
  42. id 'flow_id', bill_detail_info
  43. from yuxin_finance.fin_finance_bill_flow
  44. where is_delete=0 and is_valid=1 {debug_condition}
  45. limit {batch_size} offset {offset}
  46. """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
  47. source_data = pd.read_sql(query, con=conn)
  48. return source_data
  49. def transform(conn, data) -> pd.DataFrame:
  50. """ Transforms the dataset into desired structure and filters
  51. """
  52. df_data = []
  53. for index, row in data.iterrows():
  54. # print(row['flow_id'], row['bill_detail_info'])
  55. details = json.loads(row['bill_detail_info'])
  56. for detail in details:
  57. df_data.append({
  58. 'flow_id': row['flow_id'],
  59. 'money': detail['money'],
  60. 'is_exempt': detail['isExempt'],
  61. 'bill_detail_id': detail['billDetailId'],
  62. })
  63. pass
  64. columns = [k for k, v in cols.items()]
  65. df = pd.DataFrame(df_data, columns=columns)
  66. return df
  67. def load(conn, df: pd.DataFrame, target_db) -> None:
  68. """ Loads data into a MySQL database
  69. """
  70. # Define the column types for the table
  71. dtypes = {key: value[0] for key, value in cols.items()}
  72. # create target table with df.dtypes
  73. df.to_sql(target_db, con=conn, if_exists='append',
  74. index=False, dtype=dtypes)
  75. def etl():
  76. config = load_config('production')
  77. target_db = 'bi_bill_flow_detail_'
  78. connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
  79. .format(username=config['mysql']['username'],
  80. pwd=quote_plus(config['mysql']['password']),
  81. host=config['mysql']['host'],
  82. port=config['mysql']['port'],
  83. db=config['mysql']['db'])
  84. engine = create_engine(connection_string)
  85. with engine.begin() as conn:
  86. # Get the total number of rows
  87. total = query_total(conn)
  88. # Define the batch size
  89. batch_size = 100
  90. truncate_target_db(conn, target_db)
  91. if debug:
  92. total = 200
  93. batch_size = 100
  94. print('total', total)
  95. # Write the data to the table in batches
  96. for i in tqdm(range(0, total, batch_size)):
  97. data = extract(conn, batch_size, i)
  98. data = transform(conn, data)
  99. if debug:
  100. print(data.head())
  101. load(conn, data, target_db)
  102. update_column_comment(conn, target_db, cols)
  103. etl()