69fa21fcf146c4b688b9a7961c14027ff611b6af.svn-base 10 KB


  1. package com.chinacreator.process.job;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.chinacreator.common.util.DESUtil;
  4. import com.chinacreator.common.util.MD5;
  5. import com.chinacreator.process.dao.DictionaryDao;
  6. import com.chinacreator.process.dao.KafkaMusicDao;
  7. import com.chinacreator.process.util.HttpInvoke;
  8. import com.chinacreator.process.util.JsonUtil;
  9. import com.chinacreator.process.util.RsaUtils;
  10. import com.chinacreator.video.queue.MessageService;
  11. import com.chinacreator.video.queue.bean.MessagePipe;
  12. import org.apache.commons.codec.digest.DigestUtils;
  13. import org.apache.commons.lang.StringUtils;
  14. import org.apache.log4j.Logger;
  15. import org.quartz.DisallowConcurrentExecution;
  16. import org.quartz.PersistJobDataAfterExecution;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import java.nio.charset.Charset;
  19. import java.util.HashMap;
  20. import java.util.List;
  21. import java.util.Map;
  22. import java.util.concurrent.*;
  23. /**
  24. * 音乐公司订购关系推送
  25. *
  26. * @author yu.su
  27. * @date 2020.08.12
  28. */
  29. @PersistJobDataAfterExecution
  30. @DisallowConcurrentExecution
  31. public class KafkaMusicJob {
  32. private static Logger logger = Logger.getLogger("KafkaMusic");
  33. private static Logger log = Logger.getLogger(KafkaMusicJob.class);
  34. @Autowired
  35. private KafkaMusicDao kafkaMusicDao;
  36. @Autowired
  37. private MessageService messageService;
  38. @Autowired
  39. private DictionaryDao dictionaryDao;
  40. public void doProcess() throws Exception {
  41. log.info(Thread.currentThread().getName() + "KafkaMusicJob音乐公司订购关系推送定时任务开始");
  42. long beginTime = System.currentTimeMillis();
  43. List<MessagePipe> list = messageService.reciveBatchMessage("kafkaMusic", 500);
  44. if (list != null && list.size() > 0) {
  45. //对接路径
  46. /* String url = kafkaMusicDao.queryValue("kafkaMusic");
  47. if (StringUtils.isEmpty(url)) {
  48. logger.error("音乐公司订购关系路径不存在");
  49. throw new BusinessException("8000", "音乐公司订购关系路径不存在");
  50. }*/
  51. String url = dictionaryDao.getValue("kafkaMusicUrl");
  52. logger.info("音乐公司订购关系请求接口" + url);
  53. logger.info("音乐公司订购关系数据:" + list);
  54. CountDownLatch threadSignal = new CountDownLatch(list.size());
  55. ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  56. for (MessagePipe messagePipe : list) {
  57. Map<String, Object> params = messagePipe.getBody();
  58. logger.info("转换前的参数:" + params);
  59. try {
  60. Object param = params.get("param");
  61. if (param == null || param == "") {
  62. params = generateParams(params);
  63. logger.info("转换后的参数:" + JSONObject.toJSONString(params));
  64. }
  65. } catch (Exception e) {
  66. logger.info("参数转换发生错误========" + e.getMessage());
  67. e.printStackTrace();
  68. }
  69. if (params != null && params.size() > 0) {
  70. KafkaMusicEntity kafkaMusicEntity = new KafkaMusicEntity(list.size(), params, threadSignal, url);
  71. executorService.execute(kafkaMusicEntity);
  72. }
  73. }
  74. executorService.shutdown();
  75. }
  76. log.info(Thread.currentThread().getName() + "KuaishouPushMonthJob月初快手流量未耗尽定时任务完成,耗时:" + (System.currentTimeMillis() - beginTime) / 1000 + " 秒");
  77. }
  78. public Map<String, Object> generateParams(Map<String, Object> params) throws Exception {
  79. String userid = String.valueOf(params.get("userid"));
  80. String encode = DESUtil.encode(userid, "lol@#ewq");
  81. params.put("timestamp", System.currentTimeMillis());
  82. params.put("channel", "KafkaMusic");
  83. String type = params.get("type") + "";
  84. if ("0.0".equals(type)) {
  85. params.put("type", "0");
  86. } else if ("1.0".equals(type)) {
  87. params.put("type", "1");
  88. }
  89. params.put("userid", encode);
  90. params.put("endtime", params.get("endtime") == null ? "" : params.get("endtime"));
  91. params.put("opertime", params.get("opertime") == null ? "" : params.get("opertime"));
  92. String opertime = (String) params.get("opertime");
  93. params.put("signature", MD5.MD5Encode(opertime + params.get("endtime") +
  94. params.get("kafkaid") + params.get("cpid") + params.get("spid") + params.get("userid") + params.get("type")
  95. + params.get("channel") + params.get("timestamp") + "lol@#ewq"));
  96. logger.info("md5加密参数:" + "" + params.get("opertime") + params.get("endtime") +
  97. params.get("kafkaid") + params.get("cpid") + params.get("spid") + params.get("userid") + params.get("type")
  98. + params.get("channel") + params.get("timestamp") + "lol@#ewq");
  99. String param = "channel=" + params.get("channel") + "&cpid=" + params.get("cpid") + "&endtime=" + params.get("endtime")
  100. + "&kafkaid=" + params.get("kafkaid") + "&opertime=" + params.get("opertime") + "&signature=" + params.get("signature")
  101. + "&spid=" + params.get("spid") + "&timestamp=" + params.get("timestamp") + "&type=" + params.get("type") + "&userid=" + params.get("userid");
  102. logger.info("param拼接参数" + param);
  103. params.put("param", RsaUtils.encryptWithPublicKey(param, "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAg2a0gTu0AukDz0QD1PLV5lYy9POeapXlZOvwkK52cS71IM9GJecmVlzlHUlpfxLuIkikCy7SPnVb9Py9G6zXQQ4B/1YKmgPobW3ATy+fPnNX1f86D41t00TO5WWiBIkQFh99Q7iWERgkEsMYVfd4mxDUqiqdV5FiY1EYbDUlHOdqhwqI1zYxiYjAD9ZqoD9nQ6/X9RXrrXxQLbJIjJ30vevV0oxMCpBqRFvIhfrCSv6TNp+A2r65Z44gQtUfToXVq0bRHC7ZKQW3u+00GbT5wQUpW3UiN1w655cQRezgJXqxkZxoOBhRFll+u0A2JS/DvXH+rtS9JnyogwklWhTG2QIDAQAB", Charset.defaultCharset()));
  104. return params;
  105. }
  106. class KafkaMusicEntity implements Runnable {
  107. private int totalSize;
  108. private Map<String, Object> params;
  109. private CountDownLatch threadSignal;
  110. private String url;
  111. public KafkaMusicEntity(int totalSize, Map<String, Object> params, CountDownLatch threadSignal, String url) {
  112. this.totalSize = totalSize;
  113. this.params = params;
  114. this.threadSignal = threadSignal;
  115. this.url = url;
  116. }
  117. @Override
  118. public void run() {
  119. long startime = System.currentTimeMillis();
  120. Map<String, Object> logMap = new HashMap<String, Object>();
  121. String data = JSONObject.toJSONString(params);
  122. String result = "";
  123. String resultCode = "";
  124. String msg = "";
  125. try {
  126. // result = URLUtil.post(url, data,0,"application/json;charset=utf-8"+";timestamp="+timestamp+";sign="+sign+";nonce="+nonce);
  127. result = HttpInvoke.sendHttpByPost("POST", url, data, getReqProperty());
  128. String code = "";
  129. if (StringUtils.isNotEmpty(result)) {
  130. Map<String, String> map = JSONObject.parseObject(result, Map.class);
  131. code = map.get("resultcode");
  132. if (!"0".equals(code)) {
  133. logger.info("音乐侧订购关系请求失败,重新推入队列");
  134. insertMusicMq(params);
  135. }
  136. } else {
  137. logger.info("音乐侧订购关系请求失败,重新推入队列");
  138. insertMusicMq(params);
  139. }
  140. resultCode = code;
  141. msg = "音乐公司订购关系对接流程无问题";
  142. } catch (Exception e) {
  143. resultCode = "8000";
  144. msg = "音乐公司订购关系对接出现异常=============" + e.getMessage();
  145. e.printStackTrace();
  146. insertMusicMq(params);
  147. } finally {
  148. threadSignal.countDown();
  149. //写日志
  150. logMap.put("jobname", "KafkaMusic");
  151. logMap.put("resultcode", resultCode);
  152. logMap.put("errorinfo", msg);
  153. logMap.put("time", (System.currentTimeMillis() - startime) + "");
  154. logMap.put("count", totalSize + "/" + (totalSize - threadSignal.getCount()));
  155. logMap.put("result", result);
  156. logMap.put("data", data);
  157. logger.info("音乐公司订购关系对接日志" + JsonUtil.objectToJson(logMap));
  158. }
  159. }
  160. public Map<String, String> getReqProperty() {
  161. Map<String, String> map = new HashMap<String, String>();
  162. String timestamp = System.currentTimeMillis() + "";
  163. String key = "VNEU8G4V";
  164. String sign = DigestUtils.md5Hex(timestamp + key);
  165. String nonce = (long) Math.floor(Math.random() * 200000 + 1) + "";
  166. map.put("timestamp", timestamp);
  167. map.put("sign", sign);
  168. map.put("nonce", nonce);
  169. map.put("Content-Type", "application/json");
  170. map.put("charset", "utf-8");
  171. return map;
  172. }
  173. }
  174. /*
  175. * 推送添加音乐公司订购关系最多三次
  176. * @param orderInfo
  177. */
  178. public void insertMusicMq(Map reqBean) {
  179. try {
  180. String sendcont = (String) reqBean.get("sendcont");
  181. if (StringUtils.isEmpty(sendcont)) {
  182. sendcont = "0";
  183. }
  184. if ("3".equals(sendcont)) {
  185. return;
  186. }
  187. MessagePipe mpipe = new MessagePipe();
  188. mpipe.setHeader("kafkaMusic");
  189. mpipe.addBody("kafkaid", reqBean.get("kafkaid"));
  190. mpipe.addBody("cpid", reqBean.get("cpid"));
  191. mpipe.addBody("channel", reqBean.get("channel"));
  192. mpipe.addBody("opertime", reqBean.get("opertime"));
  193. mpipe.addBody("endtime", reqBean.get("endtime"));
  194. mpipe.addBody("type", reqBean.get("type"));
  195. mpipe.addBody("userid", reqBean.get("userid"));
  196. mpipe.addBody("spid", reqBean.get("spid"));
  197. mpipe.addBody("sendcont", (Integer.parseInt(sendcont) + 1) + "");
  198. this.messageService.sendMessage(mpipe);
  199. } catch (Exception e) {
  200. logger.error("添加音乐公司订购关系队列推送出现异常," + e.getMessage());
  201. e.printStackTrace();
  202. }
  203. }
  204. }