123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- """
- 寓信水电费收入ETL
- 水电费收入报表
- """
- import yaml
- import pandas as pd
- from sqlalchemy import create_engine, text
- from sqlalchemy.types import NVARCHAR, Date, DECIMAL, INTEGER
- from datetime import datetime, timedelta
- from tqdm import tqdm
- import json
- from dateutil.relativedelta import relativedelta
- import calendar
- import taosrest as taos
- from taosrest import connect, TaosRestConnection, TaosRestCursor
- from urllib.parse import quote_plus
- from utils import load_config, truncate_target_db
- debug = False
- if debug:
- # debug_condition = " and crc.id='1634463949758156801'" # debug charge amount
- # debug_condition = " and crc.id='1635124251453575170'" # debug renew contract
- # debug_condition = " and crc.id='1649702247584620545'" # debug 期末读数
- debug_condition = " and crc.id='1674347182259716097'" # debug 退租日期
- else:
- debug_condition = ''
- def query_total(conn) -> int:
- """ Queries the total number of records to be processed
- 合同状态不为无效4,签约类型不为续签2的合同
- """
- query = """
- select count(1) from yuxin_contract.cont_renter_contract crc
- where crc.is_delete=0 and crc.contract_status<>4 and crc.sign_type<>2 {debug_condition}
- """.format(debug_condition=debug_condition)
- return conn.execute(text(query)).scalar()
- def extract(conn, td_cursor, batch_size, i) -> pd.DataFrame:
- """ This API extracts data from
- """
- query = """
- select hhb.dept_id '房源所属门店ID', house_sd.name '房源所属门店名称', hhr.address '房源地址',
- crc_sd.id '合同所属门店/部门ID', crc_sd.name '合同所属门店/部门',
- crc.id '合同ID', crc_ei.name '合同维护人', cri.name '租客姓名', crc.terminate_type '退租类型',
- crc.begin_time, crc.end_time, crc.quite_date, crc.house_id
- from yuxin_contract.cont_renter_contract crc
- left join yuxin_house.hse_house_room hhr on hhr.is_delete=0 and hhr.id=crc.house_id
- left join yuxin_house.hse_house_base hhb on hhb.is_delete=0 and hhb.id=crc.house_id
- left join yuxin_setting.setting_department house_sd on house_sd.id=hhb.dept_id and house_sd.is_delete=0
- left join yuxin_setting.setting_employee_dept crc_ed on crc_ed.emp_id=crc.maintainer_id and crc_ed.is_delete=0
- left join yuxin_setting.setting_department crc_sd on crc_sd.id=crc_ed.dept_id and crc_sd.is_delete=0
- left join yuxin_setting.setting_employee_info crc_ei on crc_ei.is_delete=0 and crc_ei.id=crc.maintainer_id
- left join yuxin_contract.cont_renter_info cri on cri.is_delete=0 and cri.customer_type=1 and cri.contract_id=crc.id
- where crc.is_delete=0 and crc.contract_status<>4 and crc.sign_type<>2 {debug_condition}
- limit {batch_size} offset {offset}
- """.format(batch_size=batch_size, offset=i, debug_condition=debug_condition)
- source_data = pd.read_sql(query, con=conn)
- return source_data
- def find_renewal_contract(conn, contract_id):
- query = """
- select crc1.id, crc1.begin_time, crc1.end_time, crc1.quite_date,
- crc2.id, crc2.begin_time, crc2.end_time, crc2.quite_date,
- crc3.id, crc3.begin_time, crc3.end_time, crc3.quite_date
- from yuxin_contract.cont_renter_contract crc1
- left join yuxin_contract.cont_renter_contract crc2 on crc2.contract_pid=crc1.id and crc2.is_delete=0 and crc2.contract_status<>4 and crc2.sign_type=2
- left join yuxin_contract.cont_renter_contract crc3 on crc3.contract_pid=crc2.id and crc3.is_delete=0 and crc3.contract_status<>4 and crc3.sign_type=2
- where crc1.is_delete=0 and crc1.contract_status<>4 and crc1.sign_type=2
- and crc1.contract_pid='{contract_id}'
- """.format(contract_id=contract_id)
-
- result = conn.execute(text(query)).fetchone()
- if result is None:
- return None
- if result[11] is not None:
- return result[11]
- elif result[10] is not None:
- return result[10]
- elif result[7] is not None:
- return result[7]
- elif result[6] is not None:
- return result[6]
- elif result[3] is not None:
- return result[3]
- elif result[2] is not None:
- return result[2]
- return None
- def query_charge_data(conn, contract_id, room_id):
- query = """
- select DATE_FORMAT(charge_create_time,'%%Y-%%m-%%d') as 'time', sum(total_price) as 'charge_amount', sum(clear_amount) as 'clear_amount', device_id
- from yuxin_finance.fin_finance_charge_record
- where status=1 and charge_status=2 and contract_id='{contract_id}' and room_id='{room_id}'
- group by DATE_FORMAT(charge_create_time,'%%Y-%%m-%%d'), device_id
- """.format(contract_id=contract_id, room_id=room_id)
- data = pd.read_sql(query, con=conn)
- return data
- def query_meter_read_record(conn, td_cursor, contract_id, room_id, start_date, end_date):
- end_date_obj = end_date if isinstance(end_date, datetime) else datetime.strptime(end_date, '%Y-%m-%d')
- end_date_obj = end_date_obj + timedelta(days=1)
- start_date_obj = start_date if isinstance(start_date, datetime) else datetime.strptime(start_date, '%Y-%m-%d')
- start_date_obj = start_date_obj - timedelta(days=1)
- # 获取meter信息
- # 忽略合同ID,续约情况下合同ID会有多个
- query = """
- SELECT device_type, first(ts), first(amount), last(amount), sum(cost), last(`balance`), device_id
- FROM device.meter_read_record
- WHERE room_id='{room_id}' and ts >= '{start_date}' AND ts < '{end_date}' and pay_method=2
- PARTITION BY device_type, device_id
- INTERVAL(1d)
- """.format(contract_id=contract_id, room_id=room_id,
- start_date=start_date_obj.strftime('%Y-%m-%d'), end_date=end_date)
- td_cursor.execute(query)
- # get total rows
- if td_cursor.rowcount == 0:
- return None
- # get rows
- rows = td_cursor.fetchall()
- # 获取充值信息
- charge_data = query_charge_data(conn, contract_id, room_id)
- data = {}
- for idx, row in enumerate(rows):
- if idx == 0:
- prev_row = None
- else:
- prev_row = rows[idx - 1]
- day = row[1].strftime('%Y-%m-%d') # ts
- device_type = row[0] # 设备类型
- begin_amount = prev_row[3] if prev_row else row[2] # 期初读数, 取上个时间窗口的最后一个读数,否则取当前时间窗口的最早一个读数
- end_amount = row[3] # 期末读数
- amount = end_amount - begin_amount # 使用量
- cost = row[4] # 使用金额
- balance = row[5] # 期末余额
- device_id = row[6]
- charge_row = charge_data.loc[(charge_data['time'] == day) & (charge_data['device_id'] == device_id)]
- if charge_row.empty:
- charge_amount = 0
- clear_amount = 0
- else:
- charge_row = charge_row.iloc[0]
- charge_amount = charge_row['charge_amount']
- clear_amount = charge_row['clear_amount']
- # device type
- # WATER_MATER(1, "冷水表",DictConstant.FeeItemEnum.WATER_FEE),
- # RWATER_MATER(2, "中水表",DictConstant.FeeItemEnum.RWATER_FEE),
- # HWATER_MATER(3, "热水表",DictConstant.FeeItemEnum.HWATER_FEE),
- # ELE_MATER(4, "电表",DictConstant.FeeItemEnum.ELECTRIC_FEE),
- if device_type == 1:
- device_type = '冷水费'
- elif device_type == 2:
- device_type = '中水费'
- elif device_type == 3:
- device_type = '热水费'
- elif device_type == 4:
- device_type = '电费'
- else:
- device_type = '?'
- # print('device_type', device_type)
- # print('begin_amount', begin_amount)
- # print('end_amount', end_amount)
- # print('amount', amount)
- # print('balance', balance)
- # print('cost', cost)
- # ensure key exists
- if device_type not in data:
- data[device_type] = {}
- data[device_type][day] = {
- 'device_type': device_type,
- 'begin_amount': begin_amount,
- 'end_amount': end_amount,
- 'amount': amount,
- 'balance': balance,
- 'cost': cost,
- 'charge_amount': charge_amount,
- 'clear_amount': clear_amount,
- 'device_id': device_id
- }
- return data
- def transform(conn, td_cursor, data) -> pd.DataFrame:
- # 字段命名 功能说明 举例
- # 房源所属门店 显示当前房源维护人所属的门店 如木繁公寓南山店、寓信公寓 house_base里的dept_id
- # 房源地址 调取合同关联房源的完整地址 深圳市福田区香蜜湖街道乐山集团1栋1单元-203
- # 合同所属门店/部门 显示当前合同维护人所属的门店或部门 如木繁公寓南山店、寓信公寓
- # 合同ID 系统给合同分配的唯一ID 2467347234871234772
- # 合同维护人 显示合同和子合同管理的合同的合同维护人 刘飞
- # 租客姓名 显示合同和子合同关联的租客信息 租客合同显示租客;包租子合同显示承租人
- # 合同开始日期 显示合同和子合同关联的合约开始日期 -
- # 合同结束日期 显示合同和子合同关联的合约结束日期; -
- # 退租类型 若已退租,则显示退租类型 违约退;正常退,若未退租则显示“-”
- # 退租日期 若已退租,则显示退租日期 -
- # 费用类型 显示所选能耗费账单费用类型 电费、热水费、冷水费、中水费
- # target columns
- columns = ['theday', 'house_dept_id', 'house_dept_name', 'address', 'contract_dept_id', 'contract_dept_name',
- 'contract_id', 'contract_maintainer', 'renter_name', 'contract_terminate_type',
- 'begin_time', 'end_time', 'quit_time', 'kind', 'begin_amount', 'end_amount',
- 'amount', 'balance', 'cost', 'charge_amount', 'clear_amount']
- # target data
- target_data = []
- for index, row in data.iterrows():
- # 查找续租合同
- renewed_end_time = find_renewal_contract(conn, row['合同ID'])
- begin_time = row['begin_time']
- if renewed_end_time is None:
- end_time = row['quite_date'] if row['quite_date'] is not None and not pd.isnull(
- row['quite_date']) else row['end_time']
- else:
- end_time = renewed_end_time
-
- meter_data = query_meter_read_record(conn, td_cursor,
- row['合同ID'], row['house_id'], begin_time, end_time)
- if meter_data is None:
- continue
- the_time = begin_time
- while the_time < end_time:
- # 查询电表读数
- for device_type in meter_data.keys():
- key = the_time.strftime('%Y-%m-%d')
- if key not in meter_data[device_type]:
- continue
- meter_record = meter_data[device_type][key]
- if meter_record is None:
- continue
- target_data.append({
- 'theday': the_time,
- 'house_dept_id': row['房源所属门店ID'],
- 'house_dept_name': row['房源所属门店名称'],
- 'address': row['房源地址'],
- 'contract_dept_id': row['合同所属门店/部门ID'],
- 'contract_dept_name': row['合同所属门店/部门'],
- 'contract_id': row['合同ID'],
- 'contract_maintainer': row['合同维护人'],
- 'renter_name': row['租客姓名'],
- 'contract_terminate_type': row['退租类型'],
- 'begin_time': begin_time,
- 'end_time': end_time,
- 'quit_time': row['quite_date'],
- 'kind': meter_record['device_type'],
- 'begin_amount': meter_record['begin_amount'],
- 'end_amount': meter_record['end_amount'],
- 'amount': meter_record['amount'],
- 'balance': meter_record['balance'],
- 'cost': meter_record['cost'],
- 'charge_amount': meter_record['charge_amount'],
- 'clear_amount': meter_record['clear_amount']
- })
- the_time = the_time + relativedelta(days=1)
- pass
- df = pd.DataFrame(columns=columns, data=target_data)
- return df
- def load(conn, df: pd.DataFrame, target_db) -> None:
- """ Loads data into a MySQL database
- """
- # Define the column types for the table
- dtype = {
- 'theday': Date,
- 'house_dept_id': NVARCHAR(32),
- 'house_dept_name': NVARCHAR(32),
- 'address': NVARCHAR(255),
- 'contract_dept_id': NVARCHAR(32),
- 'contract_dept_name': NVARCHAR(32),
- 'contract_id': NVARCHAR(36),
- 'contract_maintainer': NVARCHAR(32),
- 'renter_name': NVARCHAR(50),
- 'contract_terminate_type': INTEGER(),
- 'begin_time': Date,
- 'end_time': Date,
- 'quit_time': Date,
- 'kind': NVARCHAR(32),
- 'begin_amount': DECIMAL(10, 2),
- 'end_amount': DECIMAL(10, 2),
- 'amount': DECIMAL(10, 2),
- 'balance': DECIMAL(10, 2),
- 'cost': DECIMAL(10, 2),
- 'charge_amount': DECIMAL(10, 2),
- 'clear_amount': DECIMAL(10, 2)
- }
- # create target table with df.dtypes
- df.to_sql(target_db, con=conn, if_exists='append',
- index=False, dtype=dtype)
- pass
- def etl():
- config = load_config('production')
- target_db = 'bi_utility_'
- # connect TDEngine
- td_conn = taos.connect(url=config['tdengine']['url'],
- user=config['tdengine']['username'],
- password=config['tdengine']['password'],
- timeout=30)
- td_cursor = td_conn.cursor()
- connection_string = 'mysql+pymysql://{username}:{pwd}@{host}:{port}/{db}?charset=utf8'\
- .format(username=config['mysql']['username'],
- pwd=quote_plus(config['mysql']['password']),
- host=config['mysql']['host'],
- port=config['mysql']['port'],
- db=config['mysql']['db'])
- engine = create_engine(connection_string)
- with engine.begin() as conn:
- # Get the total number of rows
- total = query_total(conn)
- # Define the batch size
- batch_size = 100
- truncate_target_db(conn, target_db)
- if debug:
- total = 1000
- batch_size = 100
- print('total', total)
- # Write the data to the table in batches
- for i in tqdm(range(0, total, batch_size)):
- data = extract(conn, td_cursor, batch_size, i)
- data = transform(conn, td_cursor, data)
- # if debug:
- # print(data.head())
- # else:
- load(conn, data, target_db)
- td_conn.close()
- pass
- etl()
|