cbead0b12498e6306073d2ce9462004890a48532.svn-base 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package com.chinacreator.process.job;
  2. import java.net.URLEncoder;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Date;
  6. import java.util.HashMap;
  7. import java.util.Iterator;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.Set;
  11. import java.util.concurrent.CountDownLatch;
  12. import java.util.concurrent.ExecutorService;
  13. import java.util.concurrent.LinkedBlockingQueue;
  14. import java.util.concurrent.ThreadPoolExecutor;
  15. import java.util.concurrent.TimeUnit;
  16. import net.sf.json.JSONObject;
  17. import org.apache.log4j.Logger;
  18. import org.quartz.DisallowConcurrentExecution;
  19. import org.quartz.PersistJobDataAfterExecution;
  20. import org.springframework.beans.factory.annotation.Autowired;
  21. import org.springframework.util.StringUtils;
  22. import com.chinacreator.common.exception.BusinessException;
  23. import com.chinacreator.common.util.DESUtil;
  24. import com.chinacreator.process.dao.DictionaryDao;
  25. import com.chinacreator.process.dao.UnicomSmsOrderDao;
  26. import com.chinacreator.process.util.JsonUtil;
  27. import com.chinacreator.process.util.SHAUtil;
  28. import com.chinacreator.process.util.URLUtil;
  29. /**
  30. * 短信订购定时任务
  31. * @author xu.zhou
  32. * @date 20201123
  33. */
  34. @PersistJobDataAfterExecution
  35. @DisallowConcurrentExecution
  36. public class UnicomSmsOrderJob {
  37. private static Logger logger = Logger.getLogger("smsorder");
  38. @Autowired
  39. private UnicomSmsOrderDao asyndao;
  40. @Autowired
  41. private DictionaryDao dictionaryDao;
  42. public void doProcess() throws Exception {
  43. logger.info(Thread.currentThread().getName()+"短信订购定时任务开始");
  44. long beginTime = System.currentTimeMillis();
  45. List<HashMap> list = asyndao.getInvokeShareData();
  46. logger.info("待处理数据条数:"+(list == null ? "0" : list.size()));
  47. if(list != null && list.size() > 0){
  48. CountDownLatch threadSignal = new CountDownLatch(list.size());
  49. ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  50. List<HashMap> dataList = paraseData(list);
  51. logger.info("数据库有效订购的用户数:"+dataList.size());
  52. for(HashMap vipmap : dataList){
  53. UnicomSmsOrderService continueService = new UnicomSmsOrderService(list.size(),threadSignal,vipmap,asyndao,dictionaryDao);
  54. executorService.execute(continueService);
  55. }
  56. executorService.shutdown();
  57. try {
  58. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. logger.info(Thread.currentThread().getName()+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  64. }
  65. /**
  66. * 去除重复数据
  67. * @param dataList
  68. * @return
  69. */
  70. private List<HashMap> paraseData(List<HashMap> dataList){
  71. //去重复后的数据集
  72. List<HashMap> reDataList = new ArrayList<HashMap>();
  73. HashMap<String, List> tmpMap = new HashMap<String, List>();
  74. for (HashMap dataMap : dataList) {
  75. if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){
  76. logger.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID"));
  77. }else{
  78. reDataList.add(dataMap);
  79. List tmpList = new ArrayList();
  80. tmpList.add(dataMap.get("SPID"));
  81. tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList);
  82. }
  83. }
  84. return reDataList;
  85. }
  86. }
  87. class UnicomSmsOrderService implements Runnable {
  88. private static Logger log = Logger.getLogger("smsorder");
  89. private int totalSize;
  90. private CountDownLatch threadSignal;
  91. private HashMap vipmap;
  92. private DictionaryDao dictionaryDao;
  93. private UnicomSmsOrderDao asyndao;
  94. public UnicomSmsOrderService(int totalSize,CountDownLatch threadSignal,HashMap vipmap,UnicomSmsOrderDao asyndao,DictionaryDao dictionaryDao){
  95. this.totalSize = totalSize;
  96. this.threadSignal = threadSignal;
  97. this.vipmap = vipmap;
  98. this.asyndao = asyndao;
  99. this.dictionaryDao = dictionaryDao;
  100. }
  101. @Override
  102. public void run() {
  103. long startime = System.currentTimeMillis();
  104. Map logMap = new HashMap();
  105. String id = vipmap.get("ID").toString();
  106. logMap.put("vipmap", vipmap);
  107. String resultcode = "2";
  108. String errorinfo = "处理中";
  109. try {
  110. //结果编码,-1待审核,1待处理,2处理中,0处理完成
  111. asyndao.updShareStatus(id, resultcode, errorinfo, "");
  112. //调短信订购接口
  113. invokeSmsOrder();
  114. resultcode = "0";
  115. errorinfo = "成功";
  116. } catch (Exception e) {
  117. if (e instanceof BusinessException) {
  118. errorinfo = ((BusinessException) e).getMessage();
  119. resultcode = ((BusinessException) e).getCode();
  120. }else{
  121. e.printStackTrace();
  122. resultcode = "8000";
  123. errorinfo = "处理数据出现异常,"+e.getMessage();
  124. }
  125. }finally {
  126. threadSignal.countDown();
  127. String time = System.currentTimeMillis()-startime+"";
  128. try{
  129. //更新数据处理状态
  130. asyndao.updShareStatus(id, resultcode, errorinfo, time);
  131. }catch(Exception e){
  132. log.error(vipmap.get("USERID")+"更新调能力平台结果出现异常,"+e.getMessage());
  133. }
  134. //写日志
  135. logMap.put("resultcode", resultcode);
  136. logMap.put("errorinfo", errorinfo);
  137. logMap.put("time", time);
  138. logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  139. log.info(JsonUtil.objectToJson(logMap));
  140. }
  141. }
  142. /**
  143. * 调能力平台
  144. * @throws BusinessException
  145. */
  146. public void invokeSmsOrder() throws BusinessException{
  147. String result ="";
  148. try{
  149. //String url = "http://111.206.133.54/smsorder/channelSmsSend.do";
  150. String url = dictionaryDao.getValue("unicomsmsorderurl");
  151. String channel = vipmap.get("CHANNEL").toString();
  152. String userid = vipmap.get("USERID").toString();
  153. String key = "";
  154. String unicomsmsorderpwd = dictionaryDao.getValue("unicomsmsorderpwd");
  155. if(!StringUtils.isEmpty(unicomsmsorderpwd)){
  156. String [] pwdArray = unicomsmsorderpwd.split("\\|");
  157. for(String s : pwdArray){
  158. if(!StringUtils.isEmpty(s) && channel.equals(s.split("###")[0])){
  159. key = s.split("###")[1];
  160. break;
  161. }
  162. }
  163. }
  164. SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
  165. //yyyyMMddHHmmssSSS
  166. String timestamp = sdf.format(new Date());
  167. String content = vipmap.get("CONTENT")+"";
  168. //sha1(channel+userid+timestamp+密钥)
  169. String sign = channel+userid+timestamp+key;
  170. sign = SHAUtil.shaEncode(sign).toLowerCase();
  171. userid = DESUtil.encode(userid, key);
  172. url += "?channel="+channel+"&userid="+URLEncoder.encode(userid,"UTF-8")+"&timestamp="+timestamp+"&sign="+sign+"&content="+URLEncoder.encode(content,"UTF-8");
  173. log.info("调接口参数:"+url);
  174. result = URLUtil.get(url,30000);
  175. log.info("调接口返回结果:"+result);
  176. }catch (Exception e) {
  177. e.printStackTrace();
  178. throw new BusinessException("9170", "调用短信订购接口异常,"+e.getMessage());
  179. }
  180. JSONObject obj = JSONObject.fromObject(result);
  181. if(!obj.getString("resultCode").equals("0")){
  182. throw new BusinessException(obj.getString("resultCode"), obj.getString("resultInfo"));
  183. }
  184. }
  185. }