package com.chinacreator.process.job; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.StringUtils; import com.chinacreator.common.exception.BusinessException; import com.chinacreator.process.bean.KuaishouPushBean; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.dao.KuaishouDao; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.process.util.WriteLogUtil; /** * 快手月初推送 * 替换KuaishouPushMonthJob * 月初推送定时任务(KuaishouPushMonthNewJob)只负责把表(TD_KUAISHOU_FIRSTMONTH)添加到表TD_KAFKA_KUAISHOU_PUSH,状态为待处理(resultcode=1) * 每2分钟执行一次 * @author xu.zhou * @date 20210323 */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class KuaishouPushMonthNewJob { private static Logger logger = Logger.getLogger("kuaishoupushmonth"); @Autowired private KuaishouDao kuaishouDao; // = new KuaishouDao(); @Autowired private DictionaryDao dictionaryDao; // = new DictionaryDao(); public void doProcess() throws Exception { //logger.info(Thread.currentThread().getName()+"KuaishouPushMonthNewJob月初快手流量未耗尽定时任务开始"); WriteLogUtil.writeLong("KuaishouPushMonthNewJob月初快手流量未耗尽定时任务开始", logger, "KuaishouPushMonthNewJob"); int count = 0; long beginTime = System.currentTimeMillis(); int rows = 800; //每次取数据条数 String kuaishoufmrows = dictionaryDao.getValue("kuaishoufmrows"); if (!StringUtils.isEmpty(kuaishoufmrows)) { try { rows = Integer.parseInt(kuaishoufmrows); } catch (Exception e) { rows = 800; } } //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50 String partition = "T_HASH_P"; for(int i = 1; i <= 50; i++){ count = 0; partition = "T_HASH_P"; try { if(i < 10){ partition = partition + "0" + i; }else{ partition = partition + i; } //按分区标识获取订购数据 List dataList = kuaishouDao.getFirstMonthByPart(partition,rows); count = (dataList != null ? dataList.size() : 0); //logger.info(partition+",用户数:"+(list != null ? list.size() : "0")); if(dataList != null && dataList.size() > 0){ //logger.info(partition+",去重复前用户数:"+list.size()); dataList = paraseData(dataList); //去重复数据 logger.info(partition+",去重复前用户数:"+count+",去重复后用户数:"+dataList.size()); //推送月份 String pushmonth = new SimpleDateFormat("yyyyMM").format(new Date()); CountDownLatch threadSignal = new CountDownLatch(dataList.size()); ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); //把数据更新为正在处理状态 for(HashMap hm : dataList){ boolean res = kuaishouDao.updFirstMonthExecing(hm.get("ID").toString()); } for(HashMap hm : dataList){ KuaishouPushMonthNewService continueService = new KuaishouPushMonthNewService(dataList.size(),threadSignal,hm,pushmonth,kuaishouDao); executorService.execute(continueService); } executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { logger.info(partition+",执行出现异常,"+e.getMessage()); e.printStackTrace(); } Thread.sleep(100); } //logger.info(Thread.currentThread().getName()+"KuaishouPushMonthNewJob月初快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒"); WriteLogUtil.writeLong("KuaishouPushMonthNewJob月初快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", logger, "KuaishouPushMonthNewJob"); } /** * 按手机号码去除重复数据,确保获取的同一组数据里手机号码不相同,防止同一手机号码处理并发而造成处理异常 * @param dataList * @return */ private List paraseData(List dataList){ //去重复后的数据集 List reDataList = new ArrayList(); HashMap tmpMap = new HashMap(); for (HashMap dataMap : dataList) { if(tmpMap.containsKey(dataMap.get("USERID"))){ logger.info("重复数据,"+dataMap); }else{ tmpMap.put(dataMap.get("USERID"), dataMap); reDataList.add(dataMap); } } return reDataList; } public static void main(String[] args) { KuaishouPushMonthNewJob job = new KuaishouPushMonthNewJob(); try { job.doProcess(); } catch (Exception e) { e.printStackTrace(); } } } class KuaishouPushMonthNewService implements Runnable { private static Logger logger = Logger.getLogger("kuaishoupushmonth"); private int totalSize; private CountDownLatch threadSignal; private List spidsList; private String pushmonth; private KuaishouDao kuaishouDao; private HashMap hm; //用户数据,ID和USERID public KuaishouPushMonthNewService(int totalSize,CountDownLatch threadSignal,HashMap hm,String pushmonth,KuaishouDao kuaishouDao){ this.totalSize = totalSize; this.threadSignal = threadSignal; this.hm = hm; this.pushmonth = pushmonth; this.kuaishouDao = kuaishouDao; } @Override public void run() { long startime = System.currentTimeMillis(); Map logMap = new HashMap(); logMap.put("userinfo", hm); logMap.put("pushmonth", pushmonth); String resultcode = "-1"; String errorinfo = ""; String realflag = ""; //添加到推送表标识,空,未推送,1订购推送,2月未推送 String id = hm.get("ID").toString(); String calculatedate = getCalculatedate(); //下次推送时间,一般为下月的第一天0点30分01秒 try { //处理业务逻辑 realflag = pushService(logMap); if(logMap.get("errorinfo") != null){ errorinfo = logMap.get("errorinfo").toString(); } //未出现异常,设置为成功 resultcode = "0"; if("".equals(errorinfo)){ errorinfo = "ok"; } } catch (Exception e) { if (e instanceof BusinessException) { errorinfo = ((BusinessException) e).getMessage(); resultcode = ((BusinessException) e).getCode(); }else{ e.printStackTrace(); resultcode = "8000"; errorinfo = "处理数据出现异常,"+e.getMessage(); } }finally { threadSignal.countDown(); try{ //暂未同步或者处理出现异常,恢复初始状态,下次再处理 if("9055".equals(resultcode) || "8000".equals(resultcode)){ kuaishouDao.updFirstMonthRc(id,"1", null, null, null); }else{ resultcode = "0"; kuaishouDao.updFirstMonthRc(id,resultcode,errorinfo, realflag, calculatedate); } }catch(Exception e){ e.printStackTrace(); errorinfo = "更新数据出现异常,"+e.getMessage(); resultcode = "8001"; } //写日志 logMap.put("jobname", "KuaishouPushMonthNewJob"); logMap.put("resultcode", resultcode); logMap.put("errorinfo", errorinfo); logMap.put("realflag", realflag); //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount())); logger.info(JsonUtil.objectToJson(logMap)); } } /** * 添加数据到推送表TD_KAFKA_KUAISHOU_PUSH * calculatedate为空,代表是新增的订购数据,处理订购推送 * calculatedate不为空,代表是月初要推送的数据,处理月初推送 * @param logMap * @throws Exception */ private String pushService(Map logMap) throws Exception{ String realflag = ""; //添加到推送表标识,空,未推送,1订购推送,2月未推送 String userid = hm.get("USERID").toString(); String id = hm.get("ID").toString(); String spid = hm.get("SPID").toString(); //查询SPID是否在TB_SP_AOP_CONFIG配置,因为快手流量消耗数据是根据元素ID与此表匹配得到SPID,如果SPID没有此表内,代表流量数据是不会从KAFKA推送过来,也就是这种快手订购不要触发流量耗尽或有流量的推送 if(!kuaishouDao.hasAopConf(spid)){ throw new BusinessException("9057",spid+"在TB_SP_AOP_CONFIG表无配置"); } //获取订购关系 HashMap orderRealMap = kuaishouDao.getOrderRealById(id); if(orderRealMap == null){ throw new BusinessException("9056","订购关系表无订购数据"); } //查询用户当前有效的快手订购关系 List realList = kuaishouDao.getRealByUserid(hm.get("USERID").toString()); if(realList == null || realList.size() == 0){ throw new BusinessException("9056","当月无有效订购关系"); } List spidsList = new ArrayList(); for(HashMap realtmp : realList){ if(!spidsList.contains(realtmp.get("SPID").toString())){ spidsList.add(realtmp.get("SPID").toString()); } } //拼接所有有效订购的SPID String spids = ""; for(String tmp : spidsList){ if("".equals(spids)){ spids += tmp; }else{ spids += "#"+tmp; } } logMap.put("spids", spids); //查询当月是否有推送过同类型的数据 List dataList = kuaishouDao.queryPush(userid, pushmonth, "2"); if(dataList == null || dataList.size() == 0){ KuaishouPushBean pushBean = new KuaishouPushBean(); String pushid = kuaishouDao.getNo(); pushBean.setSerial_number(userid); pushBean.setId(pushid); pushBean.setPushmonth(pushmonth); pushBean.setPushtype("2"); //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送 pushBean.setResultcode("1");//推送结果编码, 1待处理,2处理中,0成功,其他失败 pushBean.setResultinfo("待处理"); pushBean.setSpids(spids); //添加数据到推送表 kuaishouDao.insertPush(pushBean); //添加到测试表 //kuaishouDao.insertPushZx(pushBean); realflag = "2"; //logger.info(id+",添加月初推送到推送表"); logMap.put("errorinfo", "添加月初推送到推送表"); }else{ logMap.put("errorinfo", "当月已推送"); //logger.info(id+",当月已推送,不处理"); } return realflag; } /** * 获取下次推送的默认时间(下个月1号零点三十分零一秒) * @return */ private String getCalculatedate() { SimpleDateFormat dft = new SimpleDateFormat("yyyyMMdd"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MONTH, 1); calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMinimum(Calendar.DAY_OF_MONTH)); return dft.format(calendar.getTime())+"003001"; } }