123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- import pandas as pd
- from common.database_utils import database_util
- from common.log_utils import logFactory
- from common import constant
- from tqdm import tqdm
- click_client = database_util.get_client()
- logger = logFactory("preprocess data").log
- def get_df_by_id_month(uuid_list, month):
- logger.info(f"开始读取{month}数据")
- sql1 = f"select *, length(EVENT_APP_USE.C) as I_appuse from Z_USER_TAG_FLAT_out_{month} where uuid in {uuid_list} and EVENT_CATEGORYNAME_C not in ('电商卡', '无限畅视') and EVENT_VIDEO_FLUX_V is not null and EVENT_ORDER_MONTH_C IN ('订购1月内', '1-3月', '3-6月') and EVENT_CANCEL_DIFF_C='未退订'and EVENT_CPNAME_C = '快手' order by I_appuse desc limit 1 by uuid,EVENT_SPNAME_C"
- all_data1 = click_client.execute(sql1)
- sql2 = f"select USERID,MAvg_TOTAL_FLUX_1_3_zs,MPer1_TOTAL_FLUX_zs,MAvg_TOTAL_VIDEO_FLUX_1_3_zs,MPer1_TOTAL_VIDEO_FLUX_zs,MAvg_Flow_kuaishou_1_3_zs,MPer1_Flow_kuaishou_zs,Div_kuaishou_vFlux_1_3 from ALS_XXH_CANCEL_NEW_C4_lowDim where USERID in {uuid_list} and TARGET_MONTH='{month}' and MAvg_TOTAL_FLUX_1_3_zs is not null and MPer1_TOTAL_FLUX_zs is not null and MAvg_TOTAL_VIDEO_FLUX_1_3_zs is not null and MPer1_TOTAL_VIDEO_FLUX_zs is not null and MAvg_Flow_kuaishou_1_3_zs is not null and MPer1_Flow_kuaishou_zs is not null and Div_kuaishou_vFlux_1_3 is not null"
- all_data2 = click_client.execute(sql2)
- data_frame1 = pd.DataFrame(all_data1, columns=constant.origin_column_names)
- logger.info(f"data1的shape{data_frame1.shape}")
- data_frame2 = pd.DataFrame(all_data2, columns=constant.low_dim_cols)
- logger.info(f"data2的shape{data_frame2.shape}")
- logger.info(f"读取完成")
- df_merge = pd.merge(left=data_frame1, right=data_frame2, how='inner', left_on='uuid', right_on='uuid')
- logger.info(f"merge后的shape{df_merge.shape}")
- return df_merge
- def write_df_to_pickle(data, filename):
- logger.info(f"开始写入pickle")
- data.to_pickle(f"./data/pkl/{filename}")
- logger.info(f"写入pickle完成,文件名{filename},文件大小{data.shape}", )
- def init_data(data_path_list):
- logger.info("开始处理数据")
- for i, dict_value in enumerate(tqdm(data_path_list)):
- path = dict_value['path']
- month1 = dict_value['month1']
- month2 = dict_value['month2']
- pos_uuid_list = list(set(pd.read_csv(path + "/pos.csv").values.reshape(-1).tolist()))
- neg_uuid_list = list(set(pd.read_csv(path + "/neg.csv").values.reshape(-1).tolist()))
- # 处理第一个月数据
- first_month_df_pos = get_df_by_id_month(pos_uuid_list, month1)
- first_month_df_neg = get_df_by_id_month(neg_uuid_list, month1)
- write_df_to_pickle(first_month_df_pos, f"{i}_{month1}_train_pos.pkl")
- write_df_to_pickle(first_month_df_neg, f"{i}_{month1}_train_neg.pkl")
- # 处理第二个月数据
- second_month_df_pos = get_df_by_id_month(pos_uuid_list, month2)
- second_month_df_neg = get_df_by_id_month(neg_uuid_list, month2)
- write_df_to_pickle(second_month_df_pos, f"{i}_{month2}_train_pos.pkl")
- write_df_to_pickle(second_month_df_neg, f"{i}_{month2}_train_neg.pkl")
- # 合并
- if __name__ == '__main__':
- data_path = [
- {
- 'path': './data/csv/678',
- 'month1': '202106',
- 'month2': '202107',
- },
- {
- 'path': './data/csv/789',
- 'month1': '202107',
- 'month2': '202108',
- },
- {
- 'path': './data/csv/8910',
- 'month1': '202108',
- 'month2': '202109',
- },
- {
- 'path': './data/csv/91011',
- 'month1': '202109',
- 'month2': '202110',
- }
- ]
- # data_path = [
- # {
- # 'path': './data/csv/678',
- # 'month1': '202109',
- # 'month2': '202110',
- # }
- # ]
- init_data(data_path)
|