package com.chinacreator.process.job; import com.chinacreator.common.exception.BusinessException; import com.chinacreator.common.util.DESUtil; import com.chinacreator.common.util.MD5; import com.chinacreator.common.util.URLUtil; import com.chinacreator.process.bean.*; import com.chinacreator.process.dao.*; import com.chinacreator.process.service.*; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.video.queue.MessageService; import com.chinacreator.video.queue.bean.MessagePipe; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import java.net.URLEncoder; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 生成活动关系后实时赠送会员JOB * @author xu.zhou 20210610 * 队列名称:order */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class ReciveSendVipMQJob { private Logger logger = Logger.getLogger(ReciveSendVipMQJob.class); @Autowired private MessageService messageService; @Autowired private DictionaryDao dictionaryDao; @Autowired private OrderSendVipDao orderSendVipDao; @Autowired private CPDao cpDao; public void doProcess() throws Exception { String mqname = "ordersendvip"; logger.info("接收"+mqname+"订购队列JOB启动"); if (dictionaryDao.getValue("recivemq").equals("0")) { long beginTime = System.currentTimeMillis(); //获取队列数据 List list = messageService.reciveBatchMessage("ordersendvip", 500); logger.info("接收"+mqname+"订购队列花费时间:" + (System.currentTimeMillis() - beginTime)+",获取数据条数:"+(list == null ? "0" : list.size())); if(list != null && list.size()>0){ CountDownLatch threadSignal = new CountDownLatch(list.size()); ExecutorService executorService = new ThreadPoolExecutor(15, 20, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); for (MessagePipe messagePipe : list) { OrderBean orderBean = null; String srcmqname = ""; try { orderBean = transBean(messagePipe.getBody().get("data").toString()); srcmqname = messagePipe.getBody().get("mqname").toString(); ReciveSendVipService service = new ReciveSendVipService(orderBean,dictionaryDao, cpDao, orderSendVipDao,threadSignal, srcmqname); executorService.execute(service); } catch (Exception e) { e.printStackTrace(); logger.error("ReciveSendVipMQJob执行出错:" + (orderBean == null ? "解析失败" : orderBean.getUserid()) + "=>" + e); } } executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName()+","+mqname+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒"); } else { logger.info("停止接收队列消息"); } } private OrderBean transBean(String jsonStr) { //String jsonStr = JsonUtil.objectToJson(body); return (OrderBean) JsonUtil.jsonToBean(jsonStr, OrderBean.class); } } class ReciveSendVipService implements Runnable { private Logger sendviplog = Logger.getLogger("ordersendvip"); private OrderBean orderBean; private DictionaryDao dictionaryDao; private CPDao cpDao; private OrderSendVipDao orderSendVipDao; private CountDownLatch threadSignal; private String mqname; public ReciveSendVipService(OrderBean _orderBean,DictionaryDao _dictionaryDao, CPDao _cpDao, OrderSendVipDao _orderSendVipDao, CountDownLatch _threadSignal, String _mqname){ this.orderBean = _orderBean; this.dictionaryDao = _dictionaryDao; this.cpDao = _cpDao; this.orderSendVipDao = _orderSendVipDao; this.threadSignal = _threadSignal; //this.totalSize = _totalSize; this.mqname = _mqname; } public void run() { Map logMap = new HashMap(); logMap.put("data", JsonUtil.objectToJson(orderBean)); logMap.put("mqname", mqname); long startime = System.currentTimeMillis(); //LogBean logBean = new LogBean(); try { //赠送多个会员改造 String userid = orderBean.getUserid(); String cpid = orderBean.getCpid(); String spid = orderBean.getSpid(); //获取所有赠送配置 List confList = orderSendVipDao.qryOrderSendvipAllConf(cpid, spid); if(confList != null && confList.size() > 0){ sendviplog.info("confList=>"+confList); List orderList = orderSendVipDao.findOrderRel(userid, spid); if(orderList != null && orderList.size()>0){ HashMap orderMap = orderList.get(0); if ("2".equals(orderMap.get("STATUS"))){ throw new BusinessException("5005", "订购关系已失效"); } //可能一次要赠送多个会员 for(HashMap confMap : confList){ //验证是否可以赠送会员 if(valisendvip(orderBean, confMap, orderMap)){ //调接口赠送会员 sendVipByConf( orderBean, confMap, orderMap, logMap); } } //调置执行无异常 logMap.put("resultcode", "0"); logMap.put("errorinfo", "ok"); }else{ throw new BusinessException("5007", "赠送会员查无订购关系"); } }else{ throw new BusinessException("5008", "无有效赠送会员配置信息"); } } catch (Exception e) { if (e instanceof BusinessException) { logMap.put("resultcode", ((BusinessException) e).getCode()); logMap.put("errorinfo", e.getMessage()); }else{ logMap.put("errorinfo","系统错误,"+e.getMessage()); logMap.put("resultcode", "8000"); e.printStackTrace(); sendviplog.error("执行出错:" + orderBean.getUserid() + "," + mqname + "=>" + e); } } finally { threadSignal.countDown(); //设置调用时长 logMap.put("times", (System.currentTimeMillis()-startime)); //输出日志 sendviplog.info(logMap); } } /** * 调接口送会员 * * @param * @return * @throws Exception */ private String invokeVip(OrderBean orderBean) throws Exception { String result = ""; try { String vipurl = this.dictionaryDao.getValue("joinActivityUrl"); String timestamp = (System.currentTimeMillis() / 1000) + ""; String userid = orderBean.getUserid(); String channel = orderBean.getChannel(); String usertype = "4"; //会员账号类型 4:手机号 String userval = userid; String pwd = ""; CPInfo cpinfo = cpDao.findById(orderBean.getCpid()); if (cpinfo != null) { pwd = cpinfo.getNetpwd(); } userid = DESUtil.encode(userid, pwd); String signature = MD5.MD5Encode(userid + userval + pwd + timestamp); signature = signature.toLowerCase(); vipurl = vipurl + "?userid=" + URLEncoder.encode(userid, "utf-8") + "&usertype=" + usertype + "&userval=" + userval + "&channel=" + channel + "×tamp=" + timestamp + "&signature=" + signature + "&activetype=" + orderBean.getActiveType(); sendviplog.info("vipurl: " + vipurl); result = URLUtil.get(vipurl, 30 * 1000); //调赠送会员接口,超时时间设置为10秒 sendviplog.info("赠送会员结果=> userid: " + userval + ", spid: " + orderBean.getSpid() + " , result: " + result); } catch (Exception e) { e.printStackTrace(); sendviplog.error("userid: " + orderBean.getUserid() + "领取方式赠送会员出现异常," + e,e); if (e.getMessage() != null && e.getMessage().indexOf("TimeoutException") != -1) {//超时异常 throw new BusinessException("9070", "赠送会员超时", new String[0]); } else { throw new BusinessException("9002", "赠送会员未成功," + e.getMessage(), new String[0]); } } return result; } /** * 调直充接口送会员 * @param * @return * @throws Exception */ private String invokeVipByZc(OrderBean orderBean, HashMap confmp) throws Exception { String result = ""; try { String basevipurl = this.dictionaryDao.getValue("giveVipUrl"); String vipurl = basevipurl; String apptype = "2"; String orderid = UUID.randomUUID().toString().replaceAll("-", "").substring(0, 16); String timestamp = System.currentTimeMillis() + ""; String userid = orderBean.getUserid(); String channel = orderBean.getSpid(); //渠道传SPID String usertype = "4"; //会员账号类型 4:手机号 String userval = userid; String vipid = confmp.get("VIPID").toString(); String viptype = confmp.get("VIPTYPE").toString(); String pwd = confmp.get("VIPPWD").toString(); String vipval = "1"; String signature = MD5.MD5Encode(orderid+channel+vipid+userval+pwd+timestamp); signature = signature.toLowerCase(); //直充接口新参数 //产品名称 String productName = URLEncoder.encode(confmp.get("PRODUCTNAME") == null ? "" : confmp.get("PRODUCTNAME").toString(),"utf-8"); //产品ID String productId = confmp.get("PRODUCTID") == null ? "" : confmp.get("PRODUCTID").toString(); //订购状态,0订购,1退订 String orderType = orderBean.getOrderType(); String province = orderBean.getProvince(); String area = orderBean.getArea(); vipurl = vipurl + "?userval=" + URLEncoder.encode(userval,"utf-8") + "&usertype=" + usertype + "&channel=" + channel + "×tamp=" + timestamp + "&signature=" + signature + "&orderid=" + orderid + "&viptype=" + viptype + "&vipid=" + vipid + "&vipval="+vipval + "&apptype=" + apptype + "&productName="+productName + "&productId=" + productId + "&orderType=" + orderType + "&province=" + URLEncoder.encode(province,"utf-8") + "&area=" + URLEncoder.encode(area,"utf-8"); sendviplog.info("vipurl: " + vipurl); //第一次直充会员 result = URLUtil.get(vipurl, 30 * 1000); //调赠送会员接口,超时时间设置为10秒 sendviplog.info("直充会员结果=> userid: " + userval + ", spid: " + orderBean.getSpid() + " , result: " + result); } catch (Exception e) { e.printStackTrace(); sendviplog.error("userid: " + orderBean.getUserid() + "直充方式赠送会员出现异常," + e, e); if (e.getMessage() != null && e.getMessage().indexOf("TimeoutException") != -1) {//超时异常 throw new BusinessException("9070", "赠送会员超时", new String[0]); } else { throw new BusinessException("9002", "赠送会员未成功," + e.getMessage(), new String[0]); } } return result; } /** * 判断是否要赠送会员 * @param orderBean * @param confList * @param orderMap * @return */ private boolean valisendvip(OrderBean orderBean, HashMap confMap, HashMap orderMap ) throws Exception{ //是否可以赠送会员 boolean hassend = false; try { //复合产品,生成活动关系时是每个子业务一条活动关系,验证的是子产品的SPID,不处理 if (!StringUtils.isEmpty(orderBean.getFhcpid()) && !StringUtils.isEmpty(orderBean.getFhspid())) { throw new BusinessException("5005", "复合产品不参与赠送会员"); } //验证开始 String userid = orderBean.getUserid(); String cpid = orderBean.getCpid(); String spid = orderBean.getSpid(); //List confList = orderSendVipDao.qryOrderSendvipAllConf(cpid, spid); //当前时间 SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); String currtime = sdf.format(new Date()); //当前时间 String orderchannel = orderBean.getOrderchannel(); //手厅传的渠道标识与订购疯关系表不相同 String channel = orderBean.getChannel(); //子渠道标识 String subchannel = orderBean.getSubchannel(); //订购时间 String ordertime = orderMap.get("ORDERTIME").toString(); if(!StringUtils.isEmpty(orderchannel) && !StringUtils.isEmpty(channel) && !orderchannel.equals(orderMap.get("ORDERCHANNEL")) && !channel.equals(orderMap.get("ORDERCHANNEL"))){ throw new BusinessException("5005", "渠道与订购关系表的渠道不一致,orderchannel=>"+orderchannel); } //赠送允许渠道 String confChannel = confMap.get("CHANNEL").toString().trim(); //活动开始时间 String starttime = (String)confMap.get("STARTTIME"); //活动结束时间 String endtime = (String)confMap.get("ENDTIME"); //会员类型,1走会员领取接口,0走会员直充接口 String sendtype = confMap.get("SENDTYPE").toString().trim(); //配置的子渠道 String subchannelconf = (String)confMap.get("SUBCHANNEL"); //订购渠道与配置渠道相同,或者配置渠道不限制 if (confChannel.equals(orderchannel) || confChannel.equals(channel) || "*".equals(confChannel) //配置渠道以指定字符开头且订购渠道是以指定渠道开头,芒果16元 || (confChannel.indexOf("*") != -1 && orderchannel != null && orderchannel.indexOf(confChannel.split("\\*")[0]) == 0) || (confChannel.indexOf("*") != -1 && channel != null && channel.indexOf(confChannel.split("\\*")[0]) == 0)) { /**** * SENDTYPE会员类型,1走会员领取接口,0走会员直充接口 * 走会员领取接口,有子渠道配置,且SENDTYPE不为1或者主渠道配置为*或者配置的子渠道与订购关系的子渠道不匹配 * 20220325,xu.zhou,赠送配置到子渠道级别 */ if(!StringUtils.isEmpty(subchannelconf) && //有子渠道配置 ( !"1".equals(confMap.get("SENDTYPE")) || //不是领取会员 confChannel.indexOf("*") != -1 || //渠道配置有模糊匹配 !subchannelconf.equals(subchannel) //与订购关系的子渠道不相同 ) ){ //走会员领取接口,有配置限制子渠道,且订购关系表子渠道为空或者子渠道标识与配置的子渠道标识不相同,不自动送会员 throw new BusinessException("5005", "子渠道标识验证未通过,subchannel=>"+subchannel); } /**************************判断时间是否合法开始************************/ //开始时间和结束时间都不为空且当前时间包含在两者之间 if (!StringUtils.isEmpty(starttime) && !StringUtils.isEmpty(endtime) && Long.parseLong(currtime) >= Long.parseLong(starttime.trim()) && Long.parseLong(currtime) <= Long.parseLong(endtime.trim())){ hassend = true; //开始时间不为空,结束时间为空,当前时间大于或等于开始时间 }else if (!StringUtils.isEmpty(starttime) && StringUtils.isEmpty(endtime) && Long.parseLong(currtime) >= Long.parseLong(starttime.trim())){ hassend = true; //开始时间为空,结束时间不为空,当前时间小于或等于结束时间 }else if (StringUtils.isEmpty(starttime) && !StringUtils.isEmpty(endtime) && Long.parseLong(currtime) <= Long.parseLong(endtime.trim())){ hassend = true; //开始时间为空且结束时间也为空 }else if (StringUtils.isEmpty(starttime) && StringUtils.isEmpty(endtime)){ hassend = true; } if(!hassend){ throw new BusinessException("5005", "时间验证未通过"); } /**************************判断时间是否合法结束************************/ //如果上面的条件都匹配,再判断是否有订购类型限制,只有走直充的才要判断订购退订限制 if(hassend && "0".equals(sendtype)){ //还原标识为false hassend = false; //订购状态,0订购,1退订,*不限制,为空时只允许订购, SENDTYPE为0时有效 String ordertype = (String)confMap.get("ORDERTYPE"); if (StringUtils.isEmpty(ordertype)){//配置为空,只允许订购赠送会员 if("0".equals(orderBean.getOrderType())){ //订购 hassend = true; } } else if("*".equals(ordertype)){ //订购退订都允许赠送会员 hassend = true; } else {//有限制订购或退订赠送会员 if(ordertype.equals(orderBean.getOrderType())){//订购状态与配置相同 hassend = true; } } }if(hassend && "1".equals(sendtype) && !"0".equals(orderBean.getOrderType())){//领取会员、非订购操作 hassend = false; throw new BusinessException("5005", "只有订购操作的才能领取会员"); } }else{ throw new BusinessException("5005", "渠道验证未通过"); } } catch (Exception e) { if (e instanceof BusinessException) { sendviplog.info("判断是否赠送会员未通过验证,"+orderBean+", "+confMap+", 原因:"+((BusinessException)e).getMessage()); }else{ e.printStackTrace(); sendviplog.error(orderBean+",判断是否赠送会员出现异常,"+e.getMessage(), e); throw new BusinessException("8000", "判断是否赠送会员出现异常,"+e.getMessage(), new String[0]); } } return hassend; } /** * 根据赠送配置进行会员赠送 * @param orderBean * @param confMap //赠送配置 * @param orderMap //订购关系 * @param logMap //日志记录 */ private void sendVipByConf(OrderBean orderBean, HashMap confMap, HashMap orderMap, Map logMap) throws Exception{ HashMap sendResMap = new HashMap(); String resultcode = "-1"; String errorinfo = ""; String sendno = ""; //赠送记录表的ID,用于更新数据 try { String confid = confMap.get("ID").toString();//配置表ID sendResMap.put("confid", confid); String orderchannel = orderBean.getOrderchannel(); //手厅传的渠道标识与订购疯关系表不相同 String channel = orderBean.getChannel(); String subchannel = orderBean.getSubchannel(); long startime = System.currentTimeMillis(); //logMap.put("data", orderBean); sendno = orderSendVipDao.getNo(); //logMap.put("sendno", sendno); HashMap params = new HashMap(); params.put("ID", sendno); params.put("USERID", orderBean.getUserid()); params.put("CPID", orderBean.getCpid()); params.put("SPID", orderBean.getSpid()); //params.put("SENDMONTH", sendmonth); params.put("RESULTCODE", "2"); params.put("RESULTINFO", "赠送中"); params.put("ORDERTIME", orderMap.get("ORDERTIME").toString()); //新增配置表ID params.put("CONFID", confid); //如果orderchannel为空,取channel params.put("ORDERCHANNEL", ((orderchannel != null && !"".equals(orderchannel)) ? orderchannel : channel)); params.put("SUBCHANNEL", (!StringUtils.isEmpty(subchannel)) ? subchannel : ""); sendviplog.info("sendVip入库参数=>"+params); //添加赠送数据到记录表 orderSendVipDao.addSendvipRec(params); //会员类型,1走会员领取接口,0走会员直充接口 String sendtype = confMap.get("SENDTYPE").toString(); String result = ""; if("0".equals(sendtype)){ //0走会员直充接口 result = invokeVipByZc(orderBean, confMap); }else if("1".equals(sendtype)){ //1走会员领取接口 result = invokeVip(orderBean); } sendResMap.put("result", result); if (result != null && !"".equals(result)) { Map map = JsonUtil.jsonToMap(result); resultcode = (String) map.get("resultcode"); errorinfo = (String) map.get("errorinfo"); } } catch (Exception e) { if (e instanceof BusinessException) { resultcode = ((BusinessException) e).getCode(); errorinfo = e.getMessage(); }else{ e.printStackTrace(); resultcode = "8000"; errorinfo = "赠送会员出现异常,"+e.getMessage(); } } finally{ try { //更新赠送会员结果 if (errorinfo != null && errorinfo.length() > 500) { errorinfo = errorinfo.substring(0, 500); } boolean updRes = orderSendVipDao.updatePush(sendno, resultcode, errorinfo); sendviplog.info("updRes=>" + updRes); sendResMap.put("updRes", updRes); } catch (Exception e2) { // e2.printStackTrace(); errorinfo = "更新领取结果出现异常," + e2.getMessage(); sendviplog.error(sendno+errorinfo,e2); sendResMap.put("updRes", "更新领取结果出现异常," + e2.getMessage()); } sendResMap.put("resultcode", resultcode); sendResMap.put("errorinfo", errorinfo); if(logMap.get("sendResList") == null){ List sendResList = new ArrayList(); sendResList.add(sendResMap); logMap.put("sendResList", sendResList); }else{ List sendResList = (List)logMap.get("sendResList"); sendResList.add(sendResMap); logMap.put("sendResList", sendResList); } } } }