123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- package com.chinacreator.process.job;
- import java.sql.SQLException;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.chinacreator.common.exception.BusinessException;
- import com.chinacreator.common.util.MD5;
- import com.chinacreator.process.bean.ContinueBean;
- import com.chinacreator.process.bean.ContinueLogBean;
- import com.chinacreator.process.bean.KuaishouPushBean;
- import com.chinacreator.process.dao.ContinueOrderDao;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.dao.KuaishouDao;
- import com.chinacreator.process.util.DesUtil;
- import com.chinacreator.process.util.HttpInvoke;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.SpringUtils;
- import com.chinacreator.process.util.URLUtil;
- /**
- * 已废弃,不再使用
- * 快手月初推送
- * 月初推送定时任务(KuaishouPushMonthJob)只负责把订购关系表(TD_ORDER_RELATIONS)当前有订购快手业务且未失效的数据添加到推送表就结束,状态为待处理(resultcode=1)
- * 每月1号凌晨开始执行
- * @author xu.zhou
- * @date 20200515
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class KuaishouPushMonthJob {
- private static Logger logger = Logger.getLogger("kuaishoupush");
-
- @Autowired
- private KuaishouDao kuaishouDao;
-
- @Autowired
- private DictionaryDao dictionaryDao;
-
- public void doProcess() throws Exception {
- logger.info(Thread.currentThread().getName()+"KuaishouPushMonthJob月初快手流量未耗尽定时任务开始");
- long beginTime = System.currentTimeMillis();
- //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
- String partition = "T_HASH_P";
- for(int i = 1; i <= 50; i++){
- partition = "T_HASH_P";
- try {
- if(i < 10){
- partition = partition + "0" + i;
- }else{
- partition = partition + i;
- }
-
- logger.info("执行分区:"+partition);
- //按分区标识获取订购数据
- List<HashMap> list = kuaishouDao.getFirstMonthRealByPart(partition);
- if(list != null && list.size() > 0){
- //推送月份
- String pushmonth = new SimpleDateFormat("yyyyMM").format(new Date());
- CountDownLatch threadSignal = new CountDownLatch(list.size());
- ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- HashMap<String, List<String>> dataMap = paraseData(list);
- logger.info(partition+",数据库有效订购的用户数:"+dataMap.size());
- Set<String> keySet = dataMap.keySet();
- Iterator<String> it = keySet.iterator();
- String userid = "";
- while(it.hasNext()){
- userid = it.next();
- KuaishouPushMonthService continueService = new KuaishouPushMonthService(list.size(),threadSignal,userid,dataMap.get(userid),pushmonth,kuaishouDao,dictionaryDao);
- executorService.execute(continueService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }else{
- logger.info(partition+", 无订购数据");
- }
- } catch (Exception e) {
- logger.info(partition+",执行出现异常,"+e.getMessage());
- e.printStackTrace();
- }
-
- Thread.sleep(5000);
- }
- logger.info(Thread.currentThread().getName()+"KuaishouPushMonthJob月初快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
- }
-
- /**
- * 数据重组,每个号码一个key,值是spid
- * @param dataList
- * @return
- */
- private HashMap<String, List<String>> paraseData(List<HashMap> dataList){
- HashMap<String, List<String>> reData = new HashMap<String, List<String>>();
- for (HashMap dataMap : dataList) {
- if(reData.containsKey(dataMap.get("USERID"))){
- reData.get(dataMap.get("USERID")).add(dataMap.get("SPID").toString());
- }else{
- List<String> tmpList = new ArrayList<String>();
- tmpList.add(dataMap.get("SPID").toString());
- reData.put(dataMap.get("USERID").toString(), tmpList);
- }
- }
- return reData;
- }
- }
- class KuaishouPushMonthService implements Runnable {
- private static Logger logger = Logger.getLogger("kuaishoupush");
- private int totalSize;
- private CountDownLatch threadSignal;
- private String userid;
- private List<String> spidsList;
- private String pushmonth;
- private DictionaryDao dictionaryDao;
- private KuaishouDao kuaishouDao;
-
- public KuaishouPushMonthService(int totalSize,CountDownLatch threadSignal,String userid,List<String> spidsList,String pushmonth,KuaishouDao kuaishouDao,DictionaryDao dictionaryDao){
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- this.userid = userid;
- this.spidsList = spidsList;
- this.pushmonth = pushmonth;
- this.kuaishouDao = kuaishouDao;
- this.dictionaryDao = dictionaryDao;
- }
-
- @Override
- public void run() {
- long startime = System.currentTimeMillis();
- Map logMap = new HashMap();
- logMap.put("userid", userid);
- logMap.put("pushmonth", pushmonth);
- String pushid = ""; //推送记录表ID
- String resultcode = "-1";
- String errorinfo = "";
- boolean hasinvoke = false; //是否调了接口
- try {
- String spids = "";
- for(String tmp : spidsList){
- if("".equals(spids)){
- spids += tmp;
- }else{
- spids += "#"+tmp;
- }
- }
- logMap.put("spids", spids);
- List<HashMap> dataList = kuaishouDao.queryPush(userid, pushmonth, "2");
- if(dataList != null && dataList.size()>0){
- throw new BusinessException("9061","重复推送");
- }
- KuaishouPushBean pushBean = new KuaishouPushBean();
- pushid = kuaishouDao.getNo();
- pushBean.setSerial_number(userid);
- pushBean.setId(pushid);
- pushBean.setPushmonth(pushmonth);
- pushBean.setPushtype("2"); //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送
- pushBean.setResultcode("1");//推送结果编码, 1待处理,2处理中,0成功,其他失败
- pushBean.setResultinfo("待处理");
- pushBean.setSpids(spids);
- //添加数据到推送表
- kuaishouDao.insertPush(pushBean);
- resultcode = "0";
- errorinfo = "ok";
- } catch (Exception e) {
- if (e instanceof BusinessException) {
- errorinfo = ((BusinessException) e).getMessage();
- resultcode = ((BusinessException) e).getCode();
- }else{
- e.printStackTrace();
- resultcode = "8000";
- errorinfo = "处理数据出现异常,"+e.getMessage();
- }
- }finally {
- threadSignal.countDown();
- //写日志
- logMap.put("jobname", "KuaishouPushMonthJob");
- logMap.put("resultcode", resultcode);
- logMap.put("errorinfo", errorinfo);
- //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
- logger.info(JsonUtil.objectToJson(logMap));
- }
- }
- }
|