package com.chinacreator.process.job; import com.alibaba.fastjson.JSONObject; import com.chinacreator.common.util.DESUtil; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.dao.QueryCapOrderDao; import com.chinacreator.process.exception.BusinessException; import com.chinacreator.process.util.URLUtil; import com.chinacreator.process.util.fakeid.MD5; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import java.sql.SQLException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; @PersistJobDataAfterExecution @DisallowConcurrentExecution public class QueryCapOrderJob { private Logger log = Logger.getLogger("queryCapOrder"); @Autowired private DictionaryDao dictionaryDao; @Autowired private QueryCapOrderDao queryCapOrderDao; private String channel="kcqueryorder"; private String pwd="315b5de2"; public void doProcess() throws Exception { List capList = queryCapOrderDao.findCapList(); if (capList!=null && capList.size()>0){ log.info("需查询cap的条数=="+capList.size()); CountDownLatch threadSignal = new CountDownLatch(capList.size());//一个线程等待其他线程各自执行完毕后再执行。 ExecutorService executorService = new ThreadPoolExecutor(10, 15, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); for (HashMap hashMap : capList) { QueryCapOrderService queryCapOrderService = new QueryCapOrderService(hashMap, capList.size(), threadSignal); executorService.execute(queryCapOrderService); } executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } log.info("处理完成"); } } class QueryCapOrderService implements Runnable{ private Logger log = Logger.getLogger("queryCapOrder"); private Map params; private int totalSize; private CountDownLatch threadSignal; public QueryCapOrderService(Map params, int totalSize, CountDownLatch threadSignal) { this.params = params; this.totalSize = totalSize; this.threadSignal = threadSignal; } @Override public void run() { String resultCode="2"; String resp=""; try { String url = dictionaryDao.getValue("queryCapOrderUrl"); String userid = String.valueOf(params.get("USERID")); String cpid = String.valueOf(params.get("CPID")); String spid = String.valueOf(params.get("SPID")); String timestamp = System.currentTimeMillis()/1000+""; userid= DESUtil.encode(userid,pwd); String signature= MD5.MD5Encode(cpid + spid + userid + channel + timestamp + pwd); HashMap map = new HashMap(); map.put("userid",userid); map.put("cpid",cpid); map.put("spid",spid); map.put("channel",channel); map.put("timestamp",timestamp); map.put("signature",signature); resp = URLUtil.postJson(url, JSONObject.toJSONString(map)); Map respMap= JSONObject.parseObject(resp, Map.class); if (respMap!=null &&resp.length()>0){ resultCode = String.valueOf(respMap.get("resultCode")); if (!("0".equals(resultCode)||"-1".equals(resultCode)||"1".equals(resultCode))){ throw new BusinessException(resultCode,String.valueOf(respMap.get("resultInfo"))); } }else { throw new BusinessException("9001","查询cap返回结果为null"); } queryCapOrderDao.updateStatus(resultCode, String.valueOf(params.get("ID"))); } catch (Exception e) { e.printStackTrace(); try { queryCapOrderDao.updateStatus("3", String.valueOf(params.get("ID"))); } catch (SQLException ex) { ex.printStackTrace(); } } finally { threadSignal.countDown(); log.info("需处理的参数:"+params+"查询cap返回结果: "+resp); } } } }