04c537b028e9516838112825daec59ece0b16c98.svn-base 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. package com.chinacreator.process.job;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.chinacreator.common.util.DESUtil;
  5. import com.chinacreator.common.util.MD5;
  6. import com.chinacreator.process.bean.KuaiShouActivationBean;
  7. import com.chinacreator.process.dao.DictionaryDao;
  8. import com.chinacreator.process.dao.KuaishouDao;
  9. import com.chinacreator.process.util.IdGenerateUtil;
  10. import com.chinacreator.process.util.JsonUtil;
  11. import com.chinacreator.process.util.URLUtil;
  12. import com.chinacreator.process.util.WriteLogUtil;
  13. import org.apache.log4j.Logger;
  14. import org.quartz.DisallowConcurrentExecution;
  15. import org.quartz.PersistJobDataAfterExecution;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.util.StringUtils;
  18. import java.sql.SQLException;
  19. import java.text.SimpleDateFormat;
  20. import java.util.*;
  21. import java.util.concurrent.*;
  22. /**
  23. * kuaishou查询激活状态,月初采集
  24. * <p>
  25. * 每月执行一次
  26. *
  27. * @author shuiying.ou
  28. * @date 20220705
  29. */
  30. @PersistJobDataAfterExecution
  31. @DisallowConcurrentExecution
  32. public class KuaiShouActivationStateMonthJob {
  33. private static Logger logger = Logger.getLogger("kuaishouactivationstatemonth");
  34. @Autowired
  35. private KuaishouDao kuaishouDao;
  36. @Autowired
  37. private DictionaryDao dictionaryDao;
  38. public void doProcess() throws Exception {
  39. WriteLogUtil.writeLong("KuaiShouActivationStateMonthJob月初查询快手用户激活状态定时任务开始", logger, "KuaiShouActivationStateMonthJob");
  40. int count = 0;
  41. long beginTime = System.currentTimeMillis();
  42. int rows = 800; //每次取数据条数
  43. String kuaishoufmrows = dictionaryDao.getValue("kuaishouasrows");
  44. if (!StringUtils.isEmpty(kuaishoufmrows)) {
  45. try {
  46. rows = Integer.parseInt(kuaishoufmrows);
  47. } catch (Exception e) {
  48. rows = 800;
  49. }
  50. }
  51. //TD_KUAISHOU_FIRSTMONTH 分区标识,分区标识从T_HASH_P01到T_HASH_P50
  52. String partition = "T_HASH_P";
  53. for (int i = 1; i <= 50; i++) {
  54. count = 0;
  55. partition = "T_HASH_P";
  56. try {
  57. if (i < 10) {
  58. partition = partition + "0" + i;
  59. } else {
  60. partition = partition + i;
  61. }
  62. //第一步、按分区标识获取快手有效用户数据
  63. List<KuaiShouActivationBean> dataList = kuaishouDao.getOrderByPart(partition, rows);
  64. String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  65. logger.info(partition + ",获取到全量快手有效数据:" + (dataList != null ? dataList.size() : "0") + "---time" + time);
  66. if (dataList != null && dataList.size() > 0) {
  67. CountDownLatch threadSignal = new CountDownLatch(dataList.size());
  68. ExecutorService executorService = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  69. //把数据更新为正在处理状态
  70. for(KuaiShouActivationBean hm : dataList){
  71. kuaishouDao.updOrdertMonth(hm.getOrderid());
  72. }
  73. for (KuaiShouActivationBean hm : dataList) {
  74. KuaiShouActivationStateMonthService continueService = new KuaiShouActivationStateMonthService(threadSignal, hm, kuaishouDao);
  75. executorService.execute(continueService);
  76. }
  77. executorService.shutdown();
  78. try {
  79. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  80. } catch (InterruptedException e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. /* //第二步、按分区标识获取查询失败数据
  85. List<HashMap<String, Object>> failDataList = kuaishouDao.getFailDataByPart(partition, rows);
  86. time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  87. logger.info(partition + ",获取到失败快手查询数据:" + (failDataList != null ? failDataList.size() : "0") + "---time" + time);
  88. if (failDataList != null && failDataList.size() > 0) {
  89. *//*failDataList = paraseData(failDataList); //去重复数据
  90. logger.info(partition + ",去重复前用户数:" + count + ",去重复后用户数:" + failDataList.size());*//*
  91. CountDownLatch threadSignal = new CountDownLatch(failDataList.size());
  92. ExecutorService executorService = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  93. for (HashMap<String, Object> hm : failDataList) {
  94. KuaiShouActivationFailDataService continueService = new KuaiShouActivationFailDataService(threadSignal, hm, kuaishouDao);
  95. executorService.execute(continueService);
  96. }
  97. executorService.shutdown();
  98. try {
  99. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  100. } catch (InterruptedException e) {
  101. e.printStackTrace();
  102. }
  103. }*/
  104. } catch (Exception e) {
  105. logger.info(partition + ",执行出现异常," + e.getMessage());
  106. e.printStackTrace();
  107. }
  108. Thread.sleep(100);
  109. }
  110. //logger.info(Thread.currentThread().getName()+"KuaishouPushMonthNewJob月初快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  111. WriteLogUtil.writeLong("KuaiShouActivationStateMonthJob月初查询快手用户激活状态定时任务完成,耗时:" + (System.currentTimeMillis() - beginTime) / 1000 + " 秒", logger, "KuaiShouActivationStateMonthJob");
  112. }
  113. /**
  114. * 按手机号码去除重复数据,确保获取的同一组数据里手机号码不相同,防止同一手机号码处理并发而造成处理异常
  115. *
  116. * @param dataList
  117. * @return
  118. */
  119. private List<HashMap<String, Object>> paraseData(List<HashMap<String, Object>> dataList) {
  120. //去重复后的数据集
  121. List<HashMap<String, Object>> reDataList = new ArrayList<HashMap<String, Object>>();
  122. HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
  123. for (HashMap<String, Object> dataMap : dataList) {
  124. if (tmpMap.containsKey(dataMap.get("USERID"))) {
  125. logger.info("重复数据," + dataMap);
  126. } else {
  127. tmpMap.put(dataMap.get("USERID"), dataMap);
  128. reDataList.add(dataMap);
  129. }
  130. }
  131. return reDataList;
  132. }
  133. public static void main(String[] args) {
  134. KuaishouPushMonthNewJob job = new KuaishouPushMonthNewJob();
  135. try {
  136. job.doProcess();
  137. } catch (Exception e) {
  138. e.printStackTrace();
  139. }
  140. }
  141. }
  142. class KuaiShouActivationStateMonthService implements Runnable {
  143. private static Logger logger = Logger.getLogger("kuaishouactivationstatemonth");
  144. private CountDownLatch threadSignal;
  145. private KuaishouDao kuaishouDao;
  146. private KuaiShouActivationBean hm; //用户数据,ID和USERID
  147. public KuaiShouActivationStateMonthService(CountDownLatch threadSignal, KuaiShouActivationBean hm, KuaishouDao kuaishouDao) {
  148. this.threadSignal = threadSignal;
  149. this.hm = hm;
  150. this.kuaishouDao = kuaishouDao;
  151. }
  152. @Override
  153. public void run() {
  154. Map<String, Object> logMap = new HashMap<>();
  155. logMap.put("userinfo", hm);
  156. String userid = hm.getUserid();
  157. String id = hm.getOrderid();
  158. KuaiShouActivationBean activationBean = new KuaiShouActivationBean();
  159. activationBean.setUserid(userid);
  160. activationBean.setOrderid(id);
  161. String resultcode = "0";
  162. try {
  163. //处理业务逻辑 调快手激活状态接口
  164. String resp = queryService(logMap);
  165. /* resp = "{\n" +
  166. " \"result\": \"0\",\n" +
  167. " \"host-name\": \"public-jswg-rs-kce-node959.idcyz.hb1.kwaidc.com\",\n" +
  168. " \"data\": {\n" +
  169. " \"actype\": \"0\",\n" +
  170. " \"updatetime\": \"20220709181033\",\n" +
  171. " \"status\": \"0\"\n" +
  172. " },\n" +
  173. " \"errorcode\": \"\"\n" +
  174. "}";*/
  175. JSONObject json = JSON.parseObject(resp); //调用接口后获取的结果
  176. Map map = JSONObject.parseObject(resp, Map.class);
  177. // 解析接口数据
  178. if (json != null && "0".equals(json.getString("result"))) {
  179. //未出现异常,设置为成功
  180. activationBean.setResultcode(map.get("result").toString());
  181. activationBean.setErrorinfo(map.get("errorcode").toString());
  182. activationBean.setData(map.get("data").toString());
  183. Map infoMessage = (Map) map.get("data");
  184. String status = null;
  185. String actype = null;
  186. String updatetim = null;
  187. if (!infoMessage.isEmpty()) {
  188. try {
  189. status = infoMessage.get("status").toString();
  190. } catch (Exception e) {
  191. status = null;
  192. }
  193. try {
  194. actype = infoMessage.get("actype").toString();
  195. } catch (Exception e) {
  196. actype = null;
  197. }
  198. try {
  199. updatetim = infoMessage.get("updatetime").toString();
  200. } catch (Exception e) {
  201. updatetim = null;
  202. }
  203. }
  204. activationBean.setStatus(status);
  205. activationBean.setActype(actype);
  206. activationBean.setUpdatetime(updatetim);
  207. insertOrUpdateData(hm, activationBean);
  208. } else if (json != null && "1".equals(json.getString("result"))) {
  209. // result = 1 结果识别码失败
  210. activationBean.setResultcode(map.get("result").toString());
  211. activationBean.setErrorinfo(map.get("errorcode").toString());
  212. insertOrUpdateData(hm, activationBean);
  213. } else {
  214. resultcode = "3";
  215. logger.info(userid + "=======>调快手激活状态查询接口失败");
  216. }
  217. } catch (Exception e) {
  218. resultcode = "3";
  219. logger.info(userid +"=======>处理数据失败" + e.getMessage() + e);
  220. System.out.println(userid +"=======>处理数据失败" + e.getMessage() + e);
  221. } finally {
  222. threadSignal.countDown();
  223. try {
  224. //如果为3,下次再处理
  225. boolean flag = kuaishouDao.updOrderValidData(id, resultcode);
  226. if (!flag) {
  227. logger.info("无数据更新");
  228. }
  229. } catch (Exception e) {
  230. e.printStackTrace();
  231. logger.info("更新数据出现异常," + e.getMessage());
  232. }
  233. //写日志
  234. logMap.put("jobname", "KuaiShouActivationStateMonthJob");
  235. String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  236. logMap.put("endtime", time);
  237. logger.info(JsonUtil.objectToJson(logMap));
  238. }
  239. }
  240. /**
  241. * 更新或者新增
  242. * @param activationBean
  243. *
  244. */
  245. private void insertOrUpdateData(KuaiShouActivationBean hm, KuaiShouActivationBean activationBean) throws SQLException {
  246. // 根据orderid查TD_KUAISHOU_ACTIVATESTATUS是否存在数据,存在则更新,不存在则新建
  247. KuaiShouActivationBean bean = kuaishouDao.getKSActivate(activationBean.getOrderid());
  248. if (bean != null) {
  249. // 更新
  250. if (!StringUtils.isEmpty(hm.getOrdertime())) {
  251. activationBean.setOrdertime(hm.getOrdertime());
  252. }
  253. if (!StringUtils.isEmpty(hm.getOrderchannel())) {
  254. activationBean.setOrderchannel(hm.getOrderchannel());
  255. }
  256. if (!StringUtils.isEmpty(hm.getOrderchannel2())) {
  257. activationBean.setOrderchannel2(hm.getOrderchannel2());
  258. }
  259. if (!StringUtils.isEmpty(hm.getCanceltime())) {
  260. activationBean.setCanceltime(hm.getCanceltime());
  261. }
  262. kuaishouDao.updKSActivate(activationBean);
  263. } else {
  264. // 新建
  265. activationBean.setId(IdGenerateUtil.uuid6());
  266. if (!StringUtils.isEmpty(hm.getSpid())) {
  267. activationBean.setSpid(hm.getSpid());
  268. }if (!StringUtils.isEmpty(hm.getSpname())) {
  269. activationBean.setSpname(hm.getSpname());
  270. }
  271. if (!StringUtils.isEmpty(hm.getOrdertime())) {
  272. activationBean.setOrdertime(hm.getOrdertime());
  273. }
  274. if (!StringUtils.isEmpty(hm.getOrderchannel())) {
  275. activationBean.setOrderchannel(hm.getOrderchannel());
  276. }
  277. if (!StringUtils.isEmpty(hm.getOrderchannel2())) {
  278. activationBean.setOrderchannel2(hm.getOrderchannel2());
  279. }
  280. if (!StringUtils.isEmpty(hm.getCanceltime())) {
  281. activationBean.setCanceltime(hm.getCanceltime());
  282. }
  283. kuaishouDao.inserttimeKSActivate(activationBean);
  284. }
  285. }
  286. /* *
  287. * 调查询接口
  288. * @param logMap
  289. * @return
  290. * */
  291. private String queryService(Map<String, Object> logMap) {
  292. String baseurl = "http://api.gifshow.com/rest/n/partner/cucc/callback/check/active";
  293. HashMap<String, Object> paramsMap = new HashMap<>();
  294. String userid = hm.getUserid();
  295. try {
  296. userid = DESUtil.encode(userid, "ksks1234");
  297. paramsMap.put("usermob", userid);
  298. logMap.put("userid", userid);
  299. } catch (Exception e) {
  300. e.printStackTrace();
  301. }
  302. StringBuffer stringBuffer = new StringBuffer();
  303. String str = stringBuffer.append(userid).append("2rFUrLZyKV9S").toString();
  304. String sign = MD5.MD5Encode(str);
  305. if (sign != null && sign.length() > 16) {
  306. sign = sign.substring(0, 16);
  307. }
  308. paramsMap.put("sign", sign);
  309. logMap.put("sign", sign);
  310. String result = "";
  311. try {
  312. result = URLUtil.postJson(baseurl, JsonUtil.objectToJson(paramsMap));
  313. } catch (Exception e) {
  314. e.printStackTrace();
  315. logger.info("调查询快手激活状态接口失败" + e.getMessage() + e);
  316. }
  317. return result;
  318. }
  319. }