42165f9a66d2fd73d9d7144427003a7e568543b4.svn-base 7.3 KB


  1. package com.chinacreator.process.job;
  2. import java.sql.SQLException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Date;
  6. import java.util.HashMap;
  7. import java.util.Iterator;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.Set;
  11. import java.util.concurrent.CountDownLatch;
  12. import java.util.concurrent.ExecutorService;
  13. import java.util.concurrent.LinkedBlockingQueue;
  14. import java.util.concurrent.ThreadPoolExecutor;
  15. import java.util.concurrent.TimeUnit;
  16. import org.apache.log4j.Logger;
  17. import org.quartz.DisallowConcurrentExecution;
  18. import org.quartz.PersistJobDataAfterExecution;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import com.alibaba.fastjson.JSON;
  21. import com.alibaba.fastjson.JSONObject;
  22. import com.chinacreator.common.exception.BusinessException;
  23. import com.chinacreator.common.util.MD5;
  24. import com.chinacreator.process.bean.ContinueBean;
  25. import com.chinacreator.process.bean.ContinueLogBean;
  26. import com.chinacreator.process.bean.KuaishouPushBean;
  27. import com.chinacreator.process.dao.ContinueOrderDao;
  28. import com.chinacreator.process.dao.DictionaryDao;
  29. import com.chinacreator.process.dao.KuaishouDao;
  30. import com.chinacreator.process.util.DesUtil;
  31. import com.chinacreator.process.util.HttpInvoke;
  32. import com.chinacreator.process.util.JsonUtil;
  33. import com.chinacreator.process.util.SpringUtils;
  34. import com.chinacreator.process.util.URLUtil;
  35. /**
  36. * 已废弃,不再使用
  37. * 快手月初推送
  38. * 月初推送定时任务(KuaishouPushMonthJob)只负责把订购关系表(TD_ORDER_RELATIONS)当前有订购快手业务且未失效的数据添加到推送表就结束,状态为待处理(resultcode=1)
  39. * 每月1号凌晨开始执行
  40. * @author xu.zhou
  41. * @date 20200515
  42. */
  43. @PersistJobDataAfterExecution
  44. @DisallowConcurrentExecution
  45. public class KuaishouPushMonthJob {
  46. private static Logger logger = Logger.getLogger("kuaishoupush");
  47. @Autowired
  48. private KuaishouDao kuaishouDao;
  49. @Autowired
  50. private DictionaryDao dictionaryDao;
  51. public void doProcess() throws Exception {
  52. logger.info(Thread.currentThread().getName()+"KuaishouPushMonthJob月初快手流量未耗尽定时任务开始");
  53. long beginTime = System.currentTimeMillis();
  54. //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
  55. String partition = "T_HASH_P";
  56. for(int i = 1; i <= 50; i++){
  57. partition = "T_HASH_P";
  58. try {
  59. if(i < 10){
  60. partition = partition + "0" + i;
  61. }else{
  62. partition = partition + i;
  63. }
  64. logger.info("执行分区:"+partition);
  65. //按分区标识获取订购数据
  66. List<HashMap> list = kuaishouDao.getFirstMonthRealByPart(partition);
  67. if(list != null && list.size() > 0){
  68. //推送月份
  69. String pushmonth = new SimpleDateFormat("yyyyMM").format(new Date());
  70. CountDownLatch threadSignal = new CountDownLatch(list.size());
  71. ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  72. HashMap<String, List<String>> dataMap = paraseData(list);
  73. logger.info(partition+",数据库有效订购的用户数:"+dataMap.size());
  74. Set<String> keySet = dataMap.keySet();
  75. Iterator<String> it = keySet.iterator();
  76. String userid = "";
  77. while(it.hasNext()){
  78. userid = it.next();
  79. KuaishouPushMonthService continueService = new KuaishouPushMonthService(list.size(),threadSignal,userid,dataMap.get(userid),pushmonth,kuaishouDao,dictionaryDao);
  80. executorService.execute(continueService);
  81. }
  82. executorService.shutdown();
  83. try {
  84. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  85. } catch (InterruptedException e) {
  86. e.printStackTrace();
  87. }
  88. }else{
  89. logger.info(partition+", 无订购数据");
  90. }
  91. } catch (Exception e) {
  92. logger.info(partition+",执行出现异常,"+e.getMessage());
  93. e.printStackTrace();
  94. }
  95. Thread.sleep(5000);
  96. }
  97. logger.info(Thread.currentThread().getName()+"KuaishouPushMonthJob月初快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  98. }
  99. /**
  100. * 数据重组,每个号码一个key,值是spid
  101. * @param dataList
  102. * @return
  103. */
  104. private HashMap<String, List<String>> paraseData(List<HashMap> dataList){
  105. HashMap<String, List<String>> reData = new HashMap<String, List<String>>();
  106. for (HashMap dataMap : dataList) {
  107. if(reData.containsKey(dataMap.get("USERID"))){
  108. reData.get(dataMap.get("USERID")).add(dataMap.get("SPID").toString());
  109. }else{
  110. List<String> tmpList = new ArrayList<String>();
  111. tmpList.add(dataMap.get("SPID").toString());
  112. reData.put(dataMap.get("USERID").toString(), tmpList);
  113. }
  114. }
  115. return reData;
  116. }
  117. }
  118. class KuaishouPushMonthService implements Runnable {
  119. private static Logger logger = Logger.getLogger("kuaishoupush");
  120. private int totalSize;
  121. private CountDownLatch threadSignal;
  122. private String userid;
  123. private List<String> spidsList;
  124. private String pushmonth;
  125. private DictionaryDao dictionaryDao;
  126. private KuaishouDao kuaishouDao;
  127. public KuaishouPushMonthService(int totalSize,CountDownLatch threadSignal,String userid,List<String> spidsList,String pushmonth,KuaishouDao kuaishouDao,DictionaryDao dictionaryDao){
  128. this.totalSize = totalSize;
  129. this.threadSignal = threadSignal;
  130. this.userid = userid;
  131. this.spidsList = spidsList;
  132. this.pushmonth = pushmonth;
  133. this.kuaishouDao = kuaishouDao;
  134. this.dictionaryDao = dictionaryDao;
  135. }
  136. @Override
  137. public void run() {
  138. long startime = System.currentTimeMillis();
  139. Map logMap = new HashMap();
  140. logMap.put("userid", userid);
  141. logMap.put("pushmonth", pushmonth);
  142. String pushid = ""; //推送记录表ID
  143. String resultcode = "-1";
  144. String errorinfo = "";
  145. boolean hasinvoke = false; //是否调了接口
  146. try {
  147. String spids = "";
  148. for(String tmp : spidsList){
  149. if("".equals(spids)){
  150. spids += tmp;
  151. }else{
  152. spids += "#"+tmp;
  153. }
  154. }
  155. logMap.put("spids", spids);
  156. List<HashMap> dataList = kuaishouDao.queryPush(userid, pushmonth, "2");
  157. if(dataList != null && dataList.size()>0){
  158. throw new BusinessException("9061","重复推送");
  159. }
  160. KuaishouPushBean pushBean = new KuaishouPushBean();
  161. pushid = kuaishouDao.getNo();
  162. pushBean.setSerial_number(userid);
  163. pushBean.setId(pushid);
  164. pushBean.setPushmonth(pushmonth);
  165. pushBean.setPushtype("2"); //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送
  166. pushBean.setResultcode("1");//推送结果编码, 1待处理,2处理中,0成功,其他失败
  167. pushBean.setResultinfo("待处理");
  168. pushBean.setSpids(spids);
  169. //添加数据到推送表
  170. kuaishouDao.insertPush(pushBean);
  171. resultcode = "0";
  172. errorinfo = "ok";
  173. } catch (Exception e) {
  174. if (e instanceof BusinessException) {
  175. errorinfo = ((BusinessException) e).getMessage();
  176. resultcode = ((BusinessException) e).getCode();
  177. }else{
  178. e.printStackTrace();
  179. resultcode = "8000";
  180. errorinfo = "处理数据出现异常,"+e.getMessage();
  181. }
  182. }finally {
  183. threadSignal.countDown();
  184. //写日志
  185. logMap.put("jobname", "KuaishouPushMonthJob");
  186. logMap.put("resultcode", resultcode);
  187. logMap.put("errorinfo", errorinfo);
  188. //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  189. logger.info(JsonUtil.objectToJson(logMap));
  190. }
  191. }
  192. }