package com.chinacreator.process.job; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.chinacreator.common.exception.BusinessException; import com.chinacreator.common.util.DESUtil; import com.chinacreator.process.bean.ContractProductPushBean; import com.chinacreator.process.dao.ContractProductDao; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.process.util.SHAUtil; import com.chinacreator.process.util.URLUtil; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.StringUtils; import java.net.URLEncoder; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.*; /** * 合约产品合约期到期后短信通知(每月25号10点至22点) * ContractProductJob将TD_CONTRACT_PRODUCT表里面符合条件的数据添加到表TD_CONTRACT_PRODUCT_PUSH 并推送 * 每个月25号10点至20点期间每10秒钟执行一次 * @author can.he * @date 20210825 */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class ContractProductTestJob { private static Logger logger = Logger.getLogger("contractpush"); @Autowired private ContractProductDao contractProductDao; @Autowired private DictionaryDao dictionaryDao; // private ContractProductDao contractProductDao = new ContractProductDao(); // private DictionaryDao dictionaryDao = new DictionaryDao(); // @Autowired // private KuaishouDao kuaishouDao; // = new ContractProductDao(); // public void doProcess() throws Exception { System.out.println(new Date()+"-----------------------------------------------"); logger.info(new Date()+"-----------------------------------------------"); //判断是否在当前时间范围(每个月的25号10点至20点之间) Date nowDate = new Date(); if(!getBetweenTime(nowDate)){ return; } logger.info(Thread.currentThread().getName()+"ContractProductJob合约产品定时任务开始"); long beginTime = System.currentTimeMillis(); int rows = 800; //每次取数据条数 String contractfmrows = dictionaryDao.getValue("contractfmrows"); if (!StringUtils.isEmpty(contractfmrows)) { try { rows = Integer.parseInt(contractfmrows); } catch (Exception e) { rows = 800; } } //当前时间往前推5个月的时间 String newDate = getStepMonth(nowDate, -5); logger.info("当前日期往前推5个月是:"+newDate); //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50 String partition = "T_HASH_P"; for(int i = 1; i <= 20; i++){ partition = "T_HASH_P"; try { if(i < 10){ partition = partition + "0" + i; }else{ partition = partition + i; } //按分区标识获取订购数据 // List list = contractProductDao.getContractProductByPart(partition,rows,newDate); List list = contractProductDao.getContractProductByPart(partition,rows,"202108"); logger.info(partition+",用户数:"+(list != null ? list.size() : "0")); if(list != null && list.size() > 0){ logger.info(partition+",去重复前用户数:"+list.size()); // list = paraseData(list); //hecan logger.info(partition+",去重复后用户数:"+list.size()); //把数据更新为正在处理状态 contractProductDao.batchUpdSyncStatus(getIds(list)); //推送月份 String pushmonth = new SimpleDateFormat("yyyyMM").format(new Date()); CountDownLatch threadSignal = new CountDownLatch(list.size()); ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); // //把数据更新为正在处理状态 // for(HashMap hm : list){ // boolean res = contractProductDao.updContractProductExecing(hm.get("ID").toString()); // } for(HashMap hm : list){ ContractProductTestJobService continueService = new ContractProductTestJobService(list.size(),threadSignal,hm,pushmonth,dictionaryDao,contractProductDao); executorService.execute(continueService); } executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { logger.info(partition+",执行出现异常,"+e.getMessage()); e.printStackTrace(); } Thread.sleep(100); } logger.info(Thread.currentThread().getName()+"ContractProductJob合约产品定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒"); } /** * 拼接ID,用于更新数据 * @param dataList * @return */ private String getIds(List dataList){ String ids = ""; if(dataList != null && dataList.size()>0){ for(HashMap tmphm : dataList){ ids += ",'"+tmphm.get("ID")+"'"; } } if(!"".equals(ids)){ ids = ids.substring(1); } logger.info("ids=>"+ids); return ids; } private List paraseData(List dataList){ //去重复后的数据集 List reDataList = new ArrayList(); HashMap tmpMap = new HashMap(); for (HashMap dataMap : dataList) { if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){ logger.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID")); }else{ reDataList.add(dataMap); List tmpList = new ArrayList(); tmpList.add(dataMap.get("SPID")); tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList); } } return reDataList; } /** * 在给定的日期加上或减去指定月份后的日期 * * @param sourceDate 原始时间 * @param month 要调整的月份,向前为负数,向后为正数 * @return */ public String getStepMonth(Date sourceDate, int month) { DateFormat DATE_FORMAT = new SimpleDateFormat("yyyyMM"); Calendar c = Calendar.getInstance(); c.setTime(sourceDate); c.add(Calendar.MONTH, month); return DATE_FORMAT.format(c.getTime()); } //获取当前时间 public String getNowDate(Date date) { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return format.format(date); } //获取当前日期 public static boolean getBetweenTime(Date nowTime) { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM"); try { Date startTime = format.parse(format1.format(nowTime)+"-25 08:00:00"); Date endTime = format.parse(format1.format(nowTime)+"-29 19:50:00"); return isEffectiveDate(nowTime, startTime, endTime); } catch (ParseException e) { e.printStackTrace(); } return false; } //判断是否在当前时间范围 public static boolean isEffectiveDate(Date nowTime, Date startTime, Date endTime) { if (nowTime.getTime() == startTime.getTime() || nowTime.getTime() == endTime.getTime()) { return true; } Calendar date = Calendar.getInstance(); date.setTime(nowTime); Calendar begin = Calendar.getInstance(); begin.setTime(startTime); Calendar end = Calendar.getInstance(); end.setTime(endTime); if (date.after(begin) && date.before(end)) { return true; } else { return false; } } } class ContractProductTestJobService implements Runnable { private static Logger logger = Logger.getLogger("contractpush"); private int totalSize; private CountDownLatch threadSignal; private List spidsList; private String pushmonth; private ContractProductDao contractProductDao; private DictionaryDao dictionaryDao; private HashMap hm; //用户数据,ID和USERID public ContractProductTestJobService(int totalSize,CountDownLatch threadSignal,HashMap hm,String pushmonth,DictionaryDao dictionaryDao,ContractProductDao contractProductDao){ this.totalSize = totalSize; this.threadSignal = threadSignal; this.hm = hm; this.pushmonth = pushmonth; this.contractProductDao = contractProductDao; this.dictionaryDao = dictionaryDao; } @Override public void run() { long startime = System.currentTimeMillis(); Map logMap = new HashMap(); logMap.put("userinfo", hm); logMap.put("pushmonth", pushmonth); String resultcode = "-1"; //0,成功、1采集推送数据、短信推送数据处理失败,2短信推送失败 String errorinfo = ""; String realflag = ""; //添加到推送表标识,空,未推送,1已经推送 1订购推送,2月未推送 String id = hm.get("ID").toString(); try { //处理业务逻辑 realflag = pushService(logMap); if(logMap.get("errorinfo") != null){ errorinfo = logMap.get("errorinfo").toString(); } //未出现异常,设置为成功 resultcode = "0"; if("".equals(errorinfo)){ errorinfo = "ok"; //目前是那种不是订购状态的数据,不需要推送的 } } catch (Exception e) { if (e instanceof BusinessException) { errorinfo = ((BusinessException) e).getMessage(); resultcode = ((BusinessException) e).getCode(); }else{ e.printStackTrace(); resultcode = "8000"; errorinfo = "处理数据出现异常,"+e.getMessage(); } }finally { threadSignal.countDown(); try{ //暂未同步或者处理出现异常,恢复初始状态,下次再处理 if("9056".equals(resultcode) || ("8000".equals(resultcode)&& !errorinfo.contains("唯一约束"))){ resultcode = "1"; //推送失败的继续推 }else{ resultcode = "0"; } if(realflag.equals("1")){ //短信推送失败的 resultcode = "3"; } contractProductDao.updContractProduct(id,resultcode,errorinfo); }catch(Exception e){ e.printStackTrace(); errorinfo = "更新数据出现异常,"+e.getMessage(); resultcode = "8001"; } //写日志 logMap.put("jobname", "ContractProductJob"); logMap.put("resultcode", resultcode); logMap.put("errorinfo", errorinfo); logMap.put("realflag", realflag); //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount())); logger.info(JsonUtil.objectToJson(logMap)); } } /** * 添加数据到推送表TD_CONTRACT_PRODUCT_PUSH且发送短信 * @param logMap * @throws Exception */ private String pushService(Map logMap) throws Exception{ String realflag = ""; //添加到推送表标识,空,未推送,1,已经推送 1订购推送,2月未推送 String userid = hm.get("USERID").toString(); String id = hm.get("ID").toString(); String spid = hm.get("SPID").toString(); String cpid = hm.get("CPID").toString(); //获取订购关系 (进行数据比对 确认是否为订购的) HashMap orderRealMap = contractProductDao.getOrderRealById(id); //hecan // if(orderRealMap == null){ //hecan // throw new BusinessException("9056","订购关系表无订购数据"); //hecan // } //hecan //失效时间(如果是null 则代表是订购) // String ENDTIME = orderRealMap.get("ENDTIME").toString(); //hecan // if(ENDTIME.equals("NULL")){ //为空代表是订购的 //hecan ContractProductPushBean pushBean = new ContractProductPushBean(); String pushid = contractProductDao.getNo(); pushBean.setUserid(userid); pushBean.setId(pushid); pushBean.setPushmonth(pushmonth); pushBean.setCpid(cpid); pushBean.setSpid(spid); pushBean.setPushid(id); //添加数据到推送表 contractProductDao.insertPush(pushBean); String spname ="产品"; List spList = contractProductDao.findSpInfo(spid); if (spList.get(0).get("spname") != null && !StringUtils.isEmpty(spList.get(0).get("spname").toString())){ spname = spList.get(0).get("spname").toString(); } String content = "【中国联通】您订购的"+spname+"合约已到期,到期后退订不再扣取违约金,如不退订次月将自动续订,感谢您的支持!"; //发送短信 String smsInfo= "失败"; String smsCode = "-1"; // smsCode= send(pushid,userid,content); //hecan if(smsCode!= null && smsCode.equals("0")){ //短信发送成功 logMap.put("errorinfo", "短信发送成功"); smsInfo="成功"; }else{ logMap.put("errorinfo", "短信发送失败"); realflag = "1"; //短信推送失败 } contractProductDao.updatePush(pushid,smsCode,smsInfo); // } //hecan return realflag; } //发送短信 public String send(String pushid,String userid,String content) throws Exception { String url = dictionaryDao.getValue("contractpushurl"); // String url = "http://111.206.133.54/smsbusi/sms/send"; String smsid = "10655117"; String pwd = "wo6bslq2"; userid = DESUtil.encode(userid,pwd); String timestamp = System.currentTimeMillis()/1000+""; String sign = SHAUtil.shaEncode(smsid+userid+timestamp+content+pwd).toLowerCase(); userid = URLEncoder.encode(userid,"utf-8"); content = URLEncoder.encode(content,"utf-8"); url = url+"?smsid="+smsid+"&userid="+userid+"×tamp="+timestamp+"&sign="+sign+"&content="+content; String result = ""; try { result = URLUtil.get(url); } catch (Exception e) { e.printStackTrace(); } logger.info(pushid+", "+userid+", 调短信接口地址返回结果:"+result); if(result!="" && result!=null){ JSONObject obj = JSON.parseObject(result); String resultcode = obj.getString("resultcode"); // System.out.println(resultcode); return resultcode; } return null; } /** * 获取下次推送的默认时间(下个月1号零点三十分零一秒) * @return */ private String getCalculatedate() { SimpleDateFormat dft = new SimpleDateFormat("yyyyMMdd"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MONTH, 1); calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMinimum(Calendar.DAY_OF_MONTH)); return dft.format(calendar.getTime())+"003001"; } public static void main(String[] args) { ContractProductJob job = new ContractProductJob(); try { job.doProcess(); } catch (Exception e) { e.printStackTrace(); } } }