package com.chinacreator.process.job; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.StringUtils; import com.chinacreator.common.exception.BusinessException; import com.chinacreator.process.bean.KuaishouPushBean; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.dao.KuaishouDao; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.process.util.WriteLogUtil; /** * 快手订购推送 * 替换KuaishouPushOrderJob * 订购流量未耗尽的定时任务(KuaishouPushOrderNewJob)把快手订购表的未处理的数据判断是否要推送,是,则添加到推送表,否,则不处理。 * 获取TD_KUAISHOU_FIRSTMONTH表CALCULATEDATE为空的数据(通过订购关系表的触发器在订购时添加到表) * 查询推送记录表TD_KAFKA_KUAISHOU_PUSH,看本月是否已有推送过流量未耗尽 * 无,则把数据添加到推送记录表 * TD_KUAISHOU_FIRSTMONTH来源于订购关系表的触发器,在新增和再次订购时会触发把数据添加到此表 * @author xu.zhou * @date 20210331 */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class KuaishouPushOrderNewJob { private static Logger logger = Logger.getLogger("kuaishouorderpush"); @Autowired private KuaishouDao kuaishouDao; // = new KuaishouDao(); @Autowired private DictionaryDao dictionaryDao; // = new DictionaryDao(); public void doProcess() throws Exception { //logger.info(Thread.currentThread().getName()+"KuaishouPushOrderNewJob处理订购快手流量未耗尽定时任务开始"); WriteLogUtil.writeLong("KuaishouPushOrderNewJob处理订购快手流量未耗尽定时任务开始", logger, "KuaishouPushOrderNewJob"); int count = 0; long beginTime = System.currentTimeMillis(); try { int rows = 200; //每次取数据条数 String kuaishoufmrows = dictionaryDao.getValue("kuaishouorrows"); if (!StringUtils.isEmpty(kuaishoufmrows)) { try { rows = Integer.parseInt(kuaishoufmrows); } catch (Exception e) { rows = 200; } } List dataList = kuaishouDao.getOrderPushNew(rows); count = (dataList != null ? dataList.size() : 0); if(dataList != null && dataList.size() > 0){ dataList = paraseData(dataList); logger.info("去重复后数据条数:"+ (dataList == null ? "0" : dataList.size())+",去重复前用户数:"+count ); CountDownLatch threadSignal = new CountDownLatch(dataList.size()); ExecutorService executorService = new ThreadPoolExecutor(20, 30, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); //把数据更新为正在处理状态 for(HashMap hm : dataList){ boolean res = kuaishouDao.updFirstMonthExecing(hm.get("ID").toString()); //System.out.println(res); //logger.info("更新结果 :"+res); } for(HashMap dataMap : dataList){ KuaishouPushOrderNewService continueService = new KuaishouPushOrderNewService(dataList.size(),threadSignal,dataMap,kuaishouDao); executorService.execute(continueService); } executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { logger.info("执行出现异常,"+e.getMessage()); e.printStackTrace(); } //logger.info(Thread.currentThread().getName()+"KuaishouPushOrderNewJob处理订购快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒"); WriteLogUtil.writeLong("KuaishouPushOrderNewJob处理订购快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", logger, "KuaishouPushOrderNewJob"); } /** * 按手机号码去除重复数据,确保获取的同一组数据里手机号码不相同,防止同一手机号码处理并发而造成处理异常 * @param dataList * @return */ private List paraseData(List dataList){ //去重复后的数据集 List reDataList = new ArrayList(); HashMap tmpMap = new HashMap(); for (HashMap dataMap : dataList) { if(tmpMap.containsKey(dataMap.get("USERID"))){ logger.info("重复数据,"+dataMap); }else{ tmpMap.put(dataMap.get("USERID"), dataMap); reDataList.add(dataMap); } } return reDataList; } public static void main(String[] args) throws Exception { KuaishouPushOrderNewJob job = new KuaishouPushOrderNewJob(); job.doProcess(); } } class KuaishouPushOrderNewService implements Runnable { private static Logger logger = Logger.getLogger("kuaishouorderpush"); private int totalSize; private CountDownLatch threadSignal; private HashMap hm; private KuaishouDao kuaishouDao; public KuaishouPushOrderNewService(int totalSize,CountDownLatch threadSignal,HashMap hm,KuaishouDao kuaishouDao){ this.totalSize = totalSize; this.threadSignal = threadSignal; this.hm = hm; this.kuaishouDao = kuaishouDao; } @Override public void run() { long startime = System.currentTimeMillis(); Map logMap = new HashMap(); logMap.put("userinfo", hm); String resultcode = "-1"; String errorinfo = ""; String realflag = ""; //添加到推送表标识,空,未推送,1订购推送,2月未推送 String id = hm.get("ID").toString(); String userid = hm.get("USERID").toString(); String spid = hm.get("SPID").toString(); String calculatedate = getCalculatedate(); //下次推送时间,一般为下月的第一天0点30分01秒 try { //获取订购关系 HashMap orderRealMap = kuaishouDao.getOrderRealById(id); if(orderRealMap == null){ throw new BusinessException("9056","订购关系表无订购数据"); } //查询SPID是否在TB_SP_AOP_CONFIG配置,因为快手流量消耗数据是根据元素ID与此表匹配得到SPID,如果SPID没有此表内,代表流量数据是不会从KAFKA推送过来,也就是这种快手订购不要触发流量耗尽或有流量的推送 if(!kuaishouDao.hasAopConf(spid)){ throw new BusinessException("9057",spid+"在TB_SP_AOP_CONFIG表无配置"); } //当前月份 String currmonth = new SimpleDateFormat("yyyyMM").format(new Date()); //订购月份 String pushmonth = orderRealMap.get("ORDERTIME").toString().substring(0, 6); //订购月份小于当前月份,计算时间设置为当月,防止在都零点时收到上个月的数据 if(Integer.parseInt(pushmonth) < Integer.parseInt(currmonth)){ calculatedate = currmonth+"01003001"; } logMap.put("spids", spid); //判断CP是否已同步完成 //if(!kuaishouDao.hasSync(userid, spid)){ if(!kuaishouDao.hasSync(id)){ throw new BusinessException("9055","数据CP同步未完成,暂不处理"); } //判断同一月份是否推送过相同的SPID数据 List pushList = kuaishouDao.queryPush(userid, pushmonth); logger.info("pushList=>"+pushList); if(pushList != null && pushList.size() > 0){ List spidsList = null; boolean haspush = false; for(HashMap tmpMap : pushList){ spidsList = Arrays.asList(tmpMap.get("SPIDS").toString().split("#")); if(spidsList.contains(spid)){ haspush = true; break; } } logMap.put("spidList", spidsList); //当月已推送过,且不是后向产品(后向产品可以重复订购) if(haspush && !kuaishouDao.hasBackBusi(spid)){ throw new BusinessException("9054","同一月份已推送相同数据且不是后向产品"); } } KuaishouPushBean pushBean = new KuaishouPushBean(); pushBean.setSerial_number(userid); pushBean.setId(kuaishouDao.getNo()); pushBean.setPushmonth(pushmonth); pushBean.setPushtype("1"); //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送 pushBean.setResultcode("1");//推送结果编码, 1待处理,2处理中,0成功,其他失败 pushBean.setResultinfo("待处理"); pushBean.setSpids(spid); //添加数据到推送表 kuaishouDao.insertPush(pushBean); realflag = "1"; errorinfo = "添加订购推送到推送表"; //未出现异常,设置为成功 resultcode = "0"; if("".equals(errorinfo)){ errorinfo = "ok"; } } catch (Exception e) { if (e instanceof BusinessException) { errorinfo = ((BusinessException) e).getMessage(); resultcode = ((BusinessException) e).getCode(); }else{ e.printStackTrace(); resultcode = "8000"; errorinfo = "处理数据出现异常,"+e.getMessage(); } }finally { threadSignal.countDown(); try{ boolean res = false; //暂未同步或者处理出现异常,恢复初始状态,下次再处理 if("9055".equals(resultcode) || "8000".equals(resultcode)){ res = kuaishouDao.updFirstMonthRc(id,"1",errorinfo,null, null); }else{ resultcode = "0"; //更新TD_KUAISHOU_FIRSTMONTH表为月初推送数据,由KuaishouPushMonthNewJob处理 res = kuaishouDao.updFirstMonthRc(id,resultcode,errorinfo, realflag, calculatedate); } //logger.info(id+",更新执行结果:"+res); }catch(Exception e){ e.printStackTrace(); errorinfo = "更新数据出现异常,"+e.getMessage(); resultcode = "8001"; } //写日志 logMap.put("jobname", "KuaishouPushOrderNewJob"); logMap.put("resultcode", resultcode); logMap.put("errorinfo", errorinfo); logMap.put("realflag", realflag); logMap.put("calculatedate", calculatedate); //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount())); logger.info(JsonUtil.objectToJson(logMap)); } } /** * 获取下次推送的默认时间(下个月1号零点三十分零一秒) * @return */ private String getCalculatedate() { SimpleDateFormat dft = new SimpleDateFormat("yyyyMMdd"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MONTH, 1); calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMinimum(Calendar.DAY_OF_MONTH)); return dft.format(calendar.getTime())+"003001"; } }