local_process.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. from common.log_utils import logFactory
  2. from common.database_utils import database_util
  3. from common import constant
  4. import pandas as pd
  5. from tqdm import tqdm
  6. from random import shuffle
  7. from sklearn.model_selection import train_test_split
  8. import lightgbm as lgb
  9. from sklearn import metrics
  10. import matplotlib.pyplot as plt
  11. import seaborn as sns
  12. logger = logFactory("local preprocess data").log
  13. # click_client = database_util.get_client()
  14. # # 获取正负样本数量,准备随机id
  15. # tb_pos = constant.insert_pos_tb_name
  16. # tb_neg = constant.insert_neg_tb_name
  17. # positive_sql = f"select count(1) from {tb_pos}"
  18. # positive_sql_result = click_client.execute(positive_sql)[0][0]
  19. # positive_ids = range(0, positive_sql_result)
  20. # negative_sql = f"select count(1) from {tb_neg}"
  21. # negative_sql_result = click_client.execute(negative_sql)[0][0]
  22. # negative_ids = range(0, negative_sql_result)
  23. #
  24. # # 分组数量
  25. batch_num_pos = 1
  26. batch_num_neg = 13
  27. # 根据每个批次的数据量计算出每个批次的row_id
  28. def partition_preserve_order(list_in, n):
  29. indices = list(range(len(list_in)))
  30. shuffle(indices)
  31. index_partitions = [sorted(indices[i::n]) for i in range(n)]
  32. return [[list_in[i] for i in index_partition]
  33. for index_partition in index_partitions]
  34. def gen_train_tuple(pos_row_ids, neg_row_ids):
  35. result = []
  36. for i, e, in enumerate(neg_row_ids):
  37. pos_index = i % batch_num_pos
  38. result.append((pos_row_ids[pos_index], neg_row_ids[i]))
  39. return result
  40. # batch_num_pos = 10
  41. # batch_num_neg = 5
  42. total_pos = pd.read_pickle("data_pos_0218_06.pkl")
  43. total_neg = pd.read_pickle("data_neg_part_1_0218_06.pkl")
  44. positive_ids = range(0, total_pos.shape[0])
  45. negative_ids = range(0, total_neg.shape[0])
  46. pos_row_ids = partition_preserve_order(positive_ids, batch_num_pos)
  47. neg_row_ids = partition_preserve_order(negative_ids, batch_num_neg)
  48. train_tuple = gen_train_tuple(pos_row_ids, neg_row_ids)
  49. def draw_roc_auc(y_label, y_test):
  50. # ROC曲线绘制
  51. fpr, tpr, thresholds = metrics.roc_curve(y_label, y_test)
  52. ##计算曲线下面积
  53. roc_auc = metrics.auc(fpr, tpr)
  54. ##绘图
  55. plt.clf()
  56. plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
  57. plt.plot([0, 1], [0, 1], 'k--')
  58. plt.xlim([0.0, 1.0])
  59. plt.ylim([0.0, 1.0])
  60. plt.legend(loc="lower right")
  61. plt.show()
  62. drop_columns = [
  63. 'row_id',
  64. 'month',
  65. 'mark',
  66. 'EVENT_FLUX_V',
  67. 'EVENT_CONSUM_V',
  68. 'EVENT_VIDEO_FLUX_V',
  69. # "EVENT_FLUX_C_0",
  70. # "EVENT_FLUX_C_1",
  71. # "EVENT_FLUX_C_2",
  72. # "EVENT_FLUX_C_3",
  73. # "EVENT_FLUX_C_4",
  74. # "EVENT_FLUX_C_5",
  75. # "EVENT_FLUX_C_6",
  76. # "EVENT_FLUX_C_7",
  77. # "EVENT_FLUX_C_8",
  78. # "EVENT_CONSUM_C_0",
  79. # "EVENT_CONSUM_C_1",
  80. # "EVENT_CONSUM_C_2",
  81. # "EVENT_CONSUM_C_3",
  82. # "EVENT_CONSUM_C_4",
  83. # "EVENT_CONSUM_C_5",
  84. # "EVENT_CONSUM_C_6",
  85. # "EVENT_CONSUM_C_7",
  86. # "EVENT_VIDEO_FLUX_C_0",
  87. # "EVENT_VIDEO_FLUX_C_1",
  88. # "EVENT_VIDEO_FLUX_C_2",
  89. # "EVENT_VIDEO_FLUX_C_3",
  90. # "EVENT_VIDEO_FLUX_C_4",
  91. # "EVENT_VIDEO_FLUX_C_5",
  92. # "EVENT_VIDEO_FLUX_C_6",
  93. # "EVENT_VIDEO_FLUX_C_7",
  94. ]
  95. def start_train():
  96. lgb_model = None
  97. train_params = {
  98. 'task': 'train',
  99. 'objective': 'binary',
  100. 'boosting_type': 'gbdt',
  101. 'learning_rate': 0.1,
  102. 'num_leaves': 10,
  103. 'tree_learner': 'serial',
  104. 'metric': {'binary_logloss', 'auc', 'average_precision'}, # l1:mae, l2:mse
  105. 'max_bin': 20, # 较小的max_bin会导致更快的速度,较大的值会提高准确性
  106. 'max_depth': 6,
  107. # 'min_child_samples': 5,
  108. # "min_data_in_leaf": 10,
  109. "bagging_fraction": 0.8, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度
  110. "feature_fraction": 0.8, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度
  111. "n_jobs": 8,
  112. "boost_from_average": False,
  113. 'seed': 2022,
  114. "lambda_l1": 1e-5,
  115. "lambda_l2": 1e-5,
  116. }
  117. for i, data_tuple in enumerate(tqdm(train_tuple)):
  118. step_num = str(i)
  119. logger.info(f"开始第{step_num}轮")
  120. pos_ids = data_tuple[0]
  121. neg_ids = data_tuple[1]
  122. train_data_frame_pos = total_pos[total_pos.row_id.isin(pos_ids)]
  123. train_data_frame_neg = total_neg[total_neg.row_id.isin(neg_ids)]
  124. train_data_frame_pos['mark'] = 0
  125. train_data_frame_neg['mark'] = 1
  126. total_train_data = pd.concat([train_data_frame_pos, train_data_frame_neg], axis=0)
  127. # 划分训练集和测试集
  128. data_train, data_test = train_test_split(total_train_data, train_size=0.6)
  129. train_y_data = data_train[['mark']]
  130. train_x_data = data_train.drop(columns=drop_columns)
  131. test_y_data = data_test[['mark']]
  132. test_x_data = data_test.drop(columns=drop_columns)
  133. # 创建lgb的数据集
  134. lgb_train = lgb.Dataset(train_x_data, train_y_data.values, silent=True)
  135. lgb_eval = lgb.Dataset(test_x_data, test_y_data.values, reference=lgb_train, silent=True)
  136. lgb_model = lgb.train(params=train_params, train_set=lgb_train, num_boost_round=100, valid_sets=lgb_eval,
  137. init_model=lgb_model, feature_name=train_x_data.columns.tolist(),
  138. early_stopping_rounds=20,
  139. verbose_eval=False, keep_training_booster=True)
  140. # 输出模型评估分数
  141. score_train = str(dict([(s[1], s[2]) for s in lgb_model.eval_train()]))
  142. score_valid = str(dict([(s[1], s[2]) for s in lgb_model.eval_valid()]))
  143. logger.info(f"第{step_num}轮的结果如下:")
  144. logger.info(f"在训练集上:{score_train}")
  145. logger.info(f"在测试集上:{score_valid}")
  146. if i == len(train_tuple) - 1:
  147. test_predict = lgb_model.predict(test_x_data)
  148. test_result = []
  149. for x in test_predict:
  150. if x < 0.5:
  151. test_result.append(0)
  152. else:
  153. test_result.append(1)
  154. result = metrics.classification_report(test_y_data.values, test_result)
  155. logger.info(result)
  156. draw_roc_auc(test_y_data.values.reshape([-1, ]), test_predict)
  157. sns.set(rc={'figure.figsize': (50, 50)})
  158. sns.barplot(y=train_x_data.columns, x=lgb_model.feature_importance())
  159. plt.show()
  160. lgb_model.save_model("temp0218-2.model")
  161. def draw_confusion_matrix(y_label, y_test):
  162. # 画出混淆矩阵
  163. confusion_data = metrics.confusion_matrix(y_label, y_test)
  164. print(confusion_data)
  165. sns.heatmap(confusion_data, cmap="Greens", annot=True)
  166. plt.xlabel("Predicted labels")
  167. plt.ylabel("True labels")
  168. plt.tight_layout()
  169. plt.show()
  170. def test_data_valid():
  171. logger.info("准备开始加载数据")
  172. # # 获取正负样本数量,准备随机id
  173. tb_pos = constant.insert_pos_tb_test_name
  174. tb_neg = constant.insert_neg_tb_test_name
  175. dataf_pos = pd.read_pickle('data_pos_0218_07.pkl')
  176. dataf_pos['mark'] = 0
  177. dataf_neg = pd.read_pickle('data_neg_part_1_0218_07.pkl')
  178. dataf_neg['mark'] = 1
  179. test_all_data = pd.concat([dataf_pos, dataf_neg], axis=0)
  180. test_x_data = test_all_data
  181. test_y_data = test_all_data[['mark']]
  182. test_x_data.drop(columns=drop_columns, inplace=True)
  183. lgb_model = lgb.Booster(model_file='temp0218-2.model')
  184. test_predict = lgb_model.predict(test_x_data, num_iteration=lgb_model.best_iteration)
  185. test_result = []
  186. for x in test_predict:
  187. if x < 0.5:
  188. test_result.append(0)
  189. else:
  190. test_result.append(1)
  191. result = metrics.classification_report(test_y_data.values, test_result)
  192. logger.info(result)
  193. draw_confusion_matrix(test_y_data.values, test_result)
  194. draw_roc_auc(test_y_data.values.reshape([-1, ]), test_predict)
  195. if __name__ == "__main__":
  196. # start_train()
  197. test_data_valid()
  198. # data1 = pd.read_pickle("./data_neg_part_1_2013.pkl")
  199. # data2 = pd.read_pickle("./data_neg_part_2_2013.pkl")
  200. # data3 = pd.read_pickle("./data_neg_part_3_2013.pkl")
  201. # data4 = pd.read_pickle("./data_neg_part_4_2013.pkl")
  202. # data5 = pd.read_pickle("./data_neg_part_5_2013.pkl")
  203. # data_all = pd.concat([data1, data2, data3, data4], axis=0)
  204. # data_all.to_pickle("./data_neg_2013.pkl")
  205. pass