""" 寓信水电费收入ETL 水电费收入报表 """ import yaml import pandas as pd from sqlalchemy import create_engine, text from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER from datetime import datetime, timedelta from tqdm import tqdm import json from dateutil.relativedelta import relativedelta import calendar import taosrest as taos from taosrest import connect, TaosRestConnection, TaosRestCursor from urllib.parse import quote_plus from utils import load_config, truncate_target_db debug = False if debug: # debug_condition = " and crc.id='1634463949758156801'" # debug charge amount # debug_condition = " and crc.id='1635124251453575170'" # debug renew contract # debug_condition = " and crc.id='1649702247584620545'" # debug 期末读数 debug_condition = " and crc.id='1674347182259716097'" # debug 退租日期 else: debug_condition = '' def query_total(conn) -> int: """ Queries the total number of records to be processed 合同状态不为无效4,签约类型不为续签2的合同 """ query = """ select count(1) from yuxin_contract.cont_renter_contract crc where crc.is_delete=0 and crc.contract_status<>4 and crc.sign_type<>2 {debug_condition} """.format(debug_condition=debug_condition) return conn.execute(text(query)).scalar() def extract(conn, td_cursor, batch_size, i) -> pd.DataFrame: """ This API extracts data from """ query = """ select hhb.dept_id '房源所属门店ID', house_sd.name '房源所属门店名称', hhr.address '房源地址', crc_sd.id '合同所属门店/部门ID', crc_sd.name '合同所属门店/部门', crc.id '合同ID', crc_ei.name '合同维护人', cri.name '租客姓名', crc.terminate_type '退租类型', crc.begin_time, crc.end_time, crc.quite_date, crc.house_id from yuxin_contract.cont_renter_contract crc left join yuxin_house.hse_house_room hhr on hhr.is_delete=0 and hhr.id=crc.house_id left join yuxin_house.hse_house_base hhb on hhb.is_delete=0 and hhb.id=crc.house_id left join yuxin_setting.setting_department house_sd on house_sd.id=hhb.dept_id and house_sd.is_delete=0 left join yuxin_setting.setting_employee_dept crc_ed on crc_ed.emp_id=crc.maintainer_id and crc_ed.is_delete=0 left join yuxin_setting.setting_department crc_sd on crc_sd.id=crc_ed.dept_id and crc_sd.is_delete=0 left join yuxin_setting.setting_employee_info crc_ei on crc_ei.is_delete=0 and crc_ei.id=crc.maintainer_id left join yuxin_contract.cont_renter_info cri on cri.is_delete=0 and cri.customer_type=1 and cri.contract_id=crc.id where crc.is_delete=0 and crc.contract_status<>4 and crc.sign_type<>2 {debug_condition} limit {batch_size} offset {offset} """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition) source_data = pd.read_sql(query, con=conn) return source_data def find_renewal_contract(conn, contract_id): query = """ select crc1.id, crc1.begin_time, crc1.end_time, crc1.quite_date, crc2.id, crc2.begin_time, crc2.end_time, crc2.quite_date, crc3.id, crc3.begin_time, crc3.end_time, crc3.quite_date from yuxin_contract.cont_renter_contract crc1 left join yuxin_contract.cont_renter_contract crc2 on crc2.contract_pid=crc1.id and crc2.is_delete=0 and crc2.contract_status<>4 and crc2.sign_type=2 left join yuxin_contract.cont_renter_contract crc3 on crc3.contract_pid=crc2.id and crc3.is_delete=0 and crc3.contract_status<>4 and crc3.sign_type=2 where crc1.is_delete=0 and crc1.contract_status<>4 and crc1.sign_type=2 and crc1.contract_pid='{contract_id}' """.format(contract_id=contract_id) result = conn.execute(text(query)).fetchone() if result is None: return None if result[11] is not None: return result[11] elif result[10] is not None: return result[10] elif result[7] is not None: return result[7] elif result[6] is not None: return result[6] elif result[3] is not None: return result[3] elif result[2] is not None: return result[2] return None def query_charge_data(conn, contract_id, room_id): query = """ select DATE_FORMAT(charge_create_time,'%%Y-%%m-%%d') as 'time', sum(total_price) as 'charge_amount', sum(clear_amount) as 'clear_amount', device_id from yuxin_finance.fin_finance_charge_record where status=1 and charge_status=2 and contract_id='{contract_id}' and room_id='{room_id}' group by DATE_FORMAT(charge_create_time,'%%Y-%%m-%%d'), device_id """.format(contract_id=contract_id, room_id=room_id) data = pd.read_sql(query, con=conn) return data def query_meter_read_record(conn, td_cursor, contract_id, room_id, start_date, end_date): end_date_obj = end_date if isinstance(end_date, datetime) else datetime.strptime(end_date, '%Y-%m-%d') end_date_obj = end_date_obj + timedelta(days=1) start_date_obj = start_date if isinstance(start_date, datetime) else datetime.strptime(start_date, '%Y-%m-%d') start_date_obj = start_date_obj - timedelta(days=1) # 获取meter信息 # 忽略合同ID,续约情况下合同ID会有多个 query = """ SELECT device_type, first(ts), first(amount), last(amount), sum(cost), last(`balance`), device_id FROM device.meter_read_record WHERE room_id='{room_id}' and ts >= '{start_date}' AND ts < '{end_date}' and pay_method=2 PARTITION BY device_type, device_id INTERVAL(1d) """.format(contract_id=contract_id, room_id=room_id, start_date=start_date_obj.strftime('%Y-%m-%d'), end_date=end_date) td_cursor.execute(query) # get total rows if td_cursor.rowcount == 0: return None # get rows rows = td_cursor.fetchall() # 获取充值信息 charge_data = query_charge_data(conn, contract_id, room_id) data = {} for idx, row in enumerate(rows): if idx == 0: prev_row = None else: prev_row = rows[idx - 1] day = row[1].strftime('%Y-%m-%d') # ts device_type = row[0] # 设备类型 begin_amount = prev_row[3] if prev_row else row[2] # 期初读数, 取上个时间窗口的最后一个读数,否则取当前时间窗口的最早一个读数 end_amount = row[3] # 期末读数 amount = end_amount - begin_amount # 使用量 cost = row[4] # 使用金额 balance = row[5] # 期末余额 device_id = row[6] charge_row = charge_data.loc[(charge_data['time'] == day) & (charge_data['device_id'] == device_id)] if charge_row.empty: charge_amount = 0 clear_amount = 0 else: charge_row = charge_row.iloc[0] charge_amount = charge_row['charge_amount'] clear_amount = charge_row['clear_amount'] # device type # WATER_MATER(1, "冷水表",DictConstant.FeeItemEnum.WATER_FEE), # RWATER_MATER(2, "中水表",DictConstant.FeeItemEnum.RWATER_FEE), # HWATER_MATER(3, "热水表",DictConstant.FeeItemEnum.HWATER_FEE), # ELE_MATER(4, "电表",DictConstant.FeeItemEnum.ELECTRIC_FEE), if device_type == 1: device_type = '冷水费' elif device_type == 2: device_type = '中水费' elif device_type == 3: device_type = '热水费' elif device_type == 4: device_type = '电费' else: device_type = '?' # print('device_type', device_type) # print('begin_amount', begin_amount) # print('end_amount', end_amount) # print('amount', amount) # print('balance', balance) # print('cost', cost) # ensure key exists if device_type not in data: data[device_type] = {} data[device_type][day] = { 'device_type': device_type, 'begin_amount': begin_amount, 'end_amount': end_amount, 'amount': amount, 'balance': balance, 'cost': cost, 'charge_amount': charge_amount, 'clear_amount': clear_amount, 'device_id': device_id } return data def transform(conn, td_cursor, data) -> pd.DataFrame: # 字段命名 功能说明 举例 # 房源所属门店 显示当前房源维护人所属的门店 如木繁公寓南山店、寓信公寓 house_base里的dept_id # 房源地址 调取合同关联房源的完整地址 深圳市福田区香蜜湖街道乐山集团1栋1单元-203 # 合同所属门店/部门 显示当前合同维护人所属的门店或部门 如木繁公寓南山店、寓信公寓 # 合同ID 系统给合同分配的唯一ID 2467347234871234772 # 合同维护人 显示合同和子合同管理的合同的合同维护人 刘飞 # 租客姓名 显示合同和子合同关联的租客信息 租客合同显示租客;包租子合同显示承租人 # 合同开始日期 显示合同和子合同关联的合约开始日期 - # 合同结束日期 显示合同和子合同关联的合约结束日期; - # 退租类型 若已退租,则显示退租类型 违约退;正常退,若未退租则显示“-” # 退租日期 若已退租,则显示退租日期 - # 费用类型 显示所选能耗费账单费用类型 电费、热水费、冷水费、中水费 # target columns columns = ['theday', 'house_dept_id', 'house_dept_name', 'address', 'contract_dept_id', 'contract_dept_name', 'contract_id', 'contract_maintainer', 'renter_name', 'contract_terminate_type', 'begin_time', 'end_time', 'quit_time', 'kind', 'begin_amount', 'end_amount', 'amount', 'balance', 'cost', 'charge_amount', 'clear_amount'] # target data target_data = [] for index, row in data.iterrows(): # 查找续租合同 renewed_end_time = find_renewal_contract(conn, row['合同ID']) begin_time = row['begin_time'] if renewed_end_time is None: end_time = row['quite_date'] if row['quite_date'] is not None and not pd.isnull( row['quite_date']) else row['end_time'] else: end_time = renewed_end_time meter_data = query_meter_read_record(conn, td_cursor, row['合同ID'], row['house_id'], begin_time, end_time) if meter_data is None: continue the_time = begin_time while the_time < end_time: # 查询电表读数 for device_type in meter_data.keys(): key = the_time.strftime('%Y-%m-%d') if key not in meter_data[device_type]: continue meter_record = meter_data[device_type][key] if meter_record is None: continue target_data.append({ 'theday': the_time, 'house_dept_id': row['房源所属门店ID'], 'house_dept_name': row['房源所属门店名称'], 'address': row['房源地址'], 'contract_dept_id': row['合同所属门店/部门ID'], 'contract_dept_name': row['合同所属门店/部门'], 'contract_id': row['合同ID'], 'contract_maintainer': row['合同维护人'], 'renter_name': row['租客姓名'], 'contract_terminate_type': row['退租类型'], 'begin_time': begin_time, 'end_time': end_time, 'quit_time': row['quite_date'], 'kind': meter_record['device_type'], 'begin_amount': meter_record['begin_amount'], 'end_amount': meter_record['end_amount'], 'amount': meter_record['amount'], 'balance': meter_record['balance'], 'cost': meter_record['cost'], 'charge_amount': meter_record['charge_amount'], 'clear_amount': meter_record['clear_amount'] }) the_time = the_time + relativedelta(days=1) pass df = pd.DataFrame(columns=columns, data=target_data) return df def load(conn, df: pd.DataFrame, target_db) -> None: """ Loads data into a MySQL database """ # Define the column types for the table dtype = { 'theday': Date, 'house_dept_id': NVARCHAR(32), 'house_dept_name': NVARCHAR(32), 'address': NVARCHAR(255), 'contract_dept_id': NVARCHAR(32), 'contract_dept_name': NVARCHAR(32), 'contract_id': NVARCHAR(36), 'contract_maintainer': NVARCHAR(32), 'renter_name': NVARCHAR(50), 'contract_terminate_type': INTEGER(), 'begin_time': Date, 'end_time': Date, 'quit_time': Date, 'kind': NVARCHAR(32), 'begin_amount': DECIMAL(10, 2), 'end_amount': DECIMAL(10, 2), 'amount': DECIMAL(10, 2), 'balance': DECIMAL(10, 2), 'cost': DECIMAL(10, 2), 'charge_amount': DECIMAL(10, 2), 'clear_amount': DECIMAL(10, 2) } # create target table with df.dtypes df.to_sql(target_db, con=conn, if_exists='append', index=False, dtype=dtype) pass def etl(): config = load_config('production') target_db = 'bi_utility_' # connect TDEngine td_conn = taos.connect(url=config['tdengine']['url'], user=config['tdengine']['username'], password=config['tdengine']['password'], timeout=30) td_cursor = td_conn.cursor() connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\ .format(username=config['mysql']['username'], pwd=quote_plus(config['mysql']['password']), host=config['mysql']['host'], port=config['mysql']['port'], db=config['mysql']['db']) engine = create_engine(connection_string) with engine.begin() as conn: # Get the total number of rows total = query_total(conn) # Define the batch size batch_size = 100 truncate_target_db(conn, target_db) if debug: total = 1000 batch_size = 100 print('total', total) # Write the data to the table in batches for i in tqdm(range(0, total, batch_size)): data = extract(conn, td_cursor, batch_size, i) data = transform(conn, td_cursor, data) # if debug: # print(data.head()) # else: load(conn, data, target_db) td_conn.close() pass etl()