import pandas as pd from common.database_utils import database_util from common.log_utils import logFactory from common import constant from common.process_util import ProcessUtil from tqdm import tqdm import multiprocessing from multiprocessing import Process logger = logFactory("preprocess data").log def process_month_data_frame(month_df, path): logger.info(f"开始预处理{path}") month_df.drop(columns=constant.general_drop_columns, inplace=True) month_df['TAG_PROVINCE_C'] = month_df['TAG_PROVINCE_C'].map(ProcessUtil.convert_province) month_df['TAG_INTIME_C'] = month_df['TAG_INTIME_C'].map(ProcessUtil.convert_intime) month_df['TAG_INTIME_C'] = month_df['TAG_INTIME_C'].astype('int') month_df['EVENT_FLUX_C'] = month_df['EVENT_FLUX_C'].map(ProcessUtil.convert_flux) month_df['EVENT_FLUX_V'] = month_df['EVENT_FLUX_V'].map(ProcessUtil.convert_flux_value) month_df['MAvg_TOTAL_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_FLUX_1_3_zs'].map(ProcessUtil.convert_flux_value) month_df['MAvg_TOTAL_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_FLUX_1_3_zs'].astype('float') month_df['MPer1_TOTAL_FLUX_zs'] = month_df['MPer1_TOTAL_FLUX_zs'].map(ProcessUtil.convert_flux_value) month_df['MPer1_TOTAL_FLUX_zs'] = month_df['MPer1_TOTAL_FLUX_zs'].astype('float') month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'].map( ProcessUtil.convert_flux_value) month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'].astype('float') month_df['MPer1_TOTAL_VIDEO_FLUX_zs'] = month_df['MPer1_TOTAL_VIDEO_FLUX_zs'].map(ProcessUtil.convert_flux_value) month_df['MPer1_TOTAL_VIDEO_FLUX_zs'] = month_df['MPer1_TOTAL_VIDEO_FLUX_zs'].astype('float') month_df['MAvg_Flow_kuaishou_1_3_zs'] = month_df['MAvg_Flow_kuaishou_1_3_zs'].map(ProcessUtil.convert_flux_value) month_df['MAvg_Flow_kuaishou_1_3_zs'] = month_df['MAvg_Flow_kuaishou_1_3_zs'].astype('float') month_df['MPer1_Flow_kuaishou_zs'] = month_df['MPer1_Flow_kuaishou_zs'].map(ProcessUtil.convert_flux_value) month_df['MPer1_Flow_kuaishou_zs'] = month_df['MPer1_Flow_kuaishou_zs'].astype('float') month_df['Div_kuaishou_vFlux_1_3'] = month_df['Div_kuaishou_vFlux_1_3'].map(ProcessUtil.convert_flux_value) month_df['Div_kuaishou_vFlux_1_3'] = month_df['Div_kuaishou_vFlux_1_3'].astype('float') month_df['EVENT_FLUX_V'] = month_df['EVENT_FLUX_V'].astype('float') month_df['EVENT_FLUX_C'] = month_df['EVENT_FLUX_C'].astype('int') month_df['EVENT_CONSUM_C'] = month_df['EVENT_CONSUM_C'].map(ProcessUtil.convert_consume) month_df['EVENT_CONSUM_V'] = month_df['EVENT_CONSUM_V'].map(ProcessUtil.convert_consume_value) month_df['EVENT_CONSUM_C'] = month_df['EVENT_CONSUM_C'].astype('int') month_df['EVENT_CONSUM_V'] = month_df['EVENT_CONSUM_V'].astype('float') # month_df['EVENT_ORDER_MONTH_C'] = month_df['EVENT_ORDER_MONTH_C'].map(ProcessUtil.convert_event_order_c) # month_df['EVENT_ORDER_MONTH_C'] = month_df['EVENT_ORDER_MONTH_C'].astype('int') month_df['EVENT_VIDEO_FLUX_C'] = month_df['EVENT_VIDEO_FLUX_C'].map(ProcessUtil.convert_video_flux) month_df['EVENT_VIDEO_FLUX_V'] = month_df['EVENT_VIDEO_FLUX_V'].map(ProcessUtil.convert_video_flux_value) month_df['EVENT_VIDEO_FLUX_V'] = month_df['EVENT_VIDEO_FLUX_V'].astype('float') month_df['EVENT_VIDEO_FLUX_C'] = month_df['EVENT_VIDEO_FLUX_C'].astype('int') month_df['TAG_GENDER_C'] = month_df['TAG_GENDER_C'].map(ProcessUtil.convert_gender) month_df['TAG_GENDER_C'] = month_df['TAG_GENDER_C'].astype('int') month_df['TAG_NETTYPE_C'] = month_df['TAG_NETTYPE_C'].map(ProcessUtil.convert_nettype) month_df['TAG_NETTYPE_C'] = month_df['TAG_NETTYPE_C'].astype('int') month_df['TAG_AGE_C'] = month_df['TAG_AGE_C'].map(ProcessUtil.convert_age) month_df['TAG_AGE_C'] = month_df['TAG_AGE_C'].astype('int') month_df['EVENT_IS_ACCT_C'] = month_df['EVENT_IS_ACCT_C'].map(ProcessUtil.convert_acct) month_df['EVENT_IS_ACCT_C'] = month_df['EVENT_IS_ACCT_C'].astype('int') ProcessUtil.convert_appuse(month_df) month_df.drop(columns=['EVENT_APP_USE.C'], inplace=True) month_df.drop(columns=['EVENT_USER_OSTATE_C'], inplace=True) logger.info(f"完成预处理{path}") return month_df def convert_onehot(df): logger.info("开始onehot编码") return ProcessUtil.convert_onehot(df) def merge_df_for_lgbm(dataf1, dataf2): df_general = dataf1[['uuid']] df_1 = dataf1.drop(columns=['uuid']) df_1 = df_1.add_prefix("first_") df_2 = dataf2.drop(columns=['uuid']) df_2 = df_2.add_prefix('second_') df_merge = pd.concat([df_general, df_1, df_2], axis=1) df_merge.drop(columns=[ 'second_TAG_GENDER_C_0', 'second_TAG_GENDER_C_1', 'second_TAG_GENDER_C_2', 'second_TAG_NETTYPE_C_0', 'second_TAG_NETTYPE_C_1', 'second_TAG_NETTYPE_C_2', 'second_TAG_NETTYPE_C_3', 'second_TAG_NETTYPE_C_4', 'second_TAG_AGE_C_0', 'second_TAG_AGE_C_1', 'second_TAG_AGE_C_2', 'second_TAG_AGE_C_3', 'second_TAG_AGE_C_4', 'second_TAG_AGE_C_5', 'second_TAG_INTIME_C_0', 'second_TAG_INTIME_C_1', 'second_TAG_INTIME_C_2', 'second_TAG_INTIME_C_3', 'second_TAG_INTIME_C_4', 'second_TAG_INTIME_C_5', 'second_TAG_PROVINCE_C_0', 'second_TAG_PROVINCE_C_1', 'second_TAG_PROVINCE_C_2', 'second_TAG_PROVINCE_C_3', 'second_TAG_PROVINCE_C_4', 'second_TAG_PROVINCE_C_5', 'second_TAG_PROVINCE_C_6', 'second_TAG_PROVINCE_C_7', 'second_TAG_PROVINCE_C_8', 'second_TAG_PROVINCE_C_9', 'second_TAG_PROVINCE_C_10', 'second_TAG_PROVINCE_C_11', 'second_TAG_PROVINCE_C_12', 'second_TAG_PROVINCE_C_13', 'second_TAG_PROVINCE_C_14', 'second_TAG_PROVINCE_C_15', 'second_TAG_PROVINCE_C_16', 'second_TAG_PROVINCE_C_17', 'second_TAG_PROVINCE_C_18', 'second_TAG_PROVINCE_C_19', 'second_TAG_PROVINCE_C_20', 'second_TAG_PROVINCE_C_21', 'second_TAG_PROVINCE_C_22', 'second_TAG_PROVINCE_C_23', 'second_TAG_PROVINCE_C_24', 'second_TAG_PROVINCE_C_25', 'second_TAG_PROVINCE_C_26', 'second_TAG_PROVINCE_C_27', 'second_TAG_PROVINCE_C_28', 'second_TAG_PROVINCE_C_29', 'second_TAG_PROVINCE_C_30', 'second_TAG_PROVINCE_C_31', ], inplace=True) return df_merge def gen_new_feature(df): df['diff_EVENT_FLUX_V'] = df['second_EVENT_FLUX_V'] - df['first_EVENT_FLUX_V'] df['diff_EVENT_CONSUM_V'] = df['second_EVENT_CONSUM_V'] - df['first_EVENT_CONSUM_V'] df['diff_EVENT_VIDEO_FLUX_V'] = df['second_EVENT_VIDEO_FLUX_V'] - df['first_EVENT_VIDEO_FLUX_V'] df['diff_kuaishou_use'] = df['second_app_use_kuaishou'] - df['first_app_use_kuaishou'] df.drop(columns=['second_app_use_kuaishou', 'first_app_use_kuaishou'], inplace=True) return df # 按每个UUID输出一个文件,并将该文件及其标签记录下来 def merge_df_for_rnn(df1, df2, index, sample_type): df1_uuids = df1['uuid'].values.tolist() with open("./data/csv_lstm_data/path_dict.txt", "a") as f: for uuid in tqdm(df1_uuids): df_temp1 = df1[(df1.uuid == uuid)].drop_duplicates() df_temp2 = df2[(df2.uuid == uuid)].drop_duplicates() df_merge = pd.concat([df_temp1, df_temp2], axis=0) if sample_type == "0": output_pkl_path = f"./data/csv_lstm_data/pos/{index}_{uuid}.pkl" else: output_pkl_path = f"./data/csv_lstm_data/neg/{index}_{uuid}.pkl" df_merge.to_pickle(output_pkl_path) f.write(output_pkl_path + ' ' + sample_type + "\n") continue def handle_data(index, path_dict, sample_type): logger.info(f'current_process is {index}') path1 = path_dict['path1'] path2 = path_dict['path2'] first_month_data_frame = pd.read_pickle(path1) second_month_data_frame = pd.read_pickle(path2) month1_df_processed = process_month_data_frame(first_month_data_frame, path1) dataf1 = convert_onehot(month1_df_processed) month2_df_processed = process_month_data_frame(second_month_data_frame, path2) dataf2 = convert_onehot(month2_df_processed) # # # 合成数据 merge_df_for_rnn(dataf1, dataf2, index, sample_type) logger.info(f"处理完第{index}_process的整体数据") if __name__ == '__main__': # pkl_path1 = [ # { # "path1": "./data/pkl/0_202106_train_pos.pkl", # "path2": "./data/pkl/0_202107_train_pos.pkl" # }, # { # "path1": "./data/pkl/1_202107_train_pos.pkl", # "path2": "./data/pkl/1_202108_train_pos.pkl" # }, # { # "path1": "./data/pkl/2_202108_train_pos.pkl", # "path2": "./data/pkl/2_202109_train_pos.pkl" # }, # # { # # "path1": "./data/pkl/3_202109_train_pos.pkl", # # "path2": "./data/pkl/3_202110_train_pos.pkl" # # }, # ] # # # for i, pkl_path in enumerate(tqdm(pkl_path1)): # p = Process(target=handle_data, args=(i, pkl_path, "0")) # p.start() # logger.info(f"{i}_process已开启") pkl_path2 = [ { "path1": "./data/pkl/0_202106_train_neg.pkl", "path2": "./data/pkl/0_202107_train_neg.pkl" }, { "path1": "./data/pkl/1_202107_train_neg.pkl", "path2": "./data/pkl/1_202108_train_neg.pkl" }, { "path1": "./data/pkl/2_202108_train_neg.pkl", "path2": "./data/pkl/2_202109_train_neg.pkl" }, # { # "path1": "./data/pkl/3_202109_train_neg.pkl", # "path2": "./data/pkl/3_202110_train_neg.pkl" # }, ] for i, pkl_path in enumerate(tqdm(pkl_path2)): index = i + 3 p = Process(target=handle_data, args=(index, pkl_path, "1")) p.start() logger.info(f"{i}_process已开启")