import pandas as pd from common.database_utils import database_util from common.log_utils import logFactory from common import constant from common.process_util import ProcessUtil from tqdm import tqdm logger = logFactory("preprocess data").log def process_month_data_frame(month_df, path): logger.info(f"开始预处理{path}") month_df.drop(columns=constant.general_drop_columns, inplace=True) month_df['TAG_PROVINCE_C'] = month_df['TAG_PROVINCE_C'].map(ProcessUtil.convert_province) month_df['TAG_INTIME_C'] = month_df['TAG_INTIME_C'].map(ProcessUtil.convert_intime) month_df['TAG_INTIME_C'] = month_df['TAG_INTIME_C'].astype('int') month_df['EVENT_FLUX_C'] = month_df['EVENT_FLUX_C'].map(ProcessUtil.convert_flux) month_df['EVENT_FLUX_V'] = month_df['EVENT_FLUX_V'].map(ProcessUtil.convert_flux_value) month_df['MAvg_TOTAL_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_FLUX_1_3_zs'].map(ProcessUtil.convert_flux_value) month_df['MAvg_TOTAL_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_FLUX_1_3_zs'].astype('float') month_df['MPer1_TOTAL_FLUX_zs'] = month_df['MPer1_TOTAL_FLUX_zs'].map(ProcessUtil.convert_flux_value) month_df['MPer1_TOTAL_FLUX_zs'] = month_df['MPer1_TOTAL_FLUX_zs'].astype('float') month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'].map( ProcessUtil.convert_flux_value) month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'].astype('float') month_df['MPer1_TOTAL_VIDEO_FLUX_zs'] = month_df['MPer1_TOTAL_VIDEO_FLUX_zs'].map(ProcessUtil.convert_flux_value) month_df['MPer1_TOTAL_VIDEO_FLUX_zs'] = month_df['MPer1_TOTAL_VIDEO_FLUX_zs'].astype('float') month_df['MAvg_Flow_kuaishou_1_3_zs'] = month_df['MAvg_Flow_kuaishou_1_3_zs'].map(ProcessUtil.convert_flux_value) month_df['MAvg_Flow_kuaishou_1_3_zs'] = month_df['MAvg_Flow_kuaishou_1_3_zs'].astype('float') month_df['MPer1_Flow_kuaishou_zs'] = month_df['MPer1_Flow_kuaishou_zs'].map(ProcessUtil.convert_flux_value) month_df['MPer1_Flow_kuaishou_zs'] = month_df['MPer1_Flow_kuaishou_zs'].astype('float') month_df['Div_kuaishou_vFlux_1_3'] = month_df['Div_kuaishou_vFlux_1_3'].map(ProcessUtil.convert_flux_value) month_df['Div_kuaishou_vFlux_1_3'] = month_df['Div_kuaishou_vFlux_1_3'].astype('float') month_df['EVENT_FLUX_V'] = month_df['EVENT_FLUX_V'].astype('float') month_df['EVENT_FLUX_C'] = month_df['EVENT_FLUX_C'].astype('int') month_df['EVENT_CONSUM_C'] = month_df['EVENT_CONSUM_C'].map(ProcessUtil.convert_consume) month_df['EVENT_CONSUM_V'] = month_df['EVENT_CONSUM_V'].map(ProcessUtil.convert_consume_value) month_df['EVENT_CONSUM_C'] = month_df['EVENT_CONSUM_C'].astype('int') month_df['EVENT_CONSUM_V'] = month_df['EVENT_CONSUM_V'].astype('float') # month_df['EVENT_ORDER_MONTH_C'] = month_df['EVENT_ORDER_MONTH_C'].map(ProcessUtil.convert_event_order_c) # month_df['EVENT_ORDER_MONTH_C'] = month_df['EVENT_ORDER_MONTH_C'].astype('int') month_df['EVENT_VIDEO_FLUX_C'] = month_df['EVENT_VIDEO_FLUX_C'].map(ProcessUtil.convert_video_flux) month_df['EVENT_VIDEO_FLUX_V'] = month_df['EVENT_VIDEO_FLUX_V'].map(ProcessUtil.convert_video_flux_value) month_df['EVENT_VIDEO_FLUX_V'] = month_df['EVENT_VIDEO_FLUX_V'].astype('float') month_df['EVENT_VIDEO_FLUX_C'] = month_df['EVENT_VIDEO_FLUX_C'].astype('int') month_df['TAG_GENDER_C'] = month_df['TAG_GENDER_C'].map(ProcessUtil.convert_gender) month_df['TAG_GENDER_C'] = month_df['TAG_GENDER_C'].astype('int') month_df['TAG_NETTYPE_C'] = month_df['TAG_NETTYPE_C'].map(ProcessUtil.convert_nettype) month_df['TAG_NETTYPE_C'] = month_df['TAG_NETTYPE_C'].astype('int') month_df['TAG_AGE_C'] = month_df['TAG_AGE_C'].map(ProcessUtil.convert_age) month_df['TAG_AGE_C'] = month_df['TAG_AGE_C'].astype('int') month_df['EVENT_IS_ACCT_C'] = month_df['EVENT_IS_ACCT_C'].map(ProcessUtil.convert_acct) month_df['EVENT_IS_ACCT_C'] = month_df['EVENT_IS_ACCT_C'].astype('int') ProcessUtil.convert_appuse(month_df) month_df.drop(columns=['EVENT_APP_USE.C'], inplace=True) month_df.drop(columns=['EVENT_USER_OSTATE_C'], inplace=True) logger.info(f"完成预处理{path}") return month_df def convert_onehot(df): logger.info("开始onehot编码") return ProcessUtil.convert_onehot(df) def merge_df_for_lgbm(dataf1, dataf2): df_general = dataf1[['uuid']] df_1 = dataf1.drop(columns=['uuid']) df_1 = df_1.add_prefix("first_") df_2 = dataf2.drop(columns=['uuid']) df_2 = df_2.add_prefix('second_') df_merge = pd.concat([df_general, df_1, df_2], axis=1) df_merge.drop(columns=[ 'second_TAG_GENDER_C_0', 'second_TAG_GENDER_C_1', 'second_TAG_GENDER_C_2', 'second_TAG_NETTYPE_C_0', 'second_TAG_NETTYPE_C_1', 'second_TAG_NETTYPE_C_2', 'second_TAG_NETTYPE_C_3', 'second_TAG_NETTYPE_C_4', 'second_TAG_AGE_C_0', 'second_TAG_AGE_C_1', 'second_TAG_AGE_C_2', 'second_TAG_AGE_C_3', 'second_TAG_AGE_C_4', 'second_TAG_AGE_C_5', 'second_TAG_INTIME_C_0', 'second_TAG_INTIME_C_1', 'second_TAG_INTIME_C_2', 'second_TAG_INTIME_C_3', 'second_TAG_INTIME_C_4', 'second_TAG_INTIME_C_5', 'second_TAG_PROVINCE_C_0', 'second_TAG_PROVINCE_C_1', 'second_TAG_PROVINCE_C_2', 'second_TAG_PROVINCE_C_3', 'second_TAG_PROVINCE_C_4', 'second_TAG_PROVINCE_C_5', 'second_TAG_PROVINCE_C_6', 'second_TAG_PROVINCE_C_7', 'second_TAG_PROVINCE_C_8', 'second_TAG_PROVINCE_C_9', 'second_TAG_PROVINCE_C_10', 'second_TAG_PROVINCE_C_11', 'second_TAG_PROVINCE_C_12', 'second_TAG_PROVINCE_C_13', 'second_TAG_PROVINCE_C_14', 'second_TAG_PROVINCE_C_15', 'second_TAG_PROVINCE_C_16', 'second_TAG_PROVINCE_C_17', 'second_TAG_PROVINCE_C_18', 'second_TAG_PROVINCE_C_19', 'second_TAG_PROVINCE_C_20', 'second_TAG_PROVINCE_C_21', 'second_TAG_PROVINCE_C_22', 'second_TAG_PROVINCE_C_23', 'second_TAG_PROVINCE_C_24', 'second_TAG_PROVINCE_C_25', 'second_TAG_PROVINCE_C_26', 'second_TAG_PROVINCE_C_27', 'second_TAG_PROVINCE_C_28', 'second_TAG_PROVINCE_C_29', 'second_TAG_PROVINCE_C_30', 'second_TAG_PROVINCE_C_31', ], inplace=True) return df_merge def gen_new_feature(df): df['diff_EVENT_FLUX_V'] = df['second_EVENT_FLUX_V'] - df['first_EVENT_FLUX_V'] df['diff_EVENT_CONSUM_V'] = df['second_EVENT_CONSUM_V'] - df['first_EVENT_CONSUM_V'] df['diff_EVENT_VIDEO_FLUX_V'] = df['second_EVENT_VIDEO_FLUX_V'] - df['first_EVENT_VIDEO_FLUX_V'] df['diff_kuaishou_use'] = df['second_app_use_kuaishou'] - df['first_app_use_kuaishou'] df.drop(columns=['second_app_use_kuaishou', 'first_app_use_kuaishou'], inplace=True) return df # 按每个UUID输出一个文件,并将该文件及其标签记录下来 def merge_df_for_rnn(df1, df2, index, sample_type): df1_uuids = df1['uuid'].values.tolist() with open("./data/csv_lstm_data/path_dict.txt", "a") as f: for uuid in tqdm(df1_uuids): df_temp1 = df1[(df1.uuid == uuid)].drop_duplicates() df_temp2 = df2[(df2.uuid == uuid)].drop_duplicates() df_merge = pd.concat([df_temp1, df_temp2], axis=0) output_pkl_path = f"./data/csv_lstm_data/{index}_{uuid}.pkl" df_merge.to_pickle(output_pkl_path) f.write(output_pkl_path + ' ' + sample_type) continue def handle_data(pkl_path, output_name, isRNN, sample_type): sample_df = [] for i, path_dict in enumerate(tqdm(pkl_path)): path1 = path_dict['path1'] path2 = path_dict['path2'] first_month_data_frame = pd.read_pickle(path1) second_month_data_frame = pd.read_pickle(path2) month1_df_processed = process_month_data_frame(first_month_data_frame, path1) dataf1 = convert_onehot(month1_df_processed) month2_df_processed = process_month_data_frame(second_month_data_frame, path2) dataf2 = convert_onehot(month2_df_processed) # # # 合成数据 if not isRNN: new_merge_df = merge_df_for_lgbm(dataf1, dataf2) else: merge_df_for_rnn(dataf1, dataf2, i, sample_type) logger.info(f"处理完第{i}轮整体数据") continue # 新特征生成 if not isRNN: new_merge_df = gen_new_feature(new_merge_df) sample_df.append(new_merge_df) if not isRNN: logger.info("开始合并训练数据") total_train = pd.concat(sample_df, axis=0) total_train.to_pickle(output_name) if __name__ == '__main__': pkl_path = [ { "path1": "./data/pkl/0_202106_train_pos.pkl", "path2": "./data/pkl/0_202107_train_pos.pkl" }, { "path1": "./data/pkl/1_202107_train_pos.pkl", "path2": "./data/pkl/1_202108_train_pos.pkl" }, { "path1": "./data/pkl/2_202108_train_pos.pkl", "path2": "./data/pkl/2_202109_train_pos.pkl" }, # { # "path1": "./data/pkl/3_202109_train_pos.pkl", # "path2": "./data/pkl/3_202110_train_pos.pkl" # }, ] handle_data(pkl_path=pkl_path, output_name="./data/pkl/train_pos_rnn_all.pkl", isRNN=True,sample_type="0") # pkl_path = [ # { # "path1": "./data/pkl/0_202106_train_neg.pkl", # "path2": "./data/pkl/0_202107_train_neg.pkl" # }, # { # "path1": "./data/pkl/1_202107_train_neg.pkl", # "path2": "./data/pkl/1_202108_train_neg.pkl" # }, # { # "path1": "./data/pkl/2_202108_train_neg.pkl", # "path2": "./data/pkl/2_202109_train_neg.pkl" # }, # # { # # "path1": "./data/pkl/3_202109_train_neg.pkl", # # "path2": "./data/pkl/3_202110_train_neg.pkl" # # }, # ] # # handle_data(pkl_path=pkl_path, output_name="./data/pkl/train_neg_rnn_all.pkl", isRNN=True)