0e857f83e7fcf06a9914db3dd495bfa697917270.svn-base 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package com.chinacreator.process.job;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.chinacreator.process.bean.OrderPushBean;
  4. import com.chinacreator.process.dao.DictionaryDao;
  5. import com.chinacreator.process.dao.OrderPushDao;
  6. import com.chinacreator.process.util.URLUtil;
  7. import org.apache.log4j.Logger;
  8. import org.quartz.DisallowConcurrentExecution;
  9. import org.quartz.PersistJobDataAfterExecution;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.util.StringUtils;
  12. import java.net.URLEncoder;
  13. import java.util.HashMap;
  14. import java.util.List;
  15. import java.util.Map;
  16. import java.util.concurrent.*;
  17. /**
  18. * @author 黑猫警长
  19. * @create 2021/3/30 9:33
  20. */
  21. @PersistJobDataAfterExecution
  22. @DisallowConcurrentExecution
  23. public class OrderPushJob {
  24. private static Logger logger = Logger.getLogger("OrderPush");
  25. @Autowired
  26. private OrderPushDao orderPushDao;
  27. @Autowired
  28. private DictionaryDao dictionaryDao;
  29. public void doProcess() throws Exception {
  30. logger.info(Thread.currentThread().getName() + "会员匹配数据定时任务开始");
  31. long beginTime = System.currentTimeMillis();//开始时间
  32. int rows = 800; //每次取数据条数
  33. String OrderPushfmrows = dictionaryDao.getValue("OrderPushfmrows");
  34. if (!StringUtils.isEmpty(OrderPushfmrows)) {
  35. try {
  36. rows = Integer.parseInt(OrderPushfmrows);
  37. } catch (Exception e) {
  38. rows = 800;
  39. }
  40. }
  41. //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
  42. String partition = "T_HASH_P";
  43. for (int i = 1; i <= 50; i++) {
  44. partition = "T_HASH_P";
  45. try {
  46. if (i < 10) {
  47. partition = partition + "0" + i;
  48. } else {
  49. partition = partition + i;
  50. }
  51. //查询会员订购关系,根据分区查询
  52. List<HashMap> list = orderPushDao.getOrderData(partition, rows);
  53. logger.info(partition + ",用户数:" + (list != null ? list.size() : "0"));
  54. if (list != null && list.size() > 0) {
  55. paraseData(list);
  56. logger.info(partition + ",去重复后用户数:" + list.size());
  57. CountDownLatch threadSignal = new CountDownLatch(list.size());//一个线程等待其他线程各自执行完毕后再执行。
  58. ExecutorService executorService = new ThreadPoolExecutor(10, 15, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  59. //把数据更新为正在处理状态
  60. for (HashMap hm : list) {
  61. orderPushDao.updatePush(hm.get("ID").toString(), "2", null);
  62. }
  63. //处理数据
  64. for (HashMap hm : list) {
  65. OrderPushService continueService = new OrderPushService(list.size(),threadSignal, orderPushDao, dictionaryDao,hm);
  66. executorService.execute(continueService);
  67. }
  68. logger.info("ok,"+partition+"处理完成");
  69. executorService.shutdown();
  70. try {
  71. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  72. } catch (InterruptedException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. } catch (Exception e) {
  77. logger.info(partition + ",执行出现异常," + e.getMessage());
  78. e.printStackTrace();//Cannot get a connection, pool error Timeout waiting for idle object
  79. }
  80. Thread.sleep(100);
  81. }
  82. logger.info(Thread.currentThread().getName() + "定时任务完成,耗时:" + (System.currentTimeMillis() - beginTime) / 1000 + " 秒");
  83. }
  84. /**
  85. * 去重复数据(userid+spid)
  86. *
  87. * @param dataList
  88. * @return
  89. */
  90. private void paraseData(List<HashMap> dataList) {
  91. if (dataList == null || dataList.size() == 0) {
  92. return;
  93. }
  94. HashMap<String, Object> reData = new HashMap<String, Object>();
  95. String userid = "";
  96. String spid = "";
  97. HashMap dataMap = null;
  98. for (int i = 0; i < dataList.size(); i++) {
  99. dataMap = dataList.get(i);
  100. userid = dataMap.get("USERID").toString();
  101. spid = dataMap.get("SPID").toString();
  102. if (reData.containsKey(userid + spid)) {
  103. dataList.remove(i);
  104. i--;
  105. } else {
  106. reData.put(userid, dataMap);
  107. }
  108. }
  109. }
  110. /* public static void main(String[] args) {
  111. OrderPushJob job = new OrderPushJob();
  112. try {
  113. job.doProcess();
  114. } catch (Exception e) {
  115. e.printStackTrace();
  116. }
  117. }*/
  118. }
  119. class OrderPushService implements Runnable {
  120. private Logger logger = Logger.getLogger("OrderPush");
  121. private int totalSize;
  122. private CountDownLatch threadSignal;
  123. private OrderPushDao orderPushDao;
  124. private DictionaryDao dictionaryDao;
  125. private HashMap hm; //用户数据,ID和USERID
  126. public OrderPushService(int totalSize, CountDownLatch threadSignal, OrderPushDao orderPushDao, DictionaryDao dictionaryDao, HashMap hm) {
  127. this.totalSize = totalSize;
  128. this.threadSignal = threadSignal;
  129. this.orderPushDao = orderPushDao;
  130. this.dictionaryDao = dictionaryDao;
  131. this.hm = hm;
  132. }
  133. @Override
  134. public void run() {
  135. String resultcode = "";//处理结果编码,0处理成功,1待处理,2处理中
  136. String errorinfo = "";
  137. String realflag = "";//是否有订购关系标识,0有,1无,其他失败
  138. String id = hm.get("ID").toString();
  139. String url = "";
  140. OrderPushBean orderPushBean = new OrderPushBean();
  141. orderPushBean.setId(hm.get("ID").toString());
  142. orderPushBean.setUserid(hm.get("USERID").toString());
  143. orderPushBean.setSpid(hm.get("SPID").toString());
  144. orderPushBean.setResultcode("2");//处理结果编码,0处理成功,1待处理,2处理中
  145. orderPushBean.setResultinfo("正在处理中");
  146. String status = "";
  147. try {
  148. List<HashMap> OrderRela = orderPushDao.findOrderRelaBySpid(orderPushBean.getUserid(), orderPushBean.getSpid());
  149. for (HashMap hashMap : OrderRela) {
  150. status = hashMap.get("STATUS").toString();
  151. }
  152. if("0".equals(status) || "1".equals(status)){
  153. orderPushDao.updatePush(orderPushBean.getId(), orderPushBean.getResultcode(), orderPushBean.getResultinfo());
  154. String phone = URLEncoder.encode(orderPushBean.getUserid(), "utf-8");
  155. String orderPushfmrowsURL = dictionaryDao.getValue("OrderPushfmrowsURL");
  156. //url = "http://111.206.134.43:809/caporder/queryCustom?phone=15541405220"
  157. //http://172.16.1.81:809/caporder/queryCustom?phone=15541405220
  158. url = orderPushfmrowsURL+"?phone="+ phone;
  159. String spid = orderPushBean.getSpid();
  160. String vaccproductId = orderPushDao.getVaccproductId(spid);
  161. if(vaccproductId != null){
  162. boolean flag = isProductId(url, vaccproductId);
  163. if(flag){
  164. logger.info("有订购关系的SPID,"+spid+",phone,"+phone);
  165. orderPushBean.setResultinfo("处理完成,有订购关系!");
  166. orderPushBean.setResultcode("1");
  167. orderPushBean.setRealflag("0");
  168. orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status);
  169. }else{
  170. logger.info("无订购关系的SPID,"+spid+",phone,"+phone);
  171. orderPushBean.setResultinfo("处理完成,无订购关系!");
  172. orderPushBean.setResultcode("1");
  173. orderPushBean.setRealflag("1");
  174. orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status);
  175. }
  176. }else {
  177. logger.info("无订购关系的SPID,"+spid+",phone,"+phone+",TB_SP_INFO无配置vacproductId");
  178. orderPushBean.setResultinfo("处理完成,TB_SP_INFO无配置vacproductId!");
  179. orderPushBean.setResultcode("1");
  180. orderPushBean.setRealflag("2");
  181. orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status);
  182. }}else {
  183. logger.info("无订购关系的SPID,"+orderPushBean.getSpid()+",phone,"+orderPushBean.getUserid()+",退订已失效");
  184. orderPushBean.setResultinfo("处理完成,失效已退订");
  185. orderPushBean.setResultcode("1");
  186. orderPushBean.setRealflag("2");
  187. orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status);
  188. }
  189. } catch (Exception e) {
  190. logger.info("发生异常,"+e.getMessage()+",异常的数据id,"+id);
  191. logger.error(e);
  192. try {
  193. resultcode = "3";
  194. errorinfo = "处理失败"+e.getMessage();
  195. realflag = "2";
  196. orderPushDao.updatePushAll(id,resultcode,errorinfo,realflag,null);
  197. } catch (Exception ee) {
  198. ee.printStackTrace();
  199. }
  200. e.printStackTrace();
  201. }finally {
  202. threadSignal.countDown();
  203. }
  204. }
  205. /**
  206. * 调接口看有无返回的productId 与 传入的vacproductId 比对,有则有订购关系,无则没有
  207. * @param url 调接口的连接
  208. * @param vacproductId 根据spid 查询TB_SP_INFO表出来的vacproductId
  209. * @return
  210. */
  211. private static boolean isProductId(String url,String vacproductId ){
  212. Logger logger = Logger.getLogger("OrderPush");
  213. boolean flag = false;
  214. String data = "";
  215. try {
  216. data = URLUtil.get(url);
  217. Map map = JSONObject.parseObject(data, Map.class);
  218. Map infoMessage = (Map) map.get("data");
  219. if( !infoMessage.isEmpty()){
  220. List message = (List) infoMessage.get("DATA");
  221. if(message != null && message.size() > 0) {
  222. for (Object info : message) {
  223. Map infoMap = JSONObject.parseObject(String.valueOf(info), Map.class);
  224. String productId = infoMap.get("PRODUCT_ID").toString();
  225. logger.info("vacproductId,"+vacproductId+" productId,"+productId);
  226. if(productId.equals(vacproductId)){
  227. flag = true;
  228. break;
  229. }else {
  230. flag = false;
  231. }
  232. }
  233. }}else {
  234. flag = false;
  235. }
  236. } catch (Exception e) {
  237. flag = false;
  238. logger.info("异常发生在isProductId方法中,"+e.getMessage());
  239. e.printStackTrace();
  240. }
  241. return flag;
  242. }
  243. }