package com.chinacreator.process.job; import com.chinacreator.common.support.util.SpringContextUtil; import com.chinacreator.common.util.URLUtil; import com.chinacreator.process.bean.OrderBean; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.service.CSorderService; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.video.queue.MessageService; import com.chinacreator.video.queue.bean.MessagePipe; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; @PersistJobDataAfterExecution @DisallowConcurrentExecution public class CSactivityJob { private Logger logger = Logger.getLogger(CSactivityJob.class); @Autowired private DictionaryDao dictionaryDao; @Autowired private MessageService messageService; public void doProcess() throws Exception { logger.info("接收畅视活动队列JOB启动"); if(dictionaryDao.getValue("recivemq").equals("0")){ long beginTime = System.currentTimeMillis(); List list = messageService .reciveBatchMessage("csorder", 500); Map map = new HashMap(); logger.info("接收订购队列花费时间:"+(System.currentTimeMillis()-beginTime)); CountDownLatch threadSignal = new CountDownLatch(list.size()); ExecutorService executorService = new ThreadPoolExecutor(3, 6, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); if(list != null && list.size() > 0){ for (MessagePipe messagePipe : list) { OrderBean orderBean = transBean(messagePipe.getBody()); if(map.get(orderBean.getUserid()) == null){ CSsyncHandle csSyncHandle = new CSsyncHandle(orderBean,list.size(),threadSignal); executorService.execute(csSyncHandle); }else{ try { Map outmap = new HashMap(); outmap.put("id", orderBean.getId()); outmap.put("userid", orderBean.getUserid()); outmap.put("cpid", orderBean.getCpid()); outmap.put("spid", orderBean.getSpid()); outmap.put("channel", orderBean.getChannel()); outmap.put("orderType", orderBean.getOrderType()); outmap.put("type", "csorder"); //URLUtil.post("http://10.199.99.144:8090/mq-service/recive.do", JsonUtil.objectToJson(map)); URLUtil.post(dictionaryDao.getValue("mqReciveUrl"), JsonUtil.objectToJson(map)); } catch (Exception e) { e.printStackTrace(); } } } } executorService.shutdown(); try { executorService.awaitTermination(1L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } }else{ logger.info("停止接收畅视活动队列消息"); } } public OrderBean transBean(Map body) { String jsonStr = JsonUtil.objectToJson(body); return (OrderBean) JsonUtil.jsonToBean(jsonStr, OrderBean.class); } public static void main(String[] args) { String[] strs = new String[]{"1","2"}; for (String string : strs) { System.out.println(string); } } } class CSsyncHandle implements Runnable { private Logger log = Logger.getLogger(CSsyncHandle.class); private OrderBean bean; private int totalSize; private CountDownLatch threadSignal; public CSsyncHandle(OrderBean bean,int totalSize,CountDownLatch threadSignal){ this.bean = bean; this.totalSize = totalSize; this.threadSignal = threadSignal; } @Override public void run() { long start = System.currentTimeMillis(); CSorderService csOrderService = SpringContextUtil.getBean(CSorderService.class); csOrderService.handle(bean); try{ threadSignal.countDown(); log.info("[" + (totalSize - threadSignal.getCount()) + "/" + totalSize + "]" + (bean.getUserid()) + " 操作完成,花费时间:" + (System.currentTimeMillis() - start) + " ms"); }catch(Exception e){ e.printStackTrace(); } } }