package com.chinacreator.process.job; import java.net.URLEncoder; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import net.sf.json.JSONObject; import org.apache.commons.lang.time.DateUtils; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.StringUtils; import com.chinacreator.common.exception.BusinessException; import com.chinacreator.common.util.DESUtil; import com.chinacreator.common.util.MD5; import com.chinacreator.common.util.URLUtil; import com.chinacreator.process.bean.OrderLog; import com.chinacreator.process.bean.PointShopMqBean; import com.chinacreator.process.bean.PointShopOrderBean; import com.chinacreator.process.dao.BackBusiVipAsynDao; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.dao.PointShopDao; import com.chinacreator.process.util.HttpInvoke; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.video.queue.MessageService; import com.chinacreator.video.queue.bean.MessagePipe; /** * 积分商城业务异步处理 * 取消队列方式 * @author xu.zhou * @date 20210118 * PointShopMQJob和VipRetryMQJob两个定时任务不再启用 */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class PointShopSyncJob { private Logger log = Logger.getLogger("pointshop"); @Autowired private DictionaryDao dictionaryDao; @Autowired private PointShopDao pointShopDao; @Autowired private BackBusiVipAsynDao asyndao; public void doProcess() throws Exception { //dictionaryDao = new DictionaryDao(); //pointShopDao = new PointShopDao(); //asyndao = new BackBusiVipAsynDao(); String trycount = dictionaryDao.getValue("vipasyntrycount");//后向产品赠送会员异步处理重试次数 if(trycount == null || "".equals(trycount)) trycount = "3"; log.info(Thread.currentThread().getName()+"定时任务开始"); long beginTime = System.currentTimeMillis(); List list = pointShopDao.getVipAsynData(trycount); if(list != null && list.size() > 0){ //推送月份 String pushmonth = new SimpleDateFormat("yyyyMM").format(new Date()); CountDownLatch threadSignal = new CountDownLatch(list.size()); ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); //去除重复数据 List dataList = paraseData(list); log.info("数据库有效订购的用户数:"+dataList.size()); for(HashMap vipmap : dataList){ PointShopSynService continueService = new PointShopSynService(list.size(),threadSignal,vipmap,asyndao,dictionaryDao, pointShopDao); executorService.execute(continueService); } executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } log.info(Thread.currentThread().getName()+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒"); } /** * 去除重复数据,防止赠送会员接口被并发限制 * @param dataList * @return */ private List paraseData(List dataList){ //去重复后的数据集 List reDataList = new ArrayList(); HashMap tmpMap = new HashMap(); for (HashMap dataMap : dataList) { if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){ log.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID")); }else{ reDataList.add(dataMap); List tmpList = new ArrayList(); tmpList.add(dataMap.get("SPID")); tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList); } } return reDataList; } public static void main(String[] args) { PointShopSyncJob job = new PointShopSyncJob(); try { //job.doProcess(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } class PointShopSynService implements Runnable { private static Logger log = Logger.getLogger("pointshop"); private int totalSize; private CountDownLatch threadSignal; private HashMap busimap; private DictionaryDao dictionaryDao; private BackBusiVipAsynDao asyndao; private PointShopDao pointShopDao; public PointShopSynService(int totalSize,CountDownLatch threadSignal,HashMap busimap,BackBusiVipAsynDao asyndao,DictionaryDao dictionaryDao,PointShopDao pointShopDao){ this.totalSize = totalSize; this.threadSignal = threadSignal; this.busimap = busimap; this.asyndao = asyndao; this.dictionaryDao = dictionaryDao; this.pointShopDao = pointShopDao; } /** * 业务处理 * @param mqBean */ public void run(){ long startime = System.currentTimeMillis(); Map logMap = new HashMap(); logMap.put("data", busimap); PointShopOrderBean psoBean = null; String resultCode = "-1"; String resultInfo = ""; String huchiSpid = ""; //互斥业务ID,如果不为空,则业务互斥 try { //查询积分商城订购记录表 psoBean = pointShopDao.getOrderRec(busimap.get("ID").toString()); if(psoBean == null){ throw new BusinessException("9071", "积分商城订购记录表无对应订单信息,订单流水号:"+busimap.get("ID")); } //判断业务互斥 huchiSpid = checkMutual(psoBean); //判断业务互斥 if(!"".equals(huchiSpid)){ psoBean.setVipstatus("4"); throw new BusinessException("9003","已办理互斥业务"); } //业务类型,1会员,2纯免流,3会员加免流 String busiType = busimap.get("BUSITYPE").toString(); //获取业务结束时间 String newEndtime = this.getEndTime(psoBean); //赠送会员状态,0成功,1未赠送,2赠送中,3失败,4不赠送,赠送成功后小毛会改为成功 psoBean.setVipstatus("2"); //办理结果编码,0成功,1待处理,2处理中,其他为异常 psoBean.setResultCode("2"); psoBean.setResultInfo("处理中"); if("1".equals(busiType)){ //会员 //更新订购状态,调会员接口 String sendVipRes = invokeSendVip(psoBean); psoBean.setVipstatus("0"); }else if("2".equals(busiType)){ //免流 psoBean.setVipstatus("4"); //不赠送 //后向订购记录表、积分商城订购记录表的ID是相同的 pointShopDao.updSendVipStatus(psoBean,"TD_BACKBUSI_ORDER_REC"); pointShopDao.updSendVipStatus(psoBean,"TD_POINTS_ORDER_REC"); //设置订购结束时间和订购渠道 psoBean.setEndtime(newEndtime); psoBean.setOrderchannel(psoBean.getAppId()); //生成订购关系 pointShopDao.order(psoBean); //saveOrderLog(psoBean,"0","成功"); //写订购日志 }else if("3".equals(busiType)){ //会员+免流 //更新订购状态,调会员接口 String sendVipRes = invokeSendVip(psoBean); //赠送成功,更新或新增订购关系 psoBean.setVipstatus("0"); //设置订购结束时间和订购渠道 psoBean.setEndtime(newEndtime); psoBean.setOrderchannel(psoBean.getAppId()); //生成订购关系 pointShopDao.order(psoBean); }else{ psoBean.setVipstatus("5"); //赠送超时 throw new BusinessException("8000","业务类型有误", new String[0]); } resultCode = "0"; resultInfo = "成功"; } catch (Exception e) { e.printStackTrace(); if (e instanceof BusinessException) { resultInfo = ((BusinessException) e).getMessage(); resultCode = ((BusinessException) e).getCode(); }else{ resultCode = "8000"; resultInfo = "系统错误,"+e.getMessage(); } } finally{ threadSignal.countDown(); String time = System.currentTimeMillis()-startime+""; //更新订购操作表信息, try { if(resultInfo != null && resultInfo.length()>250){ resultInfo = resultInfo.substring(0,250); } if(psoBean != null){ psoBean.setResultCode(resultCode); psoBean.setResultInfo(resultInfo); //更新后向产品订购记录表处理结果 pointShopDao.updSendVipRes(psoBean.getVipstatus(), psoBean.getResultCode(), psoBean.getResultInfo(), psoBean.getId(), "TD_BACKBUSI_ORDER_REC", time); //更新积分商城订购记录表处理结果 pointShopDao.updSendVipRes(psoBean.getVipstatus(), psoBean.getResultCode(), psoBean.getResultInfo(), psoBean.getId(), "TD_POINTS_ORDER_REC",null); saveOrderLog(psoBean); //写订购日志 }else{ //积分商城订购表无记录 pointShopDao.updSendVipRes("5", resultCode, resultInfo, (String)busimap.get("ID"), "TD_BACKBUSI_ORDER_REC", time); } } catch (Exception e) { e.printStackTrace(); log.error("更新数据出现异常,"+psoBean.getUserid()+", resultCode:"+resultCode + ", resultInfo:"+resultInfo); } //充值成功发送短信 if("0".equals(resultCode)){ inserSmstMq(psoBean); //发送短信 } //回调通知,处理成功或者重试次数已满 String bcstatus = ""; try { String trycountconf = dictionaryDao.getValue("vipasyntrycount"); String trycount = busimap.get("RETRYCOUNT").toString(); if (StringUtils.isEmpty(trycountconf)) { trycountconf = "3"; } if("0".equals(resultCode) || Integer.parseInt(trycount)+1 >= Integer.parseInt(trycountconf)){ //回调积分商城通知接口 this.callBack(busimap.get("ID").toString()); } } catch (Exception e2) { e2.printStackTrace(); } logMap.put("data2", psoBean); logMap.put("reusltCode", resultCode); logMap.put("resultInfo", resultInfo); logMap.put("bcstatus", bcstatus); log.info(JsonUtil.objectToJson(logMap)); } } /** * 更新执行前状态,调接口送会员 * @param psoBean * @return * @throws Exception */ private String invokeSendVip(PointShopOrderBean psoBean) throws Exception{ //后向订购记录表、积分商城订购记录表的ID是相同的 pointShopDao.updSendVipStatus(psoBean,"TD_BACKBUSI_ORDER_REC"); pointShopDao.updSendVipStatus(psoBean,"TD_POINTS_ORDER_REC"); String sendVipRes = sendVip(psoBean); if("9070".equals(sendVipRes)){ psoBean.setVipstatus("5"); //赠送超时 throw new BusinessException("9070","赠送会员超时", new String[0]); } if(!"0".equals(sendVipRes)){ //赠送失败,再查记录表状态,(成功后小毛会改为0) //获取办理编码,如果赠送会员成功,小毛会更新vipstatus为0 sendVipRes = pointShopDao.getBackVipstatus(psoBean.getId()); } if(!"0".equals(sendVipRes)){//赠送未成功 //未成功,当作超时处理启动重试 psoBean.setVipstatus("5"); //赠送超时 throw new BusinessException("9070","赠送会员超时", new String[0]); } return sendVipRes; } /** * 写订购日志 * @param noBean PointShopOrderBean */ private void saveOrderLog(PointShopOrderBean noBean){ OrderLog orderLog = new OrderLog(); orderLog.setApptype("2"); orderLog.setArea(noBean.getArea()); orderLog.setChannel(noBean.getOrderchannel()); orderLog.setOrderstatus(0); orderLog.setStatus(0); orderLog.setCpid(noBean.getCpid()); orderLog.setIsexperience(0); orderLog.setOrdertype("0"); orderLog.setProvince(noBean.getProvince()); orderLog.setSpid(noBean.getSpid()); orderLog.setErrorcode(noBean.getResultCode()); orderLog.setErrorinfo(noBean.getResultInfo()); //手机号码不为空,且有办理免流业务的,记录订购日志,单独只送会员的不记订购日志 if(noBean.getUserid() != null && !"null".equals(noBean.getUserid().trim()) && noBean.getUserid().trim().length() == 11 && noBean.getEndtime() != null){ orderLog.setUserid(noBean.getUserid().trim()); try { this.pointShopDao.addOrderLog(orderLog); } catch (Exception e) { log.error("orderId: "+noBean.getId()+", 添加日志出现异常,"+e.getMessage()); e.printStackTrace(); } } } /** * 查询本地订购关系表当前是否有已生效的订购关系 * @param orderInfo * @return * @throws Exception */ private Map hasEffect(PointShopOrderBean psoBean) throws Exception{ Map reMap = new HashMap(); boolean hasEffect = false; Map currOrderInfo = null; String currentTime = pointShopDao.currTime(); //查询用户本地订购关系表未失效的订购数据 currOrderInfo = pointShopDao.findByUserAndSpid(psoBean.getUserid(),psoBean.getCpid(),psoBean.getSpid()); if(currOrderInfo != null){//本地有订购关系 if(Long.parseLong(currOrderInfo.get("ENDTIME").toString()) >= Long.parseLong(currentTime)){ //结束时间大于或等于当前时间,订购关系有效 hasEffect = true; } } reMap.put("hasEffect", hasEffect); reMap.put("currOrderInfo", currOrderInfo); return reMap; } /** * 判断业务是否互斥 * @param orderInfo * @throws Exception */ private String checkMutual(PointShopOrderBean psoBean) throws Exception{ String huchiSpid = ""; boolean result = false; String spid = psoBean.getSpid(); List spList = pointShopDao.findSpInfo(spid); if(spList != null && spList.size()>0){ if (spList.get(0).get("MUTEX") != null && !StringUtils.isEmpty(spList.get(0).get("MUTEX").toString())) { String[] mutexSpids = spList.get(0).get("MUTEX").toString().trim().split(","); //[{ORDERIDA=2019052, AREA=长沙, SPID=1167, PROVINCE=湖南, ID=201905241107578658666, ORDERCHANNEL=t, STATUS=1, ORDERTIME=20190524104129, USERID=18673197465, CPID=youtu}] //查询用户本地订购关系表未失效的订购数据 List list = pointShopDao.findOrderRelaAll(psoBean.getUserid()); if (list != null && list.size() > 0) { for (String mutexSpid : mutexSpids) { for (HashMap hm : list) { if (hm.get("SPID") != null && !hm.get("SPID").equals(spid) && hm.get("SPID").equals(mutexSpid) && !"2".equals(hm.get("STATUS"))) { huchiSpid = hm.get("SPID")+""; result = true; break; } } } } } } return huchiSpid; } /** * 赠送会员 * @param orderInfo * @return * @throws Exception */ private String sendVip(PointShopOrderBean orderBean)throws Exception{ String resultcode = "3"; //失败 try { //http://114.255.201.238:8090/video-activity/eshop/vip String vipurl = this.dictionaryDao.getValue("backBusiVipUrl"); String timestamp = (System.currentTimeMillis() / 1000) + ""; String id = orderBean.getId(); //TD_POINTS_ORDER_REC的ID,也是TD_BACKBUSI_ORDER_REC的ID String userid = orderBean.getUserid(); String orderid = orderBean.getOrderNo(); String cpid = orderBean.getCpid(); String spid = orderBean.getSpid(); //String goodscode = orderBean.getGoodsCode(); String pwd = ""; List confList = pointShopDao.getBackBusiConf(cpid, spid); pwd = confList.get(0).get("PWD").toString(); userid = DESUtil.encode(userid, pwd); //MD5(orderid+userid+goodscode+pwd+timestamp)转换为十六进制ASCII 码字符串,共32 个字符,全小写 userid= Des(手机号码,pwd) //MD5(orderid+userid+timestamp+pwd)转换为十六进制ASCII 码字符串,共32 个字符,全小写 String signature = MD5.MD5Encode(orderid + userid + timestamp + pwd); signature = signature.toLowerCase(); vipurl = vipurl + "?userid=" + URLEncoder.encode(userid, "utf-8")+ "&id=" +id+ "&orderid="+ orderid + "&cpid=" + cpid + "&spid=" + spid + "×tamp=" + timestamp + "&signature=" + signature+ "&apptype=2"; log.info("vipurl: "+vipurl); //http://114.255.201.228:86/activity/eshop/vip?userid=iafPbU9aRLghY%2FEVMXFeag%3D%3D&orderid=201906231206498662914&goodscode=pointshop130×tamp=1561445765&signature=47fe0e3900b29ef88fd0889b7c0e4cc6&apptype=5 String result = URLUtil.get(vipurl,30*1000); //调赠送会员接口,超时时间设置为10秒 log.info("赠送会员结果=> userid: " +userid+", orderid: "+orderBean.getOrderNo()+" , result: "+result); Map map = JsonUtil.jsonToMap(result); resultcode = (String)map.get("resultcode"); if(resultcode.equals("0")){ log.info("赠送成功"); } } catch (Exception e) { e.printStackTrace(); log.error("id=>"+orderBean.getId()+"=>userid: "+orderBean.getUserid()+"赠送会员失败,"+e); } return resultcode; } /** * 回调积分商城 * @param outputObj * @param id 我方生成的订单流水号 * @param requestId 我方生成的请求ID */ private String callBack(String id){ String bcstatus = "1"; //回调状态,1未回调,0回调完成,2回调异常 String cbReqParams = ""; //回调请求报文 String cbRspParams = ""; //回调响应报文 String requestId = ""; //我方生成的请求ID try { PointShopOrderBean bean = pointShopDao.getOrderRec(id); TreeMap orderOutObj = null; if(bean != null){ requestId = bean.getRequestId(); orderOutObj = new TreeMap(); orderOutObj.put("account",bean.getUserid()); orderOutObj.put("orderId",bean.getId()); orderOutObj.put("finishTime",bean.getFinishTime()); orderOutObj.put("goodsCode",bean.getSpid()); orderOutObj.put("orderNo",bean.getOrderNo()); orderOutObj.put("orderTime",bean.getOrderTime()); //orderOutObj.put("failDesc",bean.getResultInfo()); String orderStatus = "3"; //充值状态:1充值中 2充值成功3充值失败 String reusltCode = bean.getResultCode(); //办理结果编码,0成功,1待处理,2处理中,其他为异常 //当会员赠送失败时,反查一下会员赠送日志表,看是否真的赠送失败,防止调接口超时获取不到真实赠送结果 if(!"0".equals(reusltCode)){ if(asyndao.getVipSendRes(bean.getOrderNo())){ reusltCode = "0"; } } if("1".equals(reusltCode) || "2".equals(reusltCode)){ orderStatus = "1"; }else if("0".equals(reusltCode)){ orderStatus = "2"; }else{ //充值失败,设置失败原因 orderOutObj.put("failDesc",bean.getResultInfo()); } orderOutObj.put("orderStatus",orderStatus); //充值状态:1充值中 2充值成功3充值失败 } if(orderOutObj != null){ String callBackUrl = this.dictionaryDao.getValue("pointShopCallBackUrl");//"http://demo.mall.10010.com:8104/jf-service/zcvcard/notify"; cbReqParams = JsonUtil.objectToJson(orderOutObj); log.info("===========回调地址:"+callBackUrl); log.info("===========回调请求参数:"+cbReqParams); cbRspParams = HttpInvoke.sendHttpByPost("POST", callBackUrl, cbReqParams, getProperty()); log.info("============回调响应参数:"+cbRspParams); if("0000".equals(cbRspParams)){ bcstatus = "0"; } }else{ cbRspParams = "回调接口时未找到订购信息"; log.error("订单流水号=>"+id+"=>requestId=>"+requestId+",回调接口时未找到订购信息"); } } catch (Exception e) { bcstatus = "2"; e.printStackTrace(); log.error("订单流水号=>"+id+"=>requestId=>"+requestId+",回调接口出现异常,"+e.getMessage()); cbRspParams = "返回报文: "+cbRspParams+",回调异常,"+e.getMessage(); } finally{ if(cbRspParams != null && cbRspParams.length()> 250){ cbRspParams = cbRspParams.substring(0,250); } log.info("订单流水号=>"+id+"=>requestId=>"+requestId+"=>cbReqParams: "+cbReqParams+", cbRspParams=>"+cbRspParams); addParamsByCb(cbReqParams, cbRspParams, requestId); } return bcstatus; } /** * 添加回调报文 * @param cbReqParams * @param cbRspParams * @param requestId */ private void addParamsByCb(String cbReqParams,String cbRspParams, String requestId){ try { //一个中文占两个 字节 if(cbRspParams != null && cbRspParams.length()> 100){ cbRspParams = cbRspParams.substring(0,100); } pointShopDao.updParamsByCb(cbReqParams, cbRspParams, requestId); } catch (Exception e) { e.printStackTrace(); log.error("requestId: "+requestId+", 更新callBack状态出现异常,"+e.getMessage()); } } /** * 解析数据 * @param body * orderId TD_POINTS_ORDER_REC表ID字段,我方生成的订单流水号 * orderNo 积分商城订单号 * requestId 返回给客户的请求ID * @return */ public PointShopMqBean transBean(Map body) { String jsonStr = JsonUtil.objectToJson(body); return (PointShopMqBean) JsonUtil.jsonToBean(jsonStr, PointShopMqBean.class); } /** * 发送短信 * @param psoBean * @param resultCode */ public void inserSmstMq(PointShopOrderBean psoBean){ try{ if(psoBean != null){ Map map = new HashMap(); map.put("userid", psoBean.getUserid()); map.put("cpid", psoBean.getCpid()); map.put("spid", psoBean.getSpid()); map.put("result", "0"); map.put("channel", ""); map.put("style","0000"); map.put("times", ""); map.put("orderType", ""); map.put("type", "cssms"); map.put("busiType", "tran_succ"); log.info(JsonUtil.objectToJson(map)); String mqReciveUrl = dictionaryDao.getValue("mqReciveUrl"); URLUtil.post(mqReciveUrl, JsonUtil.objectToJson(map)); } }catch (Exception e){ e.printStackTrace(); } } /** * 获取请求属性性 * @return */ private static Map getProperty(){ Map reqProperty = new HashMap(); reqProperty.put("Content-type", "application/json;charset=UTF-8"); return reqProperty; } /** * 获取endtime * @param bean PointShopOrderBean * @return */ private String getEndTime(PointShopOrderBean bean)throws Exception{ String endtime = pointShopDao.getBackEndtime(bean.getId()); if(endtime == null || "null".equals(endtime) || endtime.length() != 14){ endtime = ""; } if("".equals(endtime)){ String currEndtime = ""; Map reMap = this.hasEffect(bean); if((Boolean)reMap.get("hasEffect")){ //存在有效订购关系 Map currOrderInfo = (Map)reMap.get("currOrderInfo"); currEndtime = (String)currOrderInfo.get("ENDTIME"); } //[{PWD=kijkfds, NETDAYS=31, CHANNEL=test_01, NETDAYS=2, SPID=1168, BUSITYPE=3, CPID=youtu}] HashMap confHm = pointShopDao.getBackBusiConf(bean.getCpid(), bean.getSpid()).get(0); //String busiType = confHm.get("BUSITYPE")+""; if("2".equals(confHm.get("BUSITYPE")) || "3".equals(confHm.get("BUSITYPE"))){ if(currEndtime != null && !"".equals(currEndtime)){ endtime = pointShopDao.endtimeParamDay(confHm.get("NETDAYS")+"", currEndtime); }else{ endtime = pointShopDao.currParamDay(confHm.get("NETDAYS")+""); } } } return endtime; } }