123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- package com.chinacreator.process.job;
- import java.math.BigDecimal;
- import java.math.RoundingMode;
- import java.sql.SQLException;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.util.StringUtils;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.chinacreator.common.exception.BusinessException;
- import com.chinacreator.process.bean.KuaishouPushBean;
- import com.chinacreator.process.bean.TdActiveclientRecBean;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.dao.KuaishouDao;
- import com.chinacreator.process.dao.KuaishouFlowMonthDao;
- import com.chinacreator.process.util.HttpInvoke;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.WriteLogUtil;
- /**
- * 快手月末查流量
- * @author xu.zhou
- * @date 20220708
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class KuaishouFlowMonthPart2Job {
- private static Logger logger = Logger.getLogger("kuaishouFlowMonth");
-
- @Autowired
- private KuaishouFlowMonthDao kuaishouDao; // = new KuaishouFlowMonthDao();
-
- @Autowired
- private DictionaryDao dictionaryDao; // = new DictionaryDao();
-
- public void doProcess() throws Exception {
- //执行开始时间
- long beginTime = System.currentTimeMillis();
- int startpart = 26;
- int endpart = 50;
- String jobname = "KuaishouFlowMonthPart2Job";
- try {
- WriteLogUtil.writeLong(jobname+"定时任务开始", logger, jobname);
- int count = 0;
- int rows = 800; //每次取数据条数
- String confrows = dictionaryDao.getValue("kuaishouflowrows");
- if (!StringUtils.isEmpty(confrows)) {
- try {
- rows = Integer.parseInt(confrows);
- } catch (Exception e) {
- rows = 800;
- }
- }
- //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
- String partition = "T_HASH_P";
- for(int i = startpart; i <= endpart; i++){
- count = 0;
- partition = "T_HASH_P";
- try {
- if(i < 10){
- partition = partition + "0" + i;
- }else{
- partition = partition + i;
- }
-
- //按分区标识获取订购数据
- List<HashMap> dataList = kuaishouDao.getFlowMonthByPart(partition,rows);
- count = (dataList != null ? dataList.size() : 0);
- if(dataList != null && dataList.size() > 0){
- dataList = paraseData(dataList); //去重复数据
- logger.info(partition+",去重复前用户数:"+count+",去重复后用户数:"+dataList.size());
-
- CountDownLatch threadSignal = new CountDownLatch(dataList.size());
- ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- //把数据更新为正在处理状态,执行次数加1
- for(HashMap hm : dataList){
- boolean res = kuaishouDao.updFlowMonthExecing(hm.get("ID").toString());
- }
- for(HashMap hm : dataList){
- KuaishouFlowMonthService continueService = new KuaishouFlowMonthService(dataList.size(), threadSignal, hm, kuaishouDao, jobname);
- executorService.execute(continueService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (Exception e) {
- logger.info(partition+",执行出现异常,"+e.getMessage());
- e.printStackTrace();
- }
-
- //每取一轮数据,休眠100毫秒
- Thread.sleep(100);
-
- //更新执行状态为2且执行时间超过50分钟的数据为待处理
- int res = kuaishouDao.updExecTimeout(partition);
- if(res > 0){
- logger.info("更新状态为2处理中且执行时间超过50分钟的数据为待处理的异常数据条数【"+res+"】");
- }
- //初始化新增数据的FLOWCALCULATEDATE值
- res = kuaishouDao.updFlowMonthInit(partition);
- if(res > 0){
- logger.info("初始化新增数据的FLOWCALCULATEDATE值的数据条数【"+res+"】");
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- logger.info(jobname+"定时任务分区2执行出现异常,"+e.getMessage());
- } finally {
- WriteLogUtil.writeLong(jobname+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", logger, jobname);
- }
- }
-
-
- /**
- * 按手机号码去除重复数据,确保获取的同一组数据里手机号码不相同,防止同一手机号码处理并发而造成处理异常
- * @param dataList
- * @return
- */
- private List<HashMap> paraseData(List<HashMap> dataList){
- //去重复后的数据集
- List<HashMap> reDataList = new ArrayList<HashMap>();
- HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
- for (HashMap dataMap : dataList) {
- if(tmpMap.containsKey(dataMap.get("USERID"))){
- logger.info("重复数据,"+dataMap);
- }else{
- tmpMap.put(dataMap.get("USERID"), dataMap);
- reDataList.add(dataMap);
- }
- }
- return reDataList;
- }
-
- public static void main(String[] args) {
- KuaishouFlowMonthPart2Job job = new KuaishouFlowMonthPart2Job();
- try {
- job.doProcess();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
|