123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- package com.chinacreator.process.job;
- import com.chinacreator.common.exception.BusinessException;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.service.QueryPrmDataService;
- import org.apache.commons.lang.StringUtils;
- import org.apache.commons.lang.time.DateUtils;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import java.sql.SQLException;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- import java.util.List;
- import java.util.concurrent.*;
- /**
- * @author yu.su
- * C+PRM前向产品用户量数据传输接口
- * @date 20201102
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class QueryPrmDataJob {
-
- private static Logger logger = Logger.getLogger("queryPrm");
- @Autowired
- private QueryPrmDataService queryPrmDataService;
- @Autowired
- private DictionaryDao dictionaryDao;
- public void doProcess() throws Exception {
- System.out.println("查询C+PRM前向产品用户量数据开始启动");
- SimpleDateFormat sdfDay = new SimpleDateFormat("yyyy-MM-dd");
- SimpleDateFormat sdfMonth = new SimpleDateFormat("yyyy-MM");
- String yesterday = sdfDay.format(DateUtils.addDays(new Date(), -1));
- String spidStr = dictionaryDao.getValue("cPrmParam");
- logger.info("spid===="+spidStr);
- if (StringUtils.isEmpty(spidStr)){
- logger.info("字典表未配置spid");
- throw new BusinessException("9001","字典表未配置SPID");
- }
- String[] spidList = spidStr.trim().split(",");
- for (String spid : spidList) {
- logger.info("=======开始查询");
- //获取昨天新增数据量
- Integer dayNum = queryPrmDataService.getDayNub(spid);
- logger.info("昨天新增数据量:======" + dayNum);
- queryPrmDataService.insertPrm(dayNum, yesterday, "0", "0",spid.replaceAll("'",""));
- //获取当月新增用户
- Integer monthNub = queryPrmDataService.geMonthNub(spid);
- logger.info("获取当月新增用户数据量:======" + monthNub);
- queryPrmDataService.insertPrm(monthNub, yesterday, "1", "0",spid.replaceAll("'",""));
- // 当前用户存量
- Integer total = queryPrmDataService.geTotal(spid);
- logger.info("当前用户存量:======" + total);
- queryPrmDataService.insertPrm(total, yesterday, "2", "0",spid.replaceAll("'",""));
- //获取昨天日净增
- Integer cancelNub = queryPrmDataService.getDayCancel(spid);
- Integer addNub = dayNum - cancelNub;
- logger.info("获取昨天日净增:===" + addNub);
- queryPrmDataService.insertPrm(addNub, yesterday, "0", "1",spid.replaceAll("'",""));
- //获取当月金额
- Integer monthMoney=queryPrmDataService.getmonthMoney(spid);
- queryPrmDataService.insertPrm(monthMoney, yesterday, "1", "4",spid.replaceAll("'",""));
- //获取上月净增
- if (sdfDay.format(new Date()).endsWith("01")) {
- Integer lastMonthNub = queryPrmDataService.getLastMonthNub(spid);
- Integer lastMonthCancel = queryPrmDataService.getLastMonthCancel(spid);
- Integer lastMonth = lastMonthNub - lastMonthCancel;
- logger.info("获取上月净增:===" + lastMonth);
- queryPrmDataService.insertPrm(lastMonth, sdfMonth.format(DateUtils.addMonths(new Date(), -1)), "1", "1",spid.replaceAll("'",""));
- }
- }
- //获取2元包产品的spid
- String prmSpidStr = dictionaryDao.getValue("prmSpid");
- logger.info("prmSpid========="+prmSpidStr);
- if (StringUtils.isEmpty(prmSpidStr)){
- logger.info("字典表未配置2元包产品Spid");
- throw new BusinessException("9001","字典表未配置2元包产品Spid");
- }
- String[] parSpidList = prmSpidStr.trim().split(",");
- //获取当前时间的日志分区
- String parName = queryPrmDataService.getParName();
- for (String prmSpid : parSpidList) {
- //获取昨日订购量
- Integer orderNubDay=queryPrmDataService.getOrderNubDay(prmSpid,parName);
- queryPrmDataService.insertPrm(orderNubDay,yesterday,"0","3",prmSpid.replaceAll("'",""));
- logger.info("昨日订购量==========="+orderNubDay);
- //获取当月订购量
- Integer orderNubMonth=queryPrmDataService.getorderNubMonth(prmSpid,parName);
- queryPrmDataService.insertPrm(orderNubMonth,yesterday,"1","3",prmSpid.replaceAll("'",""));
- logger.info("当月订购量==========="+orderNubDay);
- }
- //获取当前累计订购量
- this.totalPrmOrder(prmSpidStr,yesterday);
- }
- /**
- * 插入当前累计订购量
- * @param prmSpid
- */
- public void totalPrmOrder(String prmSpid,String yesterday) throws SQLException {
- //获取日志表所有分区
- List<String> parNameList=queryPrmDataService.getParNameList();
- CountDownLatch threadSignal=new CountDownLatch(parNameList.size());
- ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- for (String parName : parNameList) {
- PrmOrderBean prmOrderBean = new PrmOrderBean(parNameList.size(), threadSignal, parName, prmSpid, yesterday);
- executorService.execute(prmOrderBean);
- }
- executorService.shutdown();
- }
- class PrmOrderBean implements Runnable{
- private Integer totalSize;
- private CountDownLatch threadSignal;
- private String parName;
- private String spid;
- private String yesterday;
- public PrmOrderBean(Integer totalSize, CountDownLatch threadSignal, String parName, String spid, String yesterday) {
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- this.parName = parName;
- this.spid = spid;
- this.yesterday = yesterday;
- }
- @Override
- public void run() {
- try {
- String[] spidList = spid.trim().split(",");
- for (String spidPrm : spidList) {
- Integer orderTotal = queryPrmDataService.getOrderTotal(spidPrm,parName);
- logger.info(parName+"==="+totalSize+"/"+(totalSize-threadSignal.getCount())+"==="+spidPrm+"===查询出的数据量=="+orderTotal);
- queryPrmDataService.insertOrderPrm(orderTotal,yesterday,"2","3",spidPrm.replaceAll("'",""));
- }
- } catch (SQLException e) {
- logger.info(parName+"====出错==="+e.getMessage());
- e.printStackTrace();
- }finally {
- threadSignal.countDown();
- }
- }
- }
- }
|