data_preprocess.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. import pandas as pd
  2. from clickhouse_driver import Client
  3. from common.log_utils import logFactory
  4. from common import value_dict
  5. from common.database_utils import database_util
  6. from tqdm import tqdm
  7. from sklearn.preprocessing import OneHotEncoder
  8. import numpy as np
  9. import math
  10. from common import constant
  11. from sklearn import metrics
  12. import matplotlib.pyplot as plt
  13. click_client = database_util.get_client()
  14. logger = logFactory("preprocess data").log
  15. drop_columns = [
  16. "THEMONTH_V",
  17. "uuid",
  18. "EVENT_CPNAME_C",
  19. "EVENT_SPNAME_C",
  20. "EVENT_CATEGORYNAME_C",
  21. "EVENT_CHANNEL_C",
  22. "EVENT_CHANNEL_BELONGTO_C",
  23. "EVENT_ORDER_MONTH_C",
  24. "EVENT_ORDER_MONTH_V",
  25. "EVENT_CANCEL_DIFF_C",
  26. "EVENT_CANCEL_DIFF_V",
  27. "EVENT_ORDER_PRICE_C",
  28. "EVENT_ORDER_PRICE_V",
  29. "EVENT_ORDER_FLOWP_C",
  30. "EVENT_ORDER_FLOWP_V",
  31. "EVENT_ORDER_STATE_C",
  32. "EVENT_ORDER_SUMMONTH_C",
  33. "EVENT_ORDER_SUMMONTH_V",
  34. "TAG_INTIME_V",
  35. "TAG_PACKAGE_C",
  36. "TAG_PACKAGE_PIC_C",
  37. "EVENT_PHONETYPE_C",
  38. "TAG_AREA_C",
  39. "EVENT_APP_USE.V",
  40. "I_appuse"
  41. ]
  42. fetch_size = 100000
  43. def convert_product_type(value):
  44. if value == "无限畅视":
  45. value = 10
  46. elif value == "电商卡":
  47. value = 9
  48. elif value == "畅视后向":
  49. value = 8
  50. elif value == "畅视流量":
  51. value = 7
  52. elif value == "沃畅视":
  53. value = 6
  54. elif value == "畅视随心选":
  55. value = 5
  56. elif value == "沃视频":
  57. value = 4
  58. elif value == "小娱八斗":
  59. value = 3
  60. elif value == "畅视互娱":
  61. value = 2
  62. elif value == "其它":
  63. value = 1
  64. else:
  65. value = 0
  66. return value
  67. # 渠道大类
  68. def convert_channel(value):
  69. if value == '互联网渠道':
  70. value = 6
  71. elif value == '集团渠道':
  72. value = 5
  73. elif value == '自有线上渠道':
  74. value = 4
  75. elif value == '省分线下渠道':
  76. value = 3
  77. elif value == '社会代理和其他渠道':
  78. value = 2
  79. elif value == '省分线上渠道':
  80. value = 1
  81. else:
  82. value = 0
  83. return value
  84. # 订购月份
  85. def convert_order_month(value):
  86. if value == '订购1月内':
  87. value = 1
  88. elif value == '1-3月':
  89. value = 2
  90. elif value == '3-6月':
  91. value = 3
  92. elif value == '6-12月':
  93. value = 4
  94. elif value == '12-24月':
  95. value = 5
  96. elif value == '24月以上':
  97. value = 6
  98. else:
  99. value = 0
  100. return value
  101. # 退订距今时长
  102. def convert_cancel_diff(value):
  103. if value == '退订1月内':
  104. value = 1
  105. elif value == '退订1-3月':
  106. value = 2
  107. elif value == '退订3-6月':
  108. value = 3
  109. elif value == '退订6-12月':
  110. value = 4
  111. elif value == '退订1年以上':
  112. value = 5
  113. elif value == '未退订':
  114. value = 6
  115. else:
  116. value = 0
  117. return value
  118. # 订购价格
  119. def convert_order_price(value):
  120. if value == '0元':
  121. value = 1
  122. elif value == '1-3元':
  123. value = 2
  124. elif value == '3-8元':
  125. value = 3
  126. elif value == '8-15元':
  127. value = 4
  128. elif value == '15-20元':
  129. value = 5
  130. elif value == '20以上':
  131. value = 6
  132. else:
  133. value = 0
  134. return value
  135. # 流量使用比
  136. def convert_order_flow(value):
  137. if str(value) in ['5%以下']:
  138. value = 1
  139. elif value == '5-20%':
  140. value = 2
  141. elif value == '20-50%':
  142. value = 3
  143. elif value == '50-80%':
  144. value = 4
  145. elif value == '80%以上':
  146. value = 5
  147. else:
  148. value = 0
  149. return value
  150. # 产品订购总时长
  151. def convert_order_sum_month(value):
  152. if value == '0-1月':
  153. value = 1
  154. elif value == '2-6月':
  155. value = 2
  156. elif value == '6-12月':
  157. value = 3
  158. elif value == '12-24月':
  159. value = 4
  160. elif value == '24月以上':
  161. value = 5
  162. else:
  163. value = 0
  164. return value
  165. # 入网时长
  166. def convert_intime(value):
  167. if value == '1年-2年':
  168. value = 1
  169. elif value == '2年-5年':
  170. value = 2
  171. elif value == '5年-10年':
  172. value = 3
  173. elif value == '10年-20年':
  174. value = 4
  175. elif value == '20年以上':
  176. value = 5
  177. else:
  178. value = 0
  179. return value
  180. # 当月使用流量
  181. def convert_flux(value):
  182. if value == '0-0.5G':
  183. value = 2
  184. elif value == '0.5G-2G':
  185. value = 3
  186. elif value == '2G-5G':
  187. value = 4
  188. elif value == '5G-10G':
  189. value = 5
  190. elif value == '10G-15G':
  191. value = 6
  192. elif value == '15-30G':
  193. value = 7
  194. elif value == '30G以上':
  195. value = 8
  196. elif value == '小于等于0':
  197. value = 1
  198. else:
  199. value = 0
  200. return value
  201. def convert_flux_value(value):
  202. if value is None or str(value) == "nan":
  203. value = 0
  204. elif not constant.is_number(value):
  205. value = 0
  206. return value
  207. def convert_consume_value(value):
  208. if value is None or str(value) == "nan":
  209. value = 0
  210. elif not constant.is_number(value):
  211. value = 0
  212. return value
  213. # 月消费
  214. def convert_consume(value):
  215. if value == '10元以下':
  216. value = 1
  217. elif value == '10-20元':
  218. value = 2
  219. elif value == '20-60元':
  220. value = 3
  221. elif value == '60-100元':
  222. value = 4
  223. elif value == '100-150元':
  224. value = 5
  225. elif value == '150-300元':
  226. value = 6
  227. elif value == '300元以上':
  228. value = 7
  229. else:
  230. value = 0
  231. return value
  232. def convert_video_flux_value(value):
  233. if value is None or str(value) == "nan":
  234. value = 0
  235. elif not constant.is_number(value):
  236. value = 0
  237. return value
  238. # 月视频使用流量
  239. def convert_video_flux(value):
  240. if value == '0-0.5G':
  241. value = 1
  242. elif value == '0.5-2G':
  243. value = 2
  244. elif value == '2-5G':
  245. value = 3
  246. elif value == '5-10G':
  247. value = 4
  248. elif value == '10-15G':
  249. value = 5
  250. elif value == '15-30G':
  251. value = 6
  252. elif value == '30G以上':
  253. value = 7
  254. else:
  255. value = 0
  256. return value
  257. # 性别
  258. def convert_gender(value):
  259. if value == '男':
  260. value = 0
  261. elif value == '女':
  262. value = 1
  263. else:
  264. value = 2
  265. return value
  266. # 网络类型
  267. def convert_nettype(value):
  268. if value == '2G':
  269. value = 1
  270. elif value == '3G':
  271. value = 2
  272. elif value == '4G':
  273. value = 3
  274. elif value == '5G':
  275. value = 4
  276. else:
  277. value = 0
  278. return value
  279. # 是否出账
  280. def convert_acct(value):
  281. if value == '已出账':
  282. value = 0
  283. elif value == '未出账':
  284. value = 1
  285. else:
  286. value = 2
  287. return value
  288. # 年龄
  289. def convert_age(value):
  290. if value == '70前':
  291. value = 1
  292. elif value == '70后':
  293. value = 2
  294. elif value == '80后':
  295. value = 3
  296. elif value == '90后':
  297. value = 4
  298. elif value == '00后':
  299. value = 5
  300. else:
  301. value = 0
  302. return value
  303. def get_range_value(range_value):
  304. if range_value == '0.1G-0.5G':
  305. value = 1
  306. elif range_value == '0.5G-2G':
  307. value = 2
  308. elif range_value == '2G-5G':
  309. value = 3
  310. elif range_value == '5G-10G':
  311. value = 4
  312. elif range_value == '10G-20G':
  313. value = 5
  314. elif range_value == '20G以上':
  315. value = 6
  316. else:
  317. value = 0
  318. return value
  319. def convert_app_use_info(value):
  320. value = value.tolist()[-2]
  321. tencent_app_use = 7
  322. kuaishou_app_use = 7
  323. mangguo_app_use = 7
  324. youku_app_use = 7
  325. iqiyi_app_use = 7
  326. bilibili_app_use = 7
  327. if str(value).lower() == 'nan':
  328. pass
  329. else:
  330. use_list = value
  331. for apptype_range in use_list:
  332. if len(apptype_range.split("_")) > 1:
  333. apptype = apptype_range.split("_")[0]
  334. apprange = apptype_range.split("_")[1]
  335. if "腾讯视频" in apptype:
  336. tencent_app_use = get_range_value(apprange)
  337. if "哔哩哔哩" in apptype:
  338. bilibili_app_use = get_range_value(apprange)
  339. if "爱奇艺" in apptype:
  340. iqiyi_app_use = get_range_value(apprange)
  341. if "快手" in apptype:
  342. kuaishou_app_use = get_range_value(apprange)
  343. if "优酷" in apptype:
  344. youku_app_use = get_range_value(apprange)
  345. if "芒果" in apptype:
  346. mangguo_app_use = get_range_value(apprange)
  347. return int(tencent_app_use), int(mangguo_app_use), int(youku_app_use), int(iqiyi_app_use), int(
  348. bilibili_app_use), int(kuaishou_app_use)
  349. # APP使用情况
  350. def convert_appuse(df):
  351. app_use_columns = ['app_use_tencent', 'app_use_mangguo', 'app_use_youku', 'app_use_iqiyi', 'app_use_bilibili',
  352. 'app_use_kuaishou']
  353. df[app_use_columns] = df.apply(convert_app_use_info, axis=1, result_type="expand")
  354. def convert_cpname(value):
  355. return value_dict.cp_name_dict[value]
  356. def convert_province(value):
  357. return value_dict.area_dict[value]
  358. def convert_label(data):
  359. logger.info("开始预处理")
  360. # data['EVENT_CPNAME_C'] = data['EVENT_CPNAME_C'].map(convert_cpname)
  361. # data['EVENT_CPNAME_C'] = data['EVENT_CPNAME_C'].astype('int')
  362. # data['EVENT_CATEGORYNAME_C'] = data['EVENT_CATEGORYNAME_C'].map(convert_product_type)
  363. # data['EVENT_CATEGORYNAME_C'] = data['EVENT_CATEGORYNAME_C'].astype('int')
  364. # data['EVENT_CHANNEL_BELONGTO_C'] = data['EVENT_CHANNEL_BELONGTO_C'].map(convert_channel)
  365. # data['EVENT_CHANNEL_BELONGTO_C'] = data['EVENT_CHANNEL_BELONGTO_C'].astype('int')
  366. # data['EVENT_ORDER_MONTH_C'] = data['EVENT_ORDER_MONTH_C'].map(convert_order_month)
  367. # data['EVENT_CANCEL_DIFF_C'] = data['EVENT_CANCEL_DIFF_C'].map(convert_cancel_diff)
  368. # data['EVENT_CANCEL_DIFF_C'] = data['EVENT_CANCEL_DIFF_C'].astype('int')
  369. # data['EVENT_ORDER_PRICE_C'] = data['EVENT_ORDER_PRICE_C'].map(convert_order_price)
  370. # data['EVENT_ORDER_PRICE_C'] = data['EVENT_ORDER_PRICE_C'].astype('int')
  371. # data['EVENT_ORDER_PRICE_V'] = data['EVENT_ORDER_PRICE_V'].astype('float')
  372. # data['EVENT_ORDER_FLOWP_C'] = data['EVENT_ORDER_FLOWP_C'].map(convert_order_flow)
  373. # data['EVENT_ORDER_FLOWP_C'] = data['EVENT_ORDER_FLOWP_C'].astype('int')
  374. # data['EVENT_ORDER_SUMMONTH_C'] = data['EVENT_ORDER_SUMMONTH_C'].map(convert_order_sum_month)
  375. # data['EVENT_ORDER_SUMMONTH_C'] = data['EVENT_ORDER_SUMMONTH_C'].astype('int')
  376. data['TAG_PROVINCE_C'] = data['TAG_PROVINCE_C'].map(convert_province)
  377. data['TAG_INTIME_C'] = data['TAG_INTIME_C'].map(convert_intime)
  378. data['TAG_INTIME_C'] = data['TAG_INTIME_C'].astype('int')
  379. data['EVENT_FLUX_C'] = data['EVENT_FLUX_C'].map(convert_flux)
  380. data['EVENT_FLUX_V'] = data['EVENT_FLUX_V'].map(convert_flux_value)
  381. data['EVENT_FLUX_V'] = data['EVENT_FLUX_V'].astype('float')
  382. data['EVENT_FLUX_C'] = data['EVENT_FLUX_C'].astype('int')
  383. data['EVENT_CONSUM_C'] = data['EVENT_CONSUM_C'].map(convert_consume)
  384. data['EVENT_CONSUM_V'] = data['EVENT_CONSUM_V'].map(convert_consume_value)
  385. data['EVENT_CONSUM_C'] = data['EVENT_CONSUM_C'].astype('int')
  386. data['EVENT_CONSUM_V'] = data['EVENT_CONSUM_V'].astype('float')
  387. data['EVENT_VIDEO_FLUX_C'] = data['EVENT_VIDEO_FLUX_C'].map(convert_video_flux)
  388. data['EVENT_VIDEO_FLUX_V'] = data['EVENT_VIDEO_FLUX_V'].map(convert_video_flux_value)
  389. data['EVENT_VIDEO_FLUX_V'] = data['EVENT_VIDEO_FLUX_V'].astype('float')
  390. data['EVENT_VIDEO_FLUX_C'] = data['EVENT_VIDEO_FLUX_C'].astype('int')
  391. data['TAG_GENDER_C'] = data['TAG_GENDER_C'].map(convert_gender)
  392. data['TAG_GENDER_C'] = data['TAG_GENDER_C'].astype('int')
  393. data['TAG_NETTYPE_C'] = data['TAG_NETTYPE_C'].map(convert_nettype)
  394. data['TAG_NETTYPE_C'] = data['TAG_NETTYPE_C'].astype('int')
  395. data['TAG_AGE_C'] = data['TAG_AGE_C'].map(convert_age)
  396. data['TAG_AGE_C'] = data['TAG_AGE_C'].astype('int')
  397. data['EVENT_IS_ACCT_C'] = data['EVENT_IS_ACCT_C'].map(convert_acct)
  398. data['EVENT_IS_ACCT_C'] = data['EVENT_IS_ACCT_C'].astype('int')
  399. # app使用
  400. convert_appuse(data)
  401. data.drop(columns=['EVENT_APP_USE.C'], inplace=True)
  402. data.drop(columns=['EVENT_USER_OSTATE_C'], inplace=True)
  403. # 性别:0,1,2
  404. # 网络类型: 1,2,3,4,5
  405. # 性别:0,1,2
  406. # 网络类型: 1,2,3,4,5
  407. np_cates = list(np.array([
  408. [x for x in range(0, 3)],
  409. [x for x in range(0, 5)],
  410. [x for x in range(0, 6)],
  411. [x for x in range(0, 6)],
  412. [x for x in range(0, 3)],
  413. [x for x in range(0, 9)],
  414. [x for x in range(0, 8)],
  415. [x for x in range(0, 8)],
  416. [x for x in range(0, 32)],
  417. [x for x in range(0, 8)],
  418. [x for x in range(0, 8)],
  419. [x for x in range(0, 8)],
  420. [x for x in range(0, 8)],
  421. [x for x in range(0, 8)],
  422. [x for x in range(0, 8)]
  423. ]))
  424. enc = OneHotEncoder(categories=np_cates)
  425. one_hot_feature_columns = [
  426. "TAG_GENDER_C",
  427. "TAG_NETTYPE_C",
  428. "TAG_AGE_C",
  429. "TAG_INTIME_C",
  430. "EVENT_IS_ACCT_C",
  431. "EVENT_FLUX_C",
  432. "EVENT_CONSUM_C",
  433. "EVENT_VIDEO_FLUX_C",
  434. "TAG_PROVINCE_C",
  435. 'app_use_tencent', 'app_use_mangguo', 'app_use_youku', 'app_use_iqiyi', 'app_use_bilibili',
  436. 'app_use_kuaishou'
  437. ]
  438. def convert_onehot(df):
  439. logger.info("开始onehot编码")
  440. enc.fit(df[one_hot_feature_columns])
  441. after_onehot_features = enc.get_feature_names(one_hot_feature_columns)
  442. one_hot_dataframe = pd.DataFrame(enc.fit_transform(df[one_hot_feature_columns]).toarray().astype("int"),
  443. columns=after_onehot_features)
  444. df.drop(columns=one_hot_feature_columns, inplace=True)
  445. df = pd.concat([df, one_hot_dataframe], axis=1)
  446. return df
  447. def insert_to_database(dataf, row_index):
  448. nums = str(len(dataf))
  449. logger.info(f"开始写入数据库,共{nums}条")
  450. # 增加自增列
  451. next_index = row_index + len(dataf)
  452. dataf['row_id'] = range(row_index, next_index)
  453. dataf['month'] = '202107'
  454. tb_name = constant.insert_pos_tb_name
  455. # tb_name = 'lu_example_data'
  456. cols = ",".join(dataf.columns.tolist())
  457. data = dataf.to_dict("records")
  458. click_client.execute(f"INSERT INTO {tb_name} ({cols}) VALUES ", data, types_check=True)
  459. return next_index
  460. def draw_roc_auc(y_label, y_test):
  461. # ROC曲线绘制
  462. fpr, tpr, thresholds = metrics.roc_curve(y_label, y_test)
  463. ##计算曲线下面积
  464. roc_auc = metrics.auc(fpr, tpr)
  465. ##绘图
  466. plt.clf()
  467. plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
  468. plt.plot([0, 1], [0, 1], 'k--')
  469. plt.xlim([0.0, 1.0])
  470. plt.ylim([0.0, 1.0])
  471. plt.legend(loc="lower right")
  472. plt.show()
  473. def get_data():
  474. logger.info("开始处理数据")
  475. for_nums = math.ceil(2100000 / fetch_size)
  476. # for_nums = math.ceil(2000000 / fetch_size)
  477. # for_nums = int(119000000/fetch_size)
  478. row_index = 0
  479. data_all = []
  480. try:
  481. for i in tqdm(range(0, for_nums)):
  482. step = str(i)
  483. logger.info(f"第{step}次循环开始处理")
  484. start = i * fetch_size
  485. # sql = f"select t.* from Z_USER_TAG_FLAT_out t where EVENT_CPNAME_C='快手' and EVENT_USER_OSTATE_C='在订' limit 50 "
  486. # sql = f"select t.* from Z_USER_TAG_FLAT_out t where EVENT_CPNAME_C!='快手' and EVENT_USER_OSTATE_C!='在订' limit {start},{fetch_size} "
  487. # sql = f"select t.* from l_pos_origin_06 t order by uuid limit {fetch_size} offset {start}"
  488. sql = f"select t.* from l_pos_origin_07 t order by uuid limit {fetch_size} offset {start}"
  489. all_data = click_client.execute(sql)
  490. dataf = pd.DataFrame(all_data, columns=constant.origin_column_names)
  491. dataf.drop(columns=drop_columns, inplace=True)
  492. # 预处理
  493. convert_label(dataf)
  494. # onehot编码
  495. dataf = convert_onehot(dataf)
  496. # 写入数据库
  497. row_index = insert_to_database(dataf, row_index)
  498. logger.info(f"第{step}次循环处理完毕")
  499. data_all.append(dataf)
  500. except Exception as e:
  501. data_last = pd.concat(data_all, axis=0)
  502. # data_last.to_pickle("data_pos_0213_07.pkl")
  503. data_last = pd.concat(data_all, axis=0)
  504. data_last.to_pickle("data_pos_0218_07.pkl")
  505. logger.info(str(data_last.shape))
  506. if __name__ == "__main__":
  507. get_data()
  508. # df = pd.read_pickle("data_pos_0213_07.pkl")
  509. pass