|
- package com.chinacreator.process.job;
- import com.alibaba.fastjson.JSONObject;
- import com.chinacreator.common.util.DESUtil;
- import com.chinacreator.common.util.MD5;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.dao.KafkaMusicDao;
- import com.chinacreator.process.util.HttpInvoke;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.RsaUtils;
- import com.chinacreator.video.queue.MessageService;
- import com.chinacreator.video.queue.bean.MessagePipe;
- import org.apache.commons.codec.digest.DigestUtils;
- import org.apache.commons.lang.StringUtils;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import java.nio.charset.Charset;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.*;
- /**
- * 音乐公司订购关系推送
- *
- * @author yu.su
- * @date 2020.08.12
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class KafkaMusicJob {
- private static Logger logger = Logger.getLogger("KafkaMusic");
- private static Logger log = Logger.getLogger(KafkaMusicJob.class);
- @Autowired
- private KafkaMusicDao kafkaMusicDao;
- @Autowired
- private MessageService messageService;
- @Autowired
- private DictionaryDao dictionaryDao;
- public void doProcess() throws Exception {
- log.info(Thread.currentThread().getName() + "KafkaMusicJob音乐公司订购关系推送定时任务开始");
- long beginTime = System.currentTimeMillis();
- List<MessagePipe> list = messageService.reciveBatchMessage("kafkaMusic", 500);
- if (list != null && list.size() > 0) {
- //对接路径
- /* String url = kafkaMusicDao.queryValue("kafkaMusic");
- if (StringUtils.isEmpty(url)) {
- logger.error("音乐公司订购关系路径不存在");
- throw new BusinessException("8000", "音乐公司订购关系路径不存在");
- }*/
- String url = dictionaryDao.getValue("kafkaMusicUrl");
- logger.info("音乐公司订购关系请求接口" + url);
- logger.info("音乐公司订购关系数据:" + list);
- CountDownLatch threadSignal = new CountDownLatch(list.size());
- ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- for (MessagePipe messagePipe : list) {
- Map<String, Object> params = messagePipe.getBody();
- logger.info("转换前的参数:" + params);
- try {
- Object param = params.get("param");
- if (param == null || param == "") {
- params = generateParams(params);
- logger.info("转换后的参数:" + JSONObject.toJSONString(params));
- }
- } catch (Exception e) {
- logger.info("参数转换发生错误========" + e.getMessage());
- e.printStackTrace();
- }
- if (params != null && params.size() > 0) {
- KafkaMusicEntity kafkaMusicEntity = new KafkaMusicEntity(list.size(), params, threadSignal, url);
- executorService.execute(kafkaMusicEntity);
- }
- }
- executorService.shutdown();
- }
- log.info(Thread.currentThread().getName() + "KuaishouPushMonthJob月初快手流量未耗尽定时任务完成,耗时:" + (System.currentTimeMillis() - beginTime) / 1000 + " 秒");
- }
- public Map<String, Object> generateParams(Map<String, Object> params) throws Exception {
- String userid = String.valueOf(params.get("userid"));
- String encode = DESUtil.encode(userid, "lol@#ewq");
- params.put("timestamp", System.currentTimeMillis());
- params.put("channel", "KafkaMusic");
- String type = params.get("type") + "";
- if ("0.0".equals(type)) {
- params.put("type", "0");
- } else if ("1.0".equals(type)) {
- params.put("type", "1");
- }
- params.put("userid", encode);
- params.put("endtime", params.get("endtime") == null ? "" : params.get("endtime"));
- params.put("opertime", params.get("opertime") == null ? "" : params.get("opertime"));
- String opertime = (String) params.get("opertime");
- params.put("signature", MD5.MD5Encode(opertime + params.get("endtime") +
- params.get("kafkaid") + params.get("cpid") + params.get("spid") + params.get("userid") + params.get("type")
- + params.get("channel") + params.get("timestamp") + "lol@#ewq"));
- logger.info("md5加密参数:" + "" + params.get("opertime") + params.get("endtime") +
- params.get("kafkaid") + params.get("cpid") + params.get("spid") + params.get("userid") + params.get("type")
- + params.get("channel") + params.get("timestamp") + "lol@#ewq");
- String param = "channel=" + params.get("channel") + "&cpid=" + params.get("cpid") + "&endtime=" + params.get("endtime")
- + "&kafkaid=" + params.get("kafkaid") + "&opertime=" + params.get("opertime") + "&signature=" + params.get("signature")
- + "&spid=" + params.get("spid") + "×tamp=" + params.get("timestamp") + "&type=" + params.get("type") + "&userid=" + params.get("userid");
- logger.info("param拼接参数" + param);
- params.put("param", RsaUtils.encryptWithPublicKey(param, "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAg2a0gTu0AukDz0QD1PLV5lYy9POeapXlZOvwkK52cS71IM9GJecmVlzlHUlpfxLuIkikCy7SPnVb9Py9G6zXQQ4B/1YKmgPobW3ATy+fPnNX1f86D41t00TO5WWiBIkQFh99Q7iWERgkEsMYVfd4mxDUqiqdV5FiY1EYbDUlHOdqhwqI1zYxiYjAD9ZqoD9nQ6/X9RXrrXxQLbJIjJ30vevV0oxMCpBqRFvIhfrCSv6TNp+A2r65Z44gQtUfToXVq0bRHC7ZKQW3u+00GbT5wQUpW3UiN1w655cQRezgJXqxkZxoOBhRFll+u0A2JS/DvXH+rtS9JnyogwklWhTG2QIDAQAB", Charset.defaultCharset()));
- return params;
- }
- class KafkaMusicEntity implements Runnable {
- private int totalSize;
- private Map<String, Object> params;
- private CountDownLatch threadSignal;
- private String url;
- public KafkaMusicEntity(int totalSize, Map<String, Object> params, CountDownLatch threadSignal, String url) {
- this.totalSize = totalSize;
- this.params = params;
- this.threadSignal = threadSignal;
- this.url = url;
- }
- @Override
- public void run() {
- long startime = System.currentTimeMillis();
- Map<String, Object> logMap = new HashMap<String, Object>();
- String data = JSONObject.toJSONString(params);
- String result = "";
- String resultCode = "";
- String msg = "";
- try {
- // result = URLUtil.post(url, data,0,"application/json;charset=utf-8"+";timestamp="+timestamp+";sign="+sign+";nonce="+nonce);
- result = HttpInvoke.sendHttpByPost("POST", url, data, getReqProperty());
- String code = "";
- if (StringUtils.isNotEmpty(result)) {
- Map<String, String> map = JSONObject.parseObject(result, Map.class);
- code = map.get("resultcode");
- if (!"0".equals(code)) {
- logger.info("音乐侧订购关系请求失败,重新推入队列");
- insertMusicMq(params);
- }
- } else {
- logger.info("音乐侧订购关系请求失败,重新推入队列");
- insertMusicMq(params);
- }
- resultCode = code;
- msg = "音乐公司订购关系对接流程无问题";
- } catch (Exception e) {
- resultCode = "8000";
- msg = "音乐公司订购关系对接出现异常=============" + e.getMessage();
- e.printStackTrace();
- insertMusicMq(params);
- } finally {
- threadSignal.countDown();
- //写日志
- logMap.put("jobname", "KafkaMusic");
- logMap.put("resultcode", resultCode);
- logMap.put("errorinfo", msg);
- logMap.put("time", (System.currentTimeMillis() - startime) + "");
- logMap.put("count", totalSize + "/" + (totalSize - threadSignal.getCount()));
- logMap.put("result", result);
- logMap.put("data", data);
- logger.info("音乐公司订购关系对接日志" + JsonUtil.objectToJson(logMap));
- }
- }
- public Map<String, String> getReqProperty() {
- Map<String, String> map = new HashMap<String, String>();
- String timestamp = System.currentTimeMillis() + "";
- String key = "VNEU8G4V";
- String sign = DigestUtils.md5Hex(timestamp + key);
- String nonce = (long) Math.floor(Math.random() * 200000 + 1) + "";
- map.put("timestamp", timestamp);
- map.put("sign", sign);
- map.put("nonce", nonce);
- map.put("Content-Type", "application/json");
- map.put("charset", "utf-8");
- return map;
- }
- }
- /*
- * 推送添加音乐公司订购关系最多三次
- * @param orderInfo
- */
- public void insertMusicMq(Map reqBean) {
- try {
- String sendcont = (String) reqBean.get("sendcont");
- if (StringUtils.isEmpty(sendcont)) {
- sendcont = "0";
- }
- if ("3".equals(sendcont)) {
- return;
- }
- MessagePipe mpipe = new MessagePipe();
- mpipe.setHeader("kafkaMusic");
- mpipe.addBody("kafkaid", reqBean.get("kafkaid"));
- mpipe.addBody("cpid", reqBean.get("cpid"));
- mpipe.addBody("channel", reqBean.get("channel"));
- mpipe.addBody("opertime", reqBean.get("opertime"));
- mpipe.addBody("endtime", reqBean.get("endtime"));
- mpipe.addBody("type", reqBean.get("type"));
- mpipe.addBody("userid", reqBean.get("userid"));
- mpipe.addBody("spid", reqBean.get("spid"));
- mpipe.addBody("sendcont", (Integer.parseInt(sendcont) + 1) + "");
- this.messageService.sendMessage(mpipe);
- } catch (Exception e) {
- logger.error("添加音乐公司订购关系队列推送出现异常," + e.getMessage());
- e.printStackTrace();
- }
- }
- }
|