567a482bc3f3dd76b5b3ae2865a2f5a1fd83ca46.svn-base 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package com.chinacreator.process.job;
  2. import com.alibaba.druid.util.StringUtils;
  3. import com.alibaba.fastjson.JSON;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.chinacreator.process.dao.IntensiveSyncOrderDao;
  6. import com.chinacreator.process.util.AesUtilByIntensive;
  7. import com.chinacreator.process.util.URLUtil;
  8. import org.apache.log4j.Logger;
  9. import org.quartz.DisallowConcurrentExecution;
  10. import org.quartz.PersistJobDataAfterExecution;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import java.util.ArrayList;
  13. import java.util.HashMap;
  14. import java.util.List;
  15. import java.util.concurrent.ExecutorService;
  16. import java.util.concurrent.LinkedBlockingQueue;
  17. import java.util.concurrent.ThreadPoolExecutor;
  18. import java.util.concurrent.TimeUnit;
  19. @PersistJobDataAfterExecution
  20. @DisallowConcurrentExecution
  21. public class IntensiveSyncOrderJob {
  22. private Logger logger = Logger.getLogger("intensiveSyncOrder");
  23. @Autowired
  24. private IntensiveSyncOrderDao intensiveSyncOrderDao;
  25. public void doProcess() throws Exception {
  26. logger.info("开始处理9楼集约化平台订购关系同步job");
  27. try {
  28. //获取表中需要订购的数据
  29. List<HashMap> dataList = intensiveSyncOrderDao.queryList();
  30. logger.info("获取数据条数" + dataList.size());
  31. if (dataList != null && dataList.size() > 0) {
  32. dataList = paraseData(dataList); //去重复数据
  33. logger.info("去重复后用户数:" + dataList.size());
  34. ExecutorService executorService = new ThreadPoolExecutor(10, 20, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  35. for (HashMap<String, String> data : dataList) {
  36. intensiveSyncOrderDao.updataQueryList(data.get("ID"), "2");
  37. }
  38. for (HashMap<String, String> data : dataList) {
  39. IntensiveSyncOrderService intensiveSyncOrderService = new IntensiveSyncOrderService(intensiveSyncOrderDao,data);
  40. executorService.execute(intensiveSyncOrderService);
  41. }
  42. executorService.shutdown();
  43. try {
  44. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. Thread.sleep(100);
  49. //获取执行状态为2且执行时间超过10分钟的数据为待处理
  50. int i = intensiveSyncOrderDao.updExecTimeout();
  51. if (i > 0) {
  52. logger.info("更新状态为0处理中且执行时间超过10分钟的数据为待处理的异常数据条数【" + i + "】");
  53. }
  54. }
  55. }catch (Exception ee){
  56. ee.printStackTrace();
  57. logger.info("9楼集约化平台订购关系同步job执行出现异常,"+ee.getMessage());
  58. }
  59. logger.info("结束9楼集约化平台订购关系同步job");
  60. }
  61. /**
  62. * 按手机号码与spid去除重复数据,确保获取的同一组数据里手机号码与spid不相同
  63. * @param dataList
  64. * @return
  65. */
  66. private List<HashMap> paraseData(List<HashMap> dataList){
  67. //去重复后的数据集
  68. List<HashMap> reDataList = new ArrayList<HashMap>();
  69. HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
  70. for (HashMap dataMap : dataList) {
  71. if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("SPID").toString())){
  72. logger.info("重复数据,"+dataMap);
  73. }else{
  74. tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("SPID").toString(), dataMap);
  75. reDataList.add(dataMap);
  76. }
  77. }
  78. return reDataList;
  79. }
  80. }
  81. class IntensiveSyncOrderService implements Runnable{
  82. private static Logger logger = Logger.getLogger("intensiveSyncOrder");
  83. private IntensiveSyncOrderDao intensiveSyncOrderDao;
  84. private HashMap<String,String> data;
  85. public IntensiveSyncOrderService(IntensiveSyncOrderDao intensiveSyncOrderDao, HashMap<String, String> data) {
  86. this.intensiveSyncOrderDao = intensiveSyncOrderDao;
  87. this.data = data;
  88. }
  89. @Override
  90. public void run() {
  91. String resultcode = "3000";
  92. String resultinfo = "无返回";
  93. String resultdata = "";
  94. String id = data.get("ID");
  95. String syncount = data.get("SYNCOUNT");
  96. HashMap<String,Object> logmap = new HashMap<String, Object>();
  97. logmap.put("data",data);
  98. try {
  99. List<HashMap> invokeUrl = intensiveSyncOrderDao.getInvokeUrl("actvieprocess", "intensiveSyncOrder");
  100. logmap.put("invokeUrl",invokeUrl);
  101. String cpid = "";
  102. String busiid = "";
  103. String aespwd = "";
  104. String channel = "";
  105. String url = "";
  106. if (invokeUrl!=null&&invokeUrl.size()>0){
  107. HashMap invokeUrlConf = invokeUrl.get(0);
  108. String param1 = invokeUrlConf.get("PARAM1").toString();
  109. JSONObject object = JSON.parseObject(param1);
  110. cpid = object.getString("cpid");
  111. busiid = object.getString("busiid");
  112. aespwd = object.getString("aespwd");
  113. channel = object.getString("channel");
  114. url = invokeUrlConf.get("INVOKEURL").toString();
  115. }
  116. String jiyueChannel = intensiveSyncOrderDao.getChannel(data.get("SPID"));
  117. if (!StringUtils.isEmpty(jiyueChannel)){
  118. channel = jiyueChannel;
  119. }
  120. String spid = data.get("SPID");
  121. String userid = data.get("USERID");
  122. String ordertime = data.get("ORDERTIME");
  123. String canceltime = data.get("CANCELTIME");
  124. String productid = intensiveSyncOrderDao.getProductid(spid);
  125. HashMap<String,String> map = new HashMap<String,String>();
  126. map.put("busiid",busiid);
  127. map.put("serialid",id);
  128. map.put("usermob",userid);
  129. map.put("productid",productid);
  130. map.put("ordertime",ordertime);
  131. map.put("canceltime",canceltime);
  132. map.put("channel",channel);
  133. logmap.put("reqDate",map);
  134. HashMap<String,Object> hashMap = new HashMap<String,Object>();
  135. hashMap.put("cpid",cpid);
  136. hashMap.put("data", AesUtilByIntensive.encrypt(JSON.toJSONString(map),aespwd));
  137. String resp = URLUtil.postJson(url, JSON.toJSONString(hashMap));
  138. logmap.put("resp",resp);
  139. if (!StringUtils.isEmpty(resp)){
  140. JSONObject object = JSON.parseObject(resp);
  141. resultcode = object.getString("resultCode");
  142. resultinfo = object.getString("resultInfo");
  143. resultdata = object.getString("data");
  144. }
  145. if (!StringUtils.isEmpty(resultdata)){
  146. try {
  147. resultdata = AesUtilByIntensive.decrypt(resultdata,aespwd);
  148. }catch (Exception e){
  149. logger.error("解密data出错"+e);
  150. }
  151. }else {
  152. resultdata = resp;
  153. }
  154. }catch (Exception e){
  155. e.printStackTrace();
  156. resultcode = "8000";
  157. resultinfo = e.getMessage();
  158. resultdata = "";
  159. }finally {
  160. logmap.put("resultcode",resultcode);
  161. logmap.put("resultinfo",resultinfo);
  162. logmap.put("resultdata",resultdata);
  163. try {
  164. intensiveSyncOrderDao.updata(id,resultcode,resultinfo,resultdata,Integer.parseInt(syncount)+1);
  165. }catch (Exception e){
  166. e.printStackTrace();
  167. logger.error("id: "+id+"修改失败:"+e.getMessage());
  168. }
  169. logger.info(logmap);
  170. }
  171. }
  172. }