6fdcd18c685666708942563fe9063d1550bb2b6f.svn-base 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package com.chinacreator.process.job;
  2. import com.chinacreator.common.support.util.SpringContextUtil;
  3. import com.chinacreator.process.bean.VacOrderBean;
  4. import com.chinacreator.process.dao.DictionaryDao;
  5. import com.chinacreator.process.service.NewVacOrderService;
  6. import com.chinacreator.process.util.JsonUtil;
  7. import com.chinacreator.process.util.URLUtil;
  8. import com.chinacreator.video.queue.MessageService;
  9. import com.chinacreator.video.queue.bean.MessagePipe;
  10. import net.sf.json.JSONSerializer;
  11. import org.apache.log4j.Logger;
  12. import org.quartz.DisallowConcurrentExecution;
  13. import org.quartz.PersistJobDataAfterExecution;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import java.util.*;
  16. import java.util.concurrent.*;
  17. @PersistJobDataAfterExecution
  18. @DisallowConcurrentExecution
  19. public class VacMQJob {
  20. private Logger logger = Logger.getLogger(VacMQJob.class);
  21. @Autowired
  22. private DictionaryDao dictionaryDao;
  23. @Autowired
  24. private MessageService messageService;
  25. public void doProcess() throws Exception {
  26. logger.info("接收VAC订购队列JOB启动");
  27. if (dictionaryDao.getValue("recivemq").equals("0")) {
  28. long beginTime = System.currentTimeMillis();
  29. List<MessagePipe> list = messageService.reciveBatchMessage("vacmq", 500);
  30. logger.info("接收VAC订购队列花费时间:" + (System.currentTimeMillis() - beginTime));
  31. List<VacOrderBean> listvac = new ArrayList<VacOrderBean>();
  32. for (MessagePipe messagePipe : list) {
  33. VacOrderBean vacOrderBean = transBean(messagePipe.getBody());
  34. listvac.add(vacOrderBean);
  35. }
  36. List<VacOrderBean> notrepeatList = getNotRepeatData(listvac);
  37. if(notrepeatList != null && notrepeatList.size()>0){
  38. CountDownLatch threadSignal = new CountDownLatch(notrepeatList.size());
  39. ExecutorService executorService = new ThreadPoolExecutor(8, 15, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  40. for (VacOrderBean vacOrderBean : notrepeatList) {
  41. VacService vacService = new VacService(notrepeatList.size(),threadSignal,vacOrderBean);
  42. executorService.execute(vacService);
  43. }
  44. executorService.shutdown();
  45. try {
  46. executorService.awaitTermination(1L, TimeUnit.HOURS);
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. } else {
  52. logger.info("停止VAC订购接收队列消息");
  53. }
  54. }
  55. public void insertvacMq(VacOrderBean vacOrderBean) {
  56. try {
  57. Map<String, String> map = new HashMap<String, String>();
  58. map.put("userid", vacOrderBean.getUserid());
  59. map.put("cpid", vacOrderBean.getCpid());
  60. map.put("spid", vacOrderBean.getSpid());
  61. map.put("province", vacOrderBean.getProvince());
  62. map.put("area", vacOrderBean.getArea());
  63. if (vacOrderBean.getOrderstatus().equals("0")) {
  64. map.put("channel", vacOrderBean.getChannel());
  65. map.put("ordertime", vacOrderBean.getOrdertime());
  66. } else {
  67. map.put("channel", vacOrderBean.getChannel());
  68. map.put("canceltime", vacOrderBean.getCanceltime());
  69. }
  70. map.put("endtime", "");
  71. map.put("ordertype", vacOrderBean.getOrdertype());
  72. map.put("orderstatus", vacOrderBean.getOrderstatus());
  73. map.put("channe2", vacOrderBean.getChannel2()); //二级渠道
  74. map.put("staffid", vacOrderBean.getStaffid()); //工号
  75. map.put("departid", vacOrderBean.getDepartid()); //渠道号
  76. map.put("type", "vacmq");
  77. //URLUtil.post("http://10.199.99.144:8090/mq-service/recive.do", JSONSerializer.toJSON(map).toString(),1000);
  78. URLUtil.post(dictionaryDao.getValue("mqReciveUrl"), JSONSerializer.toJSON(map).toString(),1000);
  79. } catch (Exception e) {
  80. e.printStackTrace();
  81. }
  82. }
  83. public List<VacOrderBean> getNotRepeatData( List<VacOrderBean> oldnewChannelOrderBean){
  84. for (VacOrderBean vacOrderBean : oldnewChannelOrderBean) {
  85. System.out.println("==>"+JsonUtil.objectToJson(vacOrderBean));
  86. }
  87. List<VacOrderBean> newChannelOrderBean = new ArrayList<VacOrderBean>();
  88. Map<String,VacOrderBean> map = new HashMap<String, VacOrderBean>();
  89. for (VacOrderBean vacOrderBean : oldnewChannelOrderBean) {
  90. if(map.containsKey(vacOrderBean.getUserid())){
  91. insertvacMq(vacOrderBean);
  92. }else{
  93. map.put(vacOrderBean.getUserid(), vacOrderBean);
  94. }
  95. }
  96. Iterator<String> it = map.keySet().iterator();
  97. while (it.hasNext()) {
  98. String key = it.next().toString();
  99. newChannelOrderBean.add(map.get(key));
  100. }
  101. logger.info("腾讯20元渠道反向通知:old="+oldnewChannelOrderBean.size()+" new:"+newChannelOrderBean.size());
  102. for (VacOrderBean vacOrderBean : newChannelOrderBean) {
  103. System.out.println("-->"+JsonUtil.objectToJson(vacOrderBean));
  104. }
  105. return newChannelOrderBean;
  106. }
  107. public VacOrderBean transBean(Map<String, Object> body) {
  108. String jsonStr = JsonUtil.objectToJson(body);
  109. return (VacOrderBean) JsonUtil.jsonToBean(jsonStr, VacOrderBean.class);
  110. }
  111. }
  112. class VacService implements Runnable {
  113. private static Logger logger = Logger.getLogger(VacService.class);
  114. private int totalSize;
  115. private CountDownLatch threadSignal;
  116. private VacOrderBean vacOrderBean;
  117. public VacService(int totalSize,CountDownLatch threadSignal,VacOrderBean vacOrderBean){
  118. this.totalSize = totalSize;
  119. this.threadSignal = threadSignal;
  120. this.vacOrderBean = vacOrderBean;
  121. }
  122. public void run() {
  123. long startime = System.currentTimeMillis();
  124. NewVacOrderService vacOrderService = SpringContextUtil.getBean(NewVacOrderService.class);
  125. int result = vacOrderService.handle(vacOrderBean);
  126. threadSignal.countDown();
  127. logger.info("["+vacOrderBean.getUserid()+"]:结果="+result+"["+ totalSize+"/"+(totalSize - threadSignal.getCount())+"],花费时间:"+(System.currentTimeMillis()-startime+"/ms"));
  128. }
  129. }