123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- from clickhouse_driver import Client
- from common.log_utils import logFactory
- from common.database_utils import database_util
- from common import constant
- import numpy as np
- import math
- import pandas as pd
- from random import shuffle
- from tqdm import tqdm
- import optuna
- from sklearn.metrics import log_loss
- from sklearn.model_selection import train_test_split
- from optuna.integration import LightGBMPruningCallback
- import lightgbm as lgb
- from lightgbm.sklearn import LGBMClassifier
- from lightgbm import plot_importance
- from sklearn import metrics
- import matplotlib.pyplot as plt
- import seaborn as sns
- import random
- click_client = database_util.get_client()
- logger = logFactory("preprocess data").log
- #
- # # 获取正负样本数量,准备随机id
- # tb_pos = constant.insert_pos_tb_name
- # tb_neg = constant.insert_neg_tb_name
- # positive_sql = f"select count(1) from {tb_pos}"
- # positive_sql_result = click_client.execute(positive_sql)[0][0]
- # positive_ids = range(0, positive_sql_result)
- # negative_sql = f"select count(1) from {tb_neg}"
- # negative_sql_result = click_client.execute(negative_sql)[0][0]
- # negative_ids = range(0, negative_sql_result)
- #
- # # 每个批次的数据量
- # # batch_size = 100000
- # batch_size = 10
- # # 分组数量
- # batch_num_pos = 1
- # batch_num_neg = 1
- #
- #
- # 根据每个批次的数据量计算出每个批次的row_id
- def partition_preserve_order(list_in, n):
- indices = list(range(len(list_in)))
- shuffle(indices)
- index_partitions = [sorted(indices[i::n]) for i in range(n)]
- return [[list_in[i] for i in index_partition]
- for index_partition in index_partitions]
- #
- #
- # def gen_train_tuple(pos_row_ids, neg_row_ids):
- # result = []
- # for i, e, in enumerate(neg_row_ids):
- # pos_index = i % batch_num_pos
- # result.append((pos_row_ids[pos_index], neg_row_ids[i]))
- # return result
- #
- #
- # pos_row_ids = partition_preserve_order(positive_ids, batch_num_pos)
- # neg_row_ids = partition_preserve_order(negative_ids, batch_num_neg)
- # neg_row_ids = [random.sample(neg_row_ids[0], 1000000)]
- # pos_row_ids = [random.sample(pos_row_ids[0], 1000000)]
- # train_tuple = gen_train_tuple(pos_row_ids, neg_row_ids)
- DROP_COLUMNS = ['row_id', 'month',
- 'EVENT_CATEGORYNAME_C_0',
- 'EVENT_CATEGORYNAME_C_1',
- 'EVENT_CATEGORYNAME_C_2',
- 'EVENT_CATEGORYNAME_C_3',
- 'EVENT_CATEGORYNAME_C_4',
- 'EVENT_CATEGORYNAME_C_5',
- 'EVENT_CATEGORYNAME_C_6',
- 'EVENT_CATEGORYNAME_C_7',
- 'EVENT_CATEGORYNAME_C_8',
- 'EVENT_CATEGORYNAME_C_9',
- 'EVENT_CATEGORYNAME_C_10',
- "EVENT_CANCEL_DIFF_C_0",
- "EVENT_CANCEL_DIFF_C_1",
- "EVENT_CANCEL_DIFF_C_2",
- "EVENT_CANCEL_DIFF_C_3",
- "EVENT_CANCEL_DIFF_C_4",
- "EVENT_CANCEL_DIFF_C_5",
- "EVENT_CANCEL_DIFF_C_6"
- ]
- def draw_roc_auc(y_label, y_test):
- # ROC曲线绘制
- fpr, tpr, thresholds = metrics.roc_curve(y_label, y_test)
- ##计算曲线下面积
- roc_auc = metrics.auc(fpr, tpr)
- ##绘图
- plt.clf()
- plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
- plt.plot([0, 1], [0, 1], 'k--')
- plt.xlim([0.0, 1.0])
- plt.ylim([0.0, 1.0])
- plt.legend(loc="lower right")
- plt.show()
- def draw_confusion_matrix(y_label, y_test):
- # 画出混淆矩阵
- confusion_data = metrics.confusion_matrix(y_label, y_test)
- print(confusion_data)
- sns.heatmap(confusion_data, cmap="Greens", annot=True)
- plt.xlabel("Predicted labels")
- plt.ylabel("True labels")
- plt.tight_layout()
- plt.show()
- def start_train():
- logger.info("准备开始加载数据")
- lgb_model = None
- train_params = {
- 'task': 'train',
- 'objective': 'binary',
- 'boosting_type': 'gbdt',
- 'learning_rate': 0.1,
- 'num_leaves': 30,
- 'tree_learner': 'serial',
- 'metric': {'binary_logloss', 'auc', 'average_precision'}, # l1:mae, l2:mse
- # 'max_bin': 3, # 较小的max_bin会导致更快的速度,较大的值会提高准确性
- 'max_depth': 6,
- # 'min_child_samples': 5,
- "bagging_fraction": 0.8, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度
- "feature_fraction": 0.8, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度
- "n_jobs": 8,
- "boost_from_average": False,
- 'seed': 2022,
- "lambda_l1": 1e-5,
- "lambda_l2": 1e-5,
- }
- total_train_data = pd.read_pickle('total_train_data.pkl')
- # 划分训练集和测试集
- data_train, data_test = train_test_split(total_train_data, train_size=0.7)
- # data_train, data_test = train_test_split(total_train_data, total_train_data['mark'].values, train_size=0.8,
- # random_state=0, stratify=total_train_data['mark'].values)
- # df1 = data_train[data_train['mark']==0]
- # df2 = data_train[data_train['mark']==1]
- # df11 = data_test[data_test['mark']==0]
- # df12 = data_test[data_test['mark']==1]
- train_x_data = data_train[constant.feature_column_names]
- train_x_data.drop(columns=DROP_COLUMNS, inplace=True)
- train_y_data = data_train[['mark']]
- test_x_data = data_test[constant.feature_column_names]
- test_x_data.drop(columns=DROP_COLUMNS, inplace=True)
- test_y_data = data_test[['mark']]
- feature_col = [x for x in constant.feature_column_names if x not in DROP_COLUMNS]
- # 创建lgb的数据集
- lgb_train = lgb.Dataset(train_x_data, train_y_data.values, silent=True)
- lgb_eval = lgb.Dataset(test_x_data, test_y_data.values, reference=lgb_train, silent=True)
- # clf = LGBMClassifier()
- # clf.fit(train_x_data, train_y_data.values)
- # train_predict = clf.predict(train_x_data)
- # test_predict = clf.predict(test_x_data)
- # train_predict_score = metrics.accuracy_score(train_y_data.values, train_predict)
- # test_predict_score = metrics.accuracy_score(test_y_data.values, test_predict)
- # print(train_predict_score)
- # print(test_predict_score)
- # confusion_data = metrics.confusion_matrix(test_y_data.values, test_predict)
- # print(confusion_data)
- # sns.heatmap(confusion_data, cmap="Greens", annot=True)
- # plt.xlabel("Predicted labels")
- # plt.ylabel("True labels")
- # plt.tight_layout()
- # plt.show()
- lgb_model = lgb.train(params=train_params, train_set=lgb_train, num_boost_round=100, valid_sets=lgb_eval,
- init_model=lgb_model, feature_name=feature_col,
- verbose_eval=False, keep_training_booster=True)
- # lgb_model = lgb.cv(params=train_params, train_set=lgb_train, num_boost_round=1000, nfold=5,stratified=False,shuffle=True, early_stopping_rounds=50,verbose_eval=50,show_stdv=True)
- lgb_model.save_model("temp.model")
- # 输出模型评估分数
- score_train = str(dict([(s[1], s[2]) for s in lgb_model.eval_train()]))
- score_valid = str(dict([(s[1], s[2]) for s in lgb_model.eval_valid()]))
- logger.info(f"在训练集上:{score_train}")
- logger.info(f"在测试集上:{score_valid}")
- result = pd.DataFrame({
- 'column': feature_col,
- 'importance': lgb_model.feature_importance(),
- }).sort_values(by='importance')
- sns.barplot(y=train_x_data.columns, x=lgb_model.feature_importance())
- plt.tight_layout()
- plt.show()
- # pass
- test_predict = lgb_model.predict(test_x_data)
- test_result = []
- for x in test_predict:
- if x < 0.5:
- test_result.append(0)
- else:
- test_result.append(1)
- result = metrics.classification_report(test_y_data.values, test_result)
- logger.info(result)
- draw_confusion_matrix(test_y_data.values.reshape([-1,]), test_result)
- draw_roc_auc(test_y_data.values.reshape([-1,]), test_predict)
- #
- # ##计算相关数据:注意返回的结果顺序
- # fpr, tpr, thresholds = metrics.roc_curve(test_y_data.values.reshape([-1,]), test_predict)
- # ##计算曲线下面积
- # roc_auc = metrics.auc(fpr, tpr)
- # ##绘图
- # plt.clf()
- # plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
- # plt.plot([0, 1], [0, 1], 'k--')
- # plt.xlim([0.0, 1.0])
- # plt.ylim([0.0, 1.0])
- # plt.xlabel('False Positive Rate')
- # plt.ylabel('True Positive Rate')
- # plt.legend(loc="lower right")
- # plt.show()
- pass
- def test_data_valid():
- logger.info("准备开始加载数据")
- # # 获取正负样本数量,准备随机id
- tb_pos = constant.insert_pos_tb_test_name
- tb_neg = constant.insert_neg_tb_test_name
- positive_sql = f"select t.* from {tb_pos} t limit 1000000"
- positive_sql_result = click_client.execute(positive_sql)
- negative_sql = f"select t.* from {tb_neg} t limit 1000000"
- negative_sql_result = click_client.execute(negative_sql)
- dataf_pos = pd.DataFrame(positive_sql_result, columns=constant.feature_column_names)
- dataf_pos['mark'] = 0
- dataf_neg = pd.DataFrame(negative_sql_result, columns=constant.feature_column_names)
- dataf_neg['mark'] = 1
- test_all_data = pd.concat([dataf_pos, dataf_neg], axis=0)
- test_x_data = test_all_data[constant.feature_column_names]
- test_x_data.drop(columns=DROP_COLUMNS, inplace=True)
- test_y_data = test_all_data[['mark']]
- lgb_model = lgb.Booster(model_file='temp.model')
- test_predict = lgb_model.predict(test_x_data, num_iteration=lgb_model.best_iteration)
- test_result = []
- for x in test_predict:
- if x < 0.5:
- test_result.append(0)
- else:
- test_result.append(1)
- result = metrics.classification_report(test_y_data.values, test_result)
- logger.info(result)
- draw_confusion_matrix(test_y_data.values, test_result)
- draw_roc_auc(test_y_data.values.reshape([-1,]), test_predict)
- pass
- if __name__ == "__main__":
- # start_train()
- test_data_valid()
|