123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- package com.chinacreator.process.job;
- import com.chinacreator.common.support.util.SpringContextUtil;
- import com.chinacreator.process.bean.VacOrderBean;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.service.NewVacOrderService;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.URLUtil;
- import com.chinacreator.video.queue.MessageService;
- import com.chinacreator.video.queue.bean.MessagePipe;
- import net.sf.json.JSONSerializer;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import java.util.*;
- import java.util.concurrent.*;
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class VacMQJob {
- private Logger logger = Logger.getLogger(VacMQJob.class);
- @Autowired
- private DictionaryDao dictionaryDao;
- @Autowired
- private MessageService messageService;
- public void doProcess() throws Exception {
- logger.info("接收VAC订购队列JOB启动");
- if (dictionaryDao.getValue("recivemq").equals("0")) {
- long beginTime = System.currentTimeMillis();
- List<MessagePipe> list = messageService.reciveBatchMessage("vacmq", 500);
- logger.info("接收VAC订购队列花费时间:" + (System.currentTimeMillis() - beginTime));
- List<VacOrderBean> listvac = new ArrayList<VacOrderBean>();
- for (MessagePipe messagePipe : list) {
- VacOrderBean vacOrderBean = transBean(messagePipe.getBody());
- listvac.add(vacOrderBean);
- }
- List<VacOrderBean> notrepeatList = getNotRepeatData(listvac);
- if(notrepeatList != null && notrepeatList.size()>0){
- CountDownLatch threadSignal = new CountDownLatch(notrepeatList.size());
- ExecutorService executorService = new ThreadPoolExecutor(8, 15, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- for (VacOrderBean vacOrderBean : notrepeatList) {
- VacService vacService = new VacService(notrepeatList.size(),threadSignal,vacOrderBean);
- executorService.execute(vacService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(1L, TimeUnit.HOURS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } else {
- logger.info("停止VAC订购接收队列消息");
- }
- }
-
- public void insertvacMq(VacOrderBean vacOrderBean) {
- try {
- Map<String, String> map = new HashMap<String, String>();
- map.put("userid", vacOrderBean.getUserid());
- map.put("cpid", vacOrderBean.getCpid());
- map.put("spid", vacOrderBean.getSpid());
- map.put("province", vacOrderBean.getProvince());
- map.put("area", vacOrderBean.getArea());
- if (vacOrderBean.getOrderstatus().equals("0")) {
- map.put("channel", vacOrderBean.getChannel());
- map.put("ordertime", vacOrderBean.getOrdertime());
- } else {
- map.put("channel", vacOrderBean.getChannel());
- map.put("canceltime", vacOrderBean.getCanceltime());
- }
- map.put("endtime", "");
- map.put("ordertype", vacOrderBean.getOrdertype());
- map.put("orderstatus", vacOrderBean.getOrderstatus());
- map.put("channe2", vacOrderBean.getChannel2()); //二级渠道
- map.put("staffid", vacOrderBean.getStaffid()); //工号
- map.put("departid", vacOrderBean.getDepartid()); //渠道号
- map.put("type", "vacmq");
- //URLUtil.post("http://10.199.99.144:8090/mq-service/recive.do", JSONSerializer.toJSON(map).toString(),1000);
- URLUtil.post(dictionaryDao.getValue("mqReciveUrl"), JSONSerializer.toJSON(map).toString(),1000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
-
- public List<VacOrderBean> getNotRepeatData( List<VacOrderBean> oldnewChannelOrderBean){
- for (VacOrderBean vacOrderBean : oldnewChannelOrderBean) {
- System.out.println("==>"+JsonUtil.objectToJson(vacOrderBean));
- }
- List<VacOrderBean> newChannelOrderBean = new ArrayList<VacOrderBean>();
- Map<String,VacOrderBean> map = new HashMap<String, VacOrderBean>();
- for (VacOrderBean vacOrderBean : oldnewChannelOrderBean) {
- if(map.containsKey(vacOrderBean.getUserid())){
- insertvacMq(vacOrderBean);
- }else{
- map.put(vacOrderBean.getUserid(), vacOrderBean);
- }
- }
- Iterator<String> it = map.keySet().iterator();
- while (it.hasNext()) {
- String key = it.next().toString();
- newChannelOrderBean.add(map.get(key));
- }
- logger.info("腾讯20元渠道反向通知:old="+oldnewChannelOrderBean.size()+" new:"+newChannelOrderBean.size());
- for (VacOrderBean vacOrderBean : newChannelOrderBean) {
- System.out.println("-->"+JsonUtil.objectToJson(vacOrderBean));
- }
- return newChannelOrderBean;
- }
- public VacOrderBean transBean(Map<String, Object> body) {
- String jsonStr = JsonUtil.objectToJson(body);
- return (VacOrderBean) JsonUtil.jsonToBean(jsonStr, VacOrderBean.class);
- }
- }
- class VacService implements Runnable {
-
- private static Logger logger = Logger.getLogger(VacService.class);
- private int totalSize;
- private CountDownLatch threadSignal;
- private VacOrderBean vacOrderBean;
-
-
- public VacService(int totalSize,CountDownLatch threadSignal,VacOrderBean vacOrderBean){
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- this.vacOrderBean = vacOrderBean;
- }
-
- public void run() {
- long startime = System.currentTimeMillis();
- NewVacOrderService vacOrderService = SpringContextUtil.getBean(NewVacOrderService.class);
- int result = vacOrderService.handle(vacOrderBean);
- threadSignal.countDown();
- logger.info("["+vacOrderBean.getUserid()+"]:结果="+result+"["+ totalSize+"/"+(totalSize - threadSignal.getCount())+"],花费时间:"+(System.currentTimeMillis()-startime+"/ms"));
- }
- }
|