123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- package com.chinacreator.process.job;
- import java.sql.SQLException;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.chinacreator.common.exception.BusinessException;
- import com.chinacreator.common.util.MD5;
- import com.chinacreator.process.bean.ContinueBean;
- import com.chinacreator.process.bean.ContinueLogBean;
- import com.chinacreator.process.bean.KuaishouPushBean;
- import com.chinacreator.process.dao.ContinueOrderDao;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.dao.KuaishouDao;
- import com.chinacreator.process.util.DesUtil;
- import com.chinacreator.process.util.HttpInvoke;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.SpringUtils;
- import com.chinacreator.process.util.URLUtil;
- /**
- * 快手订购推送(已弃用)
- * 订购流量未耗尽的定时任务(KuaishouPushOrderJob)把快手订购表的未处理的数据判断是否要推送,是,则添加到推送表,否,则不处理。
- 在处理后数据把推送表的ID(如果要推送)添加到快手订购表(TD_KUAISHOU_ORDER);
- 1. 获取快手订购记录表数据TD_KUAISHOU_ORDER
- 2. 查询推送记录表TD_KAFKA_KUAISHOU_PUSH,看本月是否已有推送过流量未耗尽
- 3. 无,则把数据添加到推送记录表
- * @author xu.zhou
- * @date 20200515
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class KuaishouPushOrderJob {
- private static Logger logger = Logger.getLogger("kuaishoupush");
-
- @Autowired
- private KuaishouDao kuaishouDao;
-
- @Autowired
- private DictionaryDao dictionaryDao;
-
- public void doProcess() throws Exception {
- long beginTime = System.currentTimeMillis();
- logger.info(Thread.currentThread().getName()+"KuaishouPushOrderJob处理订购快手流量未耗尽定时任务开始");
- List<HashMap> dataList = kuaishouDao.getOrderPush();
- logger.info("去重复前数据条数:"+ (dataList == null ? "0" : dataList.size()));
- paraseData(dataList);
- logger.info("去重复后数据条数:"+ (dataList == null ? "0" : dataList.size()));
- if(dataList != null && dataList.size() > 0){
- CountDownLatch threadSignal = new CountDownLatch(dataList.size());
- ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- for(HashMap dataMap : dataList){
- KuaishouPushOrderService continueService = new KuaishouPushOrderService(dataList.size(),threadSignal,dataMap,kuaishouDao,dictionaryDao);
- executorService.execute(continueService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- logger.info(Thread.currentThread().getName()+"KuaishouPushOrderJob处理订购快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
- }
-
- /**
- * 去重复数据(userid+spid+ordermonth)
- * @param dataList
- * @return
- */
- private void paraseData(List<HashMap> dataList){
- if (dataList == null || dataList.size() == 0){
- return;
- }
- HashMap<String, Object> reData = new HashMap<String, Object>();
- String ordermonth = "";
- String userid = "";
- String spid = "";
- HashMap dataMap = null;
- for (int i=0; i<dataList.size(); i++){
- dataMap = dataList.get(i);
- ordermonth = dataMap.get("ORDERTIME").toString().substring(0, 6);
- userid = dataMap.get("USERID").toString();
- spid = dataMap.get("SPID").toString();
- if(reData.containsKey(userid+spid+ordermonth)){
- dataList.remove(i);
- i--;
- }else{
- reData.put(userid+spid+ordermonth, dataMap);
- }
- }
- }
- }
- class KuaishouPushOrderService implements Runnable {
- private static Logger logger = Logger.getLogger("kuaishoupush");
- private int totalSize;
- private CountDownLatch threadSignal;
- private HashMap dataMap;
- private DictionaryDao dictionaryDao;
- private KuaishouDao kuaishouDao;
-
- public KuaishouPushOrderService(int totalSize,CountDownLatch threadSignal,HashMap dataMap,KuaishouDao kuaishouDao,DictionaryDao dictionaryDao){
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- this.dataMap = dataMap;
- this.kuaishouDao = kuaishouDao;
- this.dictionaryDao = dictionaryDao;
- }
-
- @Override
- public void run() {
- long startime = System.currentTimeMillis();
- Map logMap = new HashMap();
- logMap.put("dataMap", dataMap);
- String pushid = ""; //推送记录表ID
- String resultcode = "-1";
- String errorinfo = "";
- boolean hasinvoke = false; //是否调了接口
- try {
- String spid = (String)dataMap.get("SPID");
- String pushmonth = dataMap.get("ORDERTIME").toString().substring(0, 6);
- String userid = (String)dataMap.get("USERID");
- //判断CP是否已同步完成
- if(!kuaishouDao.hasSync(userid, spid)){
- throw new BusinessException("9055","数据CP同步未完成,暂不处理");
- }
- List<HashMap> pushList = kuaishouDao.queryPush(userid, pushmonth);
- logger.info("pushList=>"+pushList);
- if(pushList != null && pushList.size() > 0){
- List<String> spidsList = null;
- boolean haspush = false;
- for(HashMap tmpMap : pushList){
- spidsList = Arrays.asList(tmpMap.get("SPIDS").toString().split("#"));
- logger.info("spid=>"+spid+", spidsList=》"+spidsList);
- if(spidsList.contains(spid)){
- haspush = true;
- break;
- }
- }
- if(haspush){
- throw new BusinessException("9054","同一月份已推送相同数据");
- }
- }
- KuaishouPushBean pushBean = new KuaishouPushBean();
- pushid = kuaishouDao.getNo();
- pushBean.setSerial_number(userid);
- pushBean.setId(pushid);
- pushBean.setPushmonth(pushmonth);
- pushBean.setPushtype("1"); //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送
- pushBean.setResultcode("1");//推送结果编码, 1待处理,2处理中,0成功,其他失败
- pushBean.setResultinfo("待处理");
- pushBean.setSpids(spid);
- //添加数据到推送表
- kuaishouDao.insertPush(pushBean);
- resultcode = "0";
- errorinfo = "ok";
- } catch (Exception e) {
- if (e instanceof BusinessException) {
- errorinfo = ((BusinessException) e).getMessage();
- resultcode = ((BusinessException) e).getCode();
- }else{
- e.printStackTrace();
- resultcode = "8000";
- errorinfo = "处理数据出现异常,"+e.getMessage();
- }
- }finally {
- threadSignal.countDown();
- try{
- //未CP同步的数据,延迟处理
- if(!"9055".equals(resultcode)){
- kuaishouDao.updOrderPush(dataMap.get("ID").toString(), "3", errorinfo, (System.currentTimeMillis()-startime)+"", pushid);
- }
- } catch (Exception e) {
- e.printStackTrace();
- errorinfo += "|更新ORDER表出现异常,"+e.getMessage();
- }
- //写日志
- logMap.put("jobname", "KuaishouPushOrderJob");
- logMap.put("time", (System.currentTimeMillis()-startime)+"");
- logMap.put("resultcode", resultcode);
- logMap.put("errorinfo", errorinfo);
- //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
- logger.info(JsonUtil.objectToJson(logMap));
- }
- }
-
- /**
- * 调接口
- * @param reqBean
- * @return
- * @throws Exception
- */
- private String invokeKsPush(KuaishouPushBean pushBean) throws Exception{
- String result = ""; //调快手接口返回结果
- String pushurl = dictionaryDao.getValue("kuaishoupushurl");
- String jsonParams = getInvokeParams(pushBean);
- logger.info("pushurl=>"+pushurl+", jsonParams=>"+jsonParams);
- if(pushurl.startsWith("https")){
- result = HttpInvoke.sendhttpsReq("POST", pushurl, jsonParams, getProperty());
- }else{
- result = HttpInvoke.sendHttpByPost("POST", pushurl, jsonParams, getProperty());
- }
- logger.info("调快手推送接口返回结果:"+result);
- //去空格、换行符号
- if(result != null) result = result.replaceAll("\r|\n", "").replaceAll(" ", "").replaceAll(" ", "");
- return result;
- }
-
- /**
- * 推接调接口的参数
- * @param reqBean
- * @return
- * @throws Exception
- */
- private String getInvokeParams(KuaishouPushBean pushBean) throws Exception{
- String pwd = dictionaryDao.getValue("kuaishoupushpwd");
- String userid = DesUtil.encode(pushBean.getSerial_number(), pwd); //手机号码加密
- String timestamp= (System.currentTimeMillis())/1000+"";
- String month = pushBean.getPushmonth();
- String pushtype = pushBean.getPushtype();
- String signature = MD5.MD5Encode(userid+pushtype+month+timestamp+pwd);
- JSONObject json = new JSONObject();
- json.put("timestamp", timestamp);
- json.put("signature", signature);
- json.put("userid", userid);
- json.put("month", month);
- json.put("type", pushtype);
- return json.toJSONString();
- }
-
- /**
- * 获取请求属性性
- * @return
- */
- private static Map getProperty(){
- Map reqProperty = new HashMap();
- reqProperty.put("Content-type", "application/json;charset=UTF-8");
- return reqProperty;
- }
- }
|