12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package com.chinacreator.process.job;
- import com.chinacreator.common.support.util.SpringContextUtil;
- import com.chinacreator.process.bean.ChannelOrderBean;
- import com.chinacreator.process.dao.ChannelOrderDao;
- import com.chinacreator.process.service.ChannelOrderService;
- import com.chinacreator.process.util.WriteLogUtil;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import java.util.*;
- import java.util.concurrent.*;
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class ChannelOrderJob {
- private static Logger log = Logger.getLogger(ChannelOrderJob.class);
-
- @Autowired
- private ChannelOrderDao channelOrderDao;
-
-
- public void doProcess() throws Exception {
- //log.info("渠道订购开始【======");
- WriteLogUtil.writeLong("渠道订购开始【======", log, "ChannelOrderDao");
- List<ChannelOrderBean> list = null;
- long stime = System.currentTimeMillis();
- try {
- list = channelOrderDao.queryNeedDo();
- if(list != null && list.size() > 0){
- log.info("需要处理的数据条数===="+list.size());
- List<ChannelOrderBean> newlist = getNotRepeatData(list);
- CountDownLatch threadSignal = new CountDownLatch(newlist.size());
- ExecutorService executorService = new ThreadPoolExecutor(8, 15, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- for (ChannelOrderBean channelOrderBean : newlist) {
- ChannelService channelService = new ChannelService(newlist.size(),threadSignal,channelOrderBean);
- executorService.execute(channelService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(1L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- //log.info("渠道订购结束===="+((System.currentTimeMillis())-stime));
- WriteLogUtil.writeLong("渠道订购结束===="+((System.currentTimeMillis())-stime), log, "ChannelOrderDao");
- }
-
- public List<ChannelOrderBean> getNotRepeatData( List<ChannelOrderBean> oldnewChannelOrderBean){
- List<ChannelOrderBean> newChannelOrderBean = new ArrayList<ChannelOrderBean>();
- Map<String,ChannelOrderBean> map = new HashMap<String, ChannelOrderBean>();
- for (int i = oldnewChannelOrderBean.size()-1; i >= 0; i--) {
- map.put(oldnewChannelOrderBean.get(i).getUserid(), oldnewChannelOrderBean.get(i));
- }
- Iterator<String> it = map.keySet().iterator();
- while (it.hasNext()) {
- String key = it.next().toString();
- newChannelOrderBean.add(map.get(key));
- }
- log.info("渠道反向通知:old="+oldnewChannelOrderBean.size()+" new:"+newChannelOrderBean.size());
- return newChannelOrderBean;
- }
-
- }
- class ChannelService implements Runnable {
-
- private static Logger logger = Logger.getLogger(ChannelService.class);
- private int totalSize;
- private CountDownLatch threadSignal;
- private ChannelOrderBean channelOrderBean;
-
-
- public ChannelService(int totalSize,CountDownLatch threadSignal,ChannelOrderBean channelOrderBean){
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- this.channelOrderBean = channelOrderBean;
- }
-
- public void run() {
- long startime = System.currentTimeMillis();
- ChannelOrderService channelOrderService = SpringContextUtil.getBean(ChannelOrderService.class);
- int result = channelOrderService.doLocalData(channelOrderBean);
- threadSignal.countDown();
- logger.info("["+channelOrderBean.getId()+"]:结果="+result+"["+ totalSize+"/"+(totalSize - threadSignal.getCount())+"],花费时间:"+(System.currentTimeMillis()-startime+"/ms"));
- }
- }
-
|