package com.chinacreator.process.job; import com.chinacreator.common.exception.BusinessException; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.service.QueryPrmDataService; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.concurrent.*; /** * @author yu.su * C+PRM前向产品用户量数据传输接口 * @date 20201102 */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class QueryPrmDataJob { private static Logger logger = Logger.getLogger("queryPrm"); @Autowired private QueryPrmDataService queryPrmDataService; @Autowired private DictionaryDao dictionaryDao; public void doProcess() throws Exception { System.out.println("查询C+PRM前向产品用户量数据开始启动"); SimpleDateFormat sdfDay = new SimpleDateFormat("yyyy-MM-dd"); SimpleDateFormat sdfMonth = new SimpleDateFormat("yyyy-MM"); String yesterday = sdfDay.format(DateUtils.addDays(new Date(), -1)); String spidStr = dictionaryDao.getValue("cPrmParam"); logger.info("spid===="+spidStr); if (StringUtils.isEmpty(spidStr)){ logger.info("字典表未配置spid"); throw new BusinessException("9001","字典表未配置SPID"); } String[] spidList = spidStr.trim().split(","); for (String spid : spidList) { logger.info("=======开始查询"); //获取昨天新增数据量 Integer dayNum = queryPrmDataService.getDayNub(spid); logger.info("昨天新增数据量:======" + dayNum); queryPrmDataService.insertPrm(dayNum, yesterday, "0", "0",spid.replaceAll("'","")); //获取当月新增用户 Integer monthNub = queryPrmDataService.geMonthNub(spid); logger.info("获取当月新增用户数据量:======" + monthNub); queryPrmDataService.insertPrm(monthNub, yesterday, "1", "0",spid.replaceAll("'","")); // 当前用户存量 Integer total = queryPrmDataService.geTotal(spid); logger.info("当前用户存量:======" + total); queryPrmDataService.insertPrm(total, yesterday, "2", "0",spid.replaceAll("'","")); //获取昨天日净增 Integer cancelNub = queryPrmDataService.getDayCancel(spid); Integer addNub = dayNum - cancelNub; logger.info("获取昨天日净增:===" + addNub); queryPrmDataService.insertPrm(addNub, yesterday, "0", "1",spid.replaceAll("'","")); //获取当月金额 Integer monthMoney=queryPrmDataService.getmonthMoney(spid); queryPrmDataService.insertPrm(monthMoney, yesterday, "1", "4",spid.replaceAll("'","")); //获取上月净增 if (sdfDay.format(new Date()).endsWith("01")) { Integer lastMonthNub = queryPrmDataService.getLastMonthNub(spid); Integer lastMonthCancel = queryPrmDataService.getLastMonthCancel(spid); Integer lastMonth = lastMonthNub - lastMonthCancel; logger.info("获取上月净增:===" + lastMonth); queryPrmDataService.insertPrm(lastMonth, sdfMonth.format(DateUtils.addMonths(new Date(), -1)), "1", "1",spid.replaceAll("'","")); } } //获取2元包产品的spid String prmSpidStr = dictionaryDao.getValue("prmSpid"); logger.info("prmSpid========="+prmSpidStr); if (StringUtils.isEmpty(prmSpidStr)){ logger.info("字典表未配置2元包产品Spid"); throw new BusinessException("9001","字典表未配置2元包产品Spid"); } String[] parSpidList = prmSpidStr.trim().split(","); //获取当前时间的日志分区 String parName = queryPrmDataService.getParName(); for (String prmSpid : parSpidList) { //获取昨日订购量 Integer orderNubDay=queryPrmDataService.getOrderNubDay(prmSpid,parName); queryPrmDataService.insertPrm(orderNubDay,yesterday,"0","3",prmSpid.replaceAll("'","")); logger.info("昨日订购量==========="+orderNubDay); //获取当月订购量 Integer orderNubMonth=queryPrmDataService.getorderNubMonth(prmSpid,parName); queryPrmDataService.insertPrm(orderNubMonth,yesterday,"1","3",prmSpid.replaceAll("'","")); logger.info("当月订购量==========="+orderNubDay); } //获取当前累计订购量 this.totalPrmOrder(prmSpidStr,yesterday); } /** * 插入当前累计订购量 * @param prmSpid */ public void totalPrmOrder(String prmSpid,String yesterday) throws SQLException { //获取日志表所有分区 List parNameList=queryPrmDataService.getParNameList(); CountDownLatch threadSignal=new CountDownLatch(parNameList.size()); ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); for (String parName : parNameList) { PrmOrderBean prmOrderBean = new PrmOrderBean(parNameList.size(), threadSignal, parName, prmSpid, yesterday); executorService.execute(prmOrderBean); } executorService.shutdown(); } class PrmOrderBean implements Runnable{ private Integer totalSize; private CountDownLatch threadSignal; private String parName; private String spid; private String yesterday; public PrmOrderBean(Integer totalSize, CountDownLatch threadSignal, String parName, String spid, String yesterday) { this.totalSize = totalSize; this.threadSignal = threadSignal; this.parName = parName; this.spid = spid; this.yesterday = yesterday; } @Override public void run() { try { String[] spidList = spid.trim().split(","); for (String spidPrm : spidList) { Integer orderTotal = queryPrmDataService.getOrderTotal(spidPrm,parName); logger.info(parName+"==="+totalSize+"/"+(totalSize-threadSignal.getCount())+"==="+spidPrm+"===查询出的数据量=="+orderTotal); queryPrmDataService.insertOrderPrm(orderTotal,yesterday,"2","3",spidPrm.replaceAll("'","")); } } catch (SQLException e) { logger.info(parName+"====出错==="+e.getMessage()); e.printStackTrace(); }finally { threadSignal.countDown(); } } } }