10132575cb3bfb5a822e7af433393a22c9b27c24.svn-base 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package com.chinacreator.process.job;
  2. import java.text.SimpleDateFormat;
  3. import java.util.ArrayList;
  4. import java.util.Arrays;
  5. import java.util.Calendar;
  6. import java.util.Date;
  7. import java.util.HashMap;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.concurrent.CountDownLatch;
  11. import java.util.concurrent.ExecutorService;
  12. import java.util.concurrent.LinkedBlockingQueue;
  13. import java.util.concurrent.ThreadPoolExecutor;
  14. import java.util.concurrent.TimeUnit;
  15. import org.apache.log4j.Logger;
  16. import org.quartz.DisallowConcurrentExecution;
  17. import org.quartz.PersistJobDataAfterExecution;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.util.StringUtils;
  20. import com.chinacreator.common.exception.BusinessException;
  21. import com.chinacreator.process.bean.KuaishouPushBean;
  22. import com.chinacreator.process.dao.DictionaryDao;
  23. import com.chinacreator.process.dao.KuaishouDao;
  24. import com.chinacreator.process.util.JsonUtil;
  25. import com.chinacreator.process.util.WriteLogUtil;
  26. /**
  27. * 快手订购推送
  28. * 替换KuaishouPushOrderJob
  29. * 订购流量未耗尽的定时任务(KuaishouPushOrderNewJob)把快手订购表的未处理的数据判断是否要推送,是,则添加到推送表,否,则不处理。
  30. * 获取TD_KUAISHOU_FIRSTMONTH表CALCULATEDATE为空的数据(通过订购关系表的触发器在订购时添加到表)
  31. * 查询推送记录表TD_KAFKA_KUAISHOU_PUSH,看本月是否已有推送过流量未耗尽
  32. * 无,则把数据添加到推送记录表
  33. * TD_KUAISHOU_FIRSTMONTH来源于订购关系表的触发器,在新增和再次订购时会触发把数据添加到此表
  34. * @author xu.zhou
  35. * @date 20210331
  36. */
  37. @PersistJobDataAfterExecution
  38. @DisallowConcurrentExecution
  39. public class KuaishouPushOrderNewJob {
  40. private static Logger logger = Logger.getLogger("kuaishouorderpush");
  41. @Autowired
  42. private KuaishouDao kuaishouDao; // = new KuaishouDao();
  43. @Autowired
  44. private DictionaryDao dictionaryDao; // = new DictionaryDao();
  45. public void doProcess() throws Exception {
  46. //logger.info(Thread.currentThread().getName()+"KuaishouPushOrderNewJob处理订购快手流量未耗尽定时任务开始");
  47. WriteLogUtil.writeLong("KuaishouPushOrderNewJob处理订购快手流量未耗尽定时任务开始", logger, "KuaishouPushOrderNewJob");
  48. int count = 0;
  49. long beginTime = System.currentTimeMillis();
  50. try {
  51. int rows = 200; //每次取数据条数
  52. String kuaishoufmrows = dictionaryDao.getValue("kuaishouorrows");
  53. if (!StringUtils.isEmpty(kuaishoufmrows)) {
  54. try {
  55. rows = Integer.parseInt(kuaishoufmrows);
  56. } catch (Exception e) {
  57. rows = 200;
  58. }
  59. }
  60. List<HashMap> dataList = kuaishouDao.getOrderPushNew(rows);
  61. count = (dataList != null ? dataList.size() : 0);
  62. if(dataList != null && dataList.size() > 0){
  63. dataList = paraseData(dataList);
  64. logger.info("去重复后数据条数:"+ (dataList == null ? "0" : dataList.size())+",去重复前用户数:"+count );
  65. CountDownLatch threadSignal = new CountDownLatch(dataList.size());
  66. ExecutorService executorService = new ThreadPoolExecutor(20, 30, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  67. //把数据更新为正在处理状态
  68. for(HashMap hm : dataList){
  69. boolean res = kuaishouDao.updFirstMonthExecing(hm.get("ID").toString());
  70. //System.out.println(res);
  71. //logger.info("更新结果 :"+res);
  72. }
  73. for(HashMap dataMap : dataList){
  74. KuaishouPushOrderNewService continueService = new KuaishouPushOrderNewService(dataList.size(),threadSignal,dataMap,kuaishouDao);
  75. executorService.execute(continueService);
  76. }
  77. executorService.shutdown();
  78. try {
  79. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  80. } catch (InterruptedException e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. } catch (Exception e) {
  85. logger.info("执行出现异常,"+e.getMessage());
  86. e.printStackTrace();
  87. }
  88. //logger.info(Thread.currentThread().getName()+"KuaishouPushOrderNewJob处理订购快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  89. WriteLogUtil.writeLong("KuaishouPushOrderNewJob处理订购快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", logger, "KuaishouPushOrderNewJob");
  90. }
  91. /**
  92. * 按手机号码去除重复数据,确保获取的同一组数据里手机号码不相同,防止同一手机号码处理并发而造成处理异常
  93. * @param dataList
  94. * @return
  95. */
  96. private List<HashMap> paraseData(List<HashMap> dataList){
  97. //去重复后的数据集
  98. List<HashMap> reDataList = new ArrayList<HashMap>();
  99. HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
  100. for (HashMap dataMap : dataList) {
  101. if(tmpMap.containsKey(dataMap.get("USERID"))){
  102. logger.info("重复数据,"+dataMap);
  103. }else{
  104. tmpMap.put(dataMap.get("USERID"), dataMap);
  105. reDataList.add(dataMap);
  106. }
  107. }
  108. return reDataList;
  109. }
  110. public static void main(String[] args) throws Exception {
  111. KuaishouPushOrderNewJob job = new KuaishouPushOrderNewJob();
  112. job.doProcess();
  113. }
  114. }
  115. class KuaishouPushOrderNewService implements Runnable {
  116. private static Logger logger = Logger.getLogger("kuaishouorderpush");
  117. private int totalSize;
  118. private CountDownLatch threadSignal;
  119. private HashMap hm;
  120. private KuaishouDao kuaishouDao;
  121. public KuaishouPushOrderNewService(int totalSize,CountDownLatch threadSignal,HashMap hm,KuaishouDao kuaishouDao){
  122. this.totalSize = totalSize;
  123. this.threadSignal = threadSignal;
  124. this.hm = hm;
  125. this.kuaishouDao = kuaishouDao;
  126. }
  127. @Override
  128. public void run() {
  129. long startime = System.currentTimeMillis();
  130. Map logMap = new HashMap();
  131. logMap.put("userinfo", hm);
  132. String resultcode = "-1";
  133. String errorinfo = "";
  134. String realflag = ""; //添加到推送表标识,空,未推送,1订购推送,2月未推送
  135. String id = hm.get("ID").toString();
  136. String userid = hm.get("USERID").toString();
  137. String spid = hm.get("SPID").toString();
  138. String calculatedate = getCalculatedate(); //下次推送时间,一般为下月的第一天0点30分01秒
  139. try {
  140. //获取订购关系
  141. HashMap orderRealMap = kuaishouDao.getOrderRealById(id);
  142. if(orderRealMap == null){
  143. throw new BusinessException("9056","订购关系表无订购数据");
  144. }
  145. //查询SPID是否在TB_SP_AOP_CONFIG配置,因为快手流量消耗数据是根据元素ID与此表匹配得到SPID,如果SPID没有此表内,代表流量数据是不会从KAFKA推送过来,也就是这种快手订购不要触发流量耗尽或有流量的推送
  146. if(!kuaishouDao.hasAopConf(spid)){
  147. throw new BusinessException("9057",spid+"在TB_SP_AOP_CONFIG表无配置");
  148. }
  149. //当前月份
  150. String currmonth = new SimpleDateFormat("yyyyMM").format(new Date());
  151. //订购月份
  152. String pushmonth = orderRealMap.get("ORDERTIME").toString().substring(0, 6);
  153. //订购月份小于当前月份,计算时间设置为当月,防止在都零点时收到上个月的数据
  154. if(Integer.parseInt(pushmonth) < Integer.parseInt(currmonth)){
  155. calculatedate = currmonth+"01003001";
  156. }
  157. logMap.put("spids", spid);
  158. //判断CP是否已同步完成
  159. //if(!kuaishouDao.hasSync(userid, spid)){
  160. if(!kuaishouDao.hasSync(id)){
  161. throw new BusinessException("9055","数据CP同步未完成,暂不处理");
  162. }
  163. //判断同一月份是否推送过相同的SPID数据
  164. List<HashMap> pushList = kuaishouDao.queryPush(userid, pushmonth);
  165. logger.info("pushList=>"+pushList);
  166. if(pushList != null && pushList.size() > 0){
  167. List<String> spidsList = null;
  168. boolean haspush = false;
  169. for(HashMap tmpMap : pushList){
  170. spidsList = Arrays.asList(tmpMap.get("SPIDS").toString().split("#"));
  171. if(spidsList.contains(spid)){
  172. haspush = true;
  173. break;
  174. }
  175. }
  176. logMap.put("spidList", spidsList);
  177. //当月已推送过,且不是后向产品(后向产品可以重复订购)
  178. if(haspush && !kuaishouDao.hasBackBusi(spid)){
  179. throw new BusinessException("9054","同一月份已推送相同数据且不是后向产品");
  180. }
  181. }
  182. KuaishouPushBean pushBean = new KuaishouPushBean();
  183. pushBean.setSerial_number(userid);
  184. pushBean.setId(kuaishouDao.getNo());
  185. pushBean.setPushmonth(pushmonth);
  186. pushBean.setPushtype("1"); //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送
  187. pushBean.setResultcode("1");//推送结果编码, 1待处理,2处理中,0成功,其他失败
  188. pushBean.setResultinfo("待处理");
  189. pushBean.setSpids(spid);
  190. //添加数据到推送表
  191. kuaishouDao.insertPush(pushBean);
  192. realflag = "1";
  193. errorinfo = "添加订购推送到推送表";
  194. //未出现异常,设置为成功
  195. resultcode = "0";
  196. if("".equals(errorinfo)){
  197. errorinfo = "ok";
  198. }
  199. } catch (Exception e) {
  200. if (e instanceof BusinessException) {
  201. errorinfo = ((BusinessException) e).getMessage();
  202. resultcode = ((BusinessException) e).getCode();
  203. }else{
  204. e.printStackTrace();
  205. resultcode = "8000";
  206. errorinfo = "处理数据出现异常,"+e.getMessage();
  207. }
  208. }finally {
  209. threadSignal.countDown();
  210. try{
  211. boolean res = false;
  212. //暂未同步或者处理出现异常,恢复初始状态,下次再处理
  213. if("9055".equals(resultcode) || "8000".equals(resultcode)){
  214. res = kuaishouDao.updFirstMonthRc(id,"1",errorinfo,null, null);
  215. }else{
  216. resultcode = "0";
  217. //更新TD_KUAISHOU_FIRSTMONTH表为月初推送数据,由KuaishouPushMonthNewJob处理
  218. res = kuaishouDao.updFirstMonthRc(id,resultcode,errorinfo, realflag, calculatedate);
  219. }
  220. //logger.info(id+",更新执行结果:"+res);
  221. }catch(Exception e){
  222. e.printStackTrace();
  223. errorinfo = "更新数据出现异常,"+e.getMessage();
  224. resultcode = "8001";
  225. }
  226. //写日志
  227. logMap.put("jobname", "KuaishouPushOrderNewJob");
  228. logMap.put("resultcode", resultcode);
  229. logMap.put("errorinfo", errorinfo);
  230. logMap.put("realflag", realflag);
  231. logMap.put("calculatedate", calculatedate);
  232. //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  233. logger.info(JsonUtil.objectToJson(logMap));
  234. }
  235. }
  236. /**
  237. * 获取下次推送的默认时间(下个月1号零点三十分零一秒)
  238. * @return
  239. */
  240. private String getCalculatedate() {
  241. SimpleDateFormat dft = new SimpleDateFormat("yyyyMMdd");
  242. Calendar calendar = Calendar.getInstance();
  243. calendar.add(Calendar.MONTH, 1);
  244. calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMinimum(Calendar.DAY_OF_MONTH));
  245. return dft.format(calendar.getTime())+"003001";
  246. }
  247. }