|
- package com.chinacreator.process.job;
- import java.net.URLEncoder;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import net.sf.json.JSONObject;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import com.chinacreator.common.exception.BusinessException;
- import com.chinacreator.common.util.DESUtil;
- import com.chinacreator.common.util.MD5;
- import com.chinacreator.process.bean.BackShareOrderBean;
- import com.chinacreator.process.dao.BackBusiVipAsynDao;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.URLUtil;
- /**
- * 后向调能力平台异步处理
- * @author xu.zhou
- * @date 20201022
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class BackBusiShareAsynJob {
- private static Logger logger = Logger.getLogger("backbusisharesyn");
-
- @Autowired
- private BackBusiVipAsynDao asyndao;
-
- @Autowired
- private DictionaryDao dictionaryDao;
-
- public void doProcess() throws Exception {
- logger.info(Thread.currentThread().getName()+"定时任务开始");
- long beginTime = System.currentTimeMillis();
- List<HashMap> list = asyndao.getInvokeShareData();
- if(list != null && list.size() > 0){
- CountDownLatch threadSignal = new CountDownLatch(list.size());
- ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- List<HashMap> dataList = paraseData(list);
- logger.info("数据库有效订购的用户数:"+dataList.size());
-
- for(HashMap vipmap : dataList){
- BackBusiShareSynService continueService = new BackBusiShareSynService(list.size(),threadSignal,vipmap,asyndao,dictionaryDao);
- executorService.execute(continueService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- logger.info(Thread.currentThread().getName()+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
- }
-
- /**
- * 去除重复数据,防止赠送会员接口被并发限制
- * @param dataList
- * @return
- */
- private List<HashMap> paraseData(List<HashMap> dataList){
- //去重复后的数据集
- List<HashMap> reDataList = new ArrayList<HashMap>();
- HashMap<String, List> tmpMap = new HashMap<String, List>();
- for (HashMap dataMap : dataList) {
- if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){
- logger.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID"));
- }else{
- reDataList.add(dataMap);
- List tmpList = new ArrayList();
- tmpList.add(dataMap.get("SPID"));
- tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList);
- }
- }
- return reDataList;
- }
- }
- class BackBusiShareSynService implements Runnable {
- private static Logger log = Logger.getLogger("backbusisharesyn");
- private int totalSize;
- private CountDownLatch threadSignal;
- private HashMap vipmap;
- private DictionaryDao dictionaryDao;
- private BackBusiVipAsynDao asyndao;
-
- public BackBusiShareSynService(int totalSize,CountDownLatch threadSignal,HashMap vipmap,BackBusiVipAsynDao asyndao,DictionaryDao dictionaryDao){
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- this.vipmap = vipmap;
- this.asyndao = asyndao;
- this.dictionaryDao = dictionaryDao;
- }
-
- @Override
- public void run() {
- long startime = System.currentTimeMillis();
- Map logMap = new HashMap();
- String id = vipmap.get("ID").toString();
- logMap.put("vipmap", vipmap);
- String resultcode = "-1";
- String errorinfo = "";
- String hasshare = "";
- try {
- List<HashMap> confList = asyndao.getBackBusiConf(vipmap.get("CPID").toString(), vipmap.get("SPID").toString());
- vipmap.put("confInfo", confList.get(0));
- hasshare = (confList.get(0).get("HASSHARE") == null ? "" : confList.get(0).get("HASSHARE").toString());
- //更新为正在赠送状态
- asyndao.updShareStatus(id,"4","纯免流和免流+会员产品异步处理中");
- //调能力平台
- shareOrder(hasshare);
- resultcode = "0";
- errorinfo = "成功";
- } catch (Exception e) {
- if (e instanceof BusinessException) {
- errorinfo = ((BusinessException) e).getMessage();
- resultcode = ((BusinessException) e).getCode();
- }else{
- e.printStackTrace();
- resultcode = "8000";
- errorinfo = "处理数据出现异常,"+e.getMessage();
- }
- }finally {
- threadSignal.countDown();
- String time = System.currentTimeMillis()-startime+"";
- try{
- //出现异常或调能力不成功,如果是不以接口结果为依据落订购关系的,resultcode设置为0
- if(!"0".equals(resultcode) && "1".equals(hasshare)){
- resultcode = "0";
- }
- //更新赠送会员结果出现异常
- asyndao.updShareStatus(id, resultcode, errorinfo);
- }catch(Exception e){
- log.error(vipmap.get("USERID")+"更新调能力平台结果出现异常,"+e.getMessage());
- }
- //写日志
- logMap.put("resultcode", resultcode);
- logMap.put("errorinfo", errorinfo);
- logMap.put("time", time);
- logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
- log.info(JsonUtil.objectToJson(logMap));
- }
- }
-
- /**
- * 调能力平台
- * @param orderInfo
- * @param hasshare //调能力平台标识:0调能力平台并以结果落订购关系,1调能力平台其结果不影响订购关系,为空时不调能力平台,
- * @throws Exception
- */
- private void shareOrder(String hasshare) throws Exception{
- if("0".equals(hasshare) || "1".equals(hasshare)){
- String shareErcode = "0";
- String shareErnfo = "成功";
- try {
- this.invokeShare();
- } catch (BusinessException e) {
- e.printStackTrace();
- shareErcode = e.getCode();
- shareErnfo = e.getMessage();
- if("0".equals(hasshare)){
- throw new BusinessException("8888","调用能力共享平台订购接口失败");
- }
- } finally {
- saveBackShareLog(shareErcode,shareErnfo);
- }
- }
- }
-
- /**
- * 写后向订购能力平台日志
- * @param orderInfo
- * @param errorcode
- * @param errorinfo
- */
- private void saveBackShareLog(String errorcode,String errorinfo){
- BackShareOrderBean bso = new BackShareOrderBean();
- bso.setUserid(vipmap.get("USERID").toString());
- bso.setErrorcode(errorcode);
- bso.setErrorinfo(errorinfo);
- bso.setCpid(vipmap.get("CPID").toString());
- bso.setSpid(vipmap.get("SPID").toString());
- try {
- asyndao.addShareOrderLog(bso);
- } catch (Exception e) {
- e.printStackTrace();
- log.error("userid:"+vipmap.get("USERID")+",写后向订购能力平台日志出现异常,"+e.getMessage());
- }
- }
-
- /**
- * 调能力平台
- * @throws BusinessException
- */
- public void invokeShare() throws BusinessException{
- String result = "";
- String userid = "";
- String cpid = "";
- String spid = "";
- String url = "";
- try{
- userid = vipmap.get("USERID").toString();
- cpid = vipmap.get("CPID").toString();
- spid = vipmap.get("SPID").toString();
- url = dictionaryDao.getValue("shareOrderUrl");
- result = URLUtil.get(url+"?cpid="+cpid+"&spid="+spid+"&userid="+userid,15000);
- }catch (Exception e) {
- log.error("****:"+userid+cpid+spid+":"+e.getMessage());
- if(e.getMessage().indexOf("connect timed out")!= -1) { //如果连接超时就再重试一次
- try {
- result = URLUtil.get(url+"?cpid="+cpid+"&spid="+spid+"&userid="+userid,15000);
- log.info(userid+cpid+spid+"重试:"+result);
- if(result.indexOf("您已订购,请勿重复")!= -1){ //包含您已订购,请勿重复订购
- result = "{\"resultCode\":0,\"resultInfo\":\"成功!\",\"data\":null}";
- }
- }catch (Exception e1){
- throw new BusinessException("9170", "调用能力共享平台订购接口异常!");
- }
- }else{
- throw new BusinessException("9170", "调用能力共享平台订购接口异常");
- }
- }
- JSONObject obj = JSONObject.fromObject(result);
- if(!obj.getString("resultCode").equals("0")){
- throw new BusinessException(obj.getString("resultCode"), obj.getString("resultInfo"));
- }
- }
- }
|