d7810b305c915fe90201877dcaf0155206c6f15a.svn-base 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package com.chinacreator.process.job;
  2. import com.alibaba.fastjson.JSON;
  3. import com.chinacreator.common.util.MD5;
  4. import com.chinacreator.common.util.URLUtil;
  5. import com.chinacreator.process.dao.HuaweiWalletOrderDao;
  6. import com.chinacreator.process.util.DesUtil;
  7. import net.sf.json.JSONObject;
  8. import org.apache.log4j.Logger;
  9. import org.quartz.DisallowConcurrentExecution;
  10. import org.quartz.PersistJobDataAfterExecution;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.scheduling.annotation.Scheduled;
  13. import java.net.URLEncoder;
  14. import java.util.ArrayList;
  15. import java.util.HashMap;
  16. import java.util.List;
  17. import java.util.concurrent.*;
  18. @PersistJobDataAfterExecution
  19. @DisallowConcurrentExecution
  20. public class HuaweiWalletOrderJob {
  21. private static Logger logger = Logger.getLogger("HuaweiWalletOrder");
  22. @Autowired
  23. private HuaweiWalletOrderDao huaweiWalletOrderDao;
  24. /**
  25. * 华为钱包订购
  26. * @throws Exception
  27. */
  28. @Scheduled(cron = "0/10 * * * * *")
  29. public void doProcess() throws Exception{
  30. logger.info("开始处理华为钱包订购订单job");
  31. try {
  32. //获取表中需要订购的数据
  33. List<HashMap> dataList = huaweiWalletOrderDao.queryList();
  34. logger.info("获取数据条数" + dataList.size());
  35. if (dataList != null && dataList.size() > 0) {
  36. dataList = paraseData(dataList); //去重复数据
  37. logger.info("去重复后用户数:" + dataList.size());
  38. CountDownLatch threadSignal = new CountDownLatch(dataList.size());
  39. ExecutorService executorService = new ThreadPoolExecutor(10, 20, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  40. for (HashMap<String, String> data : dataList) {
  41. huaweiWalletOrderDao.updataQueryList(data.get("ID"), "0");
  42. }
  43. for (HashMap<String, String> data : dataList) {
  44. HuaweiWalletOrderService continueService = new HuaweiWalletOrderService(dataList.size(), threadSignal, data, huaweiWalletOrderDao);
  45. executorService.execute(continueService);
  46. }
  47. executorService.shutdown();
  48. try {
  49. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  50. } catch (InterruptedException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. Thread.sleep(100);
  55. //获取执行状态为0且执行时间超过2分钟的数据为待处理
  56. List<HashMap> queryListlong = huaweiWalletOrderDao.getQueryListlong();
  57. for (HashMap<String,String> map:queryListlong ){
  58. //查订购关系表是否已有订购关系
  59. String count = huaweiWalletOrderDao.queryOrder(map.get("USERID"),map.get("SPID"));
  60. logger.info("count"+count);
  61. if ("0".equals(count)){
  62. huaweiWalletOrderDao.updataQueryNotifydata(map.get("ID"), "2","超时");
  63. }else {
  64. huaweiWalletOrderDao.updataQueryNotifydata(map.get("ID"), "1","超时");
  65. }
  66. }
  67. if (queryListlong.size() > 0) {
  68. logger.info("更新状态为0处理中且执行时间超过2分钟的数据为待处理的异常数据条数【" + queryListlong.size() + "】");
  69. }
  70. }catch (Exception ee){
  71. ee.printStackTrace();
  72. logger.info("华为钱包订购订单jo执行出现异常,"+ee.getMessage());
  73. }
  74. logger.info("结束华为钱包订购订单job");
  75. }
  76. /**
  77. * 按手机号码与spid去除重复数据,确保获取的同一组数据里手机号码与spid不相同,防止同一手机号码订购同一产品并发而造成处理异常
  78. * @param dataList
  79. * @return
  80. */
  81. private List<HashMap> paraseData(List<HashMap> dataList){
  82. //去重复后的数据集
  83. List<HashMap> reDataList = new ArrayList<HashMap>();
  84. HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
  85. for (HashMap dataMap : dataList) {
  86. if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("SPID").toString())){
  87. logger.info("重复数据,"+dataMap);
  88. }else{
  89. tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("SPID").toString(), dataMap);
  90. reDataList.add(dataMap);
  91. }
  92. }
  93. return reDataList;
  94. }
  95. }
  96. /**
  97. * 处理类
  98. * @author xinlong.liu
  99. * @date 20220721
  100. */
  101. class HuaweiWalletOrderService implements Runnable {
  102. private static Logger logger = Logger.getLogger("HuaweiWalletOrder");
  103. private int totalSize;
  104. private CountDownLatch threadSignal;
  105. private HuaweiWalletOrderDao huaweiWalletOrderDao;
  106. private HashMap<String,String> data; //订购数据
  107. public HuaweiWalletOrderService(int totalSize,CountDownLatch threadSignal,HashMap<String,String> data,HuaweiWalletOrderDao huaweiWalletOrderDao){
  108. this.totalSize = totalSize;
  109. this.threadSignal = threadSignal;
  110. this.data = data;
  111. this.huaweiWalletOrderDao = huaweiWalletOrderDao;
  112. }
  113. @Override
  114. public void run() {
  115. String result = null;
  116. String status = "2";
  117. HashMap<String,Object> logmap = new HashMap<String, Object>();
  118. //组件订购参数
  119. logmap.put("data",data);
  120. String cpid = data.get("CPID");
  121. String spid = data.get("SPID");
  122. String channel = data.get("CHANNEL");
  123. String type = data.get("TYPE");
  124. String userid = data.get("USERID");
  125. String token = data.get("TOKEN");
  126. String access = "1";
  127. String timestamp = Long.toString(System.currentTimeMillis()/1000);
  128. try{
  129. String pwd = huaweiWalletOrderDao.getPwd(cpid);
  130. String signStr = cpid+spid+userid+type+channel+timestamp+pwd;
  131. String signature = MD5.MD5Encode(signStr);
  132. // String url = "http://10.0.17.151:6060/videoif/order.do";
  133. String url = huaweiWalletOrderDao.getValue("commonOrderUrl");
  134. String urlStr = url+"order.do?userid="+ URLEncoder.encode(DesUtil.encode(userid, pwd), "UTF-8")+"&cpid="+cpid+"&token="+token+"&spid="+spid+"&channel="+channel+"&apptype=2";;
  135. logmap.put("url",urlStr);
  136. result = URLUtil.get(urlStr,60000);
  137. logmap.put("result",result);
  138. JSONObject obj = JSONObject.fromObject(result);
  139. if("0".equals(obj.getString("resultcode"))){
  140. status = "1";
  141. }
  142. }catch (Exception e){
  143. e.printStackTrace();
  144. logger.info("调videoif订购接口出错,"+"userid: "+userid+",cpid: "+cpid+",spid: "+spid+"错误信息: "+e);
  145. }finally {
  146. try {
  147. huaweiWalletOrderDao.updataQueryNotifydata(data.get("ID"),status,result);
  148. } catch (Exception throwables) {
  149. throwables.printStackTrace();
  150. logger.info("订购结果入库失败"+throwables.getMessage());
  151. }
  152. logger.info(JSON.toJSONString(logmap));
  153. }
  154. }
  155. }