58643bc7f0b2c83e6fb2c1eadb756d92e41a6cbc.svn-base 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package com.chinacreator.process.job;
  2. import com.chinacreator.process.bean.*;
  3. import com.chinacreator.process.dao.*;
  4. import com.chinacreator.process.service.*;
  5. import com.chinacreator.process.util.JsonUtil;
  6. import com.chinacreator.video.queue.MessageService;
  7. import com.chinacreator.video.queue.bean.MessagePipe;
  8. import org.apache.log4j.Logger;
  9. import org.quartz.DisallowConcurrentExecution;
  10. import org.quartz.PersistJobDataAfterExecution;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import java.util.List;
  13. import java.util.Map;
  14. import java.util.concurrent.CountDownLatch;
  15. import java.util.concurrent.ExecutorService;
  16. import java.util.concurrent.LinkedBlockingQueue;
  17. import java.util.concurrent.ThreadPoolExecutor;
  18. import java.util.concurrent.TimeUnit;
  19. /**
  20. * 反向通知订购退订生成活动关系JOB
  21. * @author xu.zhou 20210610
  22. * 队列名称:orderoffline
  23. */
  24. @PersistJobDataAfterExecution
  25. @DisallowConcurrentExecution
  26. public class ReciveOfflineMQJob {
  27. private Logger logger = Logger.getLogger(ReciveOnlineMQJob.class);
  28. @Autowired
  29. private MessageService messageService;
  30. @Autowired
  31. private ActivityConfigDao activityConfigDao;
  32. @Autowired
  33. private BlackWhiteDao blackWhiteDao;
  34. @Autowired
  35. private CallerAreacodeDao callerAreacodeDao;
  36. @Autowired
  37. private DictionaryDao dictionaryDao;
  38. @Autowired
  39. private ComOrderService comOrderService;
  40. @Autowired
  41. private BestvOrderService bestvOrderService;
  42. @Autowired
  43. private CctvOrderService cctvOrderService;
  44. @Autowired
  45. private AqiyOrderService aqiyOrderService;
  46. @Autowired
  47. private AqiyMonthOrderService aqiyMonthOrderService;
  48. @Autowired
  49. private ActivityHandleService activityHandleService;
  50. @Autowired
  51. private FhGoodsHandleService fhGoodsHandleService;
  52. @Autowired
  53. private YoutuOrderService youtuOrderService;
  54. @Autowired
  55. private OrderSendVipDao orderSendVipDao;
  56. @Autowired
  57. private SPDao spDao;
  58. @Autowired
  59. private CPDao cpDao;
  60. public void doProcess() throws Exception {
  61. String mqname = "orderoffline";
  62. logger.info("接收"+mqname+"订购队列JOB启动");
  63. if (dictionaryDao.getValue("recivemq").equals("0")) {
  64. long beginTime = System.currentTimeMillis();
  65. //获取队列数据
  66. List<MessagePipe> list = messageService.reciveBatchMessage("orderoffline", 500);
  67. logger.info("接收"+mqname+"订购队列花费时间:" + (System.currentTimeMillis() - beginTime)+",获取数据条数:"+(list == null ? "0" : list.size()));
  68. if(list != null && list.size()>0){
  69. CountDownLatch threadSignal = new CountDownLatch(list.size());
  70. ExecutorService executorService = new ThreadPoolExecutor(20, 30, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  71. for (MessagePipe messagePipe : list) {
  72. OrderBean orderBean = null;
  73. try {
  74. orderBean = transBean(messagePipe.getBody());
  75. ReciveMQService service = new ReciveMQService(orderBean,blackWhiteDao,callerAreacodeDao, activityConfigDao,orderSendVipDao,spDao,cpDao,
  76. dictionaryDao,fhGoodsHandleService,comOrderService,bestvOrderService,cctvOrderService,aqiyOrderService,aqiyMonthOrderService,
  77. activityHandleService,youtuOrderService, threadSignal, mqname, messageService);
  78. executorService.execute(service);
  79. } catch (Exception e) {
  80. e.printStackTrace();
  81. logger.error("执行出错:" + (orderBean == null ? "解析失败" : orderBean.toString()) + "=>" + e);
  82. }
  83. }
  84. executorService.shutdown();
  85. try {
  86. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  87. } catch (InterruptedException e) {
  88. e.printStackTrace();
  89. }
  90. }
  91. logger.info(Thread.currentThread().getName()+","+mqname+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  92. } else {
  93. logger.info("停止接收队列消息");
  94. }
  95. }
  96. private OrderBean transBean(Map<String, Object> body) {
  97. String jsonStr = JsonUtil.objectToJson(body);
  98. return (OrderBean) JsonUtil.jsonToBean(jsonStr, OrderBean.class);
  99. }
  100. }