etl_utility.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. """
  2. 寓信水电费收入ETL
  3. 水电费收入报表
  4. """
  5. import yaml
  6. import pandas as pd
  7. from sqlalchemy import create_engine, text
  8. from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
  9. from datetime import datetime, timedelta
  10. from tqdm import tqdm
  11. import json
  12. from dateutil.relativedelta import relativedelta
  13. import calendar
  14. import taosrest as taos
  15. from taosrest import connect, TaosRestConnection, TaosRestCursor
  16. from urllib.parse import quote_plus
  17. from utils import load_config, truncate_target_db, update_column_comment
  18. debug = False
  19. if debug:
  20. # debug_condition = " and crc.id='1634463949758156801'" # debug charge amount
  21. # debug_condition = " and crc.id='1635124251453575170'" # debug renew contract
  22. # debug_condition = " and crc.id='1649702247584620545'" # debug 期末读数
  23. debug_condition = " and crc.id='1674347182259716097'" # debug 退租日期
  24. else:
  25. debug_condition = ''
  26. cols = {
  27. 'tenant_id': [NVARCHAR(10), '租户ID'],
  28. 'theday': [Date, '日期'],
  29. 'house_dept_id': [NVARCHAR(32), '房源所属门店ID'],
  30. 'house_dept_name': [NVARCHAR(32), '房源所属门店名称'],
  31. 'address': [NVARCHAR(255), '房源地址'],
  32. 'contract_dept_id': [NVARCHAR(32), '合同所属门店/部门ID'],
  33. 'contract_dept_name': [NVARCHAR(32), '合同所属门店/部门'],
  34. 'contract_id': [NVARCHAR(36), '合同ID'],
  35. 'contract_maintainer': [NVARCHAR(32), '合同维护人'],
  36. 'renter_name': [NVARCHAR(50), '租客姓名'],
  37. 'contract_terminate_type': [INTEGER(), '退租类型'],
  38. 'begin_time': [Date, '合同开始日期'],
  39. 'end_time': [Date, '合同结束日期'],
  40. 'quit_time': [Date, '退租日期'],
  41. 'kind': [NVARCHAR(32), '类型'],
  42. 'begin_amount': [DECIMAL(10, 2), '期初读数'],
  43. 'end_amount': [DECIMAL(10, 2), '期末读数'],
  44. 'amount': [DECIMAL(10, 2), '使用量'],
  45. 'balance': [DECIMAL(10, 2), '期末余额'],
  46. 'cost': [DECIMAL(10, 2), '使用金额'],
  47. 'charge_amount': [DECIMAL(10, 2), '充值总金额'],
  48. 'clear_amount': [DECIMAL(10, 2), '清零总金额']
  49. }
  50. def query_total(conn) -> int:
  51. """ Queries the total number of records to be processed
  52. 合同状态不为无效4,签约类型不为续签2的合同
  53. """
  54. query = """
  55. select count(1) from yuxin_contract.cont_renter_contract crc
  56. where crc.is_delete=0 and crc.contract_status<>4 and crc.sign_type<>2 {debug_condition}
  57. """.format(debug_condition=debug_condition)
  58. return conn.execute(text(query)).scalar()
  59. def extract(conn, td_cursor, batch_size, i) -> pd.DataFrame:
  60. """ This API extracts data from
  61. """
  62. query = """
  63. select hhb.dept_id '房源所属门店ID', house_sd.name '房源所属门店名称', hhr.address '房源地址',
  64. crc_sd.id '合同所属门店/部门ID', crc_sd.name '合同所属门店/部门',
  65. crc.id '合同ID', crc_ei.name '合同维护人', cri.name '租客姓名', crc.terminate_type '退租类型',
  66. crc.begin_time, crc.end_time, crc.quite_date, crc.house_id, crc.tenant_id
  67. from yuxin_contract.cont_renter_contract crc
  68. left join yuxin_house.hse_house_room hhr on hhr.is_delete=0 and hhr.id=crc.house_id
  69. left join yuxin_house.hse_house_base hhb on hhb.is_delete=0 and hhb.id=crc.house_id
  70. left join yuxin_setting.setting_department house_sd on house_sd.id=hhb.dept_id and house_sd.is_delete=0
  71. left join yuxin_setting.setting_employee_dept crc_ed on crc_ed.emp_id=crc.maintainer_id and crc_ed.is_delete=0
  72. left join yuxin_setting.setting_department crc_sd on crc_sd.id=crc_ed.dept_id and crc_sd.is_delete=0
  73. left join yuxin_setting.setting_employee_info crc_ei on crc_ei.is_delete=0 and crc_ei.id=crc.maintainer_id
  74. left join yuxin_contract.cont_renter_info cri on cri.is_delete=0 and cri.customer_type=1 and cri.contract_id=crc.id
  75. where crc.is_delete=0 and crc.contract_status<>4 and crc.sign_type<>2 {debug_condition}
  76. limit {batch_size} offset {offset}
  77. """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
  78. source_data = pd.read_sql(query, con=conn)
  79. return source_data
  80. def find_renewal_contract(conn, contract_id):
  81. query = """
  82. select crc1.id, crc1.begin_time, crc1.end_time, crc1.quite_date,
  83. crc2.id, crc2.begin_time, crc2.end_time, crc2.quite_date,
  84. crc3.id, crc3.begin_time, crc3.end_time, crc3.quite_date
  85. from yuxin_contract.cont_renter_contract crc1
  86. left join yuxin_contract.cont_renter_contract crc2 on crc2.contract_pid=crc1.id and crc2.is_delete=0 and crc2.contract_status<>4 and crc2.sign_type=2
  87. left join yuxin_contract.cont_renter_contract crc3 on crc3.contract_pid=crc2.id and crc3.is_delete=0 and crc3.contract_status<>4 and crc3.sign_type=2
  88. where crc1.is_delete=0 and crc1.contract_status<>4 and crc1.sign_type=2
  89. and crc1.contract_pid='{contract_id}'
  90. """.format(contract_id=contract_id)
  91. result = conn.execute(text(query)).fetchone()
  92. if result is None:
  93. return None
  94. if result[11] is not None:
  95. return result[11]
  96. elif result[10] is not None:
  97. return result[10]
  98. elif result[7] is not None:
  99. return result[7]
  100. elif result[6] is not None:
  101. return result[6]
  102. elif result[3] is not None:
  103. return result[3]
  104. elif result[2] is not None:
  105. return result[2]
  106. return None
  107. def query_charge_data(conn, contract_id, room_id):
  108. query = """
  109. select DATE_FORMAT(charge_create_time,'%%Y-%%m-%%d') as 'time', sum(total_price) as 'charge_amount', sum(clear_amount) as 'clear_amount', device_id
  110. from yuxin_finance.fin_finance_charge_record
  111. where status=1 and charge_status=2 and contract_id='{contract_id}' and room_id='{room_id}'
  112. group by DATE_FORMAT(charge_create_time,'%%Y-%%m-%%d'), device_id
  113. """.format(contract_id=contract_id, room_id=room_id)
  114. data = pd.read_sql(query, con=conn)
  115. return data
  116. def query_meter_read_record(conn, td_cursor, contract_id, room_id, start_date, end_date):
  117. end_date_obj = end_date if isinstance(end_date, datetime) else datetime.strptime(end_date, '%Y-%m-%d')
  118. end_date_obj = end_date_obj + timedelta(days=1)
  119. start_date_obj = start_date if isinstance(start_date, datetime) else datetime.strptime(start_date, '%Y-%m-%d')
  120. start_date_obj = start_date_obj - timedelta(days=1)
  121. # 获取meter信息
  122. # 忽略合同ID,续约情况下合同ID会有多个
  123. query = """
  124. SELECT device_type, first(ts), first(amount), last(amount), sum(cost), last(`balance`), device_id
  125. FROM device.meter_read_record
  126. WHERE room_id='{room_id}' and ts >= '{start_date}' AND ts < '{end_date}' and pay_method=2
  127. PARTITION BY device_type, device_id
  128. INTERVAL(1d)
  129. """.format(contract_id=contract_id, room_id=room_id,
  130. start_date=start_date_obj.strftime('%Y-%m-%d'), end_date=end_date)
  131. td_cursor.execute(query)
  132. # get total rows
  133. if td_cursor.rowcount == 0:
  134. return None
  135. # get rows
  136. rows = td_cursor.fetchall()
  137. # 获取充值信息
  138. charge_data = query_charge_data(conn, contract_id, room_id)
  139. data = {}
  140. for idx, row in enumerate(rows):
  141. if idx == 0:
  142. prev_row = None
  143. else:
  144. prev_row = rows[idx - 1]
  145. day = row[1].strftime('%Y-%m-%d') # ts
  146. device_type = row[0] # 设备类型
  147. begin_amount = prev_row[3] if prev_row else row[2] # 期初读数, 取上个时间窗口的最后一个读数,否则取当前时间窗口的最早一个读数
  148. end_amount = row[3] # 期末读数
  149. amount = end_amount - begin_amount # 使用量
  150. cost = row[4] # 使用金额
  151. balance = row[5] # 期末余额
  152. device_id = row[6]
  153. charge_row = charge_data.loc[(charge_data['time'] == day) & (charge_data['device_id'] == device_id)]
  154. if charge_row.empty:
  155. charge_amount = 0
  156. clear_amount = 0
  157. else:
  158. charge_row = charge_row.iloc[0]
  159. charge_amount = charge_row['charge_amount']
  160. clear_amount = charge_row['clear_amount']
  161. # device type
  162. # WATER_MATER(1, "冷水表",DictConstant.FeeItemEnum.WATER_FEE),
  163. # RWATER_MATER(2, "中水表",DictConstant.FeeItemEnum.RWATER_FEE),
  164. # HWATER_MATER(3, "热水表",DictConstant.FeeItemEnum.HWATER_FEE),
  165. # ELE_MATER(4, "电表",DictConstant.FeeItemEnum.ELECTRIC_FEE),
  166. if device_type == 1:
  167. device_type = '冷水费'
  168. elif device_type == 2:
  169. device_type = '中水费'
  170. elif device_type == 3:
  171. device_type = '热水费'
  172. elif device_type == 4:
  173. device_type = '电费'
  174. else:
  175. device_type = '?'
  176. # print('device_type', device_type)
  177. # print('begin_amount', begin_amount)
  178. # print('end_amount', end_amount)
  179. # print('amount', amount)
  180. # print('balance', balance)
  181. # print('cost', cost)
  182. # ensure key exists
  183. if device_type not in data:
  184. data[device_type] = {}
  185. data[device_type][day] = {
  186. 'device_type': device_type,
  187. 'begin_amount': begin_amount,
  188. 'end_amount': end_amount,
  189. 'amount': amount,
  190. 'balance': balance,
  191. 'cost': cost,
  192. 'charge_amount': charge_amount,
  193. 'clear_amount': clear_amount,
  194. 'device_id': device_id
  195. }
  196. return data
  197. def transform(conn, td_cursor, data) -> pd.DataFrame:
  198. # 字段命名 功能说明 举例
  199. # 房源所属门店 显示当前房源维护人所属的门店 如木繁公寓南山店、寓信公寓 house_base里的dept_id
  200. # 房源地址 调取合同关联房源的完整地址 深圳市福田区香蜜湖街道乐山集团1栋1单元-203
  201. # 合同所属门店/部门 显示当前合同维护人所属的门店或部门 如木繁公寓南山店、寓信公寓
  202. # 合同ID 系统给合同分配的唯一ID 2467347234871234772
  203. # 合同维护人 显示合同和子合同管理的合同的合同维护人 刘飞
  204. # 租客姓名 显示合同和子合同关联的租客信息 租客合同显示租客;包租子合同显示承租人
  205. # 合同开始日期 显示合同和子合同关联的合约开始日期 -
  206. # 合同结束日期 显示合同和子合同关联的合约结束日期; -
  207. # 退租类型 若已退租,则显示退租类型 违约退;正常退,若未退租则显示“-”
  208. # 退租日期 若已退租,则显示退租日期 -
  209. # 费用类型 显示所选能耗费账单费用类型 电费、热水费、冷水费、中水费
  210. # target columns
  211. columns = ['tenant_id', 'theday', 'house_dept_id', 'house_dept_name', 'address', 'contract_dept_id', 'contract_dept_name',
  212. 'contract_id', 'contract_maintainer', 'renter_name', 'contract_terminate_type',
  213. 'begin_time', 'end_time', 'quit_time', 'kind', 'begin_amount', 'end_amount',
  214. 'amount', 'balance', 'cost', 'charge_amount', 'clear_amount']
  215. # target data
  216. target_data = []
  217. for index, row in data.iterrows():
  218. # 查找续租合同
  219. renewed_end_time = find_renewal_contract(conn, row['合同ID'])
  220. begin_time = row['begin_time']
  221. if renewed_end_time is None:
  222. end_time = row['quite_date'] if row['quite_date'] is not None and not pd.isnull(
  223. row['quite_date']) else row['end_time']
  224. else:
  225. end_time = renewed_end_time
  226. meter_data = query_meter_read_record(conn, td_cursor,
  227. row['合同ID'], row['house_id'], begin_time, end_time)
  228. if meter_data is None:
  229. continue
  230. the_time = begin_time
  231. while the_time < end_time:
  232. # 查询电表读数
  233. for device_type in meter_data.keys():
  234. key = the_time.strftime('%Y-%m-%d')
  235. if key not in meter_data[device_type]:
  236. continue
  237. meter_record = meter_data[device_type][key]
  238. if meter_record is None:
  239. continue
  240. target_data.append({
  241. 'tenant_id': row['tenant_id'],
  242. 'theday': the_time,
  243. 'house_dept_id': row['房源所属门店ID'],
  244. 'house_dept_name': row['房源所属门店名称'],
  245. 'address': row['房源地址'],
  246. 'contract_dept_id': row['合同所属门店/部门ID'],
  247. 'contract_dept_name': row['合同所属门店/部门'],
  248. 'contract_id': row['合同ID'],
  249. 'contract_maintainer': row['合同维护人'],
  250. 'renter_name': row['租客姓名'],
  251. 'contract_terminate_type': row['退租类型'],
  252. 'begin_time': begin_time,
  253. 'end_time': end_time,
  254. 'quit_time': row['quite_date'],
  255. 'kind': meter_record['device_type'],
  256. 'begin_amount': meter_record['begin_amount'],
  257. 'end_amount': meter_record['end_amount'],
  258. 'amount': meter_record['amount'],
  259. 'balance': meter_record['balance'],
  260. 'cost': meter_record['cost'],
  261. 'charge_amount': meter_record['charge_amount'],
  262. 'clear_amount': meter_record['clear_amount']
  263. })
  264. the_time = the_time + relativedelta(days=1)
  265. pass
  266. df = pd.DataFrame(columns=columns, data=target_data)
  267. return df
  268. def load(conn, df: pd.DataFrame, target_db) -> None:
  269. """ Loads data into a MySQL database
  270. """
  271. # Define the column types for the table
  272. dtypes = {key: value[0] for key, value in cols.items()}
  273. # create target table with df.dtypes
  274. df.to_sql(target_db, con=conn, if_exists='append',
  275. index=False, dtype=dtypes)
  276. pass
  277. def etl():
  278. config = load_config()
  279. target_db = 'bi_utility_'
  280. # connect TDEngine
  281. td_conn = taos.connect(url=config['tdengine']['url'],
  282. user=config['tdengine']['username'],
  283. password=config['tdengine']['password'],
  284. timeout=30)
  285. td_cursor = td_conn.cursor()
  286. connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
  287. .format(username=config['mysql']['username'],
  288. pwd=quote_plus(config['mysql']['password']),
  289. host=config['mysql']['host'],
  290. port=config['mysql']['port'],
  291. db=config['mysql']['db'])
  292. engine = create_engine(connection_string)
  293. with engine.begin() as conn:
  294. # Get the total number of rows
  295. total = query_total(conn)
  296. # Define the batch size
  297. batch_size = 100
  298. truncate_target_db(conn, target_db)
  299. if debug:
  300. total = 1000
  301. batch_size = 100
  302. print('total', total)
  303. # Write the data to the table in batches
  304. for i in tqdm(range(0, total, batch_size)):
  305. data = extract(conn, td_cursor, batch_size, i)
  306. data = transform(conn, td_cursor, data)
  307. # if debug:
  308. # print(data.head())
  309. # else:
  310. load(conn, data, target_db)
  311. update_column_comment(conn, target_db, cols)
  312. td_conn.close()
  313. pass
  314. etl()