123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- package com.chinacreator.process.job;
- import com.alibaba.fastjson.JSONObject;
- import com.chinacreator.process.bean.OrderPushBean;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.dao.OrderPushDao;
- import com.chinacreator.process.util.URLUtil;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.util.StringUtils;
- import java.net.URLEncoder;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.*;
- /**
- * @author 黑猫警长
- * @create 2021/3/30 9:33
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class OrderPushJob {
- private static Logger logger = Logger.getLogger("OrderPush");
- @Autowired
- private OrderPushDao orderPushDao;
- @Autowired
- private DictionaryDao dictionaryDao;
- public void doProcess() throws Exception {
- logger.info(Thread.currentThread().getName() + "会员匹配数据定时任务开始");
- long beginTime = System.currentTimeMillis();//开始时间
- int rows = 800; //每次取数据条数
- String OrderPushfmrows = dictionaryDao.getValue("OrderPushfmrows");
- if (!StringUtils.isEmpty(OrderPushfmrows)) {
- try {
- rows = Integer.parseInt(OrderPushfmrows);
- } catch (Exception e) {
- rows = 800;
- }
- }
- //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
- String partition = "T_HASH_P";
- for (int i = 1; i <= 50; i++) {
- partition = "T_HASH_P";
- try {
- if (i < 10) {
- partition = partition + "0" + i;
- } else {
- partition = partition + i;
- }
- //查询会员订购关系,根据分区查询
- List<HashMap> list = orderPushDao.getOrderData(partition, rows);
- logger.info(partition + ",用户数:" + (list != null ? list.size() : "0"));
- if (list != null && list.size() > 0) {
- paraseData(list);
- logger.info(partition + ",去重复后用户数:" + list.size());
- CountDownLatch threadSignal = new CountDownLatch(list.size());//一个线程等待其他线程各自执行完毕后再执行。
- ExecutorService executorService = new ThreadPoolExecutor(10, 15, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- //把数据更新为正在处理状态
- for (HashMap hm : list) {
- orderPushDao.updatePush(hm.get("ID").toString(), "2", null);
- }
- //处理数据
- for (HashMap hm : list) {
- OrderPushService continueService = new OrderPushService(list.size(),threadSignal, orderPushDao, dictionaryDao,hm);
- executorService.execute(continueService);
- }
- logger.info("ok,"+partition+"处理完成");
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (Exception e) {
- logger.info(partition + ",执行出现异常," + e.getMessage());
- e.printStackTrace();//Cannot get a connection, pool error Timeout waiting for idle object
- }
- Thread.sleep(100);
- }
- logger.info(Thread.currentThread().getName() + "定时任务完成,耗时:" + (System.currentTimeMillis() - beginTime) / 1000 + " 秒");
- }
- /**
- * 去重复数据(userid+spid)
- *
- * @param dataList
- * @return
- */
- private void paraseData(List<HashMap> dataList) {
- if (dataList == null || dataList.size() == 0) {
- return;
- }
- HashMap<String, Object> reData = new HashMap<String, Object>();
- String userid = "";
- String spid = "";
- HashMap dataMap = null;
- for (int i = 0; i < dataList.size(); i++) {
- dataMap = dataList.get(i);
- userid = dataMap.get("USERID").toString();
- spid = dataMap.get("SPID").toString();
- if (reData.containsKey(userid + spid)) {
- dataList.remove(i);
- i--;
- } else {
- reData.put(userid, dataMap);
- }
- }
- }
- /* public static void main(String[] args) {
- OrderPushJob job = new OrderPushJob();
- try {
- job.doProcess();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }*/
- }
- class OrderPushService implements Runnable {
- private Logger logger = Logger.getLogger("OrderPush");
- private int totalSize;
- private CountDownLatch threadSignal;
- private OrderPushDao orderPushDao;
- private DictionaryDao dictionaryDao;
- private HashMap hm; //用户数据,ID和USERID
- public OrderPushService(int totalSize, CountDownLatch threadSignal, OrderPushDao orderPushDao, DictionaryDao dictionaryDao, HashMap hm) {
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- this.orderPushDao = orderPushDao;
- this.dictionaryDao = dictionaryDao;
- this.hm = hm;
- }
- @Override
- public void run() {
- String resultcode = "";//处理结果编码,0处理成功,1待处理,2处理中
- String errorinfo = "";
- String realflag = "";//是否有订购关系标识,0有,1无,其他失败
- String id = hm.get("ID").toString();
- String url = "";
- OrderPushBean orderPushBean = new OrderPushBean();
- orderPushBean.setId(hm.get("ID").toString());
- orderPushBean.setUserid(hm.get("USERID").toString());
- orderPushBean.setSpid(hm.get("SPID").toString());
- orderPushBean.setResultcode("2");//处理结果编码,0处理成功,1待处理,2处理中
- orderPushBean.setResultinfo("正在处理中");
- String status = "";
- try {
- List<HashMap> OrderRela = orderPushDao.findOrderRelaBySpid(orderPushBean.getUserid(), orderPushBean.getSpid());
- for (HashMap hashMap : OrderRela) {
- status = hashMap.get("STATUS").toString();
- }
- if("0".equals(status) || "1".equals(status)){
- orderPushDao.updatePush(orderPushBean.getId(), orderPushBean.getResultcode(), orderPushBean.getResultinfo());
- String phone = URLEncoder.encode(orderPushBean.getUserid(), "utf-8");
- String orderPushfmrowsURL = dictionaryDao.getValue("OrderPushfmrowsURL");
- //url = "http://111.206.134.43:809/caporder/queryCustom?phone=15541405220"
- //http://172.16.1.81:809/caporder/queryCustom?phone=15541405220
- url = orderPushfmrowsURL+"?phone="+ phone;
- String spid = orderPushBean.getSpid();
- String vaccproductId = orderPushDao.getVaccproductId(spid);
- if(vaccproductId != null){
- boolean flag = isProductId(url, vaccproductId);
- if(flag){
- logger.info("有订购关系的SPID,"+spid+",phone,"+phone);
- orderPushBean.setResultinfo("处理完成,有订购关系!");
- orderPushBean.setResultcode("1");
- orderPushBean.setRealflag("0");
- orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status);
- }else{
- logger.info("无订购关系的SPID,"+spid+",phone,"+phone);
- orderPushBean.setResultinfo("处理完成,无订购关系!");
- orderPushBean.setResultcode("1");
- orderPushBean.setRealflag("1");
- orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status);
- }
- }else {
- logger.info("无订购关系的SPID,"+spid+",phone,"+phone+",TB_SP_INFO无配置vacproductId");
- orderPushBean.setResultinfo("处理完成,TB_SP_INFO无配置vacproductId!");
- orderPushBean.setResultcode("1");
- orderPushBean.setRealflag("2");
- orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status);
- }}else {
- logger.info("无订购关系的SPID,"+orderPushBean.getSpid()+",phone,"+orderPushBean.getUserid()+",退订已失效");
- orderPushBean.setResultinfo("处理完成,失效已退订");
- orderPushBean.setResultcode("1");
- orderPushBean.setRealflag("2");
- orderPushDao.updatePushAll(id,orderPushBean.getResultcode(),orderPushBean.getResultinfo(),orderPushBean.getRealflag(),status);
- }
- } catch (Exception e) {
- logger.info("发生异常,"+e.getMessage()+",异常的数据id,"+id);
- logger.error(e);
- try {
- resultcode = "3";
- errorinfo = "处理失败"+e.getMessage();
- realflag = "2";
- orderPushDao.updatePushAll(id,resultcode,errorinfo,realflag,null);
- } catch (Exception ee) {
- ee.printStackTrace();
- }
- e.printStackTrace();
- }finally {
- threadSignal.countDown();
- }
- }
- /**
- * 调接口看有无返回的productId 与 传入的vacproductId 比对,有则有订购关系,无则没有
- * @param url 调接口的连接
- * @param vacproductId 根据spid 查询TB_SP_INFO表出来的vacproductId
- * @return
- */
- private static boolean isProductId(String url,String vacproductId ){
- Logger logger = Logger.getLogger("OrderPush");
- boolean flag = false;
- String data = "";
- try {
- data = URLUtil.get(url);
- Map map = JSONObject.parseObject(data, Map.class);
- Map infoMessage = (Map) map.get("data");
- if( !infoMessage.isEmpty()){
- List message = (List) infoMessage.get("DATA");
- if(message != null && message.size() > 0) {
- for (Object info : message) {
- Map infoMap = JSONObject.parseObject(String.valueOf(info), Map.class);
- String productId = infoMap.get("PRODUCT_ID").toString();
- logger.info("vacproductId,"+vacproductId+" productId,"+productId);
- if(productId.equals(vacproductId)){
- flag = true;
- break;
- }else {
- flag = false;
- }
- }
- }}else {
- flag = false;
- }
- } catch (Exception e) {
- flag = false;
- logger.info("异常发生在isProductId方法中,"+e.getMessage());
- e.printStackTrace();
- }
- return flag;
- }
- }
|