123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- from clickhouse_driver import Client
- from common.log_utils import logFactory
- from common.database_utils import database_util
- from common import constant
- import numpy as np
- import math
- import pandas as pd
- from random import shuffle
- from tqdm import tqdm
- import optuna
- from sklearn.metrics import log_loss
- from sklearn.model_selection import train_test_split
- from optuna.integration import LightGBMPruningCallback
- import lightgbm as lgb
- from lightgbm.sklearn import LGBMClassifier
- from lightgbm import plot_importance
- from sklearn import metrics
- import matplotlib.pyplot as plt
- import seaborn as sns
- click_client = database_util.get_client()
- logger = logFactory("preprocess data").log
- # 获取正负样本数量,准备随机id
- tb_pos = constant.insert_pos_tb_name
- tb_neg = constant.insert_neg_tb_name
- positive_sql = f"select count(1) from {tb_pos}"
- positive_sql_result = click_client.execute(positive_sql)[0][0]
- positive_ids = range(0, positive_sql_result)
- negative_sql = f"select count(1) from {tb_neg}"
- negative_sql_result = click_client.execute(negative_sql)[0][0]
- negative_ids = range(0, negative_sql_result)
- # 每个批次的数据量
- # batch_size = 100000
- batch_size = 10
- # 分组数量
- batch_num_pos = 20
- batch_num_neg = 368
- # 根据每个批次的数据量计算出每个批次的row_id
- def partition_preserve_order(list_in, n):
- indices = list(range(len(list_in)))
- shuffle(indices)
- index_partitions = [sorted(indices[i::n]) for i in range(n)]
- return [[list_in[i] for i in index_partition]
- for index_partition in index_partitions]
- def gen_train_tuple(pos_row_ids, neg_row_ids):
- result = []
- for i, e, in enumerate(neg_row_ids):
- pos_index = i % batch_num_pos
- result.append((pos_row_ids[pos_index], neg_row_ids[i]))
- return result
- pos_row_ids = partition_preserve_order(positive_ids, batch_num_pos)
- neg_row_ids = partition_preserve_order(negative_ids, batch_num_neg)
- train_tuple = gen_train_tuple(pos_row_ids, neg_row_ids)
- DROP_COLUMNS = ['row_id', 'month',
- 'EVENT_CATEGORYNAME_C_0',
- 'EVENT_CATEGORYNAME_C_1',
- 'EVENT_CATEGORYNAME_C_2',
- 'EVENT_CATEGORYNAME_C_3',
- 'EVENT_CATEGORYNAME_C_4',
- 'EVENT_CATEGORYNAME_C_5',
- 'EVENT_CATEGORYNAME_C_6',
- 'EVENT_CATEGORYNAME_C_7',
- 'EVENT_CATEGORYNAME_C_8',
- 'EVENT_CATEGORYNAME_C_9',
- 'EVENT_CATEGORYNAME_C_10',
- "EVENT_CANCEL_DIFF_C_0",
- "EVENT_CANCEL_DIFF_C_1",
- "EVENT_CANCEL_DIFF_C_2",
- "EVENT_CANCEL_DIFF_C_3",
- "EVENT_CANCEL_DIFF_C_4",
- "EVENT_CANCEL_DIFF_C_5",
- "EVENT_CANCEL_DIFF_C_6"
- ]
- def start_train():
- logger.info("准备开始循环加载数据")
- lgb_model = None
- train_params = {
- 'task': 'train',
- 'objective': 'binary',
- 'boosting_type': 'gbdt',
- 'learning_rate': 0.1,
- # 'num_leaves': 5,
- 'tree_learner': 'serial',
- 'metric': {'binary_logloss', 'auc', 'average_precision'}, # l1:mae, l2:mse
- # 'max_bin': 3, # 较小的max_bin会导致更快的速度,较大的值会提高准确性
- # 'max_depth': 3,
- # 'min_child_samples': 5,
- # "bagging_fraction": 1, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度
- # "feature_fraction":1, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度
- "n_jobs": 8,
- "boost_from_average": True,
- 'seed': 2022,
- "lambda_l1": 1e-5,
- "lambda_l2": 1e-5,
- }
- for i, data_tuple in enumerate(tqdm(train_tuple)):
- step_num = str(i)
- logger.info(f"开始第{step_num}轮")
- pos_ids = data_tuple[0]
- neg_ids = data_tuple[1]
- tb_pos = constant.insert_pos_tb_name
- tb_neg = constant.insert_neg_tb_name
- pos_data_sql = f"select * from {tb_pos} where row_id in {pos_ids}"
- neg_data_sql = f"select * from {tb_neg} where row_id in {neg_ids}"
- pos_result = click_client.execute(pos_data_sql)
- neg_result = click_client.execute(neg_data_sql)
- train_data_frame_pos = pd.DataFrame(pos_result, columns=constant.feature_column_names)
- train_data_frame_neg = pd.DataFrame(neg_result, columns=constant.feature_column_names)
- train_data_frame_pos['mark'] = 0
- train_data_frame_neg['mark'] = 1
- total_train_data = pd.concat([train_data_frame_pos, train_data_frame_neg], axis=0)
- # 划分训练集和测试集
- data_train, data_test = train_test_split(total_train_data, train_size=0.7)
- # data_train, data_test = train_test_split(total_train_data, total_train_data['mark'].values, train_size=0.8,
- # random_state=0, stratify=total_train_data['mark'].values)
- # df1 = data_train[data_train['mark']==0]
- # df2 = data_train[data_train['mark']==1]
- # df11 = data_test[data_test['mark']==0]
- # df12 = data_test[data_test['mark']==1]
- train_x_data = data_train[constant.feature_column_names]
- train_x_data.drop(columns=DROP_COLUMNS, inplace=True)
- train_y_data = data_train[['mark']]
- test_x_data = data_test[constant.feature_column_names]
- test_x_data.drop(columns=DROP_COLUMNS, inplace=True)
- test_y_data = data_test[['mark']]
- feature_col = [x for x in constant.feature_column_names if x not in DROP_COLUMNS]
- # 创建lgb的数据集
- lgb_train = lgb.Dataset(train_x_data, train_y_data.values, silent=True)
- lgb_eval = lgb.Dataset(test_x_data, test_y_data.values, reference=lgb_train, silent=True)
- # clf = LGBMClassifier()
- # clf.fit(train_x_data, train_y_data.values)
- # train_predict = clf.predict(train_x_data)
- # test_predict = clf.predict(test_x_data)
- # train_predict_score = metrics.accuracy_score(train_y_data.values, train_predict)
- # test_predict_score = metrics.accuracy_score(test_y_data.values, test_predict)
- # print(train_predict_score)
- # print(test_predict_score)
- # confusion_data = metrics.confusion_matrix(test_y_data.values, test_predict)
- # print(confusion_data)
- # sns.heatmap(confusion_data, cmap="Greens", annot=True)
- # plt.xlabel("Predicted labels")
- # plt.ylabel("True labels")
- # plt.tight_layout()
- # plt.show()
- lgb_model = lgb.train(params=train_params, train_set=lgb_train, num_boost_round=1000, valid_sets=lgb_eval,
- init_model=lgb_model, feature_name=feature_col, early_stopping_rounds=20,
- verbose_eval=False, keep_training_booster=True)
- # 输出模型评估分数
- score_train = str(dict([(s[1], s[2]) for s in lgb_model.eval_train()]))
- score_valid = str(dict([(s[1], s[2]) for s in lgb_model.eval_valid()]))
- logger.info(f"第{step_num}轮的结果如下:")
- logger.info(f"在训练集上:{score_train}")
- logger.info(f"在测试集上:{score_valid}")
- result = pd.DataFrame({
- 'column': feature_col,
- 'importance': lgb_model.feature_importance(),
- }).sort_values(by='importance')
- sns.barplot(y=train_x_data.columns, x=lgb_model.feature_importance())
- plt.tight_layout()
- plt.show()
- continue
- pass
- if __name__ == "__main__":
- start_train()
|