5bca663e0028b0e0438d11d84f4b8211a35f0dda.svn-base 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package com.chinacreator.process.job;
  2. import com.chinacreator.common.exception.BusinessException;
  3. import com.chinacreator.common.util.DESUtil;
  4. import com.chinacreator.common.util.MD5;
  5. import com.chinacreator.process.dao.BackBusiVipAsynDao;
  6. import com.chinacreator.process.dao.DictionaryDao;
  7. import com.chinacreator.process.util.HttpInvoke;
  8. import com.chinacreator.process.util.JsonUtil;
  9. import com.chinacreator.process.util.URLUtil;
  10. import org.apache.log4j.Logger;
  11. import org.quartz.DisallowConcurrentExecution;
  12. import org.quartz.PersistJobDataAfterExecution;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import java.net.URLEncoder;
  15. import java.text.SimpleDateFormat;
  16. import java.util.*;
  17. import java.util.concurrent.*;
  18. /**
  19. * 后向送会员异步处理
  20. * @author xu.zhou
  21. * @date 20200818
  22. */
  23. @PersistJobDataAfterExecution
  24. @DisallowConcurrentExecution
  25. public class BackBusiVipAsynJob {
  26. private static Logger logger = Logger.getLogger("backbusivipsyn");
  27. @Autowired
  28. private BackBusiVipAsynDao asyndao;
  29. @Autowired
  30. private DictionaryDao dictionaryDao;
  31. public void doProcess() throws Exception {
  32. String trycount = dictionaryDao.getValue("vipasyntrycount");//后向产品赠送会员异步处理重试次数
  33. if(trycount == null || "".equals(trycount)) trycount = "3";
  34. logger.info(Thread.currentThread().getName()+"定时任务开始");
  35. long beginTime = System.currentTimeMillis();
  36. List<HashMap> list = asyndao.getVipAsynData(trycount);
  37. if(list != null && list.size() > 0){
  38. //推送月份
  39. String pushmonth = new SimpleDateFormat("yyyyMM").format(new Date());
  40. CountDownLatch threadSignal = new CountDownLatch(list.size());
  41. ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  42. List<HashMap> dataList = paraseData(list);
  43. logger.info("数据库有效订购的用户数:"+dataList.size());
  44. for(HashMap vipmap : dataList){
  45. BackBusiVipSynService continueService = new BackBusiVipSynService(list.size(),threadSignal,vipmap,asyndao,dictionaryDao);
  46. executorService.execute(continueService);
  47. }
  48. executorService.shutdown();
  49. try {
  50. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. logger.info(Thread.currentThread().getName()+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  56. }
  57. /**
  58. * 去除重复数据,防止赠送会员接口被并发限制
  59. * @param dataList
  60. * @return
  61. */
  62. private List<HashMap> paraseData(List<HashMap> dataList){
  63. //去重复后的数据集
  64. List<HashMap> reDataList = new ArrayList<HashMap>();
  65. HashMap<String, List> tmpMap = new HashMap<String, List>();
  66. for (HashMap dataMap : dataList) {
  67. if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){
  68. logger.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID"));
  69. }else{
  70. reDataList.add(dataMap);
  71. List tmpList = new ArrayList();
  72. tmpList.add(dataMap.get("SPID"));
  73. tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList);
  74. }
  75. }
  76. return reDataList;
  77. }
  78. }
  79. class BackBusiVipSynService implements Runnable {
  80. private static Logger log = Logger.getLogger("backbusivipsyn");
  81. private int totalSize;
  82. private CountDownLatch threadSignal;
  83. private HashMap vipmap;
  84. private DictionaryDao dictionaryDao;
  85. private BackBusiVipAsynDao asyndao;
  86. public BackBusiVipSynService(int totalSize,CountDownLatch threadSignal,HashMap vipmap,BackBusiVipAsynDao asyndao,DictionaryDao dictionaryDao){
  87. this.totalSize = totalSize;
  88. this.threadSignal = threadSignal;
  89. this.vipmap = vipmap;
  90. this.asyndao = asyndao;
  91. this.dictionaryDao = dictionaryDao;
  92. }
  93. @Override
  94. public void run() {
  95. long startime = System.currentTimeMillis();
  96. Map logMap = new HashMap();
  97. String id = vipmap.get("ID").toString();
  98. logMap.put("vipmap", vipmap);
  99. String resultcode = "-1";
  100. String errorinfo = "";
  101. String vipsendcode = "3"; //赠送会员结果标识,默认为失败
  102. try {
  103. //更新为正在赠送状态
  104. asyndao.updVipSendStatus(id);
  105. HashMap<String, String> remap = sendVip();
  106. if("0".equals(remap.get("vipsendcode"))){
  107. resultcode = "0";
  108. vipsendcode = "0";
  109. errorinfo = "赠送成功";
  110. }else{
  111. resultcode = remap.get("vipsendcode");
  112. errorinfo = remap.get("vipsendinfo");
  113. }
  114. } catch (Exception e) {
  115. if (e instanceof BusinessException) {
  116. errorinfo = ((BusinessException) e).getMessage();
  117. resultcode = ((BusinessException) e).getCode();
  118. }else{
  119. e.printStackTrace();
  120. resultcode = "8000";
  121. errorinfo = "处理数据出现异常,"+e.getMessage();
  122. }
  123. }finally {
  124. threadSignal.countDown();
  125. String time = System.currentTimeMillis()-startime+"";
  126. try{
  127. //更新赠送会员结果出现异常
  128. asyndao.updVipSendStatus(id, vipsendcode, time);
  129. }catch(Exception e){
  130. log.error(vipmap.get("USERID")+"更新赠送会员结果出现异常,"+e.getMessage());
  131. }
  132. //回调
  133. callBack(resultcode, errorinfo);
  134. //发送成功短信
  135. inserSmstMq(resultcode);
  136. //写日志
  137. logMap.put("resultcode", resultcode);
  138. logMap.put("errorinfo", errorinfo);
  139. logMap.put("time", time);
  140. logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  141. log.info(JsonUtil.objectToJson(logMap));
  142. }
  143. }
  144. /**
  145. * 推送办理成功的短信到队列
  146. * @param orderInfo
  147. */
  148. private void inserSmstMq(String resultcode){
  149. try{
  150. //赠送成功才发短信
  151. if(!"0".equals(resultcode)) return;
  152. String userid = vipmap.get("USERID").toString();
  153. if(userid == null || userid.length() != 11) return;
  154. String channel = vipmap.get("CHANNEL").toString();
  155. String cpid = vipmap.get("CPID").toString();
  156. String spid = vipmap.get("SPID").toString();
  157. Map<String, String> map = new HashMap<String, String>();
  158. map.put("userid", userid);
  159. map.put("cpid", cpid);
  160. map.put("spid", spid);
  161. map.put("result", "0");
  162. map.put("channel", "");
  163. map.put("style","0000");
  164. map.put("times", "");
  165. map.put("orderType", "");
  166. map.put("type", "cssms");
  167. map.put("busiType", "tran_succ"); //订购成功短信
  168. if("TX20_twback_qc".equals(channel)){//女王卡领取会员发短信,同样的SPID短信内容不同
  169. map.put("busiType", "qc_tran_succ");
  170. }
  171. String mqReciveUrl = dictionaryDao.getValue("mqReciveUrl");
  172. URLUtil.post(mqReciveUrl, JsonUtil.objectToJson(map));
  173. //部分业务要发三条短信
  174. if(!"TX20_twback_qc".equals(channel)){
  175. map.put("busiType", "tran_succ2"); //订购成功短信
  176. }
  177. URLUtil.post(mqReciveUrl, JsonUtil.objectToJson(map));
  178. if(!"TX20_twback_qc".equals(channel)){
  179. map.put("busiType", "tran_succ3"); //订购成功短信
  180. }
  181. URLUtil.post(mqReciveUrl, JsonUtil.objectToJson(map));
  182. }catch (Exception e){
  183. e.printStackTrace();
  184. log.info(vipmap.get("ID")+",发送成功短信失败");
  185. }
  186. }
  187. /**
  188. * 调接口送会员
  189. * @param orderInfo
  190. * @return
  191. * @throws Exception
  192. */
  193. private HashMap<String, String> sendVip() throws Exception{
  194. HashMap<String, String> remap = new HashMap<String, String>();
  195. String vipsendcode = "3"; //默认失败
  196. String vipsendinfo = "赠送失败";
  197. try {
  198. //http://114.255.201.228:86/activity/youkuHX
  199. String vipurl = this.dictionaryDao.getValue("backBusiVipUrl");
  200. String timestamp = (System.currentTimeMillis() / 1000) + "";
  201. String userid = vipmap.get("USERID").toString();
  202. String orderid = vipmap.get("ORDERID").toString();
  203. String channel = vipmap.get("CHANNEL").toString();
  204. String cpid = vipmap.get("CPID").toString();
  205. String spid = vipmap.get("SPID").toString();
  206. String pwd = "";
  207. List<HashMap> confList = asyndao.getBackBusiConf(cpid, spid);
  208. pwd = confList.get(0).get("PWD").toString();
  209. if("0".equals(confList.get(0).get("HASFH"))){ //是复合产品
  210. vipurl = this.dictionaryDao.getValue("backBusiGroupVipUrl");
  211. }
  212. userid = DESUtil.encode(userid, pwd);
  213. //MD5(orderid+userid+goodscode+pwd+timestamp)转换为十六进制ASCII 码字符串,共32 个字符,全小写 userid= Des(手机号码,pwd)
  214. //MD5(orderid+userid+timestamp+pwd)转换为十六进制ASCII 码字符串,共32 个字符,全小写
  215. String signature = MD5.MD5Encode(orderid + userid + timestamp + pwd);
  216. signature = signature.toLowerCase();
  217. vipurl = vipurl + "?userid=" + URLEncoder.encode(userid, "utf-8")+ "&orderid="+ orderid + "&cpid=" + cpid + "&spid=" + spid + "&timestamp="
  218. + timestamp + "&signature=" + signature+ "&apptype=2";
  219. log.info("vipurl: "+vipurl);
  220. //http://114.255.201.228:86/activity/eshop/vip?userid=iafPbU9aRLghY%2FEVMXFeag%3D%3D&orderid=201906231206498662914&goodscode=pointshop130&timestamp=1561445765&signature=47fe0e3900b29ef88fd0889b7c0e4cc6&apptype=5
  221. String result = URLUtil.get(vipurl,30*1000); //调赠送会员接口,超时时间设置为10秒
  222. log.info("赠送会员重试结果=> userid: " +userid+", orderid: "+orderid+" , result: "+result);
  223. Map<?,?> map = JsonUtil.jsonToMap(result);
  224. //resultcode = (String)map.get("resultcode");
  225. // if("0".equals(map.get("resultcode"))){
  226. // vipsendcode = "0";
  227. // }
  228. vipsendcode = map.get("resultcode")+"";
  229. vipsendinfo = map.get("errorinfo")+"";
  230. } catch (Exception e) {
  231. e.printStackTrace();
  232. log.error("userid: "+vipmap.get("USERID")+"赠送会员失败,"+e);
  233. if(e.getMessage() != null && e.getMessage().indexOf("TimeoutException") != -1){//超时异常
  234. throw new BusinessException("9070","赠送会员超时", new String[0]);
  235. }else{
  236. throw new BusinessException("9002","赠送会员未成功", new String[0]);
  237. }
  238. }
  239. remap.put("vipsendcode", vipsendcode);
  240. remap.put("vipsendinfo", vipsendinfo);
  241. return remap;
  242. }
  243. /**
  244. * 回调通知
  245. * @param orderBean
  246. * @return 0成功,1未回调,2出现异常
  247. */
  248. private String callBack(String resultcode, String resultinfo){
  249. String res = "1";
  250. try {
  251. String trycountconf = dictionaryDao.getValue("vipasyntrycount");
  252. String trycount = vipmap.get("RETRYCOUNT").toString();
  253. String userid = vipmap.get("USERID").toString();
  254. String orderid = vipmap.get("ORDERID").toString();
  255. String channel = vipmap.get("CHANNEL").toString();
  256. String id = vipmap.get("ID").toString();
  257. //判断重试是否已是最大次数
  258. if(!"0".equals(resultcode) && Integer.parseInt(trycount)+1 < Integer.parseInt(trycountconf)){
  259. return null;
  260. }
  261. //当会员赠送失败时,反查一下会员赠送日志表,看是否真的赠送失败,防止调接口超时获取不到真实赠送结果
  262. if(!"0".equals(resultcode)){
  263. if(asyndao.getVipSendRes(orderid)){
  264. resultcode = "0";
  265. resultinfo = "OK";
  266. }
  267. }
  268. HashMap channelInfo = asyndao.getChannelPwdByChannel(channel);
  269. String pwd = channelInfo.get("PASSWORD")+"";
  270. userid = DESUtil.encode(userid, pwd);
  271. userid = URLEncoder.encode(userid, "utf-8");
  272. String vipRetryCallBackUrl = this.dictionaryDao.getValue("vipRetryCallBackUrl");//回调接口地址
  273. vipRetryCallBackUrl += "?orderid="+orderid+"&resultcode="+resultcode+"&resultinfo="+URLEncoder.encode(resultinfo,"utf-8")+"&userid="+userid;
  274. log.info("===========回调地址:"+vipRetryCallBackUrl);
  275. String result = HttpInvoke.sendhttpsReq("GET", vipRetryCallBackUrl, "", null, 60*1000, "TLSv1.2");
  276. //String result = URLUtil.get(vipRetryCallBackUrl,60*1000); //调赠送会员接口,超时时间设置为10秒
  277. log.info("回调通知接口返回信息=>userid: " +userid+", orderid: "+orderid+", result: "+result);
  278. Map<?,?> map = JsonUtil.jsonToMap(result);
  279. res = (String)map.get("result");
  280. } catch (Exception e) {
  281. res = "2";
  282. e.printStackTrace();
  283. log.error("id=>"+vipmap.get("ID")+",回调接口出现异常,"+e.getMessage());
  284. }
  285. return res;
  286. }
  287. }