preprocess_data_lstm.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import pandas as pd
  2. from common.database_utils import database_util
  3. from common.log_utils import logFactory
  4. from common import constant
  5. from common.process_util import ProcessUtil
  6. from tqdm import tqdm
  7. import multiprocessing
  8. from multiprocessing import Process
  9. logger = logFactory("preprocess data").log
  10. def process_month_data_frame(month_df, path):
  11. logger.info(f"开始预处理{path}")
  12. month_df.drop(columns=constant.general_drop_columns, inplace=True)
  13. month_df['TAG_PROVINCE_C'] = month_df['TAG_PROVINCE_C'].map(ProcessUtil.convert_province)
  14. month_df['TAG_INTIME_C'] = month_df['TAG_INTIME_C'].map(ProcessUtil.convert_intime)
  15. month_df['TAG_INTIME_C'] = month_df['TAG_INTIME_C'].astype('int')
  16. month_df['EVENT_FLUX_C'] = month_df['EVENT_FLUX_C'].map(ProcessUtil.convert_flux)
  17. month_df['EVENT_FLUX_V'] = month_df['EVENT_FLUX_V'].map(ProcessUtil.convert_flux_value)
  18. month_df['MAvg_TOTAL_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_FLUX_1_3_zs'].map(ProcessUtil.convert_flux_value)
  19. month_df['MAvg_TOTAL_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_FLUX_1_3_zs'].astype('float')
  20. month_df['MPer1_TOTAL_FLUX_zs'] = month_df['MPer1_TOTAL_FLUX_zs'].map(ProcessUtil.convert_flux_value)
  21. month_df['MPer1_TOTAL_FLUX_zs'] = month_df['MPer1_TOTAL_FLUX_zs'].astype('float')
  22. month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'].map(
  23. ProcessUtil.convert_flux_value)
  24. month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'] = month_df['MAvg_TOTAL_VIDEO_FLUX_1_3_zs'].astype('float')
  25. month_df['MPer1_TOTAL_VIDEO_FLUX_zs'] = month_df['MPer1_TOTAL_VIDEO_FLUX_zs'].map(ProcessUtil.convert_flux_value)
  26. month_df['MPer1_TOTAL_VIDEO_FLUX_zs'] = month_df['MPer1_TOTAL_VIDEO_FLUX_zs'].astype('float')
  27. month_df['MAvg_Flow_kuaishou_1_3_zs'] = month_df['MAvg_Flow_kuaishou_1_3_zs'].map(ProcessUtil.convert_flux_value)
  28. month_df['MAvg_Flow_kuaishou_1_3_zs'] = month_df['MAvg_Flow_kuaishou_1_3_zs'].astype('float')
  29. month_df['MPer1_Flow_kuaishou_zs'] = month_df['MPer1_Flow_kuaishou_zs'].map(ProcessUtil.convert_flux_value)
  30. month_df['MPer1_Flow_kuaishou_zs'] = month_df['MPer1_Flow_kuaishou_zs'].astype('float')
  31. month_df['Div_kuaishou_vFlux_1_3'] = month_df['Div_kuaishou_vFlux_1_3'].map(ProcessUtil.convert_flux_value)
  32. month_df['Div_kuaishou_vFlux_1_3'] = month_df['Div_kuaishou_vFlux_1_3'].astype('float')
  33. month_df['EVENT_FLUX_V'] = month_df['EVENT_FLUX_V'].astype('float')
  34. month_df['EVENT_FLUX_C'] = month_df['EVENT_FLUX_C'].astype('int')
  35. month_df['EVENT_CONSUM_C'] = month_df['EVENT_CONSUM_C'].map(ProcessUtil.convert_consume)
  36. month_df['EVENT_CONSUM_V'] = month_df['EVENT_CONSUM_V'].map(ProcessUtil.convert_consume_value)
  37. month_df['EVENT_CONSUM_C'] = month_df['EVENT_CONSUM_C'].astype('int')
  38. month_df['EVENT_CONSUM_V'] = month_df['EVENT_CONSUM_V'].astype('float')
  39. # month_df['EVENT_ORDER_MONTH_C'] = month_df['EVENT_ORDER_MONTH_C'].map(ProcessUtil.convert_event_order_c)
  40. # month_df['EVENT_ORDER_MONTH_C'] = month_df['EVENT_ORDER_MONTH_C'].astype('int')
  41. month_df['EVENT_VIDEO_FLUX_C'] = month_df['EVENT_VIDEO_FLUX_C'].map(ProcessUtil.convert_video_flux)
  42. month_df['EVENT_VIDEO_FLUX_V'] = month_df['EVENT_VIDEO_FLUX_V'].map(ProcessUtil.convert_video_flux_value)
  43. month_df['EVENT_VIDEO_FLUX_V'] = month_df['EVENT_VIDEO_FLUX_V'].astype('float')
  44. month_df['EVENT_VIDEO_FLUX_C'] = month_df['EVENT_VIDEO_FLUX_C'].astype('int')
  45. month_df['TAG_GENDER_C'] = month_df['TAG_GENDER_C'].map(ProcessUtil.convert_gender)
  46. month_df['TAG_GENDER_C'] = month_df['TAG_GENDER_C'].astype('int')
  47. month_df['TAG_NETTYPE_C'] = month_df['TAG_NETTYPE_C'].map(ProcessUtil.convert_nettype)
  48. month_df['TAG_NETTYPE_C'] = month_df['TAG_NETTYPE_C'].astype('int')
  49. month_df['TAG_AGE_C'] = month_df['TAG_AGE_C'].map(ProcessUtil.convert_age)
  50. month_df['TAG_AGE_C'] = month_df['TAG_AGE_C'].astype('int')
  51. month_df['EVENT_IS_ACCT_C'] = month_df['EVENT_IS_ACCT_C'].map(ProcessUtil.convert_acct)
  52. month_df['EVENT_IS_ACCT_C'] = month_df['EVENT_IS_ACCT_C'].astype('int')
  53. ProcessUtil.convert_appuse(month_df)
  54. month_df.drop(columns=['EVENT_APP_USE.C'], inplace=True)
  55. month_df.drop(columns=['EVENT_USER_OSTATE_C'], inplace=True)
  56. logger.info(f"完成预处理{path}")
  57. return month_df
  58. def convert_onehot(df):
  59. logger.info("开始onehot编码")
  60. return ProcessUtil.convert_onehot(df)
  61. def merge_df_for_lgbm(dataf1, dataf2):
  62. df_general = dataf1[['uuid']]
  63. df_1 = dataf1.drop(columns=['uuid'])
  64. df_1 = df_1.add_prefix("first_")
  65. df_2 = dataf2.drop(columns=['uuid'])
  66. df_2 = df_2.add_prefix('second_')
  67. df_merge = pd.concat([df_general, df_1, df_2], axis=1)
  68. df_merge.drop(columns=[
  69. 'second_TAG_GENDER_C_0',
  70. 'second_TAG_GENDER_C_1',
  71. 'second_TAG_GENDER_C_2',
  72. 'second_TAG_NETTYPE_C_0',
  73. 'second_TAG_NETTYPE_C_1',
  74. 'second_TAG_NETTYPE_C_2',
  75. 'second_TAG_NETTYPE_C_3',
  76. 'second_TAG_NETTYPE_C_4',
  77. 'second_TAG_AGE_C_0',
  78. 'second_TAG_AGE_C_1',
  79. 'second_TAG_AGE_C_2',
  80. 'second_TAG_AGE_C_3',
  81. 'second_TAG_AGE_C_4',
  82. 'second_TAG_AGE_C_5',
  83. 'second_TAG_INTIME_C_0',
  84. 'second_TAG_INTIME_C_1',
  85. 'second_TAG_INTIME_C_2',
  86. 'second_TAG_INTIME_C_3',
  87. 'second_TAG_INTIME_C_4',
  88. 'second_TAG_INTIME_C_5',
  89. 'second_TAG_PROVINCE_C_0',
  90. 'second_TAG_PROVINCE_C_1',
  91. 'second_TAG_PROVINCE_C_2',
  92. 'second_TAG_PROVINCE_C_3',
  93. 'second_TAG_PROVINCE_C_4',
  94. 'second_TAG_PROVINCE_C_5',
  95. 'second_TAG_PROVINCE_C_6',
  96. 'second_TAG_PROVINCE_C_7',
  97. 'second_TAG_PROVINCE_C_8',
  98. 'second_TAG_PROVINCE_C_9',
  99. 'second_TAG_PROVINCE_C_10',
  100. 'second_TAG_PROVINCE_C_11',
  101. 'second_TAG_PROVINCE_C_12',
  102. 'second_TAG_PROVINCE_C_13',
  103. 'second_TAG_PROVINCE_C_14',
  104. 'second_TAG_PROVINCE_C_15',
  105. 'second_TAG_PROVINCE_C_16',
  106. 'second_TAG_PROVINCE_C_17',
  107. 'second_TAG_PROVINCE_C_18',
  108. 'second_TAG_PROVINCE_C_19',
  109. 'second_TAG_PROVINCE_C_20',
  110. 'second_TAG_PROVINCE_C_21',
  111. 'second_TAG_PROVINCE_C_22',
  112. 'second_TAG_PROVINCE_C_23',
  113. 'second_TAG_PROVINCE_C_24',
  114. 'second_TAG_PROVINCE_C_25',
  115. 'second_TAG_PROVINCE_C_26',
  116. 'second_TAG_PROVINCE_C_27',
  117. 'second_TAG_PROVINCE_C_28',
  118. 'second_TAG_PROVINCE_C_29',
  119. 'second_TAG_PROVINCE_C_30',
  120. 'second_TAG_PROVINCE_C_31',
  121. ], inplace=True)
  122. return df_merge
  123. def gen_new_feature(df):
  124. df['diff_EVENT_FLUX_V'] = df['second_EVENT_FLUX_V'] - df['first_EVENT_FLUX_V']
  125. df['diff_EVENT_CONSUM_V'] = df['second_EVENT_CONSUM_V'] - df['first_EVENT_CONSUM_V']
  126. df['diff_EVENT_VIDEO_FLUX_V'] = df['second_EVENT_VIDEO_FLUX_V'] - df['first_EVENT_VIDEO_FLUX_V']
  127. df['diff_kuaishou_use'] = df['second_app_use_kuaishou'] - df['first_app_use_kuaishou']
  128. df.drop(columns=['second_app_use_kuaishou', 'first_app_use_kuaishou'], inplace=True)
  129. return df
  130. # 按每个UUID输出一个文件,并将该文件及其标签记录下来
  131. def merge_df_for_rnn(df1, df2, index, sample_type):
  132. df1_uuids = df1['uuid'].values.tolist()
  133. with open("./data/csv_lstm_data/path_dict.txt", "a") as f:
  134. for uuid in tqdm(df1_uuids):
  135. df_temp1 = df1[(df1.uuid == uuid)].drop_duplicates()
  136. df_temp2 = df2[(df2.uuid == uuid)].drop_duplicates()
  137. df_merge = pd.concat([df_temp1, df_temp2], axis=0)
  138. if sample_type == "0":
  139. output_pkl_path = f"./data/csv_lstm_data/pos/{index}_{uuid}.pkl"
  140. else:
  141. output_pkl_path = f"./data/csv_lstm_data/neg/{index}_{uuid}.pkl"
  142. df_merge.to_pickle(output_pkl_path)
  143. f.write(output_pkl_path + ' ' + sample_type + "\n")
  144. continue
  145. def handle_data(index, path_dict, sample_type):
  146. logger.info(f'current_process is {index}')
  147. path1 = path_dict['path1']
  148. path2 = path_dict['path2']
  149. first_month_data_frame = pd.read_pickle(path1)
  150. second_month_data_frame = pd.read_pickle(path2)
  151. month1_df_processed = process_month_data_frame(first_month_data_frame, path1)
  152. dataf1 = convert_onehot(month1_df_processed)
  153. month2_df_processed = process_month_data_frame(second_month_data_frame, path2)
  154. dataf2 = convert_onehot(month2_df_processed)
  155. #
  156. # # 合成数据
  157. merge_df_for_rnn(dataf1, dataf2, index, sample_type)
  158. logger.info(f"处理完第{index}_process的整体数据")
  159. if __name__ == '__main__':
  160. # pkl_path1 = [
  161. # {
  162. # "path1": "./data/pkl/0_202106_train_pos.pkl",
  163. # "path2": "./data/pkl/0_202107_train_pos.pkl"
  164. # },
  165. # {
  166. # "path1": "./data/pkl/1_202107_train_pos.pkl",
  167. # "path2": "./data/pkl/1_202108_train_pos.pkl"
  168. # },
  169. # {
  170. # "path1": "./data/pkl/2_202108_train_pos.pkl",
  171. # "path2": "./data/pkl/2_202109_train_pos.pkl"
  172. # },
  173. # # {
  174. # # "path1": "./data/pkl/3_202109_train_pos.pkl",
  175. # # "path2": "./data/pkl/3_202110_train_pos.pkl"
  176. # # },
  177. # ]
  178. #
  179. #
  180. # for i, pkl_path in enumerate(tqdm(pkl_path1)):
  181. # p = Process(target=handle_data, args=(i, pkl_path, "0"))
  182. # p.start()
  183. # logger.info(f"{i}_process已开启")
  184. pkl_path2 = [
  185. {
  186. "path1": "./data/pkl/0_202106_train_neg.pkl",
  187. "path2": "./data/pkl/0_202107_train_neg.pkl"
  188. },
  189. {
  190. "path1": "./data/pkl/1_202107_train_neg.pkl",
  191. "path2": "./data/pkl/1_202108_train_neg.pkl"
  192. },
  193. {
  194. "path1": "./data/pkl/2_202108_train_neg.pkl",
  195. "path2": "./data/pkl/2_202109_train_neg.pkl"
  196. },
  197. # {
  198. # "path1": "./data/pkl/3_202109_train_neg.pkl",
  199. # "path2": "./data/pkl/3_202110_train_neg.pkl"
  200. # },
  201. ]
  202. for i, pkl_path in enumerate(tqdm(pkl_path2)):
  203. index = i + 3
  204. p = Process(target=handle_data, args=(index, pkl_path, "1"))
  205. p.start()
  206. logger.info(f"{i}_process已开启")