import pandas as pd from common.log_utils import logFactory from common import constant from common import value_dict from common.database_utils import database_util from tqdm import tqdm from sklearn.preprocessing import OneHotEncoder import numpy as np import math click_client = database_util.get_client() logger = logFactory("preprocess data").log column_names = [ "THEMONTH_V", "uuid", "TAG_AREA_C", "EVENT_CPNAME_C", "EVENT_SPNAME_C", "EVENT_CATEGORYNAME_C", "EVENT_CHANNEL_C", "EVENT_CHANNEL_BELONGTO_C", "EVENT_ORDER_MONTH_C", "EVENT_ORDER_MONTH_V", "EVENT_CANCEL_DIFF_C", "EVENT_CANCEL_DIFF_V", "EVENT_ORDER_PRICE_C", "EVENT_ORDER_PRICE_V", "EVENT_ORDER_FLOWP_C", "EVENT_ORDER_FLOWP_V", "EVENT_ORDER_STATE_C", "EVENT_ORDER_SUMMONTH_C", "EVENT_ORDER_SUMMONTH_V", "TAG_INTIME_C", "TAG_INTIME_V", "TAG_PACKAGE_C", "TAG_PACKAGE_PIC_C", "EVENT_FLUX_C", "EVENT_FLUX_V", "EVENT_CONSUM_C", "EVENT_CONSUM_V", "EVENT_PHONETYPE_C", "EVENT_VIDEO_FLUX_C", "EVENT_VIDEO_FLUX_V", "EVENT_IS_ACCT_C", "TAG_GENDER_C", "TAG_PROVINCE_C", "TAG_NETTYPE_C", "TAG_AGE_C", "EVENT_APP_USE.C", "EVENT_APP_USE.V", "EVENT_USER_OSTATE_C" ] drop_columns = [ "THEMONTH_V", "uuid", "EVENT_CPNAME_C", "EVENT_SPNAME_C", "EVENT_CATEGORYNAME_C", "EVENT_CHANNEL_C", "EVENT_CHANNEL_BELONGTO_C", "EVENT_ORDER_MONTH_C", "EVENT_ORDER_MONTH_V", "EVENT_CANCEL_DIFF_C", "EVENT_CANCEL_DIFF_V", "EVENT_ORDER_PRICE_C", "EVENT_ORDER_PRICE_V", "EVENT_ORDER_FLOWP_C", "EVENT_ORDER_FLOWP_V", "EVENT_ORDER_STATE_C", "EVENT_ORDER_SUMMONTH_C", "EVENT_ORDER_SUMMONTH_V", "TAG_INTIME_V", "TAG_PACKAGE_C", "TAG_PACKAGE_PIC_C", "EVENT_PHONETYPE_C", "TAG_AREA_C", "EVENT_APP_USE.V", "I_appuse" ] fetch_size = 50000 def convert_product_type(value): if value == "无限畅视": value = 10 elif value == "电商卡": value = 9 elif value == "畅视后向": value = 8 elif value == "畅视流量": value = 7 elif value == "沃畅视": value = 6 elif value == "畅视随心选": value = 5 elif value == "沃视频": value = 4 elif value == "小娱八斗": value = 3 elif value == "畅视互娱": value = 2 elif value == "其它": value = 1 else: value = 0 return value # 渠道大类 def convert_channel(value): if value == '互联网渠道': value = 6 elif value == '集团渠道': value = 5 elif value == '自有线上渠道': value = 4 elif value == '省分线下渠道': value = 3 elif value == '社会代理和其他渠道': value = 2 elif value == '省分线上渠道': value = 1 else: value = 0 return value # 订购月份 def convert_order_month(value): if value == '订购1月内': value = 1 elif value == '1-3月': value = 2 elif value == '3-6月': value = 3 elif value == '6-12月': value = 4 elif value == '12-24月': value = 5 elif value == '24月以上': value = 6 else: value = 0 return value # 退订距今时长 def convert_cancel_diff(value): if value == '退订1月内': value = 1 elif value == '退订1-3月': value = 2 elif value == '退订3-6月': value = 3 elif value == '退订6-12月': value = 4 elif value == '退订1年以上': value = 5 elif value == '未退订': value = 6 else: value = 0 return value # 订购价格 def convert_order_price(value): if value == '0元': value = 1 elif value == '1-3元': value = 2 elif value == '3-8元': value = 3 elif value == '8-15元': value = 4 elif value == '15-20元': value = 5 elif value == '20以上': value = 6 else: value = 0 return value # 流量使用比 def convert_order_flow(value): if str(value) in ['5%以下']: value = 1 elif value == '5-20%': value = 2 elif value == '20-50%': value = 3 elif value == '50-80%': value = 4 elif value == '80%以上': value = 5 else: value = 0 return value # 产品订购总时长 def convert_order_sum_month(value): if value == '0-1月': value = 1 elif value == '2-6月': value = 2 elif value == '6-12月': value = 3 elif value == '12-24月': value = 4 elif value == '24月以上': value = 5 else: value = 0 return value # 入网时长 def convert_intime(value): if value == '1年-2年': value = 1 elif value == '2年-5年': value = 2 elif value == '5年-10年': value = 3 elif value == '10年-20年': value = 4 elif value == '20年以上': value = 5 else: value = 0 return value # 当月使用流量 def convert_flux(value): if value == '0-0.5G': value = 2 elif value == '0.5G-2G': value = 3 elif value == '2G-5G': value = 4 elif value == '5G-10G': value = 5 elif value == '10G-15G': value = 6 elif value == '15-30G': value = 7 elif value == '30G以上': value = 8 elif value == '小于等于0': value = 1 else: value = 0 return value # 是否出账 def convert_acct(value): if value == '已出账': value = 0 elif value == '未出账': value = 1 else: value = 2 return value # 月消费 def convert_consume(value): if value == '10元以下': value = 1 elif value == '10-20元': value = 2 elif value == '20-60元': value = 3 elif value == '60-100元': value = 4 elif value == '100-150元': value = 5 elif value == '150-300元': value = 6 elif value == '300元以上': value = 7 else: value = 0 return value def convert_flux_value(value): if value is None or str(value) == "nan": value = 0 elif not constant.is_number(value): value = 0 return value def convert_consume_value(value): if value is None or str(value) == "nan": value = 0 elif not constant.is_number(value): value = 0 return value def convert_video_flux_value(value): if value is None or str(value) == "nan": value = 0 elif not constant.is_number(value): value = 0 return value # 月视频使用流量 def convert_video_flux(value): if value == '0-0.5G': value = 1 elif value == '0.5-2G': value = 2 elif value == '2-5G': value = 3 elif value == '5-10G': value = 4 elif value == '10-15G': value = 5 elif value == '15-30G': value = 6 elif value == '30G以上': value = 7 else: value = 0 return value # 性别 def convert_gender(value): if value == '男': value = 0 elif value == '女': value = 1 else: value = 2 return value # 网络类型 def convert_nettype(value): if value == '2G': value = 1 elif value == '3G': value = 2 elif value == '4G': value = 3 elif value == '5G': value = 4 else: value = 0 return value # 年龄 def convert_age(value): if value == '70前': value = 1 elif value == '70后': value = 2 elif value == '80后': value = 3 elif value == '90后': value = 4 elif value == '00后': value = 5 else: value = 0 return value def get_range_value(range_value): if range_value == '0.1G-0.5G': value = 1 elif range_value == '0.5G-2G': value = 2 elif range_value == '2G-5G': value = 3 elif range_value == '5G-10G': value = 4 elif range_value == '10G-20G': value = 5 elif range_value == '20G以上': value = 6 else: value = 0 return value def convert_app_use_info(value): value = value.tolist()[-2] tencent_app_use = 7 kuaishou_app_use = 7 mangguo_app_use = 7 youku_app_use = 7 iqiyi_app_use = 7 bilibili_app_use = 7 if str(value).lower() == 'nan': pass else: use_list = value for apptype_range in use_list: if len(apptype_range.split("_")) > 1: apptype = apptype_range.split("_")[0] apprange = apptype_range.split("_")[1] if "腾讯视频" in apptype: tencent_app_use = get_range_value(apprange) if "哔哩哔哩" in apptype: bilibili_app_use = get_range_value(apprange) if "爱奇艺" in apptype: iqiyi_app_use = get_range_value(apprange) if "快手" in apptype: kuaishou_app_use = get_range_value(apprange) if "优酷" in apptype: youku_app_use = get_range_value(apprange) if "芒果" in apptype: mangguo_app_use = get_range_value(apprange) return int(tencent_app_use), int(mangguo_app_use), int(youku_app_use), int(iqiyi_app_use), int( bilibili_app_use), int(kuaishou_app_use) # APP使用情况 def convert_appuse(df): app_use_columns = ['app_use_tencent', 'app_use_mangguo', 'app_use_youku', 'app_use_iqiyi', 'app_use_bilibili', 'app_use_kuaishou'] df[app_use_columns] = df.apply(convert_app_use_info, axis=1, result_type="expand") def convert_province(value): return value_dict.area_dict[value] def convert_label(data): logger.info("开始预处理") # data['EVENT_CATEGORYNAME_C'] = data['EVENT_CATEGORYNAME_C'].map(convert_product_type) # data['EVENT_CATEGORYNAME_C'] = data['EVENT_CATEGORYNAME_C'].astype('int') # data['EVENT_CHANNEL_BELONGTO_C'] = data['EVENT_CHANNEL_BELONGTO_C'].map(convert_channel) # data['EVENT_CHANNEL_BELONGTO_C'] = data['EVENT_CHANNEL_BELONGTO_C'].astype('int') # data['EVENT_ORDER_MONTH_C'] = data['EVENT_ORDER_MONTH_C'].map(convert_order_month) # data['EVENT_CANCEL_DIFF_C'] = data['EVENT_CANCEL_DIFF_C'].map(convert_cancel_diff) # data['EVENT_CANCEL_DIFF_C'] = data['EVENT_CANCEL_DIFF_C'].astype('int') # data['EVENT_ORDER_PRICE_C'] = data['EVENT_ORDER_PRICE_C'].map(convert_order_price) # data['EVENT_ORDER_FLOWP_C'] = data['EVENT_ORDER_FLOWP_C'].map(convert_order_flow) # data['EVENT_ORDER_SUMMONTH_C'] = data['EVENT_ORDER_SUMMONTH_C'].map(convert_order_sum_month) data['TAG_PROVINCE_C'] = data['TAG_PROVINCE_C'].map(convert_province) data['TAG_INTIME_C'] = data['TAG_INTIME_C'].map(convert_intime) data['TAG_INTIME_C'] = data['TAG_INTIME_C'].astype('int') data['EVENT_FLUX_C'] = data['EVENT_FLUX_C'].map(convert_flux) data['EVENT_FLUX_V'] = data['EVENT_FLUX_V'].map(convert_flux_value) data['EVENT_FLUX_V'] = data['EVENT_FLUX_V'].astype('float') data['EVENT_FLUX_C'] = data['EVENT_FLUX_C'].astype('int') data['EVENT_CONSUM_C'] = data['EVENT_CONSUM_C'].map(convert_consume) data['EVENT_CONSUM_V'] = data['EVENT_CONSUM_V'].map(convert_consume_value) data['EVENT_CONSUM_C'] = data['EVENT_CONSUM_C'].astype('int') data['EVENT_CONSUM_V'] = data['EVENT_CONSUM_V'].astype('float') data['EVENT_VIDEO_FLUX_C'] = data['EVENT_VIDEO_FLUX_C'].map(convert_video_flux) data['EVENT_VIDEO_FLUX_V'] = data['EVENT_VIDEO_FLUX_V'].map(convert_video_flux_value) data['EVENT_VIDEO_FLUX_V'] = data['EVENT_VIDEO_FLUX_V'].astype('float') data['EVENT_VIDEO_FLUX_C'] = data['EVENT_VIDEO_FLUX_C'].astype('int') data['TAG_GENDER_C'] = data['TAG_GENDER_C'].map(convert_gender) data['TAG_GENDER_C'] = data['TAG_GENDER_C'].astype('int') data['TAG_NETTYPE_C'] = data['TAG_NETTYPE_C'].map(convert_nettype) data['TAG_NETTYPE_C'] = data['TAG_NETTYPE_C'].astype('int') data['TAG_AGE_C'] = data['TAG_AGE_C'].map(convert_age) data['TAG_AGE_C'] = data['TAG_AGE_C'].astype('int') data['EVENT_IS_ACCT_C'] = data['EVENT_IS_ACCT_C'].map(convert_acct) data['EVENT_IS_ACCT_C'] = data['EVENT_IS_ACCT_C'].astype('int') # app使用 convert_appuse(data) data.drop(columns=['EVENT_APP_USE.C'], inplace=True) data.drop(columns=['EVENT_USER_OSTATE_C'], inplace=True) # 性别:0,1,2 # 网络类型: 1,2,3,4,5 np_cates = list(np.array([ [x for x in range(0, 3)], [x for x in range(0, 5)], [x for x in range(0, 6)], [x for x in range(0, 6)], [x for x in range(0, 3)], [x for x in range(0, 9)], [x for x in range(0, 8)], [x for x in range(0, 8)], [x for x in range(0, 32)], [x for x in range(0, 8)], [x for x in range(0, 8)], [x for x in range(0, 8)], [x for x in range(0, 8)], [x for x in range(0, 8)], [x for x in range(0, 8)] ])) enc = OneHotEncoder(categories=np_cates) one_hot_feature_columns = [ "TAG_GENDER_C", "TAG_NETTYPE_C", "TAG_AGE_C", "TAG_INTIME_C", "EVENT_IS_ACCT_C", "EVENT_FLUX_C", "EVENT_CONSUM_C", "EVENT_VIDEO_FLUX_C", "TAG_PROVINCE_C", 'app_use_tencent', 'app_use_mangguo', 'app_use_youku', 'app_use_iqiyi', 'app_use_bilibili', 'app_use_kuaishou' ] def convert_onehot(df): logger.info("开始onehot编码") enc.fit(df[one_hot_feature_columns]) after_onehot_features = enc.get_feature_names(one_hot_feature_columns) one_hot_dataframe = pd.DataFrame(enc.fit_transform(df[one_hot_feature_columns]).toarray().astype("int"), columns=after_onehot_features) df.drop(columns=one_hot_feature_columns, inplace=True) df = pd.concat([df, one_hot_dataframe], axis=1) return df def insert_to_database(dataf, row_index): nums = str(len(dataf)) logger.info(f"开始写入数据库,共{nums}条") # 增加自增列 next_index = row_index + len(dataf) dataf['row_id'] = range(row_index, next_index) dataf['month'] = '202107' # tb_name = 'lu_target_data' tb_name = constant.insert_neg_tb_name cols = ",".join(dataf.columns.tolist()) data = dataf.to_dict("records") click_client.execute(f"INSERT INTO {tb_name} ({cols}) VALUES ", data, types_check=True) return next_index # 760 # range(0,190) row_index = 0 # range(190,380) row_index = 9500000 # range(380,570) row_index = 19000000 # range(570,760) row_index = 28500000 def get_data(row_index, range_start, range_end, part): logger.info("开始处理数据") # for_nums = int(2000000 / fetch_size) # for_nums = math.ceil(36700000 / fetch_size) # for_nums = math.ceil(38000000 / fetch_size) row_index = row_index data_all = [] try: for i in tqdm(range(range_start, range_end)): step = str(i) logger.info(f"第{step}次循环开始处理") start = i * fetch_size # sql = f"select t.* from Z_USER_TAG_FLAT_out t where EVENT_CPNAME_C!='快手' and EVENT_USER_OSTATE_C!='在订' and EVENT_CONSUM_C!='10元以下' and EVENT_VIDEO_FLUX_C!='未知' and TAG_NETTYPE_C!='2G' and position(arrayStringConcat(EVENT_APP_USE.C,','),'快手')!=0 limit {start},{fetch_size} " sql = f"select t.* from l_neg_origin_07 t order by t.uuid limit {fetch_size} offset {start}" all_data = click_client.execute(sql) dataf = pd.DataFrame(all_data, columns=constant.origin_column_names) dataf.drop(columns=drop_columns, inplace=True) # 预处理 convert_label(dataf) # onehot编码 dataf = convert_onehot(dataf) # 写入数据库 row_index = insert_to_database(dataf, row_index) logger.info(f"第{step}次循环处理完毕") data_all.append(dataf) except Exception as e: logger.error(str(e)) data_last = pd.concat(data_all, axis=0) data_last.to_pickle(f"data_neg_part_{part}_0218_07.pkl") data_last = pd.concat(data_all, axis=0) data_last.to_pickle(f"data_neg_part_{part}_0218_07.pkl") if __name__ == "__main__": get_data(0, 0, 27, "1") # get_data(9500000, 190, 285, "2") # get_data(14250000, 285, 380, "3") # get_data(9500000, 190, 380, "4") # get_data(19000000, 380, 475, "5") # get_data(19000000, 475, 570, "6") # get_data(28500000, 570, 760)