package com.chinacreator.process.job; import com.alibaba.fastjson.JSONObject; import com.chinacreator.process.bean.OrderPushBean; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.dao.OrderPushDao; import com.chinacreator.process.util.URLUtil; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.StringUtils; import java.net.URLEncoder; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; /** * @author 黑猫警长 * @create 2021/3/30 9:33 */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class OrderPushJob { private static Logger logger = Logger.getLogger("OrderPush"); @Autowired private OrderPushDao orderPushDao; @Autowired private DictionaryDao dictionaryDao; public void doProcess() throws Exception { logger.info(Thread.currentThread().getName() + "会员匹配数据定时任务开始"); long beginTime = System.currentTimeMillis();//开始时间 int rows = 800; //每次取数据条数 String OrderPushfmrows = dictionaryDao.getValue("OrderPushfmrows"); if (!StringUtils.isEmpty(OrderPushfmrows)) { try { rows = Integer.parseInt(OrderPushfmrows); } catch (Exception e) { rows = 800; } } //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50 String partition = "T_HASH_P"; for (int i = 1; i <= 50; i++) { partition = "T_HASH_P"; try { if (i < 10) { partition = partition + "0" + i; } else { partition = partition + i; } //查询会员订购关系,根据分区查询 List list = orderPushDao.getOrderData(partition, rows); logger.info(partition + ",用户数:" + (list != null ? list.size() : "0")); if (list != null && list.size() > 0) { paraseData(list); logger.info(partition + ",去重复后用户数:" + list.size()); CountDownLatch threadSignal = new CountDownLatch(list.size());//一个线程等待其他线程各自执行完毕后再执行。 ExecutorService executorService = new ThreadPoolExecutor(10, 15, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); //把数据更新为正在处理状态 for (HashMap hm : list) { orderPushDao.updatePush(hm.get("ID").toString(), "2", null); } //处理数据 for (HashMap hm : list) { OrderPushService continueService = new OrderPushService(list.size(),threadSignal, orderPushDao, dictionaryDao,hm); executorService.execute(continueService); } logger.info("ok,"+partition+"处理完成"); executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { logger.info(partition + ",执行出现异常," + e.getMessage()); e.printStackTrace();//Cannot get a connection, pool error Timeout waiting for idle object } Thread.sleep(100); } logger.info(Thread.currentThread().getName() + "定时任务完成,耗时:" + (System.currentTimeMillis() - beginTime) / 1000 + " 秒"); } /** * 去重复数据(userid+spid) * * @param dataList * @return */ private void paraseData(List dataList) { if (dataList == null || dataList.size() == 0) { return; } HashMap reData = new HashMap(); String userid = ""; String spid = ""; HashMap dataMap = null; for (int i = 0; i < dataList.size(); i++) { dataMap = dataList.get(i); userid = dataMap.get("USERID").toString(); spid = dataMap.get("SPID").toString(); if (reData.containsKey(userid + spid)) { dataList.remove(i); i--; } else { reData.put(userid, dataMap); } } } /* public static void main(String[] args) { OrderPushJob job = new OrderPushJob(); try { job.doProcess(); } catch (Exception e) { e.printStackTrace(); } }*/ } class OrderPushService implements Runnable { private Logger logger = Logger.getLogger("OrderPush"); private int totalSize; private CountDownLatch threadSignal; private OrderPushDao orderPushDao; private DictionaryDao dictionaryDao; private HashMap hm; //用户数据,ID和USERID public OrderPushService(int totalSize, CountDownLatch threadSignal, OrderPushDao orderPushDao, DictionaryDao dictionaryDao, HashMap hm) { this.totalSize = totalSize; this.threadSignal = threadSignal; this.orderPushDao = orderPushDao; this.dictionaryDao = dictionaryDao; this.hm = hm; } @Override public void run() { String resultcode = "";//处理结果编码,0处理成功,1待处理,2处理中 String errorinfo = ""; String realflag = "";//是否有订购关系标识,0有,1无,其他失败 String id = hm.get("ID").toString(); String url = ""; OrderPushBean orderPushBean = new OrderPushBean(); orderPushBean.setId(hm.get("ID").toString()); orderPushBean.setUserid(hm.get("USERID").toString()); orderPushBean.setSpid(hm.get("SPID").toString()); orderPushBean.setResultcode("2");//处理结果编码,0处理成功,1待处理,2处理中 orderPushBean.setResultinfo("正在处理中"); String status = ""; try { List OrderRela = orderPushDao.findOrderRelaBySpid(orderPushBean.getUserid(), orderPushBean.getSpid()); for (HashMap hashMap : OrderRela) { status = hashMap.get("STATUS").toString(); } if("0".equals(status) || "1".equals(status)){ orderPushDao.updatePush(orderPushBean.getId(), orderPushBean.getResultcode(), orderPushBean.getResultinfo()); String phone = URLEncoder.encode(orderPushBean.getUserid(), "utf-8"); String orderPushfmrowsURL = dictionaryDao.getValue("OrderPushfmrowsURL"); //url = "http://111.206.134.43:809/caporder/queryCustom?phone=15541405220" //http://172.16.1.81:809/caporder/queryCustom?phone=15541405220 url = orderPushfmrowsURL+"?phone="+ phone; String spid = orderPushBean.getSpid(); String vaccproductId = orderPushDao.getVaccproductId(spid); if(vaccproductId != null){ boolean flag = isProductId(url, vaccproductId); if(flag){ logger.info("有订购关系的SPID,"+spid+",phone,"+phone); orderPushBean.setResultinfo("处理完成,有订购关系!"); orderPushBean.setResultcode("1"); orderPushBean.setRealflag("0"); orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status); }else{ logger.info("无订购关系的SPID,"+spid+",phone,"+phone); orderPushBean.setResultinfo("处理完成,无订购关系!"); orderPushBean.setResultcode("1"); orderPushBean.setRealflag("1"); orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status); } }else { logger.info("无订购关系的SPID,"+spid+",phone,"+phone+",TB_SP_INFO无配置vacproductId"); orderPushBean.setResultinfo("处理完成,TB_SP_INFO无配置vacproductId!"); orderPushBean.setResultcode("1"); orderPushBean.setRealflag("2"); orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status); }}else { logger.info("无订购关系的SPID,"+orderPushBean.getSpid()+",phone,"+orderPushBean.getUserid()+",退订已失效"); orderPushBean.setResultinfo("处理完成,失效已退订"); orderPushBean.setResultcode("1"); orderPushBean.setRealflag("2"); orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status); } } catch (Exception e) { logger.info("发生异常,"+e.getMessage()+",异常的数据id,"+id); logger.error(e); try { resultcode = "3"; errorinfo = "处理失败"+e.getMessage(); realflag = "2"; orderPushDao.updatePushAll(id,resultcode,errorinfo,realflag,null); } catch (Exception ee) { ee.printStackTrace(); } e.printStackTrace(); }finally { threadSignal.countDown(); } } /** * 调接口看有无返回的productId 与 传入的vacproductId 比对,有则有订购关系,无则没有 * @param url 调接口的连接 * @param vacproductId 根据spid 查询TB_SP_INFO表出来的vacproductId * @return */ private static boolean isProductId(String url,String vacproductId ){ Logger logger = Logger.getLogger("OrderPush"); boolean flag = false; String data = ""; try { data = URLUtil.get(url); Map map = JSONObject.parseObject(data, Map.class); Map infoMessage = (Map) map.get("data"); if( !infoMessage.isEmpty()){ List message = (List) infoMessage.get("DATA"); if(message != null && message.size() > 0) { for (Object info : message) { Map infoMap = JSONObject.parseObject(String.valueOf(info), Map.class); String productId = infoMap.get("PRODUCT_ID").toString(); logger.info("vacproductId,"+vacproductId+" productId,"+productId); if(productId.equals(vacproductId)){ flag = true; break; }else { flag = false; } } }}else { flag = false; } } catch (Exception e) { flag = false; logger.info("异常发生在isProductId方法中,"+e.getMessage()); e.printStackTrace(); } return flag; } }