|
- import pandas as pd
- from clickhouse_driver import Client
- from common.log_utils import logFactory
- from common import value_dict
- from common.database_utils import database_util
- from tqdm import tqdm
- from sklearn.preprocessing import OneHotEncoder
- import numpy as np
- import math
- from common import constant
- from sklearn import metrics
- import matplotlib.pyplot as plt
- click_client = database_util.get_client()
- logger = logFactory("preprocess data").log
- drop_columns = [
- "THEMONTH_V",
- "uuid",
- "EVENT_CPNAME_C",
- "EVENT_SPNAME_C",
- "EVENT_CATEGORYNAME_C",
- "EVENT_CHANNEL_C",
- "EVENT_CHANNEL_BELONGTO_C",
- "EVENT_ORDER_MONTH_C",
- "EVENT_ORDER_MONTH_V",
- "EVENT_CANCEL_DIFF_C",
- "EVENT_CANCEL_DIFF_V",
- "EVENT_ORDER_PRICE_C",
- "EVENT_ORDER_PRICE_V",
- "EVENT_ORDER_FLOWP_C",
- "EVENT_ORDER_FLOWP_V",
- "EVENT_ORDER_STATE_C",
- "EVENT_ORDER_SUMMONTH_C",
- "EVENT_ORDER_SUMMONTH_V",
- "TAG_INTIME_V",
- "TAG_PACKAGE_C",
- "TAG_PACKAGE_PIC_C",
- "EVENT_PHONETYPE_C",
- "TAG_AREA_C",
- "EVENT_APP_USE.V",
- "I_appuse"
- ]
- fetch_size = 100000
- def convert_product_type(value):
- if value == "无限畅视":
- value = 10
- elif value == "电商卡":
- value = 9
- elif value == "畅视后向":
- value = 8
- elif value == "畅视流量":
- value = 7
- elif value == "沃畅视":
- value = 6
- elif value == "畅视随心选":
- value = 5
- elif value == "沃视频":
- value = 4
- elif value == "小娱八斗":
- value = 3
- elif value == "畅视互娱":
- value = 2
- elif value == "其它":
- value = 1
- else:
- value = 0
- return value
- # 渠道大类
- def convert_channel(value):
- if value == '互联网渠道':
- value = 6
- elif value == '集团渠道':
- value = 5
- elif value == '自有线上渠道':
- value = 4
- elif value == '省分线下渠道':
- value = 3
- elif value == '社会代理和其他渠道':
- value = 2
- elif value == '省分线上渠道':
- value = 1
- else:
- value = 0
- return value
- # 订购月份
- def convert_order_month(value):
- if value == '订购1月内':
- value = 1
- elif value == '1-3月':
- value = 2
- elif value == '3-6月':
- value = 3
- elif value == '6-12月':
- value = 4
- elif value == '12-24月':
- value = 5
- elif value == '24月以上':
- value = 6
- else:
- value = 0
- return value
- # 退订距今时长
- def convert_cancel_diff(value):
- if value == '退订1月内':
- value = 1
- elif value == '退订1-3月':
- value = 2
- elif value == '退订3-6月':
- value = 3
- elif value == '退订6-12月':
- value = 4
- elif value == '退订1年以上':
- value = 5
- elif value == '未退订':
- value = 6
- else:
- value = 0
- return value
- # 订购价格
- def convert_order_price(value):
- if value == '0元':
- value = 1
- elif value == '1-3元':
- value = 2
- elif value == '3-8元':
- value = 3
- elif value == '8-15元':
- value = 4
- elif value == '15-20元':
- value = 5
- elif value == '20以上':
- value = 6
- else:
- value = 0
- return value
- # 流量使用比
- def convert_order_flow(value):
- if str(value) in ['5%以下']:
- value = 1
- elif value == '5-20%':
- value = 2
- elif value == '20-50%':
- value = 3
- elif value == '50-80%':
- value = 4
- elif value == '80%以上':
- value = 5
- else:
- value = 0
- return value
- # 产品订购总时长
- def convert_order_sum_month(value):
- if value == '0-1月':
- value = 1
- elif value == '2-6月':
- value = 2
- elif value == '6-12月':
- value = 3
- elif value == '12-24月':
- value = 4
- elif value == '24月以上':
- value = 5
- else:
- value = 0
- return value
- # 入网时长
- def convert_intime(value):
- if value == '1年-2年':
- value = 1
- elif value == '2年-5年':
- value = 2
- elif value == '5年-10年':
- value = 3
- elif value == '10年-20年':
- value = 4
- elif value == '20年以上':
- value = 5
- else:
- value = 0
- return value
- # 当月使用流量
- def convert_flux(value):
- if value == '0-0.5G':
- value = 2
- elif value == '0.5G-2G':
- value = 3
- elif value == '2G-5G':
- value = 4
- elif value == '5G-10G':
- value = 5
- elif value == '10G-15G':
- value = 6
- elif value == '15-30G':
- value = 7
- elif value == '30G以上':
- value = 8
- elif value == '小于等于0':
- value = 1
- else:
- value = 0
- return value
- def convert_flux_value(value):
- if value is None or str(value) == "nan":
- value = 0
- elif not constant.is_number(value):
- value = 0
- return value
- def convert_consume_value(value):
- if value is None or str(value) == "nan":
- value = 0
- elif not constant.is_number(value):
- value = 0
- return value
- # 月消费
- def convert_consume(value):
- if value == '10元以下':
- value = 1
- elif value == '10-20元':
- value = 2
- elif value == '20-60元':
- value = 3
- elif value == '60-100元':
- value = 4
- elif value == '100-150元':
- value = 5
- elif value == '150-300元':
- value = 6
- elif value == '300元以上':
- value = 7
- else:
- value = 0
- return value
- def convert_video_flux_value(value):
- if value is None or str(value) == "nan":
- value = 0
- elif not constant.is_number(value):
- value = 0
- return value
- # 月视频使用流量
- def convert_video_flux(value):
- if value == '0-0.5G':
- value = 1
- elif value == '0.5-2G':
- value = 2
- elif value == '2-5G':
- value = 3
- elif value == '5-10G':
- value = 4
- elif value == '10-15G':
- value = 5
- elif value == '15-30G':
- value = 6
- elif value == '30G以上':
- value = 7
- else:
- value = 0
- return value
- # 性别
- def convert_gender(value):
- if value == '男':
- value = 0
- elif value == '女':
- value = 1
- else:
- value = 2
- return value
- # 网络类型
- def convert_nettype(value):
- if value == '2G':
- value = 1
- elif value == '3G':
- value = 2
- elif value == '4G':
- value = 3
- elif value == '5G':
- value = 4
- else:
- value = 0
- return value
- # 是否出账
- def convert_acct(value):
- if value == '已出账':
- value = 0
- elif value == '未出账':
- value = 1
- else:
- value = 2
- return value
- # 年龄
- def convert_age(value):
- if value == '70前':
- value = 1
- elif value == '70后':
- value = 2
- elif value == '80后':
- value = 3
- elif value == '90后':
- value = 4
- elif value == '00后':
- value = 5
- else:
- value = 0
- return value
- def get_range_value(range_value):
- if range_value == '0.1G-0.5G':
- value = 1
- elif range_value == '0.5G-2G':
- value = 2
- elif range_value == '2G-5G':
- value = 3
- elif range_value == '5G-10G':
- value = 4
- elif range_value == '10G-20G':
- value = 5
- elif range_value == '20G以上':
- value = 6
- else:
- value = 0
- return value
- def convert_app_use_info(value):
- value = value.tolist()[-2]
- tencent_app_use = 7
- kuaishou_app_use = 7
- mangguo_app_use = 7
- youku_app_use = 7
- iqiyi_app_use = 7
- bilibili_app_use = 7
- if str(value).lower() == 'nan':
- pass
- else:
- use_list = value
- for apptype_range in use_list:
- if len(apptype_range.split("_")) > 1:
- apptype = apptype_range.split("_")[0]
- apprange = apptype_range.split("_")[1]
- if "腾讯视频" in apptype:
- tencent_app_use = get_range_value(apprange)
- if "哔哩哔哩" in apptype:
- bilibili_app_use = get_range_value(apprange)
- if "爱奇艺" in apptype:
- iqiyi_app_use = get_range_value(apprange)
- if "快手" in apptype:
- kuaishou_app_use = get_range_value(apprange)
- if "优酷" in apptype:
- youku_app_use = get_range_value(apprange)
- if "芒果" in apptype:
- mangguo_app_use = get_range_value(apprange)
- return int(tencent_app_use), int(mangguo_app_use), int(youku_app_use), int(iqiyi_app_use), int(
- bilibili_app_use), int(kuaishou_app_use)
- # APP使用情况
- def convert_appuse(df):
- app_use_columns = ['app_use_tencent', 'app_use_mangguo', 'app_use_youku', 'app_use_iqiyi', 'app_use_bilibili',
- 'app_use_kuaishou']
- df[app_use_columns] = df.apply(convert_app_use_info, axis=1, result_type="expand")
- def convert_cpname(value):
- return value_dict.cp_name_dict[value]
- def convert_province(value):
- return value_dict.area_dict[value]
- def convert_label(data):
- logger.info("开始预处理")
- # data['EVENT_CPNAME_C'] = data['EVENT_CPNAME_C'].map(convert_cpname)
- # data['EVENT_CPNAME_C'] = data['EVENT_CPNAME_C'].astype('int')
- # data['EVENT_CATEGORYNAME_C'] = data['EVENT_CATEGORYNAME_C'].map(convert_product_type)
- # data['EVENT_CATEGORYNAME_C'] = data['EVENT_CATEGORYNAME_C'].astype('int')
- # data['EVENT_CHANNEL_BELONGTO_C'] = data['EVENT_CHANNEL_BELONGTO_C'].map(convert_channel)
- # data['EVENT_CHANNEL_BELONGTO_C'] = data['EVENT_CHANNEL_BELONGTO_C'].astype('int')
- # data['EVENT_ORDER_MONTH_C'] = data['EVENT_ORDER_MONTH_C'].map(convert_order_month)
- # data['EVENT_CANCEL_DIFF_C'] = data['EVENT_CANCEL_DIFF_C'].map(convert_cancel_diff)
- # data['EVENT_CANCEL_DIFF_C'] = data['EVENT_CANCEL_DIFF_C'].astype('int')
- # data['EVENT_ORDER_PRICE_C'] = data['EVENT_ORDER_PRICE_C'].map(convert_order_price)
- # data['EVENT_ORDER_PRICE_C'] = data['EVENT_ORDER_PRICE_C'].astype('int')
- # data['EVENT_ORDER_PRICE_V'] = data['EVENT_ORDER_PRICE_V'].astype('float')
- # data['EVENT_ORDER_FLOWP_C'] = data['EVENT_ORDER_FLOWP_C'].map(convert_order_flow)
- # data['EVENT_ORDER_FLOWP_C'] = data['EVENT_ORDER_FLOWP_C'].astype('int')
- # data['EVENT_ORDER_SUMMONTH_C'] = data['EVENT_ORDER_SUMMONTH_C'].map(convert_order_sum_month)
- # data['EVENT_ORDER_SUMMONTH_C'] = data['EVENT_ORDER_SUMMONTH_C'].astype('int')
- data['TAG_PROVINCE_C'] = data['TAG_PROVINCE_C'].map(convert_province)
- data['TAG_INTIME_C'] = data['TAG_INTIME_C'].map(convert_intime)
- data['TAG_INTIME_C'] = data['TAG_INTIME_C'].astype('int')
- data['EVENT_FLUX_C'] = data['EVENT_FLUX_C'].map(convert_flux)
- data['EVENT_FLUX_V'] = data['EVENT_FLUX_V'].map(convert_flux_value)
- data['EVENT_FLUX_V'] = data['EVENT_FLUX_V'].astype('float')
- data['EVENT_FLUX_C'] = data['EVENT_FLUX_C'].astype('int')
- data['EVENT_CONSUM_C'] = data['EVENT_CONSUM_C'].map(convert_consume)
- data['EVENT_CONSUM_V'] = data['EVENT_CONSUM_V'].map(convert_consume_value)
- data['EVENT_CONSUM_C'] = data['EVENT_CONSUM_C'].astype('int')
- data['EVENT_CONSUM_V'] = data['EVENT_CONSUM_V'].astype('float')
- data['EVENT_VIDEO_FLUX_C'] = data['EVENT_VIDEO_FLUX_C'].map(convert_video_flux)
- data['EVENT_VIDEO_FLUX_V'] = data['EVENT_VIDEO_FLUX_V'].map(convert_video_flux_value)
- data['EVENT_VIDEO_FLUX_V'] = data['EVENT_VIDEO_FLUX_V'].astype('float')
- data['EVENT_VIDEO_FLUX_C'] = data['EVENT_VIDEO_FLUX_C'].astype('int')
- data['TAG_GENDER_C'] = data['TAG_GENDER_C'].map(convert_gender)
- data['TAG_GENDER_C'] = data['TAG_GENDER_C'].astype('int')
- data['TAG_NETTYPE_C'] = data['TAG_NETTYPE_C'].map(convert_nettype)
- data['TAG_NETTYPE_C'] = data['TAG_NETTYPE_C'].astype('int')
- data['TAG_AGE_C'] = data['TAG_AGE_C'].map(convert_age)
- data['TAG_AGE_C'] = data['TAG_AGE_C'].astype('int')
- data['EVENT_IS_ACCT_C'] = data['EVENT_IS_ACCT_C'].map(convert_acct)
- data['EVENT_IS_ACCT_C'] = data['EVENT_IS_ACCT_C'].astype('int')
- # app使用
- convert_appuse(data)
- data.drop(columns=['EVENT_APP_USE.C'], inplace=True)
- data.drop(columns=['EVENT_USER_OSTATE_C'], inplace=True)
- # 性别:0,1,2
- # 网络类型: 1,2,3,4,5
- # 性别:0,1,2
- # 网络类型: 1,2,3,4,5
- np_cates = list(np.array([
- [x for x in range(0, 3)],
- [x for x in range(0, 5)],
- [x for x in range(0, 6)],
- [x for x in range(0, 6)],
- [x for x in range(0, 3)],
- [x for x in range(0, 9)],
- [x for x in range(0, 8)],
- [x for x in range(0, 8)],
- [x for x in range(0, 32)],
- [x for x in range(0, 8)],
- [x for x in range(0, 8)],
- [x for x in range(0, 8)],
- [x for x in range(0, 8)],
- [x for x in range(0, 8)],
- [x for x in range(0, 8)]
- ]))
- enc = OneHotEncoder(categories=np_cates)
- one_hot_feature_columns = [
- "TAG_GENDER_C",
- "TAG_NETTYPE_C",
- "TAG_AGE_C",
- "TAG_INTIME_C",
- "EVENT_IS_ACCT_C",
- "EVENT_FLUX_C",
- "EVENT_CONSUM_C",
- "EVENT_VIDEO_FLUX_C",
- "TAG_PROVINCE_C",
- 'app_use_tencent', 'app_use_mangguo', 'app_use_youku', 'app_use_iqiyi', 'app_use_bilibili',
- 'app_use_kuaishou'
- ]
- def convert_onehot(df):
- logger.info("开始onehot编码")
- enc.fit(df[one_hot_feature_columns])
- after_onehot_features = enc.get_feature_names(one_hot_feature_columns)
- one_hot_dataframe = pd.DataFrame(enc.fit_transform(df[one_hot_feature_columns]).toarray().astype("int"),
- columns=after_onehot_features)
- df.drop(columns=one_hot_feature_columns, inplace=True)
- df = pd.concat([df, one_hot_dataframe], axis=1)
- return df
- def insert_to_database(dataf, row_index):
- nums = str(len(dataf))
- logger.info(f"开始写入数据库,共{nums}条")
- # 增加自增列
- next_index = row_index + len(dataf)
- dataf['row_id'] = range(row_index, next_index)
- dataf['month'] = '202107'
- tb_name = constant.insert_pos_tb_name
- # tb_name = 'lu_example_data'
- cols = ",".join(dataf.columns.tolist())
- data = dataf.to_dict("records")
- click_client.execute(f"INSERT INTO {tb_name} ({cols}) VALUES ", data, types_check=True)
- return next_index
- def draw_roc_auc(y_label, y_test):
- # ROC曲线绘制
- fpr, tpr, thresholds = metrics.roc_curve(y_label, y_test)
- ##计算曲线下面积
- roc_auc = metrics.auc(fpr, tpr)
- ##绘图
- plt.clf()
- plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
- plt.plot([0, 1], [0, 1], 'k--')
- plt.xlim([0.0, 1.0])
- plt.ylim([0.0, 1.0])
- plt.legend(loc="lower right")
- plt.show()
- def get_data():
- logger.info("开始处理数据")
- for_nums = math.ceil(2100000 / fetch_size)
- # for_nums = math.ceil(2000000 / fetch_size)
- # for_nums = int(119000000/fetch_size)
- row_index = 0
- data_all = []
- try:
- for i in tqdm(range(0, for_nums)):
- step = str(i)
- logger.info(f"第{step}次循环开始处理")
- start = i * fetch_size
- # sql = f"select t.* from Z_USER_TAG_FLAT_out t where EVENT_CPNAME_C='快手' and EVENT_USER_OSTATE_C='在订' limit 50 "
- # sql = f"select t.* from Z_USER_TAG_FLAT_out t where EVENT_CPNAME_C!='快手' and EVENT_USER_OSTATE_C!='在订' limit {start},{fetch_size} "
- # sql = f"select t.* from l_pos_origin_06 t order by uuid limit {fetch_size} offset {start}"
- sql = f"select t.* from l_pos_origin_07 t order by uuid limit {fetch_size} offset {start}"
- all_data = click_client.execute(sql)
- dataf = pd.DataFrame(all_data, columns=constant.origin_column_names)
- dataf.drop(columns=drop_columns, inplace=True)
- # 预处理
- convert_label(dataf)
- # onehot编码
- dataf = convert_onehot(dataf)
- # 写入数据库
- row_index = insert_to_database(dataf, row_index)
- logger.info(f"第{step}次循环处理完毕")
- data_all.append(dataf)
- except Exception as e:
- data_last = pd.concat(data_all, axis=0)
- # data_last.to_pickle("data_pos_0213_07.pkl")
- data_last = pd.concat(data_all, axis=0)
- data_last.to_pickle("data_pos_0218_07.pkl")
- logger.info(str(data_last.shape))
- if __name__ == "__main__":
- get_data()
- # df = pd.read_pickle("data_pos_0213_07.pkl")
- pass
|