123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- package com.chinacreator.process.job;
- import java.net.URLEncoder;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import org.apache.commons.lang.StringUtils;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import com.alibaba.fastjson.JSONObject;
- import com.chinacreator.common.exception.BusinessException;
- import com.chinacreator.common.util.DESUtil;
- import com.chinacreator.common.util.MD5;
- import com.chinacreator.process.dao.ChannelOrderAsynDao;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.URLUtil;
- /**
- * 已弃用,葫芦侠已改为通用CP同步
- * 合作方订购数据同步
- * @author xu.zhou
- * @date 20210115
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class ChannelOrderAsynJob {
- private static Logger logger = Logger.getLogger("channelordersyn");
-
- @Autowired
- private ChannelOrderAsynDao asyndao;
-
- public void doProcess() throws Exception {
- //try{
- logger.info(Thread.currentThread().getName()+"定时任务开始");
- long beginTime = System.currentTimeMillis();
-
- //asyndao = new ChannelOrderAsynDao();
-
- //获取要处理的数据
- List<HashMap> list = asyndao.getAsynData();
- //获取业务配置数据
- List<HashMap> confList = asyndao.getAsynConf();
- if(list != null && list.size() > 0 && confList != null && confList.size() > 0){
- //去除重复数据
- List<HashMap> dataList = paraseData(list);
- logger.info("有效用户数:"+dataList.size());
-
- //更新为正处理状态
- asyndao.batchUpdSyncStatus(getIds(dataList));
-
- CountDownLatch threadSignal = new CountDownLatch(dataList.size());
- ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- for(HashMap busimap : dataList){
- ChannelOrderAsynService continueService = new ChannelOrderAsynService(list.size(),threadSignal,busimap,asyndao,confList);
- executorService.execute(continueService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- logger.info(Thread.currentThread().getName()+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
- //} catch (Exception e) {
- //logger.error("定时任务启动处理出现异常,"+e.getMessage(),e);
- //}
- }
-
- /**
- * 拼接ID,用于更新数据
- * @param dataList
- * @return
- */
- private String getIds(List<HashMap> dataList){
- String ids = "";
- if(dataList != null && dataList.size()>0){
- for(HashMap tmphm : dataList){
- ids += ",'"+tmphm.get("ID")+"'";
- }
- }
- if(!"".equals(ids)){
- ids = ids.substring(1);
- }
- logger.info("ids=>"+ids);
- return ids;
- }
-
- /**
- * 去除重复数据
- * @param dataList
- * @return
- */
- private List<HashMap> paraseData(List<HashMap> dataList){
- logger.info("去重复前数据条数:"+dataList.size());
- //去重复后的数据集
- List<HashMap> reDataList = new ArrayList<HashMap>();
- HashMap<String, List> tmpMap = new HashMap<String, List>();
- for (HashMap dataMap : dataList) {
- if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){
- logger.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID"));
- }else{
- reDataList.add(dataMap);
- List tmpList = new ArrayList();
- tmpList.add(dataMap.get("SPID"));
- tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList);
- }
- }
- logger.info("去重复后数据条数:"+reDataList.size());
- return reDataList;
- }
-
- /**
- public static void main(String[] args) {
- ChannelOrderAsynJob job = new ChannelOrderAsynJob();
- try {
- job.doProcess();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- ***/
- }
- class ChannelOrderAsynService implements Runnable {
- private static Logger log = Logger.getLogger("channelordersyn");
- private int totalSize;
- private CountDownLatch threadSignal;
- private HashMap busimap;
- private ChannelOrderAsynDao asyndao;
- private List<HashMap> confList;
-
- public ChannelOrderAsynService(int totalSize,CountDownLatch threadSignal,HashMap busimap,ChannelOrderAsynDao asyndao,List<HashMap> confList){
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- this.busimap = busimap;
- this.asyndao = asyndao;
- this.confList = confList;
- }
-
- @Override
- public void run() {
- long startime = System.currentTimeMillis();
- Map logMap = new HashMap();
- String id = busimap.get("ID").toString();
- logMap.put("busimap", busimap);
- String resultcode = "-1";
- String errorinfo = "";
- try {
- String result = invokeFace();
- log.info("调接口返回结果:"+result);
- //{"resultcode":"10006","errorInfo":"签名错误"}
- logMap.put("result", result);
- if (!StringUtils.isEmpty(result)) {
- Map<?, ?> map = JsonUtil.jsonToMap(result);
- resultcode = (String) map.get("resultcode");
- errorinfo = (String) map.get("errorInfo");
- }else{
- resultcode = "5000";
- errorinfo = "无返回信息";
- }
- //log.error("resultcode1=>"+resultcode);
- } catch (Exception e) {
- //log.error("resultcode1.1=>"+resultcode);
- if (e instanceof BusinessException) {
- errorinfo = ((BusinessException) e).getMessage();
- resultcode = ((BusinessException) e).getCode();
- }else{
- e.printStackTrace();
- resultcode = "8000";
- errorinfo = "同步出现异常,"+e.getMessage();
- log.error("同步出现异常,"+e.getMessage(),e);
- }
- //log.error("resultcode2=>"+resultcode);
- }finally {
- //log.error("resultcode3=>"+resultcode);
- threadSignal.countDown();
- String time = System.currentTimeMillis()-startime+"";
- try{
- if(errorinfo != null && errorinfo.length() > 500){
- errorinfo = errorinfo.substring(0,500);
- }
- //更新赠送会员结果出现异常
- asyndao.updSyncRes(id, resultcode, errorinfo, time);
- }catch(Exception e){
- log.error(busimap.get("USERID")+"更新处理结果出现异常,"+e.getMessage());
- resultcode = "8000";
- errorinfo = "更新处理结果出现异常,"+e.getMessage();
- }
- //写日志
- logMap.put("resultcode", resultcode);
- logMap.put("errorinfo", errorinfo);
- logMap.put("time", time);
- //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
- log.info(JsonUtil.objectToJson(logMap));
- }
- }
-
- /**
- * 调接口
- * @return
- * @throws Exception
- */
- private String invokeFace() throws Exception {
- String result = "";
- try {
- String invokeurl = ""; //接口地址
- String pwd = ""; //加密KEY
- String method = ""; //方法,POST/GET
- int timeout = 10; //超时时间
- String canceltime = busimap.get("CANCELTIME") != null ? busimap.get("CANCELTIME").toString() : "";
- String ordertime = busimap.get("ORDERTIME") != null ? busimap.get("ORDERTIME").toString() : "";
- String endtime = busimap.get("ENDTIME") != null ? busimap.get("ENDTIME").toString() : "";
- String orderid = busimap.get("ORDERID") != null ? busimap.get("ORDERID").toString() : "";
- String channel = busimap.get("CHANNEL").toString();
- String type = busimap.get("TYPE").toString();
- String cpid = busimap.get("CPID").toString();
- String spid = busimap.get("SPID").toString();
- String userid = busimap.get("USERID").toString();
- for(HashMap confMap : confList){
- if(spid.equals(confMap.get("SPID"))){
- invokeurl = confMap.get("INVOKEURL").toString();
- pwd = confMap.get("PWD").toString();
- method = confMap.get("METHOD").toString();
- timeout = Integer.parseInt(confMap.get("TIMEOUT").toString());
- break;
- }
- }
-
- if (StringUtils.isEmpty(invokeurl)
- || StringUtils.isEmpty(pwd)
- || StringUtils.isEmpty(method)) {
- throw new BusinessException("5001", "接口参数不完整(invokeurl、pwd、method、timeout)" , new String[0]);
- }
- String timestamp = (System.currentTimeMillis())/1000+"";
- userid = DESUtil.encode(userid, pwd);
- String signature = orderid+cpid+spid+userid+type+channel+timestamp+pwd;
- signature = MD5.MD5Encode(orderid+cpid+spid+userid+type+channel+timestamp+pwd);
- if("POST".equals(method)){
- JSONObject params = new JSONObject();
- params.put("orderid", orderid);
- params.put("type", type);
- params.put("userid", userid);
- params.put("channel", channel);
- params.put("cpid", cpid);
- params.put("spid", spid);
- params.put("timestamp", timestamp);
- params.put("signature", signature);
- params.put("canceltime", canceltime);
- params.put("ordertime", ordertime);
- params.put("endtime", endtime);
- log.info("调接口地址:"+invokeurl+",调接口参数:"+params.toJSONString());
- //调接口
- result = URLUtil.postJson(invokeurl,params.toJSONString(),timeout*1000);
- }else{
- String params = "?endtime="+endtime+"&ordertime="+ordertime+"&canceltime="+canceltime+"&orderid="+orderid+"&type="+type+"&userid="+URLEncoder.encode(userid,"utf-8")+"&channel="+channel+"&cpid="+cpid+"&spid="+spid+"×tamp="+timestamp+"&signature="+signature;
- invokeurl += params;
- //调接口
- result = URLUtil.get(invokeurl,timeout*1000);
- }
- } catch (Exception e) {
- e.printStackTrace();
- log.info("调接口出现异常,"+e.getMessage());
- throw e;
- }
- return result;
- }
- }
|