123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- package com.chinacreator.process.job;
- import com.chinacreator.common.support.util.SpringContextUtil;
- import com.chinacreator.common.util.URLUtil;
- import com.chinacreator.process.bean.OrderBean;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.service.CSorderService;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.video.queue.MessageService;
- import com.chinacreator.video.queue.bean.MessagePipe;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.*;
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class CSactivityJob {
- private Logger logger = Logger.getLogger(CSactivityJob.class);
-
- @Autowired
- private DictionaryDao dictionaryDao;
-
- @Autowired
- private MessageService messageService;
-
-
- public void doProcess() throws Exception {
- logger.info("接收畅视活动队列JOB启动");
- if(dictionaryDao.getValue("recivemq").equals("0")){
- long beginTime = System.currentTimeMillis();
- List<MessagePipe> list = messageService
- .reciveBatchMessage("csorder", 500);
- Map<String,String> map = new HashMap<String, String>();
- logger.info("接收订购队列花费时间:"+(System.currentTimeMillis()-beginTime));
- CountDownLatch threadSignal = new CountDownLatch(list.size());
- ExecutorService executorService = new ThreadPoolExecutor(3, 6, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- if(list != null && list.size() > 0){
- for (MessagePipe messagePipe : list) {
- OrderBean orderBean = transBean(messagePipe.getBody());
- if(map.get(orderBean.getUserid()) == null){
- CSsyncHandle csSyncHandle = new CSsyncHandle(orderBean,list.size(),threadSignal);
- executorService.execute(csSyncHandle);
- }else{
- try {
- Map<String, String> outmap = new HashMap<String, String>();
- outmap.put("id", orderBean.getId());
- outmap.put("userid", orderBean.getUserid());
- outmap.put("cpid", orderBean.getCpid());
- outmap.put("spid", orderBean.getSpid());
- outmap.put("channel", orderBean.getChannel());
- outmap.put("orderType", orderBean.getOrderType());
- outmap.put("type", "csorder");
- //URLUtil.post("http://10.199.99.144:8090/mq-service/recive.do", JsonUtil.objectToJson(map));
- URLUtil.post(dictionaryDao.getValue("mqReciveUrl"), JsonUtil.objectToJson(map));
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(1L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }else{
- logger.info("停止接收畅视活动队列消息");
- }
- }
-
- public OrderBean transBean(Map<String, Object> body) {
- String jsonStr = JsonUtil.objectToJson(body);
- return (OrderBean) JsonUtil.jsonToBean(jsonStr, OrderBean.class);
- }
-
- public static void main(String[] args) {
- String[] strs = new String[]{"1","2"};
- for (String string : strs) {
- System.out.println(string);
- }
- }
- }
- class CSsyncHandle implements Runnable {
-
- private Logger log = Logger.getLogger(CSsyncHandle.class);
-
- private OrderBean bean;
- private int totalSize;
- private CountDownLatch threadSignal;
-
- public CSsyncHandle(OrderBean bean,int totalSize,CountDownLatch threadSignal){
- this.bean = bean;
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- }
-
- @Override
- public void run() {
- long start = System.currentTimeMillis();
- CSorderService csOrderService = SpringContextUtil.getBean(CSorderService.class);
- csOrderService.handle(bean);
- try{
- threadSignal.countDown();
- log.info("[" + (totalSize - threadSignal.getCount()) + "/" + totalSize + "]" + (bean.getUserid()) + " 操作完成,花费时间:" + (System.currentTimeMillis() - start) + " ms");
- }catch(Exception e){
- e.printStackTrace();
- }
- }
-
- }
|