123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- package com.chinacreator.process.job;
- import java.sql.SQLException;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.util.StringUtils;
- import com.chinacreator.common.exception.BusinessException;
- import com.chinacreator.process.bean.KuaishouPushBean;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.dao.KuaishouDao;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.WriteLogUtil;
- /**
- * 快手月初推送
- * 替换KuaishouPushMonthJob
- * 月初推送定时任务(KuaishouPushMonthNewJob)只负责把表(TD_KUAISHOU_FIRSTMONTH)添加到表TD_KAFKA_KUAISHOU_PUSH,状态为待处理(resultcode=1)
- * 每2分钟执行一次
- * @author xu.zhou
- * @date 20210323
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class KuaishouPushMonthNewJob {
- private static Logger logger = Logger.getLogger("kuaishoupushmonth");
-
- @Autowired
- private KuaishouDao kuaishouDao; // = new KuaishouDao();
-
- @Autowired
- private DictionaryDao dictionaryDao; // = new DictionaryDao();
-
- public void doProcess() throws Exception {
- //logger.info(Thread.currentThread().getName()+"KuaishouPushMonthNewJob月初快手流量未耗尽定时任务开始");
- WriteLogUtil.writeLong("KuaishouPushMonthNewJob月初快手流量未耗尽定时任务开始", logger, "KuaishouPushMonthNewJob");
- int count = 0;
- long beginTime = System.currentTimeMillis();
- int rows = 800; //每次取数据条数
- String kuaishoufmrows = dictionaryDao.getValue("kuaishoufmrows");
- if (!StringUtils.isEmpty(kuaishoufmrows)) {
- try {
- rows = Integer.parseInt(kuaishoufmrows);
- } catch (Exception e) {
- rows = 800;
- }
- }
- //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
- String partition = "T_HASH_P";
- for(int i = 1; i <= 50; i++){
- count = 0;
- partition = "T_HASH_P";
- try {
- if(i < 10){
- partition = partition + "0" + i;
- }else{
- partition = partition + i;
- }
-
- //按分区标识获取订购数据
- List<HashMap> dataList = kuaishouDao.getFirstMonthByPart(partition,rows);
- count = (dataList != null ? dataList.size() : 0);
- //logger.info(partition+",用户数:"+(list != null ? list.size() : "0"));
- if(dataList != null && dataList.size() > 0){
- //logger.info(partition+",去重复前用户数:"+list.size());
- dataList = paraseData(dataList); //去重复数据
- logger.info(partition+",去重复前用户数:"+count+",去重复后用户数:"+dataList.size());
- //推送月份
- String pushmonth = new SimpleDateFormat("yyyyMM").format(new Date());
-
- CountDownLatch threadSignal = new CountDownLatch(dataList.size());
- ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- //把数据更新为正在处理状态
- for(HashMap hm : dataList){
- boolean res = kuaishouDao.updFirstMonthExecing(hm.get("ID").toString());
- }
- for(HashMap hm : dataList){
- KuaishouPushMonthNewService continueService = new KuaishouPushMonthNewService(dataList.size(),threadSignal,hm,pushmonth,kuaishouDao);
- executorService.execute(continueService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (Exception e) {
- logger.info(partition+",执行出现异常,"+e.getMessage());
- e.printStackTrace();
- }
-
- Thread.sleep(100);
- }
- //logger.info(Thread.currentThread().getName()+"KuaishouPushMonthNewJob月初快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
- WriteLogUtil.writeLong("KuaishouPushMonthNewJob月初快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", logger, "KuaishouPushMonthNewJob");
- }
-
-
- /**
- * 按手机号码去除重复数据,确保获取的同一组数据里手机号码不相同,防止同一手机号码处理并发而造成处理异常
- * @param dataList
- * @return
- */
- private List<HashMap> paraseData(List<HashMap> dataList){
- //去重复后的数据集
- List<HashMap> reDataList = new ArrayList<HashMap>();
- HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
- for (HashMap dataMap : dataList) {
- if(tmpMap.containsKey(dataMap.get("USERID"))){
- logger.info("重复数据,"+dataMap);
- }else{
- tmpMap.put(dataMap.get("USERID"), dataMap);
- reDataList.add(dataMap);
- }
- }
- return reDataList;
- }
-
- public static void main(String[] args) {
- KuaishouPushMonthNewJob job = new KuaishouPushMonthNewJob();
- try {
- job.doProcess();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- class KuaishouPushMonthNewService implements Runnable {
- private static Logger logger = Logger.getLogger("kuaishoupushmonth");
- private int totalSize;
- private CountDownLatch threadSignal;
- private List<String> spidsList;
- private String pushmonth;
- private KuaishouDao kuaishouDao;
- private HashMap hm; //用户数据,ID和USERID
-
- public KuaishouPushMonthNewService(int totalSize,CountDownLatch threadSignal,HashMap hm,String pushmonth,KuaishouDao kuaishouDao){
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- this.hm = hm;
- this.pushmonth = pushmonth;
- this.kuaishouDao = kuaishouDao;
- }
-
- @Override
- public void run() {
- long startime = System.currentTimeMillis();
- Map logMap = new HashMap();
- logMap.put("userinfo", hm);
- logMap.put("pushmonth", pushmonth);
- String resultcode = "-1";
- String errorinfo = "";
- String realflag = ""; //添加到推送表标识,空,未推送,1订购推送,2月未推送
- String id = hm.get("ID").toString();
- String calculatedate = getCalculatedate(); //下次推送时间,一般为下月的第一天0点30分01秒
- try {
- //处理业务逻辑
- realflag = pushService(logMap);
- if(logMap.get("errorinfo") != null){
- errorinfo = logMap.get("errorinfo").toString();
- }
- //未出现异常,设置为成功
- resultcode = "0";
- if("".equals(errorinfo)){
- errorinfo = "ok";
- }
- } catch (Exception e) {
- if (e instanceof BusinessException) {
- errorinfo = ((BusinessException) e).getMessage();
- resultcode = ((BusinessException) e).getCode();
- }else{
- e.printStackTrace();
- resultcode = "8000";
- errorinfo = "处理数据出现异常,"+e.getMessage();
- }
- }finally {
- threadSignal.countDown();
- try{
- //暂未同步或者处理出现异常,恢复初始状态,下次再处理
- if("9055".equals(resultcode) || "8000".equals(resultcode)){
- kuaishouDao.updFirstMonthRc(id,"1", null, null, null);
- }else{
- resultcode = "0";
- kuaishouDao.updFirstMonthRc(id,resultcode,errorinfo, realflag, calculatedate);
- }
- }catch(Exception e){
- e.printStackTrace();
- errorinfo = "更新数据出现异常,"+e.getMessage();
- resultcode = "8001";
- }
- //写日志
- logMap.put("jobname", "KuaishouPushMonthNewJob");
- logMap.put("resultcode", resultcode);
- logMap.put("errorinfo", errorinfo);
- logMap.put("realflag", realflag);
- //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
- logger.info(JsonUtil.objectToJson(logMap));
- }
- }
-
- /**
- * 添加数据到推送表TD_KAFKA_KUAISHOU_PUSH
- * calculatedate为空,代表是新增的订购数据,处理订购推送
- * calculatedate不为空,代表是月初要推送的数据,处理月初推送
- * @param logMap
- * @throws Exception
- */
- private String pushService(Map logMap) throws Exception{
- String realflag = ""; //添加到推送表标识,空,未推送,1订购推送,2月未推送
- String userid = hm.get("USERID").toString();
- String id = hm.get("ID").toString();
- String spid = hm.get("SPID").toString();
-
- //查询SPID是否在TB_SP_AOP_CONFIG配置,因为快手流量消耗数据是根据元素ID与此表匹配得到SPID,如果SPID没有此表内,代表流量数据是不会从KAFKA推送过来,也就是这种快手订购不要触发流量耗尽或有流量的推送
- if(!kuaishouDao.hasAopConf(spid)){
- throw new BusinessException("9057",spid+"在TB_SP_AOP_CONFIG表无配置");
- }
-
- //获取订购关系
- HashMap orderRealMap = kuaishouDao.getOrderRealById(id);
- if(orderRealMap == null){
- throw new BusinessException("9056","订购关系表无订购数据");
- }
- //查询用户当前有效的快手订购关系
- List<HashMap> realList = kuaishouDao.getRealByUserid(hm.get("USERID").toString());
- if(realList == null || realList.size() == 0){
- throw new BusinessException("9056","当月无有效订购关系");
- }
- List<String> spidsList = new ArrayList<String>();
- for(HashMap realtmp : realList){
- if(!spidsList.contains(realtmp.get("SPID").toString())){
- spidsList.add(realtmp.get("SPID").toString());
- }
- }
- //拼接所有有效订购的SPID
- String spids = "";
- for(String tmp : spidsList){
- if("".equals(spids)){
- spids += tmp;
- }else{
- spids += "#"+tmp;
- }
- }
- logMap.put("spids", spids);
- //查询当月是否有推送过同类型的数据
- List<HashMap> dataList = kuaishouDao.queryPush(userid, pushmonth, "2");
- if(dataList == null || dataList.size() == 0){
- KuaishouPushBean pushBean = new KuaishouPushBean();
- String pushid = kuaishouDao.getNo();
- pushBean.setSerial_number(userid);
- pushBean.setId(pushid);
- pushBean.setPushmonth(pushmonth);
- pushBean.setPushtype("2"); //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送
- pushBean.setResultcode("1");//推送结果编码, 1待处理,2处理中,0成功,其他失败
- pushBean.setResultinfo("待处理");
- pushBean.setSpids(spids);
- //添加数据到推送表
- kuaishouDao.insertPush(pushBean);
- //添加到测试表
- //kuaishouDao.insertPushZx(pushBean);
- realflag = "2";
- //logger.info(id+",添加月初推送到推送表");
- logMap.put("errorinfo", "添加月初推送到推送表");
- }else{
- logMap.put("errorinfo", "当月已推送");
- //logger.info(id+",当月已推送,不处理");
- }
-
- return realflag;
- }
-
- /**
- * 获取下次推送的默认时间(下个月1号零点三十分零一秒)
- * @return
- */
- private String getCalculatedate() {
- SimpleDateFormat dft = new SimpleDateFormat("yyyyMMdd");
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.MONTH, 1);
- calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMinimum(Calendar.DAY_OF_MONTH));
- return dft.format(calendar.getTime())+"003001";
- }
- }
|