package com.chinacreator.process.job; import java.net.URLEncoder; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import net.sf.json.JSONObject; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.StringUtils; import com.chinacreator.common.exception.BusinessException; import com.chinacreator.common.util.DESUtil; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.dao.UnicomSmsOrderDao; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.process.util.SHAUtil; import com.chinacreator.process.util.URLUtil; /** * 短信订购定时任务 * @author xu.zhou * @date 20201123 */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class UnicomSmsOrderJob { private static Logger logger = Logger.getLogger("smsorder"); @Autowired private UnicomSmsOrderDao asyndao; @Autowired private DictionaryDao dictionaryDao; public void doProcess() throws Exception { logger.info(Thread.currentThread().getName()+"短信订购定时任务开始"); long beginTime = System.currentTimeMillis(); List list = asyndao.getInvokeShareData(); logger.info("待处理数据条数:"+(list == null ? "0" : list.size())); if(list != null && list.size() > 0){ CountDownLatch threadSignal = new CountDownLatch(list.size()); ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); List dataList = paraseData(list); logger.info("数据库有效订购的用户数:"+dataList.size()); for(HashMap vipmap : dataList){ UnicomSmsOrderService continueService = new UnicomSmsOrderService(list.size(),threadSignal,vipmap,asyndao,dictionaryDao); executorService.execute(continueService); } executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName()+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒"); } /** * 去除重复数据 * @param dataList * @return */ private List paraseData(List dataList){ //去重复后的数据集 List reDataList = new ArrayList(); HashMap tmpMap = new HashMap(); for (HashMap dataMap : dataList) { if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){ logger.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID")); }else{ reDataList.add(dataMap); List tmpList = new ArrayList(); tmpList.add(dataMap.get("SPID")); tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList); } } return reDataList; } } class UnicomSmsOrderService implements Runnable { private static Logger log = Logger.getLogger("smsorder"); private int totalSize; private CountDownLatch threadSignal; private HashMap vipmap; private DictionaryDao dictionaryDao; private UnicomSmsOrderDao asyndao; public UnicomSmsOrderService(int totalSize,CountDownLatch threadSignal,HashMap vipmap,UnicomSmsOrderDao asyndao,DictionaryDao dictionaryDao){ this.totalSize = totalSize; this.threadSignal = threadSignal; this.vipmap = vipmap; this.asyndao = asyndao; this.dictionaryDao = dictionaryDao; } @Override public void run() { long startime = System.currentTimeMillis(); Map logMap = new HashMap(); String id = vipmap.get("ID").toString(); logMap.put("vipmap", vipmap); String resultcode = "2"; String errorinfo = "处理中"; try { //结果编码,-1待审核,1待处理,2处理中,0处理完成 asyndao.updShareStatus(id, resultcode, errorinfo, ""); //调短信订购接口 invokeSmsOrder(); resultcode = "0"; errorinfo = "成功"; } catch (Exception e) { if (e instanceof BusinessException) { errorinfo = ((BusinessException) e).getMessage(); resultcode = ((BusinessException) e).getCode(); }else{ e.printStackTrace(); resultcode = "8000"; errorinfo = "处理数据出现异常,"+e.getMessage(); } }finally { threadSignal.countDown(); String time = System.currentTimeMillis()-startime+""; try{ //更新数据处理状态 asyndao.updShareStatus(id, resultcode, errorinfo, time); }catch(Exception e){ log.error(vipmap.get("USERID")+"更新调能力平台结果出现异常,"+e.getMessage()); } //写日志 logMap.put("resultcode", resultcode); logMap.put("errorinfo", errorinfo); logMap.put("time", time); logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount())); log.info(JsonUtil.objectToJson(logMap)); } } /** * 调能力平台 * @throws BusinessException */ public void invokeSmsOrder() throws BusinessException{ String result =""; try{ //String url = "http://111.206.133.54/smsorder/channelSmsSend.do"; String url = dictionaryDao.getValue("unicomsmsorderurl"); String channel = vipmap.get("CHANNEL").toString(); String userid = vipmap.get("USERID").toString(); String key = ""; String unicomsmsorderpwd = dictionaryDao.getValue("unicomsmsorderpwd"); if(!StringUtils.isEmpty(unicomsmsorderpwd)){ String [] pwdArray = unicomsmsorderpwd.split("\\|"); for(String s : pwdArray){ if(!StringUtils.isEmpty(s) && channel.equals(s.split("###")[0])){ key = s.split("###")[1]; break; } } } SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS"); //yyyyMMddHHmmssSSS String timestamp = sdf.format(new Date()); String content = vipmap.get("CONTENT")+""; //sha1(channel+userid+timestamp+密钥) String sign = channel+userid+timestamp+key; sign = SHAUtil.shaEncode(sign).toLowerCase(); userid = DESUtil.encode(userid, key); url += "?channel="+channel+"&userid="+URLEncoder.encode(userid,"UTF-8")+"×tamp="+timestamp+"&sign="+sign+"&content="+URLEncoder.encode(content,"UTF-8"); log.info("调接口参数:"+url); result = URLUtil.get(url,30000); log.info("调接口返回结果:"+result); }catch (Exception e) { e.printStackTrace(); throw new BusinessException("9170", "调用短信订购接口异常,"+e.getMessage()); } JSONObject obj = JSONObject.fromObject(result); if(!obj.getString("resultCode").equals("0")){ throw new BusinessException(obj.getString("resultCode"), obj.getString("resultInfo")); } } }