import pandas as pd from common.database_utils import database_util from common.log_utils import logFactory from common import constant from tqdm import tqdm click_client = database_util.get_client() logger = logFactory("preprocess data").log def get_df_by_id_month(uuid_list, month): logger.info(f"开始读取{month}数据") sql1 = f"select *, length(EVENT_APP_USE.C) as I_appuse from Z_USER_TAG_FLAT_out_{month} where uuid in {uuid_list} and EVENT_CATEGORYNAME_C not in ('电商卡', '无限畅视') and EVENT_VIDEO_FLUX_V is not null and EVENT_ORDER_MONTH_C IN ('订购1月内', '1-3月', '3-6月') and EVENT_CANCEL_DIFF_C='未退订'and EVENT_CPNAME_C = '快手' order by I_appuse desc limit 1 by uuid,EVENT_SPNAME_C" all_data1 = click_client.execute(sql1) sql2 = f"select USERID,MAvg_TOTAL_FLUX_1_3_zs,MPer1_TOTAL_FLUX_zs,MAvg_TOTAL_VIDEO_FLUX_1_3_zs,MPer1_TOTAL_VIDEO_FLUX_zs,MAvg_Flow_kuaishou_1_3_zs,MPer1_Flow_kuaishou_zs,Div_kuaishou_vFlux_1_3 from ALS_XXH_CANCEL_NEW_C4_lowDim where USERID in {uuid_list} and TARGET_MONTH='{month}' and MAvg_TOTAL_FLUX_1_3_zs is not null and MPer1_TOTAL_FLUX_zs is not null and MAvg_TOTAL_VIDEO_FLUX_1_3_zs is not null and MPer1_TOTAL_VIDEO_FLUX_zs is not null and MAvg_Flow_kuaishou_1_3_zs is not null and MPer1_Flow_kuaishou_zs is not null and Div_kuaishou_vFlux_1_3 is not null" all_data2 = click_client.execute(sql2) data_frame1 = pd.DataFrame(all_data1, columns=constant.origin_column_names) logger.info(f"data1的shape{data_frame1.shape}") data_frame2 = pd.DataFrame(all_data2, columns=constant.low_dim_cols) logger.info(f"data2的shape{data_frame2.shape}") logger.info(f"读取完成") df_merge = pd.merge(left=data_frame1, right=data_frame2, how='inner', left_on='uuid', right_on='uuid') logger.info(f"merge后的shape{df_merge.shape}") return df_merge def write_df_to_pickle(data, filename): logger.info(f"开始写入pickle") data.to_pickle(f"./data/pkl/{filename}") logger.info(f"写入pickle完成,文件名{filename},文件大小{data.shape}", ) def init_data(data_path_list): logger.info("开始处理数据") for i, dict_value in enumerate(tqdm(data_path_list)): path = dict_value['path'] month1 = dict_value['month1'] month2 = dict_value['month2'] pos_uuid_list = list(set(pd.read_csv(path + "/pos.csv").values.reshape(-1).tolist())) neg_uuid_list = list(set(pd.read_csv(path + "/neg.csv").values.reshape(-1).tolist())) # 处理第一个月数据 first_month_df_pos = get_df_by_id_month(pos_uuid_list, month1) first_month_df_neg = get_df_by_id_month(neg_uuid_list, month1) write_df_to_pickle(first_month_df_pos, f"{i}_{month1}_train_pos.pkl") write_df_to_pickle(first_month_df_neg, f"{i}_{month1}_train_neg.pkl") # 处理第二个月数据 second_month_df_pos = get_df_by_id_month(pos_uuid_list, month2) second_month_df_neg = get_df_by_id_month(neg_uuid_list, month2) write_df_to_pickle(second_month_df_pos, f"{i}_{month2}_train_pos.pkl") write_df_to_pickle(second_month_df_neg, f"{i}_{month2}_train_neg.pkl") # 合并 if __name__ == '__main__': data_path = [ { 'path': './data/csv/678', 'month1': '202106', 'month2': '202107', }, { 'path': './data/csv/789', 'month1': '202107', 'month2': '202108', }, { 'path': './data/csv/8910', 'month1': '202108', 'month2': '202109', }, { 'path': './data/csv/91011', 'month1': '202109', 'month2': '202110', } ] # data_path = [ # { # 'path': './data/csv/678', # 'month1': '202109', # 'month2': '202110', # } # ] init_data(data_path)