ceb32c6b8bc84e94033a5bb88a6faed98bc846ed.svn-base 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package com.chinacreator.process.job;
  2. import com.chinacreator.common.support.util.SpringContextUtil;
  3. import com.chinacreator.common.util.URLUtil;
  4. import com.chinacreator.process.bean.OrderBean;
  5. import com.chinacreator.process.dao.DictionaryDao;
  6. import com.chinacreator.process.service.CSorderService;
  7. import com.chinacreator.process.util.JsonUtil;
  8. import com.chinacreator.video.queue.MessageService;
  9. import com.chinacreator.video.queue.bean.MessagePipe;
  10. import org.apache.log4j.Logger;
  11. import org.quartz.DisallowConcurrentExecution;
  12. import org.quartz.PersistJobDataAfterExecution;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import java.util.HashMap;
  15. import java.util.List;
  16. import java.util.Map;
  17. import java.util.concurrent.*;
  18. @PersistJobDataAfterExecution
  19. @DisallowConcurrentExecution
  20. public class CSactivityJob {
  21. private Logger logger = Logger.getLogger(CSactivityJob.class);
  22. @Autowired
  23. private DictionaryDao dictionaryDao;
  24. @Autowired
  25. private MessageService messageService;
  26. public void doProcess() throws Exception {
  27. logger.info("接收畅视活动队列JOB启动");
  28. if(dictionaryDao.getValue("recivemq").equals("0")){
  29. long beginTime = System.currentTimeMillis();
  30. List<MessagePipe> list = messageService
  31. .reciveBatchMessage("csorder", 500);
  32. Map<String,String> map = new HashMap<String, String>();
  33. logger.info("接收订购队列花费时间:"+(System.currentTimeMillis()-beginTime));
  34. CountDownLatch threadSignal = new CountDownLatch(list.size());
  35. ExecutorService executorService = new ThreadPoolExecutor(3, 6, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  36. if(list != null && list.size() > 0){
  37. for (MessagePipe messagePipe : list) {
  38. OrderBean orderBean = transBean(messagePipe.getBody());
  39. if(map.get(orderBean.getUserid()) == null){
  40. CSsyncHandle csSyncHandle = new CSsyncHandle(orderBean,list.size(),threadSignal);
  41. executorService.execute(csSyncHandle);
  42. }else{
  43. try {
  44. Map<String, String> outmap = new HashMap<String, String>();
  45. outmap.put("id", orderBean.getId());
  46. outmap.put("userid", orderBean.getUserid());
  47. outmap.put("cpid", orderBean.getCpid());
  48. outmap.put("spid", orderBean.getSpid());
  49. outmap.put("channel", orderBean.getChannel());
  50. outmap.put("orderType", orderBean.getOrderType());
  51. outmap.put("type", "csorder");
  52. //URLUtil.post("http://10.199.99.144:8090/mq-service/recive.do", JsonUtil.objectToJson(map));
  53. URLUtil.post(dictionaryDao.getValue("mqReciveUrl"), JsonUtil.objectToJson(map));
  54. } catch (Exception e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. }
  59. }
  60. executorService.shutdown();
  61. try {
  62. executorService.awaitTermination(1L, TimeUnit.MINUTES);
  63. } catch (InterruptedException e) {
  64. e.printStackTrace();
  65. }
  66. }else{
  67. logger.info("停止接收畅视活动队列消息");
  68. }
  69. }
  70. public OrderBean transBean(Map<String, Object> body) {
  71. String jsonStr = JsonUtil.objectToJson(body);
  72. return (OrderBean) JsonUtil.jsonToBean(jsonStr, OrderBean.class);
  73. }
  74. public static void main(String[] args) {
  75. String[] strs = new String[]{"1","2"};
  76. for (String string : strs) {
  77. System.out.println(string);
  78. }
  79. }
  80. }
  81. class CSsyncHandle implements Runnable {
  82. private Logger log = Logger.getLogger(CSsyncHandle.class);
  83. private OrderBean bean;
  84. private int totalSize;
  85. private CountDownLatch threadSignal;
  86. public CSsyncHandle(OrderBean bean,int totalSize,CountDownLatch threadSignal){
  87. this.bean = bean;
  88. this.totalSize = totalSize;
  89. this.threadSignal = threadSignal;
  90. }
  91. @Override
  92. public void run() {
  93. long start = System.currentTimeMillis();
  94. CSorderService csOrderService = SpringContextUtil.getBean(CSorderService.class);
  95. csOrderService.handle(bean);
  96. try{
  97. threadSignal.countDown();
  98. log.info("[" + (totalSize - threadSignal.getCount()) + "/" + totalSize + "]" + (bean.getUserid()) + " 操作完成,花费时间:" + (System.currentTimeMillis() - start) + " ms");
  99. }catch(Exception e){
  100. e.printStackTrace();
  101. }
  102. }
  103. }