123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- package com.chinacreator.process.job;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.chinacreator.common.util.DESUtil;
- import com.chinacreator.common.util.MD5;
- import com.chinacreator.process.bean.KuaiShouActivationBean;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.dao.KuaishouDao;
- import com.chinacreator.process.util.IdGenerateUtil;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.URLUtil;
- import com.chinacreator.process.util.WriteLogUtil;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.util.StringUtils;
- import java.sql.SQLException;
- import java.text.SimpleDateFormat;
- import java.util.*;
- import java.util.concurrent.*;
- /**
- * kuaishou查询激活状态,月初采集
- * <p>
- * 每月执行一次
- *
- * @author shuiying.ou
- * @date 20220705
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class KuaiShouActivationStateMonthJob {
- private static Logger logger = Logger.getLogger("kuaishouactivationstatemonth");
- @Autowired
- private KuaishouDao kuaishouDao;
- @Autowired
- private DictionaryDao dictionaryDao;
- public void doProcess() throws Exception {
- WriteLogUtil.writeLong("KuaiShouActivationStateMonthJob月初查询快手用户激活状态定时任务开始", logger, "KuaiShouActivationStateMonthJob");
- int count = 0;
- long beginTime = System.currentTimeMillis();
- int rows = 800; //每次取数据条数
- String kuaishoufmrows = dictionaryDao.getValue("kuaishouasrows");
- if (!StringUtils.isEmpty(kuaishoufmrows)) {
- try {
- rows = Integer.parseInt(kuaishoufmrows);
- } catch (Exception e) {
- rows = 800;
- }
- }
- //TD_KUAISHOU_FIRSTMONTH 分区标识,分区标识从T_HASH_P01到T_HASH_P50
- String partition = "T_HASH_P";
- for (int i = 1; i <= 50; i++) {
- count = 0;
- partition = "T_HASH_P";
- try {
- if (i < 10) {
- partition = partition + "0" + i;
- } else {
- partition = partition + i;
- }
- //第一步、按分区标识获取快手有效用户数据
- List<KuaiShouActivationBean> dataList = kuaishouDao.getOrderByPart(partition, rows);
- String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
- logger.info(partition + ",获取到全量快手有效数据:" + (dataList != null ? dataList.size() : "0") + "---time" + time);
- if (dataList != null && dataList.size() > 0) {
- CountDownLatch threadSignal = new CountDownLatch(dataList.size());
- ExecutorService executorService = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- //把数据更新为正在处理状态
- for(KuaiShouActivationBean hm : dataList){
- kuaishouDao.updOrdertMonth(hm.getOrderid());
- }
- for (KuaiShouActivationBean hm : dataList) {
- KuaiShouActivationStateMonthService continueService = new KuaiShouActivationStateMonthService(threadSignal, hm, kuaishouDao);
- executorService.execute(continueService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- /* //第二步、按分区标识获取查询失败数据
- List<HashMap<String, Object>> failDataList = kuaishouDao.getFailDataByPart(partition, rows);
- time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
- logger.info(partition + ",获取到失败快手查询数据:" + (failDataList != null ? failDataList.size() : "0") + "---time" + time);
- if (failDataList != null && failDataList.size() > 0) {
- *//*failDataList = paraseData(failDataList); //去重复数据
- logger.info(partition + ",去重复前用户数:" + count + ",去重复后用户数:" + failDataList.size());*//*
- CountDownLatch threadSignal = new CountDownLatch(failDataList.size());
- ExecutorService executorService = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- for (HashMap<String, Object> hm : failDataList) {
- KuaiShouActivationFailDataService continueService = new KuaiShouActivationFailDataService(threadSignal, hm, kuaishouDao);
- executorService.execute(continueService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }*/
- } catch (Exception e) {
- logger.info(partition + ",执行出现异常," + e.getMessage());
- e.printStackTrace();
- }
- Thread.sleep(100);
- }
- //logger.info(Thread.currentThread().getName()+"KuaishouPushMonthNewJob月初快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
- WriteLogUtil.writeLong("KuaiShouActivationStateMonthJob月初查询快手用户激活状态定时任务完成,耗时:" + (System.currentTimeMillis() - beginTime) / 1000 + " 秒", logger, "KuaiShouActivationStateMonthJob");
- }
- /**
- * 按手机号码去除重复数据,确保获取的同一组数据里手机号码不相同,防止同一手机号码处理并发而造成处理异常
- *
- * @param dataList
- * @return
- */
- private List<HashMap<String, Object>> paraseData(List<HashMap<String, Object>> dataList) {
- //去重复后的数据集
- List<HashMap<String, Object>> reDataList = new ArrayList<HashMap<String, Object>>();
- HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
- for (HashMap<String, Object> dataMap : dataList) {
- if (tmpMap.containsKey(dataMap.get("USERID"))) {
- logger.info("重复数据," + dataMap);
- } else {
- tmpMap.put(dataMap.get("USERID"), dataMap);
- reDataList.add(dataMap);
- }
- }
- return reDataList;
- }
- public static void main(String[] args) {
- KuaishouPushMonthNewJob job = new KuaishouPushMonthNewJob();
- try {
- job.doProcess();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- class KuaiShouActivationStateMonthService implements Runnable {
- private static Logger logger = Logger.getLogger("kuaishouactivationstatemonth");
- private CountDownLatch threadSignal;
- private KuaishouDao kuaishouDao;
- private KuaiShouActivationBean hm; //用户数据,ID和USERID
- public KuaiShouActivationStateMonthService(CountDownLatch threadSignal, KuaiShouActivationBean hm, KuaishouDao kuaishouDao) {
- this.threadSignal = threadSignal;
- this.hm = hm;
- this.kuaishouDao = kuaishouDao;
- }
- @Override
- public void run() {
- Map<String, Object> logMap = new HashMap<>();
- logMap.put("userinfo", hm);
- String userid = hm.getUserid();
- String id = hm.getOrderid();
- KuaiShouActivationBean activationBean = new KuaiShouActivationBean();
- activationBean.setUserid(userid);
- activationBean.setOrderid(id);
- String resultcode = "0";
- try {
- //处理业务逻辑 调快手激活状态接口
- String resp = queryService(logMap);
- /* resp = "{\n" +
- " \"result\": \"0\",\n" +
- " \"host-name\": \"public-jswg-rs-kce-node959.idcyz.hb1.kwaidc.com\",\n" +
- " \"data\": {\n" +
- " \"actype\": \"0\",\n" +
- " \"updatetime\": \"20220709181033\",\n" +
- " \"status\": \"0\"\n" +
- " },\n" +
- " \"errorcode\": \"\"\n" +
- "}";*/
- JSONObject json = JSON.parseObject(resp); //调用接口后获取的结果
- Map map = JSONObject.parseObject(resp, Map.class);
- // 解析接口数据
- if (json != null && "0".equals(json.getString("result"))) {
- //未出现异常,设置为成功
- activationBean.setResultcode(map.get("result").toString());
- activationBean.setErrorinfo(map.get("errorcode").toString());
- activationBean.setData(map.get("data").toString());
- Map infoMessage = (Map) map.get("data");
- String status = null;
- String actype = null;
- String updatetim = null;
- if (!infoMessage.isEmpty()) {
- try {
- status = infoMessage.get("status").toString();
- } catch (Exception e) {
- status = null;
- }
- try {
- actype = infoMessage.get("actype").toString();
- } catch (Exception e) {
- actype = null;
- }
- try {
- updatetim = infoMessage.get("updatetime").toString();
- } catch (Exception e) {
- updatetim = null;
- }
- }
- activationBean.setStatus(status);
- activationBean.setActype(actype);
- activationBean.setUpdatetime(updatetim);
- insertOrUpdateData(hm, activationBean);
- } else if (json != null && "1".equals(json.getString("result"))) {
- // result = 1 结果识别码失败
- activationBean.setResultcode(map.get("result").toString());
- activationBean.setErrorinfo(map.get("errorcode").toString());
- insertOrUpdateData(hm, activationBean);
- } else {
- resultcode = "3";
- logger.info(userid + "=======>调快手激活状态查询接口失败");
- }
- } catch (Exception e) {
- resultcode = "3";
- logger.info(userid +"=======>处理数据失败" + e.getMessage() + e);
- System.out.println(userid +"=======>处理数据失败" + e.getMessage() + e);
- } finally {
- threadSignal.countDown();
- try {
- //如果为3,下次再处理
- boolean flag = kuaishouDao.updOrderValidData(id, resultcode);
- if (!flag) {
- logger.info("无数据更新");
- }
- } catch (Exception e) {
- e.printStackTrace();
- logger.info("更新数据出现异常," + e.getMessage());
- }
- //写日志
- logMap.put("jobname", "KuaiShouActivationStateMonthJob");
- String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
- logMap.put("endtime", time);
- logger.info(JsonUtil.objectToJson(logMap));
- }
- }
- /**
- * 更新或者新增
- * @param activationBean
- *
- */
- private void insertOrUpdateData(KuaiShouActivationBean hm, KuaiShouActivationBean activationBean) throws SQLException {
- // 根据orderid查TD_KUAISHOU_ACTIVATESTATUS是否存在数据,存在则更新,不存在则新建
- KuaiShouActivationBean bean = kuaishouDao.getKSActivate(activationBean.getOrderid());
- if (bean != null) {
- // 更新
- if (!StringUtils.isEmpty(hm.getOrdertime())) {
- activationBean.setOrdertime(hm.getOrdertime());
- }
- if (!StringUtils.isEmpty(hm.getOrderchannel())) {
- activationBean.setOrderchannel(hm.getOrderchannel());
- }
- if (!StringUtils.isEmpty(hm.getOrderchannel2())) {
- activationBean.setOrderchannel2(hm.getOrderchannel2());
- }
- if (!StringUtils.isEmpty(hm.getCanceltime())) {
- activationBean.setCanceltime(hm.getCanceltime());
- }
- kuaishouDao.updKSActivate(activationBean);
- } else {
- // 新建
- activationBean.setId(IdGenerateUtil.uuid6());
- if (!StringUtils.isEmpty(hm.getSpid())) {
- activationBean.setSpid(hm.getSpid());
- }if (!StringUtils.isEmpty(hm.getSpname())) {
- activationBean.setSpname(hm.getSpname());
- }
- if (!StringUtils.isEmpty(hm.getOrdertime())) {
- activationBean.setOrdertime(hm.getOrdertime());
- }
- if (!StringUtils.isEmpty(hm.getOrderchannel())) {
- activationBean.setOrderchannel(hm.getOrderchannel());
- }
- if (!StringUtils.isEmpty(hm.getOrderchannel2())) {
- activationBean.setOrderchannel2(hm.getOrderchannel2());
- }
- if (!StringUtils.isEmpty(hm.getCanceltime())) {
- activationBean.setCanceltime(hm.getCanceltime());
- }
- kuaishouDao.inserttimeKSActivate(activationBean);
- }
- }
- /* *
- * 调查询接口
- * @param logMap
- * @return
- * */
- private String queryService(Map<String, Object> logMap) {
- String baseurl = "http://api.gifshow.com/rest/n/partner/cucc/callback/check/active";
- HashMap<String, Object> paramsMap = new HashMap<>();
- String userid = hm.getUserid();
- try {
- userid = DESUtil.encode(userid, "ksks1234");
- paramsMap.put("usermob", userid);
- logMap.put("userid", userid);
- } catch (Exception e) {
- e.printStackTrace();
- }
- StringBuffer stringBuffer = new StringBuffer();
- String str = stringBuffer.append(userid).append("2rFUrLZyKV9S").toString();
- String sign = MD5.MD5Encode(str);
- if (sign != null && sign.length() > 16) {
- sign = sign.substring(0, 16);
- }
- paramsMap.put("sign", sign);
- logMap.put("sign", sign);
- String result = "";
- try {
- result = URLUtil.postJson(baseurl, JsonUtil.objectToJson(paramsMap));
- } catch (Exception e) {
- e.printStackTrace();
- logger.info("调查询快手激活状态接口失败" + e.getMessage() + e);
- }
- return result;
- }
- }
|