train.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. from clickhouse_driver import Client
  2. from common.log_utils import logFactory
  3. from common.database_utils import database_util
  4. from common import constant
  5. import numpy as np
  6. import math
  7. import pandas as pd
  8. from random import shuffle
  9. from tqdm import tqdm
  10. import optuna
  11. from sklearn.metrics import log_loss
  12. from sklearn.model_selection import train_test_split
  13. from optuna.integration import LightGBMPruningCallback
  14. import lightgbm as lgb
  15. from lightgbm.sklearn import LGBMClassifier
  16. from lightgbm import plot_importance
  17. from sklearn import metrics
  18. import matplotlib.pyplot as plt
  19. import seaborn as sns
  20. click_client = database_util.get_client()
  21. logger = logFactory("preprocess data").log
  22. # 获取正负样本数量,准备随机id
  23. tb_pos = constant.insert_pos_tb_name
  24. tb_neg = constant.insert_neg_tb_name
  25. positive_sql = f"select count(1) from {tb_pos}"
  26. positive_sql_result = click_client.execute(positive_sql)[0][0]
  27. positive_ids = range(0, positive_sql_result)
  28. negative_sql = f"select count(1) from {tb_neg}"
  29. negative_sql_result = click_client.execute(negative_sql)[0][0]
  30. negative_ids = range(0, negative_sql_result)
  31. # 每个批次的数据量
  32. # batch_size = 100000
  33. batch_size = 10
  34. # 分组数量
  35. batch_num_pos = 20
  36. batch_num_neg = 368
  37. # 根据每个批次的数据量计算出每个批次的row_id
  38. def partition_preserve_order(list_in, n):
  39. indices = list(range(len(list_in)))
  40. shuffle(indices)
  41. index_partitions = [sorted(indices[i::n]) for i in range(n)]
  42. return [[list_in[i] for i in index_partition]
  43. for index_partition in index_partitions]
  44. def gen_train_tuple(pos_row_ids, neg_row_ids):
  45. result = []
  46. for i, e, in enumerate(neg_row_ids):
  47. pos_index = i % batch_num_pos
  48. result.append((pos_row_ids[pos_index], neg_row_ids[i]))
  49. return result
  50. pos_row_ids = partition_preserve_order(positive_ids, batch_num_pos)
  51. neg_row_ids = partition_preserve_order(negative_ids, batch_num_neg)
  52. train_tuple = gen_train_tuple(pos_row_ids, neg_row_ids)
  53. DROP_COLUMNS = ['row_id', 'month',
  54. 'EVENT_CATEGORYNAME_C_0',
  55. 'EVENT_CATEGORYNAME_C_1',
  56. 'EVENT_CATEGORYNAME_C_2',
  57. 'EVENT_CATEGORYNAME_C_3',
  58. 'EVENT_CATEGORYNAME_C_4',
  59. 'EVENT_CATEGORYNAME_C_5',
  60. 'EVENT_CATEGORYNAME_C_6',
  61. 'EVENT_CATEGORYNAME_C_7',
  62. 'EVENT_CATEGORYNAME_C_8',
  63. 'EVENT_CATEGORYNAME_C_9',
  64. 'EVENT_CATEGORYNAME_C_10',
  65. "EVENT_CANCEL_DIFF_C_0",
  66. "EVENT_CANCEL_DIFF_C_1",
  67. "EVENT_CANCEL_DIFF_C_2",
  68. "EVENT_CANCEL_DIFF_C_3",
  69. "EVENT_CANCEL_DIFF_C_4",
  70. "EVENT_CANCEL_DIFF_C_5",
  71. "EVENT_CANCEL_DIFF_C_6"
  72. ]
  73. def start_train():
  74. logger.info("准备开始循环加载数据")
  75. lgb_model = None
  76. train_params = {
  77. 'task': 'train',
  78. 'objective': 'binary',
  79. 'boosting_type': 'gbdt',
  80. 'learning_rate': 0.1,
  81. # 'num_leaves': 5,
  82. 'tree_learner': 'serial',
  83. 'metric': {'binary_logloss', 'auc', 'average_precision'}, # l1:mae, l2:mse
  84. # 'max_bin': 3, # 较小的max_bin会导致更快的速度,较大的值会提高准确性
  85. # 'max_depth': 3,
  86. # 'min_child_samples': 5,
  87. # "bagging_fraction": 1, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度
  88. # "feature_fraction":1, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度
  89. "n_jobs": 8,
  90. "boost_from_average": True,
  91. 'seed': 2022,
  92. "lambda_l1": 1e-5,
  93. "lambda_l2": 1e-5,
  94. }
  95. for i, data_tuple in enumerate(tqdm(train_tuple)):
  96. step_num = str(i)
  97. logger.info(f"开始第{step_num}轮")
  98. pos_ids = data_tuple[0]
  99. neg_ids = data_tuple[1]
  100. tb_pos = constant.insert_pos_tb_name
  101. tb_neg = constant.insert_neg_tb_name
  102. pos_data_sql = f"select * from {tb_pos} where row_id in {pos_ids}"
  103. neg_data_sql = f"select * from {tb_neg} where row_id in {neg_ids}"
  104. pos_result = click_client.execute(pos_data_sql)
  105. neg_result = click_client.execute(neg_data_sql)
  106. train_data_frame_pos = pd.DataFrame(pos_result, columns=constant.feature_column_names)
  107. train_data_frame_neg = pd.DataFrame(neg_result, columns=constant.feature_column_names)
  108. train_data_frame_pos['mark'] = 0
  109. train_data_frame_neg['mark'] = 1
  110. total_train_data = pd.concat([train_data_frame_pos, train_data_frame_neg], axis=0)
  111. # 划分训练集和测试集
  112. data_train, data_test = train_test_split(total_train_data, train_size=0.7)
  113. # data_train, data_test = train_test_split(total_train_data, total_train_data['mark'].values, train_size=0.8,
  114. # random_state=0, stratify=total_train_data['mark'].values)
  115. # df1 = data_train[data_train['mark']==0]
  116. # df2 = data_train[data_train['mark']==1]
  117. # df11 = data_test[data_test['mark']==0]
  118. # df12 = data_test[data_test['mark']==1]
  119. train_x_data = data_train[constant.feature_column_names]
  120. train_x_data.drop(columns=DROP_COLUMNS, inplace=True)
  121. train_y_data = data_train[['mark']]
  122. test_x_data = data_test[constant.feature_column_names]
  123. test_x_data.drop(columns=DROP_COLUMNS, inplace=True)
  124. test_y_data = data_test[['mark']]
  125. feature_col = [x for x in constant.feature_column_names if x not in DROP_COLUMNS]
  126. # 创建lgb的数据集
  127. lgb_train = lgb.Dataset(train_x_data, train_y_data.values, silent=True)
  128. lgb_eval = lgb.Dataset(test_x_data, test_y_data.values, reference=lgb_train, silent=True)
  129. # clf = LGBMClassifier()
  130. # clf.fit(train_x_data, train_y_data.values)
  131. # train_predict = clf.predict(train_x_data)
  132. # test_predict = clf.predict(test_x_data)
  133. # train_predict_score = metrics.accuracy_score(train_y_data.values, train_predict)
  134. # test_predict_score = metrics.accuracy_score(test_y_data.values, test_predict)
  135. # print(train_predict_score)
  136. # print(test_predict_score)
  137. # confusion_data = metrics.confusion_matrix(test_y_data.values, test_predict)
  138. # print(confusion_data)
  139. # sns.heatmap(confusion_data, cmap="Greens", annot=True)
  140. # plt.xlabel("Predicted labels")
  141. # plt.ylabel("True labels")
  142. # plt.tight_layout()
  143. # plt.show()
  144. lgb_model = lgb.train(params=train_params, train_set=lgb_train, num_boost_round=1000, valid_sets=lgb_eval,
  145. init_model=lgb_model, feature_name=feature_col, early_stopping_rounds=20,
  146. verbose_eval=False, keep_training_booster=True)
  147. # 输出模型评估分数
  148. score_train = str(dict([(s[1], s[2]) for s in lgb_model.eval_train()]))
  149. score_valid = str(dict([(s[1], s[2]) for s in lgb_model.eval_valid()]))
  150. logger.info(f"第{step_num}轮的结果如下:")
  151. logger.info(f"在训练集上:{score_train}")
  152. logger.info(f"在测试集上:{score_valid}")
  153. result = pd.DataFrame({
  154. 'column': feature_col,
  155. 'importance': lgb_model.feature_importance(),
  156. }).sort_values(by='importance')
  157. sns.barplot(y=train_x_data.columns, x=lgb_model.feature_importance())
  158. plt.tight_layout()
  159. plt.show()
  160. continue
  161. pass
  162. if __name__ == "__main__":
  163. start_train()