from common.log_utils import logFactory from common.database_utils import database_util from common import constant import pandas as pd from tqdm import tqdm from random import shuffle from sklearn.model_selection import train_test_split import lightgbm as lgb from sklearn import metrics import matplotlib.pyplot as plt import seaborn as sns logger = logFactory("local preprocess data").log # click_client = database_util.get_client() # # 获取正负样本数量,准备随机id # tb_pos = constant.insert_pos_tb_name # tb_neg = constant.insert_neg_tb_name # positive_sql = f"select count(1) from {tb_pos}" # positive_sql_result = click_client.execute(positive_sql)[0][0] # positive_ids = range(0, positive_sql_result) # negative_sql = f"select count(1) from {tb_neg}" # negative_sql_result = click_client.execute(negative_sql)[0][0] # negative_ids = range(0, negative_sql_result) # # # 分组数量 batch_num_pos = 1 batch_num_neg = 13 # 根据每个批次的数据量计算出每个批次的row_id def partition_preserve_order(list_in, n): indices = list(range(len(list_in))) shuffle(indices) index_partitions = [sorted(indices[i::n]) for i in range(n)] return [[list_in[i] for i in index_partition] for index_partition in index_partitions] def gen_train_tuple(pos_row_ids, neg_row_ids): result = [] for i, e, in enumerate(neg_row_ids): pos_index = i % batch_num_pos result.append((pos_row_ids[pos_index], neg_row_ids[i])) return result # batch_num_pos = 10 # batch_num_neg = 5 total_pos = pd.read_pickle("data_pos_0218_06.pkl") total_neg = pd.read_pickle("data_neg_part_1_0218_06.pkl") positive_ids = range(0, total_pos.shape[0]) negative_ids = range(0, total_neg.shape[0]) pos_row_ids = partition_preserve_order(positive_ids, batch_num_pos) neg_row_ids = partition_preserve_order(negative_ids, batch_num_neg) train_tuple = gen_train_tuple(pos_row_ids, neg_row_ids) def draw_roc_auc(y_label, y_test): # ROC曲线绘制 fpr, tpr, thresholds = metrics.roc_curve(y_label, y_test) ##计算曲线下面积 roc_auc = metrics.auc(fpr, tpr) ##绘图 plt.clf() plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc) plt.plot([0, 1], [0, 1], 'k--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.0]) plt.legend(loc="lower right") plt.show() drop_columns = [ 'row_id', 'month', 'mark', 'EVENT_FLUX_V', 'EVENT_CONSUM_V', 'EVENT_VIDEO_FLUX_V', # "EVENT_FLUX_C_0", # "EVENT_FLUX_C_1", # "EVENT_FLUX_C_2", # "EVENT_FLUX_C_3", # "EVENT_FLUX_C_4", # "EVENT_FLUX_C_5", # "EVENT_FLUX_C_6", # "EVENT_FLUX_C_7", # "EVENT_FLUX_C_8", # "EVENT_CONSUM_C_0", # "EVENT_CONSUM_C_1", # "EVENT_CONSUM_C_2", # "EVENT_CONSUM_C_3", # "EVENT_CONSUM_C_4", # "EVENT_CONSUM_C_5", # "EVENT_CONSUM_C_6", # "EVENT_CONSUM_C_7", # "EVENT_VIDEO_FLUX_C_0", # "EVENT_VIDEO_FLUX_C_1", # "EVENT_VIDEO_FLUX_C_2", # "EVENT_VIDEO_FLUX_C_3", # "EVENT_VIDEO_FLUX_C_4", # "EVENT_VIDEO_FLUX_C_5", # "EVENT_VIDEO_FLUX_C_6", # "EVENT_VIDEO_FLUX_C_7", ] def start_train(): lgb_model = None train_params = { 'task': 'train', 'objective': 'binary', 'boosting_type': 'gbdt', 'learning_rate': 0.1, 'num_leaves': 10, 'tree_learner': 'serial', 'metric': {'binary_logloss', 'auc', 'average_precision'}, # l1:mae, l2:mse 'max_bin': 20, # 较小的max_bin会导致更快的速度,较大的值会提高准确性 'max_depth': 6, # 'min_child_samples': 5, # "min_data_in_leaf": 10, "bagging_fraction": 0.8, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度 "feature_fraction": 0.8, # 样本采样比例,同 XGBoost ,调小可以防止过拟合,加快运算速度 "n_jobs": 8, "boost_from_average": False, 'seed': 2022, "lambda_l1": 1e-5, "lambda_l2": 1e-5, } for i, data_tuple in enumerate(tqdm(train_tuple)): step_num = str(i) logger.info(f"开始第{step_num}轮") pos_ids = data_tuple[0] neg_ids = data_tuple[1] train_data_frame_pos = total_pos[total_pos.row_id.isin(pos_ids)] train_data_frame_neg = total_neg[total_neg.row_id.isin(neg_ids)] train_data_frame_pos['mark'] = 0 train_data_frame_neg['mark'] = 1 total_train_data = pd.concat([train_data_frame_pos, train_data_frame_neg], axis=0) # 划分训练集和测试集 data_train, data_test = train_test_split(total_train_data, train_size=0.6) train_y_data = data_train[['mark']] train_x_data = data_train.drop(columns=drop_columns) test_y_data = data_test[['mark']] test_x_data = data_test.drop(columns=drop_columns) # 创建lgb的数据集 lgb_train = lgb.Dataset(train_x_data, train_y_data.values, silent=True) lgb_eval = lgb.Dataset(test_x_data, test_y_data.values, reference=lgb_train, silent=True) lgb_model = lgb.train(params=train_params, train_set=lgb_train, num_boost_round=100, valid_sets=lgb_eval, init_model=lgb_model, feature_name=train_x_data.columns.tolist(), early_stopping_rounds=20, verbose_eval=False, keep_training_booster=True) # 输出模型评估分数 score_train = str(dict([(s[1], s[2]) for s in lgb_model.eval_train()])) score_valid = str(dict([(s[1], s[2]) for s in lgb_model.eval_valid()])) logger.info(f"第{step_num}轮的结果如下:") logger.info(f"在训练集上:{score_train}") logger.info(f"在测试集上:{score_valid}") if i == len(train_tuple) - 1: test_predict = lgb_model.predict(test_x_data) test_result = [] for x in test_predict: if x < 0.5: test_result.append(0) else: test_result.append(1) result = metrics.classification_report(test_y_data.values, test_result) logger.info(result) draw_roc_auc(test_y_data.values.reshape([-1, ]), test_predict) sns.set(rc={'figure.figsize': (50, 50)}) sns.barplot(y=train_x_data.columns, x=lgb_model.feature_importance()) plt.show() lgb_model.save_model("temp0218-2.model") def draw_confusion_matrix(y_label, y_test): # 画出混淆矩阵 confusion_data = metrics.confusion_matrix(y_label, y_test) print(confusion_data) sns.heatmap(confusion_data, cmap="Greens", annot=True) plt.xlabel("Predicted labels") plt.ylabel("True labels") plt.tight_layout() plt.show() def test_data_valid(): logger.info("准备开始加载数据") # # 获取正负样本数量,准备随机id tb_pos = constant.insert_pos_tb_test_name tb_neg = constant.insert_neg_tb_test_name dataf_pos = pd.read_pickle('data_pos_0218_07.pkl') dataf_pos['mark'] = 0 dataf_neg = pd.read_pickle('data_neg_part_1_0218_07.pkl') dataf_neg['mark'] = 1 test_all_data = pd.concat([dataf_pos, dataf_neg], axis=0) test_x_data = test_all_data test_y_data = test_all_data[['mark']] test_x_data.drop(columns=drop_columns, inplace=True) lgb_model = lgb.Booster(model_file='temp0218-2.model') test_predict = lgb_model.predict(test_x_data, num_iteration=lgb_model.best_iteration) test_result = [] for x in test_predict: if x < 0.5: test_result.append(0) else: test_result.append(1) result = metrics.classification_report(test_y_data.values, test_result) logger.info(result) draw_confusion_matrix(test_y_data.values, test_result) draw_roc_auc(test_y_data.values.reshape([-1, ]), test_predict) if __name__ == "__main__": # start_train() test_data_valid() # data1 = pd.read_pickle("./data_neg_part_1_2013.pkl") # data2 = pd.read_pickle("./data_neg_part_2_2013.pkl") # data3 = pd.read_pickle("./data_neg_part_3_2013.pkl") # data4 = pd.read_pickle("./data_neg_part_4_2013.pkl") # data5 = pd.read_pickle("./data_neg_part_5_2013.pkl") # data_all = pd.concat([data1, data2, data3, data4], axis=0) # data_all.to_pickle("./data_neg_2013.pkl") pass