189dcd12b06377c46965a4bfd52640231bfe6c62.svn-base 12 KB


  1. package com.chinacreator.process.job;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.chinacreator.common.util.DESUtil;
  5. import com.chinacreator.common.util.MD5;
  6. import com.chinacreator.process.bean.KuaiShouActivationBean;
  7. import com.chinacreator.process.bean.OrderRealationBean;
  8. import com.chinacreator.process.dao.KuaishouDao;
  9. import com.chinacreator.process.util.IdGenerateUtil;
  10. import com.chinacreator.process.util.JsonUtil;
  11. import com.chinacreator.process.util.URLUtil;
  12. import com.chinacreator.process.util.WriteLogUtil;
  13. import org.apache.log4j.Logger;
  14. import org.quartz.DisallowConcurrentExecution;
  15. import org.quartz.PersistJobDataAfterExecution;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.util.StringUtils;
  18. import java.text.SimpleDateFormat;
  19. import java.util.*;
  20. import java.util.concurrent.*;
  21. /**
  22. * kuaishou查询激活状态,每周更新
  23. * <p>
  24. * 每2分钟执行一次
  25. *
  26. * @author shuiying.ou
  27. * @date 20220705
  28. */
  29. @PersistJobDataAfterExecution
  30. @DisallowConcurrentExecution
  31. public class KuaishouActivationStateJob {
  32. private static Logger logger = Logger.getLogger("kuaishouactivationstate");
  33. @Autowired
  34. private KuaishouDao kuaishouDao;
  35. public void doProcess() {
  36. WriteLogUtil.writeLong("KuaishouActivationStateJob查询快手用户激活状态定时任务开始", logger, "KuaishouActivationStateJob");
  37. long beginTime = System.currentTimeMillis();
  38. try {
  39. int rows = 800; //每次取数据条数
  40. //按分区标识获取快手订购数据
  41. List<KuaiShouActivationBean> dataList = kuaishouDao.getOrderKuaiShou(rows);
  42. if (dataList != null) {
  43. logger.info("新增快手有效用户量:" + dataList.size());
  44. }
  45. List<KuaiShouActivationBean> unActivationList = kuaishouDao.getUnActivationStatus(rows);
  46. if (dataList != null && unActivationList != null) {
  47. dataList.addAll(unActivationList);
  48. }else if(dataList == null && unActivationList != null){
  49. dataList = unActivationList;
  50. }
  51. String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  52. if (dataList != null) {
  53. logger.info("获取到总数据:" + dataList.size() + "---time" + time);
  54. }
  55. if (dataList!= null && dataList.size() > 0) {
  56. CountDownLatch threadSignal = new CountDownLatch(dataList.size());
  57. ExecutorService executorService = new ThreadPoolExecutor(2, 2, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  58. //把数据更新为正在处理状态
  59. for (KuaiShouActivationBean hm : dataList) {
  60. String acticationstatus = "2";
  61. kuaishouDao.updOrderKuaiShouDataToTwo(hm.getId(), acticationstatus);
  62. }
  63. for (KuaiShouActivationBean hm : dataList) {
  64. KuaishouActivationStateService continueService = new KuaishouActivationStateService(threadSignal, hm, kuaishouDao);
  65. executorService.execute(continueService);
  66. }
  67. executorService.shutdown();
  68. try {
  69. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  70. } catch (InterruptedException e) {
  71. e.printStackTrace();
  72. }
  73. }else {
  74. logger.info("无新增数据与未激活数据");
  75. }
  76. Thread.sleep(100);
  77. } catch (Exception e) {
  78. e.printStackTrace();
  79. logger.info("查询失败" + e.getMessage() + e);
  80. }
  81. WriteLogUtil.writeLong("KuaishouActivationStateJob查询快手用户激活状态定时任务完成,耗时:" + (System.currentTimeMillis() - beginTime) / 1000 + " 秒", logger, "KuaishouActivationStateJob");
  82. }
  83. public static void main(String[] args) {
  84. KuaishouPushMonthNewJob job = new KuaishouPushMonthNewJob();
  85. try {
  86. job.doProcess();
  87. } catch (Exception e) {
  88. e.printStackTrace();
  89. }
  90. }
  91. }
  92. class KuaishouActivationStateService implements Runnable {
  93. private static Logger logger = Logger.getLogger("kuaishouactivationstate");
  94. private CountDownLatch threadSignal;
  95. private KuaishouDao kuaishouDao;
  96. private KuaiShouActivationBean hm; //用户数据,ID和USERID
  97. public KuaishouActivationStateService(CountDownLatch threadSignal, KuaiShouActivationBean hm, KuaishouDao kuaishouDao) {
  98. this.threadSignal = threadSignal;
  99. this.hm = hm;
  100. this.kuaishouDao = kuaishouDao;
  101. }
  102. @Override
  103. public void run() {
  104. Map<String, Object> logMap = new HashMap<>();
  105. logMap.put("userinfo", hm);
  106. String acticationstatus = "1";
  107. String id = hm.getId();
  108. String userid = hm.getUserid();
  109. KuaiShouActivationBean activationBean = new KuaiShouActivationBean();
  110. //未出现异常,默认设置为成功
  111. acticationstatus = "0";
  112. try {
  113. //处理业务逻辑 调快手激活状态接口
  114. String resp = queryService(logMap);
  115. JSONObject json = JSON.parseObject(resp); //调用接口后获取的结果
  116. Map map = JSONObject.parseObject(resp, Map.class);
  117. // 解析接口数据
  118. if (json != null && "0".equals(json.getString("result"))) {
  119. activationBean.setResultcode(map.get("result").toString());
  120. activationBean.setErrorinfo(map.get("errorcode").toString());
  121. activationBean.setData(map.get("data").toString());
  122. Map infoMessage = (Map) map.get("data");
  123. String status = null;
  124. String actype = null;
  125. String updatetim = null;
  126. if (!infoMessage.isEmpty()) {
  127. try {
  128. status = infoMessage.get("status").toString();
  129. } catch (Exception e) {
  130. status = null;
  131. }
  132. try {
  133. actype = infoMessage.get("actype").toString();
  134. } catch (Exception e) {
  135. actype = null;
  136. }
  137. try {
  138. updatetim = infoMessage.get("updatetime").toString();
  139. } catch (Exception e) {
  140. updatetim = null;
  141. }
  142. }
  143. activationBean.setStatus(status);
  144. activationBean.setActype(actype);
  145. activationBean.setUpdatetime(updatetim);
  146. } else if (json != null && "1".equals(json.getString("result"))) {
  147. // result = 1 接口返回失败
  148. activationBean.setResultcode(map.get("result").toString());
  149. activationBean.setErrorinfo(map.get("errorcode").toString());
  150. } else {
  151. //查询失败,设置为失败
  152. acticationstatus = "3";
  153. logger.info(id + "=======>调快手激活状态接口失败");
  154. }
  155. if ("0".equals(acticationstatus)) {
  156. // 根据TD_KUAISHOU_FIRSTMONTH表中id查订购关系表中的数据
  157. OrderRealationBean orderRealationBean = kuaishouDao.getOrderRealationById(id);
  158. if (orderRealationBean != null && !StringUtils.isEmpty(orderRealationBean.getId())) {
  159. activationBean.setOrderid(id);
  160. // 根据orderid查 TD_KUAISHOU_VALID_STATUS 是否存在数据,存在则更新,不存在则新建
  161. KuaiShouActivationBean bean = kuaishouDao.getKSActivate(id);
  162. if (bean != null) {
  163. // 更新
  164. if (!StringUtils.isEmpty(orderRealationBean.getOrdertime())) {
  165. activationBean.setOrdertime(orderRealationBean.getOrdertime());
  166. }
  167. if (!StringUtils.isEmpty(orderRealationBean.getOrderchannel())) {
  168. activationBean.setOrderchannel(orderRealationBean.getOrderchannel());
  169. }
  170. if (!StringUtils.isEmpty(orderRealationBean.getCanceltime())) {
  171. activationBean.setCanceltime(orderRealationBean.getCanceltime());
  172. }
  173. kuaishouDao.updKSActivate(activationBean);
  174. } else {
  175. // 新建
  176. activationBean.setId(IdGenerateUtil.uuid6());
  177. activationBean.setUserid(orderRealationBean.getUserid());
  178. if (!StringUtils.isEmpty(orderRealationBean.getSpid())) {
  179. activationBean.setSpid(orderRealationBean.getSpid());
  180. }
  181. if (!StringUtils.isEmpty(orderRealationBean.getSpname())) {
  182. activationBean.setSpname(orderRealationBean.getSpname());
  183. }
  184. if (!StringUtils.isEmpty(orderRealationBean.getOrdertime())) {
  185. activationBean.setOrdertime(orderRealationBean.getOrdertime());
  186. }
  187. if (!StringUtils.isEmpty(orderRealationBean.getOrderchannel())) {
  188. activationBean.setOrderchannel(orderRealationBean.getOrderchannel());
  189. }
  190. if (!StringUtils.isEmpty(orderRealationBean.getCanceltime())) {
  191. activationBean.setCanceltime(orderRealationBean.getCanceltime());
  192. }
  193. kuaishouDao.inserttimeKSActivate(activationBean);
  194. }
  195. } else {
  196. logger.info(id + "=======>订购关系表中无该数据");
  197. }
  198. } else {
  199. logger.info(userid + "=======>调快手激活状态查询接口失败");
  200. }
  201. } catch (Exception e) {
  202. //出现异常,设置为失败
  203. acticationstatus = "3";
  204. } finally {
  205. threadSignal.countDown();
  206. try {
  207. //如果为3,下次再处理
  208. kuaishouDao.updOrderKuaiShouData(id, acticationstatus);
  209. } catch (Exception e) {
  210. e.printStackTrace();
  211. logger.info("更新数据出现异常," + e.getMessage());
  212. }
  213. //写日志
  214. logMap.put("jobname", "KuaishouActivationStateJob");
  215. logMap.put("activationBean", activationBean);
  216. String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  217. logMap.put("time", time);
  218. logger.info(JsonUtil.objectToJson(logMap));
  219. }
  220. }
  221. private String queryService(Map<String, Object> logMap) {
  222. String baseurl = "http://api.gifshow.com/rest/n/partner/cucc/callback/check/active";
  223. HashMap<String, Object> paramsMap = new HashMap<>();
  224. String userid = hm.getUserid();
  225. try {
  226. userid = DESUtil.encode(userid, "ksks1234");
  227. paramsMap.put("usermob", userid);
  228. logMap.put("userid", userid);
  229. } catch (Exception e) {
  230. e.printStackTrace();
  231. }
  232. StringBuffer stringBuffer = new StringBuffer();
  233. String str = stringBuffer.append(userid).append("2rFUrLZyKV9S").toString();
  234. String sign = MD5.MD5Encode(str);
  235. if (sign != null && sign.length() > 16) {
  236. sign = sign.substring(0, 16);
  237. }
  238. paramsMap.put("sign", sign);
  239. logMap.put("sign", sign);
  240. String result = "";
  241. try {
  242. result = URLUtil.postJson(baseurl, JsonUtil.objectToJson(paramsMap));
  243. } catch (Exception e) {
  244. e.printStackTrace();
  245. logger.info("调查询快手激活状态接口失败" + e.getMessage() + e);
  246. }
  247. return result;
  248. }
  249. }