06fc74eb400aba0abd7d5aefa473f700f5dc40af.svn-base 3.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package com.chinacreator.process.job;
  2. import com.chinacreator.common.support.util.SpringContextUtil;
  3. import com.chinacreator.process.bean.ChannelOrderBean;
  4. import com.chinacreator.process.dao.ChannelOrderDao;
  5. import com.chinacreator.process.service.ChannelOrderService;
  6. import com.chinacreator.process.util.WriteLogUtil;
  7. import org.apache.log4j.Logger;
  8. import org.quartz.DisallowConcurrentExecution;
  9. import org.quartz.PersistJobDataAfterExecution;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import java.util.*;
  12. import java.util.concurrent.*;
  13. @PersistJobDataAfterExecution
  14. @DisallowConcurrentExecution
  15. public class ChannelOrderJob {
  16. private static Logger log = Logger.getLogger(ChannelOrderJob.class);
  17. @Autowired
  18. private ChannelOrderDao channelOrderDao;
  19. public void doProcess() throws Exception {
  20. //log.info("渠道订购开始【======");
  21. WriteLogUtil.writeLong("渠道订购开始【======", log, "ChannelOrderDao");
  22. List<ChannelOrderBean> list = null;
  23. long stime = System.currentTimeMillis();
  24. try {
  25. list = channelOrderDao.queryNeedDo();
  26. if(list != null && list.size() > 0){
  27. log.info("需要处理的数据条数===="+list.size());
  28. List<ChannelOrderBean> newlist = getNotRepeatData(list);
  29. CountDownLatch threadSignal = new CountDownLatch(newlist.size());
  30. ExecutorService executorService = new ThreadPoolExecutor(8, 15, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  31. for (ChannelOrderBean channelOrderBean : newlist) {
  32. ChannelService channelService = new ChannelService(newlist.size(),threadSignal,channelOrderBean);
  33. executorService.execute(channelService);
  34. }
  35. executorService.shutdown();
  36. try {
  37. executorService.awaitTermination(1L, TimeUnit.MINUTES);
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. } catch (Exception e) {
  43. e.printStackTrace();
  44. }
  45. //log.info("渠道订购结束===="+((System.currentTimeMillis())-stime));
  46. WriteLogUtil.writeLong("渠道订购结束===="+((System.currentTimeMillis())-stime), log, "ChannelOrderDao");
  47. }
  48. public List<ChannelOrderBean> getNotRepeatData( List<ChannelOrderBean> oldnewChannelOrderBean){
  49. List<ChannelOrderBean> newChannelOrderBean = new ArrayList<ChannelOrderBean>();
  50. Map<String,ChannelOrderBean> map = new HashMap<String, ChannelOrderBean>();
  51. for (int i = oldnewChannelOrderBean.size()-1; i >= 0; i--) {
  52. map.put(oldnewChannelOrderBean.get(i).getUserid(), oldnewChannelOrderBean.get(i));
  53. }
  54. Iterator<String> it = map.keySet().iterator();
  55. while (it.hasNext()) {
  56. String key = it.next().toString();
  57. newChannelOrderBean.add(map.get(key));
  58. }
  59. log.info("渠道反向通知:old="+oldnewChannelOrderBean.size()+" new:"+newChannelOrderBean.size());
  60. return newChannelOrderBean;
  61. }
  62. }
  63. class ChannelService implements Runnable {
  64. private static Logger logger = Logger.getLogger(ChannelService.class);
  65. private int totalSize;
  66. private CountDownLatch threadSignal;
  67. private ChannelOrderBean channelOrderBean;
  68. public ChannelService(int totalSize,CountDownLatch threadSignal,ChannelOrderBean channelOrderBean){
  69. this.totalSize = totalSize;
  70. this.threadSignal = threadSignal;
  71. this.channelOrderBean = channelOrderBean;
  72. }
  73. public void run() {
  74. long startime = System.currentTimeMillis();
  75. ChannelOrderService channelOrderService = SpringContextUtil.getBean(ChannelOrderService.class);
  76. int result = channelOrderService.doLocalData(channelOrderBean);
  77. threadSignal.countDown();
  78. logger.info("["+channelOrderBean.getId()+"]:结果="+result+"["+ totalSize+"/"+(totalSize - threadSignal.getCount())+"],花费时间:"+(System.currentTimeMillis()-startime+"/ms"));
  79. }
  80. }