d9b18abf908084599b13d217e46e50fcee59629f.svn-base 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package com.chinacreator.process.job;
  2. import java.math.BigDecimal;
  3. import java.math.RoundingMode;
  4. import java.sql.SQLException;
  5. import java.text.SimpleDateFormat;
  6. import java.util.ArrayList;
  7. import java.util.Arrays;
  8. import java.util.Calendar;
  9. import java.util.Date;
  10. import java.util.HashMap;
  11. import java.util.Iterator;
  12. import java.util.List;
  13. import java.util.Map;
  14. import java.util.Set;
  15. import java.util.concurrent.CountDownLatch;
  16. import java.util.concurrent.ExecutorService;
  17. import java.util.concurrent.LinkedBlockingQueue;
  18. import java.util.concurrent.ThreadPoolExecutor;
  19. import java.util.concurrent.TimeUnit;
  20. import org.apache.log4j.Logger;
  21. import org.quartz.DisallowConcurrentExecution;
  22. import org.quartz.PersistJobDataAfterExecution;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.util.StringUtils;
  25. import com.alibaba.fastjson.JSONArray;
  26. import com.alibaba.fastjson.JSONObject;
  27. import com.chinacreator.common.exception.BusinessException;
  28. import com.chinacreator.process.bean.KuaishouPushBean;
  29. import com.chinacreator.process.bean.TdActiveclientRecBean;
  30. import com.chinacreator.process.dao.DictionaryDao;
  31. import com.chinacreator.process.dao.KuaishouDao;
  32. import com.chinacreator.process.dao.KuaishouFlowMonthDao;
  33. import com.chinacreator.process.util.HttpInvoke;
  34. import com.chinacreator.process.util.JsonUtil;
  35. import com.chinacreator.process.util.WriteLogUtil;
  36. /**
  37. * 快手月末查流量
  38. * @author xu.zhou
  39. * @date 20220708
  40. */
  41. @PersistJobDataAfterExecution
  42. @DisallowConcurrentExecution
  43. public class KuaishouFlowMonthPart2Job {
  44. private static Logger logger = Logger.getLogger("kuaishouFlowMonth");
  45. @Autowired
  46. private KuaishouFlowMonthDao kuaishouDao; // = new KuaishouFlowMonthDao();
  47. @Autowired
  48. private DictionaryDao dictionaryDao; // = new DictionaryDao();
  49. public void doProcess() throws Exception {
  50. //执行开始时间
  51. long beginTime = System.currentTimeMillis();
  52. int startpart = 26;
  53. int endpart = 50;
  54. String jobname = "KuaishouFlowMonthPart2Job";
  55. try {
  56. WriteLogUtil.writeLong(jobname+"定时任务开始", logger, jobname);
  57. int count = 0;
  58. int rows = 800; //每次取数据条数
  59. String confrows = dictionaryDao.getValue("kuaishouflowrows");
  60. if (!StringUtils.isEmpty(confrows)) {
  61. try {
  62. rows = Integer.parseInt(confrows);
  63. } catch (Exception e) {
  64. rows = 800;
  65. }
  66. }
  67. //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
  68. String partition = "T_HASH_P";
  69. for(int i = startpart; i <= endpart; i++){
  70. count = 0;
  71. partition = "T_HASH_P";
  72. try {
  73. if(i < 10){
  74. partition = partition + "0" + i;
  75. }else{
  76. partition = partition + i;
  77. }
  78. //按分区标识获取订购数据
  79. List<HashMap> dataList = kuaishouDao.getFlowMonthByPart(partition,rows);
  80. count = (dataList != null ? dataList.size() : 0);
  81. if(dataList != null && dataList.size() > 0){
  82. dataList = paraseData(dataList); //去重复数据
  83. logger.info(partition+",去重复前用户数:"+count+",去重复后用户数:"+dataList.size());
  84. CountDownLatch threadSignal = new CountDownLatch(dataList.size());
  85. ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  86. //把数据更新为正在处理状态,执行次数加1
  87. for(HashMap hm : dataList){
  88. boolean res = kuaishouDao.updFlowMonthExecing(hm.get("ID").toString());
  89. }
  90. for(HashMap hm : dataList){
  91. KuaishouFlowMonthService continueService = new KuaishouFlowMonthService(dataList.size(), threadSignal, hm, kuaishouDao, jobname);
  92. executorService.execute(continueService);
  93. }
  94. executorService.shutdown();
  95. try {
  96. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  97. } catch (InterruptedException e) {
  98. e.printStackTrace();
  99. }
  100. }
  101. } catch (Exception e) {
  102. logger.info(partition+",执行出现异常,"+e.getMessage());
  103. e.printStackTrace();
  104. }
  105. //每取一轮数据,休眠100毫秒
  106. Thread.sleep(100);
  107. //更新执行状态为2且执行时间超过50分钟的数据为待处理
  108. int res = kuaishouDao.updExecTimeout(partition);
  109. if(res > 0){
  110. logger.info("更新状态为2处理中且执行时间超过50分钟的数据为待处理的异常数据条数【"+res+"】");
  111. }
  112. //初始化新增数据的FLOWCALCULATEDATE值
  113. res = kuaishouDao.updFlowMonthInit(partition);
  114. if(res > 0){
  115. logger.info("初始化新增数据的FLOWCALCULATEDATE值的数据条数【"+res+"】");
  116. }
  117. }
  118. } catch (Exception e) {
  119. e.printStackTrace();
  120. logger.info(jobname+"定时任务分区2执行出现异常,"+e.getMessage());
  121. } finally {
  122. WriteLogUtil.writeLong(jobname+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", logger, jobname);
  123. }
  124. }
  125. /**
  126. * 按手机号码去除重复数据,确保获取的同一组数据里手机号码不相同,防止同一手机号码处理并发而造成处理异常
  127. * @param dataList
  128. * @return
  129. */
  130. private List<HashMap> paraseData(List<HashMap> dataList){
  131. //去重复后的数据集
  132. List<HashMap> reDataList = new ArrayList<HashMap>();
  133. HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
  134. for (HashMap dataMap : dataList) {
  135. if(tmpMap.containsKey(dataMap.get("USERID"))){
  136. logger.info("重复数据,"+dataMap);
  137. }else{
  138. tmpMap.put(dataMap.get("USERID"), dataMap);
  139. reDataList.add(dataMap);
  140. }
  141. }
  142. return reDataList;
  143. }
  144. public static void main(String[] args) {
  145. KuaishouFlowMonthPart2Job job = new KuaishouFlowMonthPart2Job();
  146. try {
  147. job.doProcess();
  148. } catch (Exception e) {
  149. e.printStackTrace();
  150. }
  151. }
  152. }