0f86e6ab521e0190cb559f0b1c5473e47988daad.svn-base 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package com.chinacreator.process.job;
  2. import com.chinacreator.common.exception.BusinessException;
  3. import com.chinacreator.process.dao.DictionaryDao;
  4. import com.chinacreator.process.service.QueryPrmDataService;
  5. import org.apache.commons.lang.StringUtils;
  6. import org.apache.commons.lang.time.DateUtils;
  7. import org.apache.log4j.Logger;
  8. import org.quartz.DisallowConcurrentExecution;
  9. import org.quartz.PersistJobDataAfterExecution;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import java.sql.SQLException;
  12. import java.text.SimpleDateFormat;
  13. import java.util.Date;
  14. import java.util.List;
  15. import java.util.concurrent.*;
  16. /**
  17. * @author yu.su
  18. * C+PRM前向产品用户量数据传输接口
  19. * @date 20201102
  20. */
  21. @PersistJobDataAfterExecution
  22. @DisallowConcurrentExecution
  23. public class QueryPrmDataJob {
  24. private static Logger logger = Logger.getLogger("queryPrm");
  25. @Autowired
  26. private QueryPrmDataService queryPrmDataService;
  27. @Autowired
  28. private DictionaryDao dictionaryDao;
  29. public void doProcess() throws Exception {
  30. System.out.println("查询C+PRM前向产品用户量数据开始启动");
  31. SimpleDateFormat sdfDay = new SimpleDateFormat("yyyy-MM-dd");
  32. SimpleDateFormat sdfMonth = new SimpleDateFormat("yyyy-MM");
  33. String yesterday = sdfDay.format(DateUtils.addDays(new Date(), -1));
  34. String spidStr = dictionaryDao.getValue("cPrmParam");
  35. logger.info("spid===="+spidStr);
  36. if (StringUtils.isEmpty(spidStr)){
  37. logger.info("字典表未配置spid");
  38. throw new BusinessException("9001","字典表未配置SPID");
  39. }
  40. String[] spidList = spidStr.trim().split(",");
  41. for (String spid : spidList) {
  42. logger.info("=======开始查询");
  43. //获取昨天新增数据量
  44. Integer dayNum = queryPrmDataService.getDayNub(spid);
  45. logger.info("昨天新增数据量:======" + dayNum);
  46. queryPrmDataService.insertPrm(dayNum, yesterday, "0", "0",spid.replaceAll("'",""));
  47. //获取当月新增用户
  48. Integer monthNub = queryPrmDataService.geMonthNub(spid);
  49. logger.info("获取当月新增用户数据量:======" + monthNub);
  50. queryPrmDataService.insertPrm(monthNub, yesterday, "1", "0",spid.replaceAll("'",""));
  51. // 当前用户存量
  52. Integer total = queryPrmDataService.geTotal(spid);
  53. logger.info("当前用户存量:======" + total);
  54. queryPrmDataService.insertPrm(total, yesterday, "2", "0",spid.replaceAll("'",""));
  55. //获取昨天日净增
  56. Integer cancelNub = queryPrmDataService.getDayCancel(spid);
  57. Integer addNub = dayNum - cancelNub;
  58. logger.info("获取昨天日净增:===" + addNub);
  59. queryPrmDataService.insertPrm(addNub, yesterday, "0", "1",spid.replaceAll("'",""));
  60. //获取当月金额
  61. Integer monthMoney=queryPrmDataService.getmonthMoney(spid);
  62. queryPrmDataService.insertPrm(monthMoney, yesterday, "1", "4",spid.replaceAll("'",""));
  63. //获取上月净增
  64. if (sdfDay.format(new Date()).endsWith("01")) {
  65. Integer lastMonthNub = queryPrmDataService.getLastMonthNub(spid);
  66. Integer lastMonthCancel = queryPrmDataService.getLastMonthCancel(spid);
  67. Integer lastMonth = lastMonthNub - lastMonthCancel;
  68. logger.info("获取上月净增:===" + lastMonth);
  69. queryPrmDataService.insertPrm(lastMonth, sdfMonth.format(DateUtils.addMonths(new Date(), -1)), "1", "1",spid.replaceAll("'",""));
  70. }
  71. }
  72. //获取2元包产品的spid
  73. String prmSpidStr = dictionaryDao.getValue("prmSpid");
  74. logger.info("prmSpid========="+prmSpidStr);
  75. if (StringUtils.isEmpty(prmSpidStr)){
  76. logger.info("字典表未配置2元包产品Spid");
  77. throw new BusinessException("9001","字典表未配置2元包产品Spid");
  78. }
  79. String[] parSpidList = prmSpidStr.trim().split(",");
  80. //获取当前时间的日志分区
  81. String parName = queryPrmDataService.getParName();
  82. for (String prmSpid : parSpidList) {
  83. //获取昨日订购量
  84. Integer orderNubDay=queryPrmDataService.getOrderNubDay(prmSpid,parName);
  85. queryPrmDataService.insertPrm(orderNubDay,yesterday,"0","3",prmSpid.replaceAll("'",""));
  86. logger.info("昨日订购量==========="+orderNubDay);
  87. //获取当月订购量
  88. Integer orderNubMonth=queryPrmDataService.getorderNubMonth(prmSpid,parName);
  89. queryPrmDataService.insertPrm(orderNubMonth,yesterday,"1","3",prmSpid.replaceAll("'",""));
  90. logger.info("当月订购量==========="+orderNubDay);
  91. }
  92. //获取当前累计订购量
  93. this.totalPrmOrder(prmSpidStr,yesterday);
  94. }
  95. /**
  96. * 插入当前累计订购量
  97. * @param prmSpid
  98. */
  99. public void totalPrmOrder(String prmSpid,String yesterday) throws SQLException {
  100. //获取日志表所有分区
  101. List<String> parNameList=queryPrmDataService.getParNameList();
  102. CountDownLatch threadSignal=new CountDownLatch(parNameList.size());
  103. ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  104. for (String parName : parNameList) {
  105. PrmOrderBean prmOrderBean = new PrmOrderBean(parNameList.size(), threadSignal, parName, prmSpid, yesterday);
  106. executorService.execute(prmOrderBean);
  107. }
  108. executorService.shutdown();
  109. }
  110. class PrmOrderBean implements Runnable{
  111. private Integer totalSize;
  112. private CountDownLatch threadSignal;
  113. private String parName;
  114. private String spid;
  115. private String yesterday;
  116. public PrmOrderBean(Integer totalSize, CountDownLatch threadSignal, String parName, String spid, String yesterday) {
  117. this.totalSize = totalSize;
  118. this.threadSignal = threadSignal;
  119. this.parName = parName;
  120. this.spid = spid;
  121. this.yesterday = yesterday;
  122. }
  123. @Override
  124. public void run() {
  125. try {
  126. String[] spidList = spid.trim().split(",");
  127. for (String spidPrm : spidList) {
  128. Integer orderTotal = queryPrmDataService.getOrderTotal(spidPrm,parName);
  129. logger.info(parName+"==="+totalSize+"/"+(totalSize-threadSignal.getCount())+"==="+spidPrm+"===查询出的数据量=="+orderTotal);
  130. queryPrmDataService.insertOrderPrm(orderTotal,yesterday,"2","3",spidPrm.replaceAll("'",""));
  131. }
  132. } catch (SQLException e) {
  133. logger.info(parName+"====出错==="+e.getMessage());
  134. e.printStackTrace();
  135. }finally {
  136. threadSignal.countDown();
  137. }
  138. }
  139. }
  140. }