9c1919d690bda17e65ca1087df53f387339a4449.svn-base 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. package com.chinacreator.process.job;
  2. import java.math.BigDecimal;
  3. import java.math.RoundingMode;
  4. import java.sql.SQLException;
  5. import java.text.SimpleDateFormat;
  6. import java.util.ArrayList;
  7. import java.util.Arrays;
  8. import java.util.Calendar;
  9. import java.util.Date;
  10. import java.util.HashMap;
  11. import java.util.Iterator;
  12. import java.util.List;
  13. import java.util.Map;
  14. import java.util.Set;
  15. import java.util.concurrent.CountDownLatch;
  16. import java.util.concurrent.ExecutorService;
  17. import java.util.concurrent.LinkedBlockingQueue;
  18. import java.util.concurrent.ThreadPoolExecutor;
  19. import java.util.concurrent.TimeUnit;
  20. import org.apache.log4j.Logger;
  21. import org.quartz.DisallowConcurrentExecution;
  22. import org.quartz.PersistJobDataAfterExecution;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.util.StringUtils;
  25. import com.alibaba.fastjson.JSONArray;
  26. import com.alibaba.fastjson.JSONObject;
  27. import com.chinacreator.common.exception.BusinessException;
  28. import com.chinacreator.process.bean.KuaishouPushBean;
  29. import com.chinacreator.process.bean.TdActiveclientRecBean;
  30. import com.chinacreator.process.dao.DictionaryDao;
  31. import com.chinacreator.process.dao.KuaishouDao;
  32. import com.chinacreator.process.dao.KuaishouFlowMonthDao;
  33. import com.chinacreator.process.util.HttpInvoke;
  34. import com.chinacreator.process.util.JsonUtil;
  35. import com.chinacreator.process.util.WriteLogUtil;
  36. /**
  37. * 快手月末查流量
  38. * @author xu.zhou
  39. * @date 20220708
  40. */
  41. @PersistJobDataAfterExecution
  42. @DisallowConcurrentExecution
  43. public class KuaishouFlowMonthJob {
  44. private static Logger logger = Logger.getLogger("kuaishouFlowMonth");
  45. @Autowired
  46. private KuaishouFlowMonthDao kuaishouDao; // = new KuaishouFlowMonthDao();
  47. @Autowired
  48. private DictionaryDao dictionaryDao; // = new DictionaryDao();
  49. public void doProcess() throws Exception {
  50. //执行开始时间
  51. long beginTime = System.currentTimeMillis();
  52. int startpart = 1;
  53. int endpart = 25;
  54. String jobname = "KuaishouFlowMonthJob";
  55. try {
  56. WriteLogUtil.writeLong(jobname+"定时任务开始", logger, jobname);
  57. int count = 0;
  58. int rows = 800; //每次取数据条数
  59. String confrows = dictionaryDao.getValue("kuaishouflowrows");
  60. if (!StringUtils.isEmpty(confrows)) {
  61. try {
  62. rows = Integer.parseInt(confrows);
  63. } catch (Exception e) {
  64. rows = 800;
  65. }
  66. }
  67. //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
  68. String partition = "T_HASH_P";
  69. for(int i = startpart; i <= endpart; i++){
  70. count = 0;
  71. partition = "T_HASH_P";
  72. try {
  73. if(i < 10){
  74. partition = partition + "0" + i;
  75. }else{
  76. partition = partition + i;
  77. }
  78. //按分区标识获取订购数据
  79. List<HashMap> dataList = kuaishouDao.getFlowMonthByPart(partition,rows);
  80. count = (dataList != null ? dataList.size() : 0);
  81. if(dataList != null && dataList.size() > 0){
  82. dataList = paraseData(dataList); //去重复数据
  83. logger.info(partition+",去重复前用户数:"+count+",去重复后用户数:"+dataList.size());
  84. CountDownLatch threadSignal = new CountDownLatch(dataList.size());
  85. ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  86. //把数据更新为正在处理状态,执行次数加1
  87. for(HashMap hm : dataList){
  88. boolean res = kuaishouDao.updFlowMonthExecing(hm.get("ID").toString());
  89. }
  90. for(HashMap hm : dataList){
  91. KuaishouFlowMonthService continueService = new KuaishouFlowMonthService(dataList.size(), threadSignal, hm, kuaishouDao, jobname);
  92. executorService.execute(continueService);
  93. }
  94. executorService.shutdown();
  95. try {
  96. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  97. } catch (InterruptedException e) {
  98. e.printStackTrace();
  99. }
  100. }
  101. } catch (Exception e) {
  102. logger.info(partition+",执行出现异常,"+e.getMessage());
  103. e.printStackTrace();
  104. }
  105. //每取一轮数据,休眠100毫秒
  106. Thread.sleep(100);
  107. //更新执行状态为2且执行时间超过50分钟的数据为待处理
  108. int res = kuaishouDao.updExecTimeout(partition);
  109. if(res > 0){
  110. logger.info("更新状态为2处理中且执行时间超过50分钟的数据为待处理的异常数据条数【"+res+"】");
  111. }
  112. //初始化新增数据的FLOWCALCULATEDATE值
  113. res = kuaishouDao.updFlowMonthInit(partition);
  114. if(res > 0){
  115. logger.info("初始化新增数据的FLOWCALCULATEDATE值的数据条数【"+res+"】");
  116. }
  117. }
  118. } catch (Exception e) {
  119. e.printStackTrace();
  120. logger.info(jobname+"定时任务执行出现异常,"+e.getMessage());
  121. } finally {
  122. WriteLogUtil.writeLong(jobname+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", logger, jobname);
  123. }
  124. }
  125. /**
  126. * 按手机号码去除重复数据,确保获取的同一组数据里手机号码不相同,防止同一手机号码处理并发而造成处理异常
  127. * @param dataList
  128. * @return
  129. */
  130. private List<HashMap> paraseData(List<HashMap> dataList){
  131. //去重复后的数据集
  132. List<HashMap> reDataList = new ArrayList<HashMap>();
  133. HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
  134. for (HashMap dataMap : dataList) {
  135. if(tmpMap.containsKey(dataMap.get("USERID"))){
  136. logger.info("重复数据,"+dataMap);
  137. }else{
  138. tmpMap.put(dataMap.get("USERID"), dataMap);
  139. reDataList.add(dataMap);
  140. }
  141. }
  142. return reDataList;
  143. }
  144. public static void main(String[] args) {
  145. KuaishouFlowMonthJob job = new KuaishouFlowMonthJob();
  146. try {
  147. job.doProcess();
  148. } catch (Exception e) {
  149. e.printStackTrace();
  150. }
  151. }
  152. }
  153. /**
  154. * 处理类
  155. * @author xu.zhou
  156. * @date 20220708
  157. */
  158. class KuaishouFlowMonthService implements Runnable {
  159. private static Logger logger = Logger.getLogger("kuaishouFlowMonth");
  160. private int totalSize;
  161. private CountDownLatch threadSignal;
  162. private List<String> spidsList;
  163. private KuaishouFlowMonthDao kuaishouFMDao;
  164. private HashMap hm; //用户数据,ID和USERID
  165. private String jobname;
  166. public KuaishouFlowMonthService(int totalSize,CountDownLatch threadSignal,HashMap hm,KuaishouFlowMonthDao kuaishouFMDao, String jobname){
  167. this.totalSize = totalSize;
  168. this.threadSignal = threadSignal;
  169. this.hm = hm;
  170. this.kuaishouFMDao = kuaishouFMDao;
  171. this.jobname = jobname;
  172. }
  173. @Override
  174. public void run() {
  175. long startime = System.currentTimeMillis();
  176. Map logMap = new HashMap();
  177. logMap.put("data", hm);
  178. String resultcode = "5000";
  179. String errorinfo = "";
  180. String useflow = ""; //useflow
  181. String maxflow = ""; //最大流量
  182. String surflow = ""; //surflow
  183. boolean hasexists = false;
  184. boolean haskafka = false; //是否从KAFKA配置表得到的产品ID
  185. String id = hm.get("ID").toString();
  186. long starttime = System.currentTimeMillis();
  187. //String calculatedate = getCalculatedate(); //下次推送时间,一般为下月的第一天0点30分01秒
  188. TdActiveclientRecBean bean = new TdActiveclientRecBean();
  189. try {
  190. //判断流量查询日期是否已跨月
  191. if(!valiFlowCalculateDate()){
  192. throw new BusinessException("9052","当前月份不等于流量查询月份FLOWCALCULATEDATE");
  193. }
  194. String spid = hm.get("SPID") == null ? null : hm.get("SPID")+"";
  195. //获取订购关系
  196. HashMap orderRealMap = kuaishouFMDao.getOrderRealById(id);
  197. if(orderRealMap == null){
  198. throw new BusinessException("9055","订购关系表无订购数据");
  199. }
  200. bean.setProvince(orderRealMap.get("PROVINCE")+"");
  201. bean.setArea(orderRealMap.get("AREA")+"");
  202. bean.setSpid(orderRealMap.get("SPID").toString());
  203. bean.setUserid(orderRealMap.get("USERID").toString());
  204. //订购关系已失效
  205. if("2".equals(orderRealMap.get("STATUS"))){
  206. throw new BusinessException("9056","当月无有效订购关系");
  207. }
  208. //SPID为空取订购关系的SPID
  209. if(StringUtils.isEmpty(spid) || "null".equals(spid)){
  210. spid = orderRealMap.get("SPID").toString();
  211. }
  212. //获取SPNAME
  213. bean.setSpname(kuaishouFMDao.qrySpname(spid));
  214. //获取产品ID
  215. String productid = kuaishouFMDao.qryAopProductid(spid);
  216. if(StringUtils.isEmpty(productid)){
  217. throw new BusinessException("9053","无产品ID配置信息");
  218. //从kafka同步的订购关系,在TB_SP_AOP_INFO表没有,要到TB_KAFKA_PRODUCT_CONF表查询
  219. /***
  220. * 从KAFKA落的快手订购关系,不包含为活跃客户
  221. productid = kuaishouFMDao.qryKafkaProductid(spid);
  222. if(StringUtils.isEmpty(productid)){
  223. throw new BusinessException("9053","无产品ID配置信息");
  224. }
  225. haskafka = true;
  226. ***/
  227. }
  228. logMap.put("productid", productid);
  229. //调接口获取流量信息
  230. String result = invoke();
  231. //logMap.put("result", result);
  232. if(!StringUtils.isEmpty(result)){
  233. JSONObject resObj = new JSONObject().parseObject(result);
  234. if("0".equals(resObj.get("resultCode")+"")){//接口返回成功
  235. JSONArray resdata = resObj.getJSONArray("data");
  236. if(resdata != null && resObj.getJSONArray("data").size()>0){
  237. hasexists = true;
  238. JSONObject tmpobj = null;
  239. /***筛选逻辑
  240. 1、FEE_POLICY_NAME包含“快手”筛选
  241. 2、ADDUP_UPPER(免流总量)不为0筛选
  242. 3、根据以上条件筛选后,取X_USED_VALUE(使用量)
  243. 1)有一个使用量,直接取值
  244. 2)有多个使用量,若使用量相同,直接取相同值
  245. 3)有多个使用量,若使用量不同。
  246. a、排除非查询号码的值 //移到最上面,与1和2并列
  247. b、若还有多个值,取最大值
  248. ***/
  249. for(int i = 0; i< resdata.size(); i++){
  250. tmpobj = (JSONObject)resdata.get(i);
  251. if(productid.equals(tmpobj.getString("PRODUCT_ID"))){
  252. if(logMap.get("result") == null){
  253. List<String> tmpList = new ArrayList<String>();
  254. tmpList.add(tmpobj.toJSONString());
  255. logMap.put("result", tmpList);
  256. logMap.put("resultsize", 1);
  257. }else{
  258. List<String> tmpList = (List<String>)logMap.get("result");
  259. tmpList.add(tmpobj.toJSONString());
  260. logMap.put("result", tmpList);
  261. logMap.put("resultsize", (Integer)logMap.get("resultsize")+1);
  262. }
  263. //从kafka同步的订购关系,REAL_RESOURCE_TYPE是13
  264. if(haskafka && !"13".equals(tmpobj.get("REAL_RESOURCE_TYPE"))){
  265. continue;
  266. }
  267. //产品名称不包含快手
  268. if(tmpobj.getString("FEE_POLICY_NAME").indexOf("快手") == -1){
  269. continue;
  270. }
  271. //ADDUP_UPPER(免流总量)不为0筛选
  272. if("0".equals(tmpobj.get("ADDUP_UPPER"))){
  273. continue;
  274. }
  275. //手机号码不相同,不取值
  276. if(!hm.get("USERID").equals(tmpobj.get("USER_NUMBER"))){
  277. continue;
  278. }
  279. //最大流量
  280. maxflow = tmpobj.getString("ADDUP_UPPER");
  281. //剩余流量
  282. surflow = tmpobj.getString("X_CANUSE_VALUE");
  283. //已使用流量
  284. if("".equals(useflow)){
  285. useflow = tmpobj.getString("X_USED_VALUE");
  286. }else{
  287. //取最大值
  288. try {
  289. if(Integer.parseInt(tmpobj.getString("X_USED_VALUE")) > Integer.parseInt(useflow)){
  290. useflow = tmpobj.getString("X_USED_VALUE");
  291. }
  292. } catch (Exception e) {
  293. e.printStackTrace();
  294. }
  295. }
  296. }
  297. }
  298. }
  299. }else{
  300. throw new BusinessException("9059","调接口失败");
  301. }
  302. }
  303. if(!hasexists){
  304. throw new BusinessException("9057","接口返回数据列表为空");
  305. }
  306. //有返回流量数据,但没有对应产品的信息
  307. if("".equals(useflow)){
  308. throw new BusinessException("9058","接口返回数据列表无"+productid+"产品ID数据");
  309. }
  310. //未出现异常,设置为成功
  311. resultcode = "0";
  312. //设置已使用流量值,由KB转换成MB,保留三位小数
  313. bean.setFlowtotal(formatDouble(Double.parseDouble(useflow)/1024));
  314. } catch (Exception e) {
  315. if (e instanceof BusinessException) {
  316. errorinfo = ((BusinessException) e).getMessage();
  317. resultcode = ((BusinessException) e).getCode();
  318. }else{
  319. e.printStackTrace();
  320. resultcode = "8000";
  321. errorinfo = "处理数据出现异常,"+e.getMessage();
  322. }
  323. }finally {
  324. threadSignal.countDown();
  325. try{
  326. //执行SQL是否成功
  327. boolean res = false;
  328. //暂未同步或者处理出现异常,恢复初始状态,下次再处理
  329. if("0".equals(resultcode)){//成功
  330. //更新状态为成功
  331. res = kuaishouFMDao.updFlowMonthSucc(id);
  332. //添加数据到活跃表
  333. kuaishouFMDao.insertActive(bean);
  334. logMap.put("insertActive", true );
  335. }else{//失败
  336. res = kuaishouFMDao.updFlowMonthFailEnd(hm,resultcode,errorinfo);
  337. }
  338. logMap.put("updFlowMonth", res );
  339. }catch(Exception e){
  340. logMap.put("updFlowMonth", false );
  341. e.printStackTrace();
  342. errorinfo = "更新数据出现异常,"+e.getMessage();
  343. resultcode = "8001";
  344. }
  345. //写日志
  346. logMap.put("haskafka", haskafka);
  347. logMap.put("resultcode", resultcode);
  348. logMap.put("errorinfo", errorinfo);
  349. //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  350. logMap.put("times", System.currentTimeMillis() - starttime);
  351. logger.info(JsonUtil.objectToJson(logMap));
  352. }
  353. }
  354. /**
  355. * 验证计算月份是否为当月
  356. * @return
  357. */
  358. private boolean valiFlowCalculateDate(){
  359. boolean res = false;
  360. SimpleDateFormat dft = new SimpleDateFormat("yyyyMM");
  361. String currMonth = dft.format(new Date()); //当前月
  362. String flowCalculateDate = hm.get("FLOWCALCULATEDATE").toString();
  363. if(currMonth.equals(flowCalculateDate.substring(0, 6))){
  364. res = true;
  365. }
  366. return res;
  367. }
  368. /**
  369. * 调接口
  370. * @param reqBean
  371. * @return
  372. * @throws Exception
  373. */
  374. private String invoke() throws Exception{
  375. String result = ""; //调快手接口返回结果
  376. HashMap confMap = kuaishouFMDao.getInvokeUrlInfo("shareorder","selectallow", "2");
  377. if(confMap == null || confMap.size() == 0){
  378. throw new BusinessException("9054","无接口配置数据");
  379. }
  380. String invokeurl = confMap.get("INVOKEURL").toString();
  381. String userId = hm.get("USERID").toString();
  382. SimpleDateFormat dft = new SimpleDateFormat("yyyyMM");
  383. //查询周期,当月
  384. String cycleId = dft.format(new Date());
  385. String params = "userId="+userId+"&cycleId="+cycleId;
  386. int timeout = 20 * 1000; //超时时间
  387. //logger.info("invokeurl=>"+invokeurl+", params=>"+params);
  388. if(invokeurl.startsWith("https")){
  389. result = HttpInvoke.sendhttpsReq("GET", invokeurl, params, getProperty(), timeout);
  390. }else{
  391. result = HttpInvoke.sendHttpByGet(invokeurl+"?"+params, timeout);
  392. }
  393. //{"result":"0","errorcode":"","host-name":"bjm7-rs1892.jxq"}
  394. //logger.info(params+", 调快接口返回结果:"+result);
  395. //去空格、换行符号
  396. if(result != null) result = result.replaceAll("\r|\n", "").replaceAll(" ", "").replaceAll(" ", "");
  397. return result;
  398. }
  399. /**
  400. * 获取请求属性性
  401. * @return
  402. */
  403. private Map getProperty(){
  404. Map reqProperty = new HashMap();
  405. reqProperty.put("Content-type", "application/json;charset=UTF-8");
  406. return reqProperty;
  407. }
  408. /**
  409. * 获取下次推送的默认时间(下个月1号零点三十分零一秒)
  410. * @return
  411. */
  412. private String getCalculatedate() {
  413. SimpleDateFormat dft = new SimpleDateFormat("yyyyMMdd");
  414. Calendar calendar = Calendar.getInstance();
  415. calendar.add(Calendar.MONTH, 1);
  416. calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMinimum(Calendar.DAY_OF_MONTH));
  417. return dft.format(calendar.getTime())+"003001";
  418. }
  419. /**
  420. * double类型如果小数点后为零则显示整数否则保留三位小数,比如23.0,转换后为23
  421. * @param d
  422. * @return
  423. */
  424. public String formatDouble(double d) {
  425. String res = "";
  426. String tmp = "";
  427. BigDecimal bg = new BigDecimal(d).setScale(4, RoundingMode.UP);
  428. double num = bg.doubleValue();
  429. //防止double值大于8位数出现科学计数法格式
  430. java.text.NumberFormat nf = java.text.NumberFormat.getInstance();
  431. nf.setGroupingUsed(false);
  432. tmp = nf.format(num);
  433. //以小数点分隔为两部分
  434. String [] arry = tmp.split("\\.");
  435. //如果有小数点
  436. if(arry.length == 2){
  437. //小数点前面的是0,则小数点后面的置为空,不要了
  438. if("0".equals(arry[1])){
  439. arry[1] = "";
  440. }else if(arry[1].length() > 3){//小数点后面超过3位数,只要前三位
  441. arry[1] = arry[1].substring(0,3);
  442. }
  443. //去除小数点后的无效0(最后一位为0,去除),执行三次,因为前面保留了最多小数点后的三位小数
  444. if(arry[1].lastIndexOf("0") == arry[1].length()-1){
  445. arry[1] = arry[1].substring(0, arry[1].length()-1);
  446. }
  447. if(arry[1].lastIndexOf("0") == arry[1].length()-1){
  448. arry[1] = arry[1].substring(0, arry[1].length()-1);
  449. }
  450. if(arry[1].lastIndexOf("0") == arry[1].length()-1){
  451. arry[1] = arry[1].substring(0, arry[1].length()-1);
  452. }
  453. //如果第二部分为空,则只返回第一部分的数据
  454. if("".equals(arry[1])){
  455. res = arry[0];
  456. }else{
  457. res = arry[0]+"."+arry[1];
  458. }
  459. }else{
  460. res = tmp;
  461. }
  462. return res;
  463. }
  464. }