package com.chinacreator.process.job; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import com.alibaba.fastjson.JSONObject; import com.chinacreator.common.exception.BusinessException; import com.chinacreator.common.util.DESUtil; import com.chinacreator.common.util.MD5; import com.chinacreator.process.dao.ChannelOrderAsynDao; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.process.util.URLUtil; /** * 已弃用,葫芦侠已改为通用CP同步 * 合作方订购数据同步 * @author xu.zhou * @date 20210115 */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class ChannelOrderAsynJob { private static Logger logger = Logger.getLogger("channelordersyn"); @Autowired private ChannelOrderAsynDao asyndao; public void doProcess() throws Exception { //try{ logger.info(Thread.currentThread().getName()+"定时任务开始"); long beginTime = System.currentTimeMillis(); //asyndao = new ChannelOrderAsynDao(); //获取要处理的数据 List list = asyndao.getAsynData(); //获取业务配置数据 List confList = asyndao.getAsynConf(); if(list != null && list.size() > 0 && confList != null && confList.size() > 0){ //去除重复数据 List dataList = paraseData(list); logger.info("有效用户数:"+dataList.size()); //更新为正处理状态 asyndao.batchUpdSyncStatus(getIds(dataList)); CountDownLatch threadSignal = new CountDownLatch(dataList.size()); ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); for(HashMap busimap : dataList){ ChannelOrderAsynService continueService = new ChannelOrderAsynService(list.size(),threadSignal,busimap,asyndao,confList); executorService.execute(continueService); } executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName()+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒"); //} catch (Exception e) { //logger.error("定时任务启动处理出现异常,"+e.getMessage(),e); //} } /** * 拼接ID,用于更新数据 * @param dataList * @return */ private String getIds(List dataList){ String ids = ""; if(dataList != null && dataList.size()>0){ for(HashMap tmphm : dataList){ ids += ",'"+tmphm.get("ID")+"'"; } } if(!"".equals(ids)){ ids = ids.substring(1); } logger.info("ids=>"+ids); return ids; } /** * 去除重复数据 * @param dataList * @return */ private List paraseData(List dataList){ logger.info("去重复前数据条数:"+dataList.size()); //去重复后的数据集 List reDataList = new ArrayList(); HashMap tmpMap = new HashMap(); for (HashMap dataMap : dataList) { if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){ logger.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID")); }else{ reDataList.add(dataMap); List tmpList = new ArrayList(); tmpList.add(dataMap.get("SPID")); tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList); } } logger.info("去重复后数据条数:"+reDataList.size()); return reDataList; } /** public static void main(String[] args) { ChannelOrderAsynJob job = new ChannelOrderAsynJob(); try { job.doProcess(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } ***/ } class ChannelOrderAsynService implements Runnable { private static Logger log = Logger.getLogger("channelordersyn"); private int totalSize; private CountDownLatch threadSignal; private HashMap busimap; private ChannelOrderAsynDao asyndao; private List confList; public ChannelOrderAsynService(int totalSize,CountDownLatch threadSignal,HashMap busimap,ChannelOrderAsynDao asyndao,List confList){ this.totalSize = totalSize; this.threadSignal = threadSignal; this.busimap = busimap; this.asyndao = asyndao; this.confList = confList; } @Override public void run() { long startime = System.currentTimeMillis(); Map logMap = new HashMap(); String id = busimap.get("ID").toString(); logMap.put("busimap", busimap); String resultcode = "-1"; String errorinfo = ""; try { String result = invokeFace(); log.info("调接口返回结果:"+result); //{"resultcode":"10006","errorInfo":"签名错误"} logMap.put("result", result); if (!StringUtils.isEmpty(result)) { Map map = JsonUtil.jsonToMap(result); resultcode = (String) map.get("resultcode"); errorinfo = (String) map.get("errorInfo"); }else{ resultcode = "5000"; errorinfo = "无返回信息"; } //log.error("resultcode1=>"+resultcode); } catch (Exception e) { //log.error("resultcode1.1=>"+resultcode); if (e instanceof BusinessException) { errorinfo = ((BusinessException) e).getMessage(); resultcode = ((BusinessException) e).getCode(); }else{ e.printStackTrace(); resultcode = "8000"; errorinfo = "同步出现异常,"+e.getMessage(); log.error("同步出现异常,"+e.getMessage(),e); } //log.error("resultcode2=>"+resultcode); }finally { //log.error("resultcode3=>"+resultcode); threadSignal.countDown(); String time = System.currentTimeMillis()-startime+""; try{ if(errorinfo != null && errorinfo.length() > 500){ errorinfo = errorinfo.substring(0,500); } //更新赠送会员结果出现异常 asyndao.updSyncRes(id, resultcode, errorinfo, time); }catch(Exception e){ log.error(busimap.get("USERID")+"更新处理结果出现异常,"+e.getMessage()); resultcode = "8000"; errorinfo = "更新处理结果出现异常,"+e.getMessage(); } //写日志 logMap.put("resultcode", resultcode); logMap.put("errorinfo", errorinfo); logMap.put("time", time); //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount())); log.info(JsonUtil.objectToJson(logMap)); } } /** * 调接口 * @return * @throws Exception */ private String invokeFace() throws Exception { String result = ""; try { String invokeurl = ""; //接口地址 String pwd = ""; //加密KEY String method = ""; //方法,POST/GET int timeout = 10; //超时时间 String canceltime = busimap.get("CANCELTIME") != null ? busimap.get("CANCELTIME").toString() : ""; String ordertime = busimap.get("ORDERTIME") != null ? busimap.get("ORDERTIME").toString() : ""; String endtime = busimap.get("ENDTIME") != null ? busimap.get("ENDTIME").toString() : ""; String orderid = busimap.get("ORDERID") != null ? busimap.get("ORDERID").toString() : ""; String channel = busimap.get("CHANNEL").toString(); String type = busimap.get("TYPE").toString(); String cpid = busimap.get("CPID").toString(); String spid = busimap.get("SPID").toString(); String userid = busimap.get("USERID").toString(); for(HashMap confMap : confList){ if(spid.equals(confMap.get("SPID"))){ invokeurl = confMap.get("INVOKEURL").toString(); pwd = confMap.get("PWD").toString(); method = confMap.get("METHOD").toString(); timeout = Integer.parseInt(confMap.get("TIMEOUT").toString()); break; } } if (StringUtils.isEmpty(invokeurl) || StringUtils.isEmpty(pwd) || StringUtils.isEmpty(method)) { throw new BusinessException("5001", "接口参数不完整(invokeurl、pwd、method、timeout)" , new String[0]); } String timestamp = (System.currentTimeMillis())/1000+""; userid = DESUtil.encode(userid, pwd); String signature = orderid+cpid+spid+userid+type+channel+timestamp+pwd; signature = MD5.MD5Encode(orderid+cpid+spid+userid+type+channel+timestamp+pwd); if("POST".equals(method)){ JSONObject params = new JSONObject(); params.put("orderid", orderid); params.put("type", type); params.put("userid", userid); params.put("channel", channel); params.put("cpid", cpid); params.put("spid", spid); params.put("timestamp", timestamp); params.put("signature", signature); params.put("canceltime", canceltime); params.put("ordertime", ordertime); params.put("endtime", endtime); log.info("调接口地址:"+invokeurl+",调接口参数:"+params.toJSONString()); //调接口 result = URLUtil.postJson(invokeurl,params.toJSONString(),timeout*1000); }else{ String params = "?endtime="+endtime+"&ordertime="+ordertime+"&canceltime="+canceltime+"&orderid="+orderid+"&type="+type+"&userid="+URLEncoder.encode(userid,"utf-8")+"&channel="+channel+"&cpid="+cpid+"&spid="+spid+"×tamp="+timestamp+"&signature="+signature; invokeurl += params; //调接口 result = URLUtil.get(invokeurl,timeout*1000); } } catch (Exception e) { e.printStackTrace(); log.info("调接口出现异常,"+e.getMessage()); throw e; } return result; } }