preprocess_data.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. import pandas as pd
  2. from common.database_utils import database_util
  3. from common.log_utils import logFactory
  4. from common import constant
  5. from common.process_util import ProcessUtil
  6. from tqdm import tqdm
  7. logger = logFactory("preprocess data").log
  8. def process_month_data_frame(month_df, path):
  9. logger.info(f"开始预处理{path}")
  10. month_df.drop(columns=constant.general_drop_columns, inplace=True)
  11. month_df['TAG_PROVINCE_C'] = month_df['TAG_PROVINCE_C'].map(ProcessUtil.convert_province)
  12. month_df['TAG_INTIME_C'] = month_df['TAG_INTIME_C'].map(ProcessUtil.convert_intime)
  13. month_df['TAG_INTIME_C'] = month_df['TAG_INTIME_C'].astype('int')
  14. month_df['EVENT_FLUX_C'] = month_df['EVENT_FLUX_C'].map(ProcessUtil.convert_flux)
  15. month_df['EVENT_FLUX_V'] = month_df['EVENT_FLUX_V'].map(ProcessUtil.convert_flux_value)
  16. month_df['MAvg_TOTAL_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_FLUX_1_3_zs'].map(ProcessUtil.convert_flux_value)
  17. month_df['MAvg_TOTAL_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_FLUX_1_3_zs'].astype('float')
  18. month_df['MPer1_TOTAL_FLUX_zs'] = month_df['MPer1_TOTAL_FLUX_zs'].map(ProcessUtil.convert_flux_value)
  19. month_df['MPer1_TOTAL_FLUX_zs'] = month_df['MPer1_TOTAL_FLUX_zs'].astype('float')
  20. month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'].map(
  21. ProcessUtil.convert_flux_value)
  22. month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'].astype('float')
  23. month_df['MPer1_TOTAL_VIDEO_FLUX_zs'] = month_df['MPer1_TOTAL_VIDEO_FLUX_zs'].map(ProcessUtil.convert_flux_value)
  24. month_df['MPer1_TOTAL_VIDEO_FLUX_zs'] = month_df['MPer1_TOTAL_VIDEO_FLUX_zs'].astype('float')
  25. month_df['MAvg_Flow_kuaishou_1_3_zs'] = month_df['MAvg_Flow_kuaishou_1_3_zs'].map(ProcessUtil.convert_flux_value)
  26. month_df['MAvg_Flow_kuaishou_1_3_zs'] = month_df['MAvg_Flow_kuaishou_1_3_zs'].astype('float')
  27. month_df['MPer1_Flow_kuaishou_zs'] = month_df['MPer1_Flow_kuaishou_zs'].map(ProcessUtil.convert_flux_value)
  28. month_df['MPer1_Flow_kuaishou_zs'] = month_df['MPer1_Flow_kuaishou_zs'].astype('float')
  29. month_df['Div_kuaishou_vFlux_1_3'] = month_df['Div_kuaishou_vFlux_1_3'].map(ProcessUtil.convert_flux_value)
  30. month_df['Div_kuaishou_vFlux_1_3'] = month_df['Div_kuaishou_vFlux_1_3'].astype('float')
  31. month_df['EVENT_FLUX_V'] = month_df['EVENT_FLUX_V'].astype('float')
  32. month_df['EVENT_FLUX_C'] = month_df['EVENT_FLUX_C'].astype('int')
  33. month_df['EVENT_CONSUM_C'] = month_df['EVENT_CONSUM_C'].map(ProcessUtil.convert_consume)
  34. month_df['EVENT_CONSUM_V'] = month_df['EVENT_CONSUM_V'].map(ProcessUtil.convert_consume_value)
  35. month_df['EVENT_CONSUM_C'] = month_df['EVENT_CONSUM_C'].astype('int')
  36. month_df['EVENT_CONSUM_V'] = month_df['EVENT_CONSUM_V'].astype('float')
  37. # month_df['EVENT_ORDER_MONTH_C'] = month_df['EVENT_ORDER_MONTH_C'].map(ProcessUtil.convert_event_order_c)
  38. # month_df['EVENT_ORDER_MONTH_C'] = month_df['EVENT_ORDER_MONTH_C'].astype('int')
  39. month_df['EVENT_VIDEO_FLUX_C'] = month_df['EVENT_VIDEO_FLUX_C'].map(ProcessUtil.convert_video_flux)
  40. month_df['EVENT_VIDEO_FLUX_V'] = month_df['EVENT_VIDEO_FLUX_V'].map(ProcessUtil.convert_video_flux_value)
  41. month_df['EVENT_VIDEO_FLUX_V'] = month_df['EVENT_VIDEO_FLUX_V'].astype('float')
  42. month_df['EVENT_VIDEO_FLUX_C'] = month_df['EVENT_VIDEO_FLUX_C'].astype('int')
  43. month_df['TAG_GENDER_C'] = month_df['TAG_GENDER_C'].map(ProcessUtil.convert_gender)
  44. month_df['TAG_GENDER_C'] = month_df['TAG_GENDER_C'].astype('int')
  45. month_df['TAG_NETTYPE_C'] = month_df['TAG_NETTYPE_C'].map(ProcessUtil.convert_nettype)
  46. month_df['TAG_NETTYPE_C'] = month_df['TAG_NETTYPE_C'].astype('int')
  47. month_df['TAG_AGE_C'] = month_df['TAG_AGE_C'].map(ProcessUtil.convert_age)
  48. month_df['TAG_AGE_C'] = month_df['TAG_AGE_C'].astype('int')
  49. month_df['EVENT_IS_ACCT_C'] = month_df['EVENT_IS_ACCT_C'].map(ProcessUtil.convert_acct)
  50. month_df['EVENT_IS_ACCT_C'] = month_df['EVENT_IS_ACCT_C'].astype('int')
  51. ProcessUtil.convert_appuse(month_df)
  52. month_df.drop(columns=['EVENT_APP_USE.C'], inplace=True)
  53. month_df.drop(columns=['EVENT_USER_OSTATE_C'], inplace=True)
  54. logger.info(f"完成预处理{path}")
  55. return month_df
  56. def convert_onehot(df):
  57. logger.info("开始onehot编码")
  58. return ProcessUtil.convert_onehot(df)
  59. def merge_df_for_lgbm(dataf1, dataf2):
  60. df_general = dataf1[['uuid']]
  61. df_1 = dataf1.drop(columns=['uuid'])
  62. df_1 = df_1.add_prefix("first_")
  63. df_2 = dataf2.drop(columns=['uuid'])
  64. df_2 = df_2.add_prefix('second_')
  65. df_merge = pd.concat([df_general, df_1, df_2], axis=1)
  66. df_merge.drop(columns=[
  67. 'second_TAG_GENDER_C_0',
  68. 'second_TAG_GENDER_C_1',
  69. 'second_TAG_GENDER_C_2',
  70. 'second_TAG_NETTYPE_C_0',
  71. 'second_TAG_NETTYPE_C_1',
  72. 'second_TAG_NETTYPE_C_2',
  73. 'second_TAG_NETTYPE_C_3',
  74. 'second_TAG_NETTYPE_C_4',
  75. 'second_TAG_AGE_C_0',
  76. 'second_TAG_AGE_C_1',
  77. 'second_TAG_AGE_C_2',
  78. 'second_TAG_AGE_C_3',
  79. 'second_TAG_AGE_C_4',
  80. 'second_TAG_AGE_C_5',
  81. 'second_TAG_INTIME_C_0',
  82. 'second_TAG_INTIME_C_1',
  83. 'second_TAG_INTIME_C_2',
  84. 'second_TAG_INTIME_C_3',
  85. 'second_TAG_INTIME_C_4',
  86. 'second_TAG_INTIME_C_5',
  87. 'second_TAG_PROVINCE_C_0',
  88. 'second_TAG_PROVINCE_C_1',
  89. 'second_TAG_PROVINCE_C_2',
  90. 'second_TAG_PROVINCE_C_3',
  91. 'second_TAG_PROVINCE_C_4',
  92. 'second_TAG_PROVINCE_C_5',
  93. 'second_TAG_PROVINCE_C_6',
  94. 'second_TAG_PROVINCE_C_7',
  95. 'second_TAG_PROVINCE_C_8',
  96. 'second_TAG_PROVINCE_C_9',
  97. 'second_TAG_PROVINCE_C_10',
  98. 'second_TAG_PROVINCE_C_11',
  99. 'second_TAG_PROVINCE_C_12',
  100. 'second_TAG_PROVINCE_C_13',
  101. 'second_TAG_PROVINCE_C_14',
  102. 'second_TAG_PROVINCE_C_15',
  103. 'second_TAG_PROVINCE_C_16',
  104. 'second_TAG_PROVINCE_C_17',
  105. 'second_TAG_PROVINCE_C_18',
  106. 'second_TAG_PROVINCE_C_19',
  107. 'second_TAG_PROVINCE_C_20',
  108. 'second_TAG_PROVINCE_C_21',
  109. 'second_TAG_PROVINCE_C_22',
  110. 'second_TAG_PROVINCE_C_23',
  111. 'second_TAG_PROVINCE_C_24',
  112. 'second_TAG_PROVINCE_C_25',
  113. 'second_TAG_PROVINCE_C_26',
  114. 'second_TAG_PROVINCE_C_27',
  115. 'second_TAG_PROVINCE_C_28',
  116. 'second_TAG_PROVINCE_C_29',
  117. 'second_TAG_PROVINCE_C_30',
  118. 'second_TAG_PROVINCE_C_31',
  119. ], inplace=True)
  120. return df_merge
  121. def gen_new_feature(df):
  122. df['diff_EVENT_FLUX_V'] = df['second_EVENT_FLUX_V'] - df['first_EVENT_FLUX_V']
  123. df['diff_EVENT_CONSUM_V'] = df['second_EVENT_CONSUM_V'] - df['first_EVENT_CONSUM_V']
  124. df['diff_EVENT_VIDEO_FLUX_V'] = df['second_EVENT_VIDEO_FLUX_V'] - df['first_EVENT_VIDEO_FLUX_V']
  125. df['diff_kuaishou_use'] = df['second_app_use_kuaishou'] - df['first_app_use_kuaishou']
  126. df.drop(columns=['second_app_use_kuaishou', 'first_app_use_kuaishou'], inplace=True)
  127. return df
  128. # 按每个UUID输出一个文件,并将该文件及其标签记录下来
  129. def merge_df_for_rnn(df1, df2, index, sample_type):
  130. df1_uuids = df1['uuid'].values.tolist()
  131. with open("./data/csv_lstm_data/path_dict.txt", "a") as f:
  132. for uuid in tqdm(df1_uuids):
  133. df_temp1 = df1[(df1.uuid == uuid)].drop_duplicates()
  134. df_temp2 = df2[(df2.uuid == uuid)].drop_duplicates()
  135. df_merge = pd.concat([df_temp1, df_temp2], axis=0)
  136. output_pkl_path = f"./data/csv_lstm_data/{index}_{uuid}.pkl"
  137. df_merge.to_pickle(output_pkl_path)
  138. f.write(output_pkl_path + ' ' + sample_type)
  139. continue
  140. def handle_data(pkl_path, output_name, isRNN, sample_type):
  141. sample_df = []
  142. for i, path_dict in enumerate(tqdm(pkl_path)):
  143. path1 = path_dict['path1']
  144. path2 = path_dict['path2']
  145. first_month_data_frame = pd.read_pickle(path1)
  146. second_month_data_frame = pd.read_pickle(path2)
  147. month1_df_processed = process_month_data_frame(first_month_data_frame, path1)
  148. dataf1 = convert_onehot(month1_df_processed)
  149. month2_df_processed = process_month_data_frame(second_month_data_frame, path2)
  150. dataf2 = convert_onehot(month2_df_processed)
  151. #
  152. # # 合成数据
  153. if not isRNN:
  154. new_merge_df = merge_df_for_lgbm(dataf1, dataf2)
  155. else:
  156. merge_df_for_rnn(dataf1, dataf2, i, sample_type)
  157. logger.info(f"处理完第{i}轮整体数据")
  158. continue
  159. # 新特征生成
  160. if not isRNN:
  161. new_merge_df = gen_new_feature(new_merge_df)
  162. sample_df.append(new_merge_df)
  163. if not isRNN:
  164. logger.info("开始合并训练数据")
  165. total_train = pd.concat(sample_df, axis=0)
  166. total_train.to_pickle(output_name)
  167. if __name__ == '__main__':
  168. pkl_path = [
  169. {
  170. "path1": "./data/pkl/0_202106_train_pos.pkl",
  171. "path2": "./data/pkl/0_202107_train_pos.pkl"
  172. },
  173. {
  174. "path1": "./data/pkl/1_202107_train_pos.pkl",
  175. "path2": "./data/pkl/1_202108_train_pos.pkl"
  176. },
  177. {
  178. "path1": "./data/pkl/2_202108_train_pos.pkl",
  179. "path2": "./data/pkl/2_202109_train_pos.pkl"
  180. },
  181. # {
  182. # "path1": "./data/pkl/3_202109_train_pos.pkl",
  183. # "path2": "./data/pkl/3_202110_train_pos.pkl"
  184. # },
  185. ]
  186. handle_data(pkl_path=pkl_path, output_name="./data/pkl/train_pos_rnn_all.pkl", isRNN=True,sample_type="0")
  187. # pkl_path = [
  188. # {
  189. # "path1": "./data/pkl/0_202106_train_neg.pkl",
  190. # "path2": "./data/pkl/0_202107_train_neg.pkl"
  191. # },
  192. # {
  193. # "path1": "./data/pkl/1_202107_train_neg.pkl",
  194. # "path2": "./data/pkl/1_202108_train_neg.pkl"
  195. # },
  196. # {
  197. # "path1": "./data/pkl/2_202108_train_neg.pkl",
  198. # "path2": "./data/pkl/2_202109_train_neg.pkl"
  199. # },
  200. # # {
  201. # # "path1": "./data/pkl/3_202109_train_neg.pkl",
  202. # # "path2": "./data/pkl/3_202110_train_neg.pkl"
  203. # # },
  204. # ]
  205. #
  206. # handle_data(pkl_path=pkl_path, output_name="./data/pkl/train_neg_rnn_all.pkl", isRNN=True)