gen_origin_data.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import pandas as pd
  2. from common.database_utils import database_util
  3. from common.log_utils import logFactory
  4. from common import constant
  5. from tqdm import tqdm
  6. click_client = database_util.get_client()
  7. logger = logFactory("preprocess data").log
  8. def get_df_by_id_month(uuid_list, month):
  9. logger.info(f"开始读取{month}数据")
  10. sql1 = f"select *, length(EVENT_APP_USE.C) as I_appuse from Z_USER_TAG_FLAT_out_{month} where uuid in {uuid_list} and EVENT_CATEGORYNAME_C not in ('电商卡', '无限畅视') and EVENT_VIDEO_FLUX_V is not null and EVENT_ORDER_MONTH_C IN ('订购1月内', '1-3月', '3-6月') and EVENT_CANCEL_DIFF_C='未退订'and EVENT_CPNAME_C = '快手' order by I_appuse desc limit 1 by uuid,EVENT_SPNAME_C"
  11. all_data1 = click_client.execute(sql1)
  12. sql2 = f"select USERID,MAvg_TOTAL_FLUX_1_3_zs,MPer1_TOTAL_FLUX_zs,MAvg_TOTAL_VIDEO_FLUX_1_3_zs,MPer1_TOTAL_VIDEO_FLUX_zs,MAvg_Flow_kuaishou_1_3_zs,MPer1_Flow_kuaishou_zs,Div_kuaishou_vFlux_1_3 from ALS_XXH_CANCEL_NEW_C4_lowDim where USERID in {uuid_list} and TARGET_MONTH='{month}' and MAvg_TOTAL_FLUX_1_3_zs is not null and MPer1_TOTAL_FLUX_zs is not null and MAvg_TOTAL_VIDEO_FLUX_1_3_zs is not null and MPer1_TOTAL_VIDEO_FLUX_zs is not null and MAvg_Flow_kuaishou_1_3_zs is not null and MPer1_Flow_kuaishou_zs is not null and Div_kuaishou_vFlux_1_3 is not null"
  13. all_data2 = click_client.execute(sql2)
  14. data_frame1 = pd.DataFrame(all_data1, columns=constant.origin_column_names)
  15. logger.info(f"data1的shape{data_frame1.shape}")
  16. data_frame2 = pd.DataFrame(all_data2, columns=constant.low_dim_cols)
  17. logger.info(f"data2的shape{data_frame2.shape}")
  18. logger.info(f"读取完成")
  19. df_merge = pd.merge(left=data_frame1, right=data_frame2, how='inner', left_on='uuid', right_on='uuid')
  20. logger.info(f"merge后的shape{df_merge.shape}")
  21. return df_merge
  22. def write_df_to_pickle(data, filename):
  23. logger.info(f"开始写入pickle")
  24. data.to_pickle(f"./data/pkl/{filename}")
  25. logger.info(f"写入pickle完成,文件名{filename},文件大小{data.shape}", )
  26. def init_data(data_path_list):
  27. logger.info("开始处理数据")
  28. for i, dict_value in enumerate(tqdm(data_path_list)):
  29. path = dict_value['path']
  30. month1 = dict_value['month1']
  31. month2 = dict_value['month2']
  32. pos_uuid_list = list(set(pd.read_csv(path + "/pos.csv").values.reshape(-1).tolist()))
  33. neg_uuid_list = list(set(pd.read_csv(path + "/neg.csv").values.reshape(-1).tolist()))
  34. # 处理第一个月数据
  35. first_month_df_pos = get_df_by_id_month(pos_uuid_list, month1)
  36. first_month_df_neg = get_df_by_id_month(neg_uuid_list, month1)
  37. write_df_to_pickle(first_month_df_pos, f"{i}_{month1}_train_pos.pkl")
  38. write_df_to_pickle(first_month_df_neg, f"{i}_{month1}_train_neg.pkl")
  39. # 处理第二个月数据
  40. second_month_df_pos = get_df_by_id_month(pos_uuid_list, month2)
  41. second_month_df_neg = get_df_by_id_month(neg_uuid_list, month2)
  42. write_df_to_pickle(second_month_df_pos, f"{i}_{month2}_train_pos.pkl")
  43. write_df_to_pickle(second_month_df_neg, f"{i}_{month2}_train_neg.pkl")
  44. # 合并
  45. if __name__ == '__main__':
  46. data_path = [
  47. {
  48. 'path': './data/csv/678',
  49. 'month1': '202106',
  50. 'month2': '202107',
  51. },
  52. {
  53. 'path': './data/csv/789',
  54. 'month1': '202107',
  55. 'month2': '202108',
  56. },
  57. {
  58. 'path': './data/csv/8910',
  59. 'month1': '202108',
  60. 'month2': '202109',
  61. },
  62. {
  63. 'path': './data/csv/91011',
  64. 'month1': '202109',
  65. 'month2': '202110',
  66. }
  67. ]
  68. # data_path = [
  69. # {
  70. # 'path': './data/csv/678',
  71. # 'month1': '202109',
  72. # 'month2': '202110',
  73. # }
  74. # ]
  75. init_data(data_path)