package com.chinacreator.process.job; import com.chinacreator.process.bean.*; import com.chinacreator.process.dao.*; import com.chinacreator.process.service.*; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.video.queue.MessageService; import com.chinacreator.video.queue.bean.MessagePipe; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 反向通知订购退订生成活动关系JOB * @author xu.zhou 20210610 * 队列名称:orderoffline */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class ReciveOfflineMQJob { private Logger logger = Logger.getLogger(ReciveOnlineMQJob.class); @Autowired private MessageService messageService; @Autowired private ActivityConfigDao activityConfigDao; @Autowired private BlackWhiteDao blackWhiteDao; @Autowired private CallerAreacodeDao callerAreacodeDao; @Autowired private DictionaryDao dictionaryDao; @Autowired private ComOrderService comOrderService; @Autowired private BestvOrderService bestvOrderService; @Autowired private CctvOrderService cctvOrderService; @Autowired private AqiyOrderService aqiyOrderService; @Autowired private AqiyMonthOrderService aqiyMonthOrderService; @Autowired private ActivityHandleService activityHandleService; @Autowired private FhGoodsHandleService fhGoodsHandleService; @Autowired private YoutuOrderService youtuOrderService; @Autowired private OrderSendVipDao orderSendVipDao; @Autowired private SPDao spDao; @Autowired private CPDao cpDao; public void doProcess() throws Exception { String mqname = "orderoffline"; logger.info("接收"+mqname+"订购队列JOB启动"); if (dictionaryDao.getValue("recivemq").equals("0")) { long beginTime = System.currentTimeMillis(); //获取队列数据 List list = messageService.reciveBatchMessage("orderoffline", 500); logger.info("接收"+mqname+"订购队列花费时间:" + (System.currentTimeMillis() - beginTime)+",获取数据条数:"+(list == null ? "0" : list.size())); if(list != null && list.size()>0){ CountDownLatch threadSignal = new CountDownLatch(list.size()); ExecutorService executorService = new ThreadPoolExecutor(20, 30, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); for (MessagePipe messagePipe : list) { OrderBean orderBean = null; try { orderBean = transBean(messagePipe.getBody()); ReciveMQService service = new ReciveMQService(orderBean,blackWhiteDao,callerAreacodeDao, activityConfigDao,orderSendVipDao,spDao,cpDao, dictionaryDao,fhGoodsHandleService,comOrderService,bestvOrderService,cctvOrderService,aqiyOrderService,aqiyMonthOrderService, activityHandleService,youtuOrderService, threadSignal, mqname, messageService); executorService.execute(service); } catch (Exception e) { e.printStackTrace(); logger.error("执行出错:" + (orderBean == null ? "解析失败" : orderBean.toString()) + "=>" + e); } } executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName()+","+mqname+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒"); } else { logger.info("停止接收队列消息"); } } private OrderBean transBean(Map body) { String jsonStr = JsonUtil.objectToJson(body); return (OrderBean) JsonUtil.jsonToBean(jsonStr, OrderBean.class); } }