123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- import pandas as pd
- from common.database_utils import database_util
- from common.log_utils import logFactory
- from common import constant
- from common.process_util import ProcessUtil
- from tqdm import tqdm
- logger = logFactory("preprocess data").log
- def process_month_data_frame(month_df, path):
- logger.info(f"开始预处理{path}")
- month_df.drop(columns=constant.general_drop_columns, inplace=True)
- month_df['TAG_PROVINCE_C'] = month_df['TAG_PROVINCE_C'].map(ProcessUtil.convert_province)
- month_df['TAG_INTIME_C'] = month_df['TAG_INTIME_C'].map(ProcessUtil.convert_intime)
- month_df['TAG_INTIME_C'] = month_df['TAG_INTIME_C'].astype('int')
- month_df['EVENT_FLUX_C'] = month_df['EVENT_FLUX_C'].map(ProcessUtil.convert_flux)
- month_df['EVENT_FLUX_V'] = month_df['EVENT_FLUX_V'].map(ProcessUtil.convert_flux_value)
- month_df['MAvg_TOTAL_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_FLUX_1_3_zs'].map(ProcessUtil.convert_flux_value)
- month_df['MAvg_TOTAL_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_FLUX_1_3_zs'].astype('float')
- month_df['MPer1_TOTAL_FLUX_zs'] = month_df['MPer1_TOTAL_FLUX_zs'].map(ProcessUtil.convert_flux_value)
- month_df['MPer1_TOTAL_FLUX_zs'] = month_df['MPer1_TOTAL_FLUX_zs'].astype('float')
- month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'].map(
- ProcessUtil.convert_flux_value)
- month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'].astype('float')
- month_df['MPer1_TOTAL_VIDEO_FLUX_zs'] = month_df['MPer1_TOTAL_VIDEO_FLUX_zs'].map(ProcessUtil.convert_flux_value)
- month_df['MPer1_TOTAL_VIDEO_FLUX_zs'] = month_df['MPer1_TOTAL_VIDEO_FLUX_zs'].astype('float')
- month_df['MAvg_Flow_kuaishou_1_3_zs'] = month_df['MAvg_Flow_kuaishou_1_3_zs'].map(ProcessUtil.convert_flux_value)
- month_df['MAvg_Flow_kuaishou_1_3_zs'] = month_df['MAvg_Flow_kuaishou_1_3_zs'].astype('float')
- month_df['MPer1_Flow_kuaishou_zs'] = month_df['MPer1_Flow_kuaishou_zs'].map(ProcessUtil.convert_flux_value)
- month_df['MPer1_Flow_kuaishou_zs'] = month_df['MPer1_Flow_kuaishou_zs'].astype('float')
- month_df['Div_kuaishou_vFlux_1_3'] = month_df['Div_kuaishou_vFlux_1_3'].map(ProcessUtil.convert_flux_value)
- month_df['Div_kuaishou_vFlux_1_3'] = month_df['Div_kuaishou_vFlux_1_3'].astype('float')
- month_df['EVENT_FLUX_V'] = month_df['EVENT_FLUX_V'].astype('float')
- month_df['EVENT_FLUX_C'] = month_df['EVENT_FLUX_C'].astype('int')
- month_df['EVENT_CONSUM_C'] = month_df['EVENT_CONSUM_C'].map(ProcessUtil.convert_consume)
- month_df['EVENT_CONSUM_V'] = month_df['EVENT_CONSUM_V'].map(ProcessUtil.convert_consume_value)
- month_df['EVENT_CONSUM_C'] = month_df['EVENT_CONSUM_C'].astype('int')
- month_df['EVENT_CONSUM_V'] = month_df['EVENT_CONSUM_V'].astype('float')
- # month_df['EVENT_ORDER_MONTH_C'] = month_df['EVENT_ORDER_MONTH_C'].map(ProcessUtil.convert_event_order_c)
- # month_df['EVENT_ORDER_MONTH_C'] = month_df['EVENT_ORDER_MONTH_C'].astype('int')
- month_df['EVENT_VIDEO_FLUX_C'] = month_df['EVENT_VIDEO_FLUX_C'].map(ProcessUtil.convert_video_flux)
- month_df['EVENT_VIDEO_FLUX_V'] = month_df['EVENT_VIDEO_FLUX_V'].map(ProcessUtil.convert_video_flux_value)
- month_df['EVENT_VIDEO_FLUX_V'] = month_df['EVENT_VIDEO_FLUX_V'].astype('float')
- month_df['EVENT_VIDEO_FLUX_C'] = month_df['EVENT_VIDEO_FLUX_C'].astype('int')
- month_df['TAG_GENDER_C'] = month_df['TAG_GENDER_C'].map(ProcessUtil.convert_gender)
- month_df['TAG_GENDER_C'] = month_df['TAG_GENDER_C'].astype('int')
- month_df['TAG_NETTYPE_C'] = month_df['TAG_NETTYPE_C'].map(ProcessUtil.convert_nettype)
- month_df['TAG_NETTYPE_C'] = month_df['TAG_NETTYPE_C'].astype('int')
- month_df['TAG_AGE_C'] = month_df['TAG_AGE_C'].map(ProcessUtil.convert_age)
- month_df['TAG_AGE_C'] = month_df['TAG_AGE_C'].astype('int')
- month_df['EVENT_IS_ACCT_C'] = month_df['EVENT_IS_ACCT_C'].map(ProcessUtil.convert_acct)
- month_df['EVENT_IS_ACCT_C'] = month_df['EVENT_IS_ACCT_C'].astype('int')
- ProcessUtil.convert_appuse(month_df)
- month_df.drop(columns=['EVENT_APP_USE.C'], inplace=True)
- month_df.drop(columns=['EVENT_USER_OSTATE_C'], inplace=True)
- logger.info(f"完成预处理{path}")
- return month_df
- def convert_onehot(df):
- logger.info("开始onehot编码")
- return ProcessUtil.convert_onehot(df)
- def merge_df_for_lgbm(dataf1, dataf2):
- df_general = dataf1[['uuid']]
- df_1 = dataf1.drop(columns=['uuid'])
- df_1 = df_1.add_prefix("first_")
- df_2 = dataf2.drop(columns=['uuid'])
- df_2 = df_2.add_prefix('second_')
- df_merge = pd.concat([df_general, df_1, df_2], axis=1)
- df_merge.drop(columns=[
- 'second_TAG_GENDER_C_0',
- 'second_TAG_GENDER_C_1',
- 'second_TAG_GENDER_C_2',
- 'second_TAG_NETTYPE_C_0',
- 'second_TAG_NETTYPE_C_1',
- 'second_TAG_NETTYPE_C_2',
- 'second_TAG_NETTYPE_C_3',
- 'second_TAG_NETTYPE_C_4',
- 'second_TAG_AGE_C_0',
- 'second_TAG_AGE_C_1',
- 'second_TAG_AGE_C_2',
- 'second_TAG_AGE_C_3',
- 'second_TAG_AGE_C_4',
- 'second_TAG_AGE_C_5',
- 'second_TAG_INTIME_C_0',
- 'second_TAG_INTIME_C_1',
- 'second_TAG_INTIME_C_2',
- 'second_TAG_INTIME_C_3',
- 'second_TAG_INTIME_C_4',
- 'second_TAG_INTIME_C_5',
- 'second_TAG_PROVINCE_C_0',
- 'second_TAG_PROVINCE_C_1',
- 'second_TAG_PROVINCE_C_2',
- 'second_TAG_PROVINCE_C_3',
- 'second_TAG_PROVINCE_C_4',
- 'second_TAG_PROVINCE_C_5',
- 'second_TAG_PROVINCE_C_6',
- 'second_TAG_PROVINCE_C_7',
- 'second_TAG_PROVINCE_C_8',
- 'second_TAG_PROVINCE_C_9',
- 'second_TAG_PROVINCE_C_10',
- 'second_TAG_PROVINCE_C_11',
- 'second_TAG_PROVINCE_C_12',
- 'second_TAG_PROVINCE_C_13',
- 'second_TAG_PROVINCE_C_14',
- 'second_TAG_PROVINCE_C_15',
- 'second_TAG_PROVINCE_C_16',
- 'second_TAG_PROVINCE_C_17',
- 'second_TAG_PROVINCE_C_18',
- 'second_TAG_PROVINCE_C_19',
- 'second_TAG_PROVINCE_C_20',
- 'second_TAG_PROVINCE_C_21',
- 'second_TAG_PROVINCE_C_22',
- 'second_TAG_PROVINCE_C_23',
- 'second_TAG_PROVINCE_C_24',
- 'second_TAG_PROVINCE_C_25',
- 'second_TAG_PROVINCE_C_26',
- 'second_TAG_PROVINCE_C_27',
- 'second_TAG_PROVINCE_C_28',
- 'second_TAG_PROVINCE_C_29',
- 'second_TAG_PROVINCE_C_30',
- 'second_TAG_PROVINCE_C_31',
- ], inplace=True)
- return df_merge
- def gen_new_feature(df):
- df['diff_EVENT_FLUX_V'] = df['second_EVENT_FLUX_V'] - df['first_EVENT_FLUX_V']
- df['diff_EVENT_CONSUM_V'] = df['second_EVENT_CONSUM_V'] - df['first_EVENT_CONSUM_V']
- df['diff_EVENT_VIDEO_FLUX_V'] = df['second_EVENT_VIDEO_FLUX_V'] - df['first_EVENT_VIDEO_FLUX_V']
- df['diff_kuaishou_use'] = df['second_app_use_kuaishou'] - df['first_app_use_kuaishou']
- df.drop(columns=['second_app_use_kuaishou', 'first_app_use_kuaishou'], inplace=True)
- return df
- # 按每个UUID输出一个文件,并将该文件及其标签记录下来
- def merge_df_for_rnn(df1, df2, index, sample_type):
- df1_uuids = df1['uuid'].values.tolist()
- with open("./data/csv_lstm_data/path_dict.txt", "a") as f:
- for uuid in tqdm(df1_uuids):
- df_temp1 = df1[(df1.uuid == uuid)].drop_duplicates()
- df_temp2 = df2[(df2.uuid == uuid)].drop_duplicates()
- df_merge = pd.concat([df_temp1, df_temp2], axis=0)
- output_pkl_path = f"./data/csv_lstm_data/{index}_{uuid}.pkl"
- df_merge.to_pickle(output_pkl_path)
- f.write(output_pkl_path + ' ' + sample_type)
- continue
- def handle_data(pkl_path, output_name, isRNN, sample_type):
- sample_df = []
- for i, path_dict in enumerate(tqdm(pkl_path)):
- path1 = path_dict['path1']
- path2 = path_dict['path2']
- first_month_data_frame = pd.read_pickle(path1)
- second_month_data_frame = pd.read_pickle(path2)
- month1_df_processed = process_month_data_frame(first_month_data_frame, path1)
- dataf1 = convert_onehot(month1_df_processed)
- month2_df_processed = process_month_data_frame(second_month_data_frame, path2)
- dataf2 = convert_onehot(month2_df_processed)
- #
- # # 合成数据
- if not isRNN:
- new_merge_df = merge_df_for_lgbm(dataf1, dataf2)
- else:
- merge_df_for_rnn(dataf1, dataf2, i, sample_type)
- logger.info(f"处理完第{i}轮整体数据")
- continue
- # 新特征生成
- if not isRNN:
- new_merge_df = gen_new_feature(new_merge_df)
- sample_df.append(new_merge_df)
- if not isRNN:
- logger.info("开始合并训练数据")
- total_train = pd.concat(sample_df, axis=0)
- total_train.to_pickle(output_name)
- if __name__ == '__main__':
- pkl_path = [
- {
- "path1": "./data/pkl/0_202106_train_pos.pkl",
- "path2": "./data/pkl/0_202107_train_pos.pkl"
- },
- {
- "path1": "./data/pkl/1_202107_train_pos.pkl",
- "path2": "./data/pkl/1_202108_train_pos.pkl"
- },
- {
- "path1": "./data/pkl/2_202108_train_pos.pkl",
- "path2": "./data/pkl/2_202109_train_pos.pkl"
- },
- # {
- # "path1": "./data/pkl/3_202109_train_pos.pkl",
- # "path2": "./data/pkl/3_202110_train_pos.pkl"
- # },
- ]
- handle_data(pkl_path=pkl_path, output_name="./data/pkl/train_pos_rnn_all.pkl", isRNN=True,sample_type="0")
- # pkl_path = [
- # {
- # "path1": "./data/pkl/0_202106_train_neg.pkl",
- # "path2": "./data/pkl/0_202107_train_neg.pkl"
- # },
- # {
- # "path1": "./data/pkl/1_202107_train_neg.pkl",
- # "path2": "./data/pkl/1_202108_train_neg.pkl"
- # },
- # {
- # "path1": "./data/pkl/2_202108_train_neg.pkl",
- # "path2": "./data/pkl/2_202109_train_neg.pkl"
- # },
- # # {
- # # "path1": "./data/pkl/3_202109_train_neg.pkl",
- # # "path2": "./data/pkl/3_202110_train_neg.pkl"
- # # },
- # ]
- #
- # handle_data(pkl_path=pkl_path, output_name="./data/pkl/train_neg_rnn_all.pkl", isRNN=True)
|