etl_utility.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. """
  2. 寓信水电费收入ETL
  3. 水电费收入报表
  4. """
  5. import yaml
  6. import pandas as pd
  7. from sqlalchemy import create_engine, text
  8. from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
  9. from datetime import datetime, timedelta
  10. from tqdm import tqdm
  11. import json
  12. from dateutil.relativedelta import relativedelta
  13. import calendar
  14. import taosrest as taos
  15. from taosrest import connect, TaosRestConnection, TaosRestCursor
  16. from urllib.parse import quote_plus
  17. from utils import load_config, truncate_target_db
  18. debug = False
  19. if debug:
  20. # debug_condition = " and crc.id='1634463949758156801'" # debug charge amount
  21. # debug_condition = " and crc.id='1635124251453575170'" # debug renew contract
  22. # debug_condition = " and crc.id='1649702247584620545'" # debug 期末读数
  23. debug_condition = " and crc.id='1674347182259716097'" # debug 退租日期
  24. else:
  25. debug_condition = ''
  26. def query_total(conn) -> int:
  27. """ Queries the total number of records to be processed
  28. 合同状态不为无效4,签约类型不为续签2的合同
  29. """
  30. query = """
  31. select count(1) from yuxin_contract.cont_renter_contract crc
  32. where crc.is_delete=0 and crc.contract_status<>4 and crc.sign_type<>2 {debug_condition}
  33. """.format(debug_condition=debug_condition)
  34. return conn.execute(text(query)).scalar()
  35. def extract(conn, td_cursor, batch_size, i) -> pd.DataFrame:
  36. """ This API extracts data from
  37. """
  38. query = """
  39. select hhb.dept_id '房源所属门店ID', house_sd.name '房源所属门店名称', hhr.address '房源地址',
  40. crc_sd.id '合同所属门店/部门ID', crc_sd.name '合同所属门店/部门',
  41. crc.id '合同ID', crc_ei.name '合同维护人', cri.name '租客姓名', crc.terminate_type '退租类型',
  42. crc.begin_time, crc.end_time, crc.quite_date, crc.house_id
  43. from yuxin_contract.cont_renter_contract crc
  44. left join yuxin_house.hse_house_room hhr on hhr.is_delete=0 and hhr.id=crc.house_id
  45. left join yuxin_house.hse_house_base hhb on hhb.is_delete=0 and hhb.id=crc.house_id
  46. left join yuxin_setting.setting_department house_sd on house_sd.id=hhb.dept_id and house_sd.is_delete=0
  47. left join yuxin_setting.setting_employee_dept crc_ed on crc_ed.emp_id=crc.maintainer_id and crc_ed.is_delete=0
  48. left join yuxin_setting.setting_department crc_sd on crc_sd.id=crc_ed.dept_id and crc_sd.is_delete=0
  49. left join yuxin_setting.setting_employee_info crc_ei on crc_ei.is_delete=0 and crc_ei.id=crc.maintainer_id
  50. left join yuxin_contract.cont_renter_info cri on cri.is_delete=0 and cri.customer_type=1 and cri.contract_id=crc.id
  51. where crc.is_delete=0 and crc.contract_status<>4 and crc.sign_type<>2 {debug_condition}
  52. limit {batch_size} offset {offset}
  53. """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
  54. source_data = pd.read_sql(query, con=conn)
  55. return source_data
  56. def find_renewal_contract(conn, contract_id):
  57. query = """
  58. select crc1.id, crc1.begin_time, crc1.end_time, crc1.quite_date,
  59. crc2.id, crc2.begin_time, crc2.end_time, crc2.quite_date,
  60. crc3.id, crc3.begin_time, crc3.end_time, crc3.quite_date
  61. from yuxin_contract.cont_renter_contract crc1
  62. left join yuxin_contract.cont_renter_contract crc2 on crc2.contract_pid=crc1.id and crc2.is_delete=0 and crc2.contract_status<>4 and crc2.sign_type=2
  63. left join yuxin_contract.cont_renter_contract crc3 on crc3.contract_pid=crc2.id and crc3.is_delete=0 and crc3.contract_status<>4 and crc3.sign_type=2
  64. where crc1.is_delete=0 and crc1.contract_status<>4 and crc1.sign_type=2
  65. and crc1.contract_pid='{contract_id}'
  66. """.format(contract_id=contract_id)
  67. result = conn.execute(text(query)).fetchone()
  68. if result is None:
  69. return None
  70. if result[11] is not None:
  71. return result[11]
  72. elif result[10] is not None:
  73. return result[10]
  74. elif result[7] is not None:
  75. return result[7]
  76. elif result[6] is not None:
  77. return result[6]
  78. elif result[3] is not None:
  79. return result[3]
  80. elif result[2] is not None:
  81. return result[2]
  82. return None
  83. def query_charge_data(conn, contract_id, room_id):
  84. query = """
  85. select DATE_FORMAT(charge_create_time,'%%Y-%%m-%%d') as 'time', sum(total_price) as 'charge_amount', sum(clear_amount) as 'clear_amount', device_id
  86. from yuxin_finance.fin_finance_charge_record
  87. where status=1 and charge_status=2 and contract_id='{contract_id}' and room_id='{room_id}'
  88. group by DATE_FORMAT(charge_create_time,'%%Y-%%m-%%d'), device_id
  89. """.format(contract_id=contract_id, room_id=room_id)
  90. data = pd.read_sql(query, con=conn)
  91. return data
  92. def query_meter_read_record(conn, td_cursor, contract_id, room_id, start_date, end_date):
  93. end_date_obj = end_date if isinstance(end_date, datetime) else datetime.strptime(end_date, '%Y-%m-%d')
  94. end_date_obj = end_date_obj + timedelta(days=1)
  95. start_date_obj = start_date if isinstance(start_date, datetime) else datetime.strptime(start_date, '%Y-%m-%d')
  96. start_date_obj = start_date_obj - timedelta(days=1)
  97. # 获取meter信息
  98. # 忽略合同ID,续约情况下合同ID会有多个
  99. query = """
  100. SELECT device_type, first(ts), first(amount), last(amount), sum(cost), last(`balance`), device_id
  101. FROM device.meter_read_record
  102. WHERE room_id='{room_id}' and ts >= '{start_date}' AND ts < '{end_date}' and pay_method=2
  103. PARTITION BY device_type, device_id
  104. INTERVAL(1d)
  105. """.format(contract_id=contract_id, room_id=room_id,
  106. start_date=start_date_obj.strftime('%Y-%m-%d'), end_date=end_date)
  107. td_cursor.execute(query)
  108. # get total rows
  109. if td_cursor.rowcount == 0:
  110. return None
  111. # get rows
  112. rows = td_cursor.fetchall()
  113. # 获取充值信息
  114. charge_data = query_charge_data(conn, contract_id, room_id)
  115. data = {}
  116. for idx, row in enumerate(rows):
  117. if idx == 0:
  118. prev_row = None
  119. else:
  120. prev_row = rows[idx - 1]
  121. day = row[1].strftime('%Y-%m-%d') # ts
  122. device_type = row[0] # 设备类型
  123. begin_amount = prev_row[3] if prev_row else row[2] # 期初读数, 取上个时间窗口的最后一个读数,否则取当前时间窗口的最早一个读数
  124. end_amount = row[3] # 期末读数
  125. amount = end_amount - begin_amount # 使用量
  126. cost = row[4] # 使用金额
  127. balance = row[5] # 期末余额
  128. device_id = row[6]
  129. charge_row = charge_data.loc[(charge_data['time'] == day) & (charge_data['device_id'] == device_id)]
  130. if charge_row.empty:
  131. charge_amount = 0
  132. clear_amount = 0
  133. else:
  134. charge_row = charge_row.iloc[0]
  135. charge_amount = charge_row['charge_amount']
  136. clear_amount = charge_row['clear_amount']
  137. # device type
  138. # WATER_MATER(1, "冷水表",DictConstant.FeeItemEnum.WATER_FEE),
  139. # RWATER_MATER(2, "中水表",DictConstant.FeeItemEnum.RWATER_FEE),
  140. # HWATER_MATER(3, "热水表",DictConstant.FeeItemEnum.HWATER_FEE),
  141. # ELE_MATER(4, "电表",DictConstant.FeeItemEnum.ELECTRIC_FEE),
  142. if device_type == 1:
  143. device_type = '冷水费'
  144. elif device_type == 2:
  145. device_type = '中水费'
  146. elif device_type == 3:
  147. device_type = '热水费'
  148. elif device_type == 4:
  149. device_type = '电费'
  150. else:
  151. device_type = '?'
  152. # print('device_type', device_type)
  153. # print('begin_amount', begin_amount)
  154. # print('end_amount', end_amount)
  155. # print('amount', amount)
  156. # print('balance', balance)
  157. # print('cost', cost)
  158. # ensure key exists
  159. if device_type not in data:
  160. data[device_type] = {}
  161. data[device_type][day] = {
  162. 'device_type': device_type,
  163. 'begin_amount': begin_amount,
  164. 'end_amount': end_amount,
  165. 'amount': amount,
  166. 'balance': balance,
  167. 'cost': cost,
  168. 'charge_amount': charge_amount,
  169. 'clear_amount': clear_amount,
  170. 'device_id': device_id
  171. }
  172. return data
  173. def transform(conn, td_cursor, data) -> pd.DataFrame:
  174. # 字段命名 功能说明 举例
  175. # 房源所属门店 显示当前房源维护人所属的门店 如木繁公寓南山店、寓信公寓 house_base里的dept_id
  176. # 房源地址 调取合同关联房源的完整地址 深圳市福田区香蜜湖街道乐山集团1栋1单元-203
  177. # 合同所属门店/部门 显示当前合同维护人所属的门店或部门 如木繁公寓南山店、寓信公寓
  178. # 合同ID 系统给合同分配的唯一ID 2467347234871234772
  179. # 合同维护人 显示合同和子合同管理的合同的合同维护人 刘飞
  180. # 租客姓名 显示合同和子合同关联的租客信息 租客合同显示租客;包租子合同显示承租人
  181. # 合同开始日期 显示合同和子合同关联的合约开始日期 -
  182. # 合同结束日期 显示合同和子合同关联的合约结束日期; -
  183. # 退租类型 若已退租,则显示退租类型 违约退;正常退,若未退租则显示“-”
  184. # 退租日期 若已退租,则显示退租日期 -
  185. # 费用类型 显示所选能耗费账单费用类型 电费、热水费、冷水费、中水费
  186. # target columns
  187. columns = ['theday', 'house_dept_id', 'house_dept_name', 'address', 'contract_dept_id', 'contract_dept_name',
  188. 'contract_id', 'contract_maintainer', 'renter_name', 'contract_terminate_type',
  189. 'begin_time', 'end_time', 'quit_time', 'kind', 'begin_amount', 'end_amount',
  190. 'amount', 'balance', 'cost', 'charge_amount', 'clear_amount']
  191. # target data
  192. target_data = []
  193. for index, row in data.iterrows():
  194. # 查找续租合同
  195. renewed_end_time = find_renewal_contract(conn, row['合同ID'])
  196. begin_time = row['begin_time']
  197. if renewed_end_time is None:
  198. end_time = row['quite_date'] if row['quite_date'] is not None and not pd.isnull(
  199. row['quite_date']) else row['end_time']
  200. else:
  201. end_time = renewed_end_time
  202. meter_data = query_meter_read_record(conn, td_cursor,
  203. row['合同ID'], row['house_id'], begin_time, end_time)
  204. if meter_data is None:
  205. continue
  206. the_time = begin_time
  207. while the_time < end_time:
  208. # 查询电表读数
  209. for device_type in meter_data.keys():
  210. key = the_time.strftime('%Y-%m-%d')
  211. if key not in meter_data[device_type]:
  212. continue
  213. meter_record = meter_data[device_type][key]
  214. if meter_record is None:
  215. continue
  216. target_data.append({
  217. 'theday': the_time,
  218. 'house_dept_id': row['房源所属门店ID'],
  219. 'house_dept_name': row['房源所属门店名称'],
  220. 'address': row['房源地址'],
  221. 'contract_dept_id': row['合同所属门店/部门ID'],
  222. 'contract_dept_name': row['合同所属门店/部门'],
  223. 'contract_id': row['合同ID'],
  224. 'contract_maintainer': row['合同维护人'],
  225. 'renter_name': row['租客姓名'],
  226. 'contract_terminate_type': row['退租类型'],
  227. 'begin_time': begin_time,
  228. 'end_time': end_time,
  229. 'quit_time': row['quite_date'],
  230. 'kind': meter_record['device_type'],
  231. 'begin_amount': meter_record['begin_amount'],
  232. 'end_amount': meter_record['end_amount'],
  233. 'amount': meter_record['amount'],
  234. 'balance': meter_record['balance'],
  235. 'cost': meter_record['cost'],
  236. 'charge_amount': meter_record['charge_amount'],
  237. 'clear_amount': meter_record['clear_amount']
  238. })
  239. the_time = the_time + relativedelta(days=1)
  240. pass
  241. df = pd.DataFrame(columns=columns, data=target_data)
  242. return df
  243. def load(conn, df: pd.DataFrame, target_db) -> None:
  244. """ Loads data into a MySQL database
  245. """
  246. # Define the column types for the table
  247. dtype = {
  248. 'theday': Date,
  249. 'house_dept_id': NVARCHAR(32),
  250. 'house_dept_name': NVARCHAR(32),
  251. 'address': NVARCHAR(255),
  252. 'contract_dept_id': NVARCHAR(32),
  253. 'contract_dept_name': NVARCHAR(32),
  254. 'contract_id': NVARCHAR(36),
  255. 'contract_maintainer': NVARCHAR(32),
  256. 'renter_name': NVARCHAR(50),
  257. 'contract_terminate_type': INTEGER(),
  258. 'begin_time': Date,
  259. 'end_time': Date,
  260. 'quit_time': Date,
  261. 'kind': NVARCHAR(32),
  262. 'begin_amount': DECIMAL(10, 2),
  263. 'end_amount': DECIMAL(10, 2),
  264. 'amount': DECIMAL(10, 2),
  265. 'balance': DECIMAL(10, 2),
  266. 'cost': DECIMAL(10, 2),
  267. 'charge_amount': DECIMAL(10, 2),
  268. 'clear_amount': DECIMAL(10, 2)
  269. }
  270. # create target table with df.dtypes
  271. df.to_sql(target_db, con=conn, if_exists='append',
  272. index=False, dtype=dtype)
  273. pass
  274. def etl():
  275. config = load_config('production')
  276. target_db = 'bi_utility_'
  277. # connect TDEngine
  278. td_conn = taos.connect(url=config['tdengine']['url'],
  279. user=config['tdengine']['username'],
  280. password=config['tdengine']['password'],
  281. timeout=30)
  282. td_cursor = td_conn.cursor()
  283. connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
  284. .format(username=config['mysql']['username'],
  285. pwd=quote_plus(config['mysql']['password']),
  286. host=config['mysql']['host'],
  287. port=config['mysql']['port'],
  288. db=config['mysql']['db'])
  289. engine = create_engine(connection_string)
  290. with engine.begin() as conn:
  291. # Get the total number of rows
  292. total = query_total(conn)
  293. # Define the batch size
  294. batch_size = 100
  295. truncate_target_db(conn, target_db)
  296. if debug:
  297. total = 1000
  298. batch_size = 100
  299. print('total', total)
  300. # Write the data to the table in batches
  301. for i in tqdm(range(0, total, batch_size)):
  302. data = extract(conn, td_cursor, batch_size, i)
  303. data = transform(conn, td_cursor, data)
  304. # if debug:
  305. # print(data.head())
  306. # else:
  307. load(conn, data, target_db)
  308. td_conn.close()
  309. pass
  310. etl()