from common.log_utils import logFactory from common.database_utils import database_util from common import constant import pandas as pd from tqdm import tqdm from random import shuffle from sklearn.model_selection import train_test_split import lightgbm as lgb import torch import torch.nn as nn import torch.utils.data as data_utils from torch.utils.data import Dataset, DataLoader logger = logFactory("local preprocess data").log # click_client = database_util.get_client() # # 获取正负样本数量,准备随机id # tb_pos = constant.insert_pos_tb_name # tb_neg = constant.insert_neg_tb_name # positive_sql = f"select count(1) from {tb_pos}" # positive_sql_result = click_client.execute(positive_sql)[0][0] # positive_ids = range(0, positive_sql_result) # negative_sql = f"select count(1) from {tb_neg}" # negative_sql_result = click_client.execute(negative_sql)[0][0] # negative_ids = range(0, negative_sql_result) # # # 分组数量 batch_num_pos = 10 batch_num_neg = 150 # 根据每个批次的数据量计算出每个批次的row_id def partition_preserve_order(list_in, n): indices = list(range(len(list_in))) shuffle(indices) index_partitions = [sorted(indices[i::n]) for i in range(n)] return [[list_in[i] for i in index_partition] for index_partition in index_partitions] def gen_train_tuple(pos_row_ids, neg_row_ids): result = [] for i, e, in enumerate(neg_row_ids): pos_index = i % batch_num_pos result.append((pos_row_ids[pos_index], neg_row_ids[i])) return result # batch_num_pos = 10 # batch_num_neg = 5 total_pos = pd.read_pickle("data_pos_2013.pkl") total_neg = pd.read_pickle("data_neg_2013.pkl") # 1862547 positive_ids = range(0, total_pos.shape[0]) # 28500000 negative_ids = range(0, total_neg.shape[0]) pos_row_ids = partition_preserve_order(positive_ids, batch_num_pos) neg_row_ids = partition_preserve_order(negative_ids, batch_num_neg) train_tuple = gen_train_tuple(pos_row_ids, neg_row_ids) ''' 搭建神经网络, 输入层包括2个节点,两个隐层均包含5个节点,输出层包括1个节点。 ''' net = nn.Sequential( nn.Linear(2,5), # 输入层与第一隐层结点数设置,全连接结构 torch.nn.Sigmoid(), # 第一隐层激活函数采用sigmoid nn.Linear(5,5), # 第一隐层与第二隐层结点数设置,全连接结构 torch.nn.Sigmoid(), # 第一隐层激活函数采用sigmoid nn.Linear(5,2), # 第二隐层与输出层层结点数设置,全连接结构 nn.Softmax(dim=1) # 由于有两个概率输出,因此对其使用Softmax进行概率归一化 ) # 配置损失函数和优化器 optimizer = torch.optim.SGD(net.parameters(),lr=0.01) # 优化器使用随机梯度下降,传入网络参数和学习率 loss_func = torch.nn.CrossEntropyLoss() # 损失函数使用交叉熵损失函数 def start_train(): for i, data_tuple in enumerate(tqdm(train_tuple)): step_num = str(i) logger.info(f"开始第{step_num}轮") pos_ids = data_tuple[0] neg_ids = data_tuple[1] train_data_frame_pos = total_pos[total_pos.row_id.isin(pos_ids)] train_data_frame_neg = total_neg[total_neg.row_id.isin(neg_ids)] train_data_frame_pos['mark'] = 0 train_data_frame_neg['mark'] = 1 total_train_data = pd.concat([train_data_frame_pos, train_data_frame_neg], axis=0) # 划分训练集和测试集 data_train, data_test = train_test_split(total_train_data, train_size=0.7) train_y_data = data_train[['mark']] train_x_data = data_train.drop(columns=['row_id', 'month', 'mark']) x_train = torch.from_numpy(train_x_data.values) y_train = torch.from_numpy(train_y_data.values) test_x_data = data_test.drop(columns=['row_id', 'month', 'mark']) test_y_data = data_test[['mark']] y_p = net(x_train) loss = loss_func(y_p, y_train.long()) # 计算损失 optimizer.zero_grad() # 清除梯度 loss.backward() # 计算梯度,误差回传 optimizer.step() # 根据计算的梯度,更新网络中的参数 if i % 1000 == 0: print('epoch: {}, loss: {}'.format(i, loss.data.item())) if __name__ == "__main__": start_train() # data1 = pd.read_pickle("./data_neg_part_1_2013.pkl") # data2 = pd.read_pickle("./data_neg_part_2_2013.pkl") # data3 = pd.read_pickle("./data_neg_part_3_2013.pkl") # data4 = pd.read_pickle("./data_neg_part_4_2013.pkl") # data5 = pd.read_pickle("./data_neg_part_5_2013.pkl") # data_all = pd.concat([data1, data2, data3, data4], axis=0) # data_all.to_pickle("./data_neg_2013.pkl") pass