from clickhouse_driver import Client from common.log_utils import logFactory from common.database_utils import database_util from common import constant import numpy as np import math import pandas as pd from random import shuffle from tqdm import tqdm import optuna from sklearn.metrics import log_loss from sklearn.model_selection import train_test_split from optuna.integration import LightGBMPruningCallback import lightgbm as lgb from lightgbm.sklearn import LGBMClassifier from lightgbm import plot_importance from sklearn import metrics import matplotlib.pyplot as plt import seaborn as sns import random click_client = database_util.get_client() logger = logFactory("preprocess data").log # # # 获取正负样本数量,准备随机id # tb_pos = constant.insert_pos_tb_name # tb_neg = constant.insert_neg_tb_name # positive_sql = f"select count(1) from {tb_pos}" # positive_sql_result = click_client.execute(positive_sql)[0][0] # positive_ids = range(0, positive_sql_result) # negative_sql = f"select count(1) from {tb_neg}" # negative_sql_result = click_client.execute(negative_sql)[0][0] # negative_ids = range(0, negative_sql_result) # # # 每个批次的数据量 # # batch_size = 100000 # batch_size = 10 # # 分组数量 # batch_num_pos = 1 # batch_num_neg = 1 # # # 根据每个批次的数据量计算出每个批次的row_id def partition_preserve_order(list_in, n): indices = list(range(len(list_in))) shuffle(indices) index_partitions = [sorted(indices[i::n]) for i in range(n)] return [[list_in[i] for i in index_partition] for index_partition in index_partitions] # # # def gen_train_tuple(pos_row_ids, neg_row_ids): # result = [] # for i, e, in enumerate(neg_row_ids): # pos_index = i % batch_num_pos # result.append((pos_row_ids[pos_index], neg_row_ids[i])) # return result # # # pos_row_ids = partition_preserve_order(positive_ids, batch_num_pos) # neg_row_ids = partition_preserve_order(negative_ids, batch_num_neg) # neg_row_ids = [random.sample(neg_row_ids[0], 1000000)] # pos_row_ids = [random.sample(pos_row_ids[0], 1000000)] # train_tuple = gen_train_tuple(pos_row_ids, neg_row_ids) DROP_COLUMNS = ['row_id', 'month', 'EVENT_CATEGORYNAME_C_0', 'EVENT_CATEGORYNAME_C_1', 'EVENT_CATEGORYNAME_C_2', 'EVENT_CATEGORYNAME_C_3', 'EVENT_CATEGORYNAME_C_4', 'EVENT_CATEGORYNAME_C_5', 'EVENT_CATEGORYNAME_C_6', 'EVENT_CATEGORYNAME_C_7', 'EVENT_CATEGORYNAME_C_8', 'EVENT_CATEGORYNAME_C_9', 'EVENT_CATEGORYNAME_C_10', "EVENT_CANCEL_DIFF_C_0", "EVENT_CANCEL_DIFF_C_1", "EVENT_CANCEL_DIFF_C_2", "EVENT_CANCEL_DIFF_C_3", "EVENT_CANCEL_DIFF_C_4", "EVENT_CANCEL_DIFF_C_5", "EVENT_CANCEL_DIFF_C_6" ] def draw_roc_auc(y_label, y_test): # ROC曲线绘制 fpr, tpr, thresholds = metrics.roc_curve(y_label, y_test) ##计算曲线下面积 roc_auc = metrics.auc(fpr, tpr) ##绘图 plt.clf() plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc) plt.plot([0, 1], [0, 1], 'k--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.0]) plt.legend(loc="lower right") plt.show() def draw_confusion_matrix(y_label, y_test): # 画出混淆矩阵 confusion_data = metrics.confusion_matrix(y_label, y_test) print(confusion_data) sns.heatmap(confusion_data, cmap="Greens", annot=True) plt.xlabel("Predicted labels") plt.ylabel("True labels") plt.tight_layout() plt.show() def start_train(): logger.info("准备开始加载数据") lgb_model = None train_params = { 'task': 'train', 'objective': 'binary', 'boosting_type': 'gbdt', 'learning_rate': 0.1, 'num_leaves': 30, 'tree_learner': 'serial', 'metric': {'binary_logloss', 'auc', 'average_precision'}, # l1:mae, l2:mse # 'max_bin': 3, # 较小的max_bin会导致更快的速度,较大的值会提高准确性 'max_depth': 6, # 'min_child_samples': 5, "bagging_fraction": 0.8, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度 "feature_fraction": 0.8, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度 "n_jobs": 8, "boost_from_average": False, 'seed': 2022, "lambda_l1": 1e-5, "lambda_l2": 1e-5, } total_train_data = pd.read_pickle('total_train_data.pkl') # 划分训练集和测试集 data_train, data_test = train_test_split(total_train_data, train_size=0.7) # data_train, data_test = train_test_split(total_train_data, total_train_data['mark'].values, train_size=0.8, # random_state=0, stratify=total_train_data['mark'].values) # df1 = data_train[data_train['mark']==0] # df2 = data_train[data_train['mark']==1] # df11 = data_test[data_test['mark']==0] # df12 = data_test[data_test['mark']==1] train_x_data = data_train[constant.feature_column_names] train_x_data.drop(columns=DROP_COLUMNS, inplace=True) train_y_data = data_train[['mark']] test_x_data = data_test[constant.feature_column_names] test_x_data.drop(columns=DROP_COLUMNS, inplace=True) test_y_data = data_test[['mark']] feature_col = [x for x in constant.feature_column_names if x not in DROP_COLUMNS] # 创建lgb的数据集 lgb_train = lgb.Dataset(train_x_data, train_y_data.values, silent=True) lgb_eval = lgb.Dataset(test_x_data, test_y_data.values, reference=lgb_train, silent=True) # clf = LGBMClassifier() # clf.fit(train_x_data, train_y_data.values) # train_predict = clf.predict(train_x_data) # test_predict = clf.predict(test_x_data) # train_predict_score = metrics.accuracy_score(train_y_data.values, train_predict) # test_predict_score = metrics.accuracy_score(test_y_data.values, test_predict) # print(train_predict_score) # print(test_predict_score) # confusion_data = metrics.confusion_matrix(test_y_data.values, test_predict) # print(confusion_data) # sns.heatmap(confusion_data, cmap="Greens", annot=True) # plt.xlabel("Predicted labels") # plt.ylabel("True labels") # plt.tight_layout() # plt.show() lgb_model = lgb.train(params=train_params, train_set=lgb_train, num_boost_round=100, valid_sets=lgb_eval, init_model=lgb_model, feature_name=feature_col, verbose_eval=False, keep_training_booster=True) # lgb_model = lgb.cv(params=train_params, train_set=lgb_train, num_boost_round=1000, nfold=5,stratified=False,shuffle=True, early_stopping_rounds=50,verbose_eval=50,show_stdv=True) lgb_model.save_model("temp.model") # 输出模型评估分数 score_train = str(dict([(s[1], s[2]) for s in lgb_model.eval_train()])) score_valid = str(dict([(s[1], s[2]) for s in lgb_model.eval_valid()])) logger.info(f"在训练集上:{score_train}") logger.info(f"在测试集上:{score_valid}") result = pd.DataFrame({ 'column': feature_col, 'importance': lgb_model.feature_importance(), }).sort_values(by='importance') sns.barplot(y=train_x_data.columns, x=lgb_model.feature_importance()) plt.tight_layout() plt.show() # pass test_predict = lgb_model.predict(test_x_data) test_result = [] for x in test_predict: if x < 0.5: test_result.append(0) else: test_result.append(1) result = metrics.classification_report(test_y_data.values, test_result) logger.info(result) draw_confusion_matrix(test_y_data.values.reshape([-1,]), test_result) draw_roc_auc(test_y_data.values.reshape([-1,]), test_predict) # # ##计算相关数据:注意返回的结果顺序 # fpr, tpr, thresholds = metrics.roc_curve(test_y_data.values.reshape([-1,]), test_predict) # ##计算曲线下面积 # roc_auc = metrics.auc(fpr, tpr) # ##绘图 # plt.clf() # plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc) # plt.plot([0, 1], [0, 1], 'k--') # plt.xlim([0.0, 1.0]) # plt.ylim([0.0, 1.0]) # plt.xlabel('False Positive Rate') # plt.ylabel('True Positive Rate') # plt.legend(loc="lower right") # plt.show() pass def test_data_valid(): logger.info("准备开始加载数据") # # 获取正负样本数量,准备随机id tb_pos = constant.insert_pos_tb_test_name tb_neg = constant.insert_neg_tb_test_name positive_sql = f"select t.* from {tb_pos} t limit 1000000" positive_sql_result = click_client.execute(positive_sql) negative_sql = f"select t.* from {tb_neg} t limit 1000000" negative_sql_result = click_client.execute(negative_sql) dataf_pos = pd.DataFrame(positive_sql_result, columns=constant.feature_column_names) dataf_pos['mark'] = 0 dataf_neg = pd.DataFrame(negative_sql_result, columns=constant.feature_column_names) dataf_neg['mark'] = 1 test_all_data = pd.concat([dataf_pos, dataf_neg], axis=0) test_x_data = test_all_data[constant.feature_column_names] test_x_data.drop(columns=DROP_COLUMNS, inplace=True) test_y_data = test_all_data[['mark']] lgb_model = lgb.Booster(model_file='temp.model') test_predict = lgb_model.predict(test_x_data, num_iteration=lgb_model.best_iteration) test_result = [] for x in test_predict: if x < 0.5: test_result.append(0) else: test_result.append(1) result = metrics.classification_report(test_y_data.values, test_result) logger.info(result) draw_confusion_matrix(test_y_data.values, test_result) draw_roc_auc(test_y_data.values.reshape([-1,]), test_predict) pass if __name__ == "__main__": # start_train() test_data_valid()