c07febbe8f04151ed5bf579759e55efc55978641.svn-base 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package com.chinacreator.process.job;
  2. import java.util.ArrayList;
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. import com.alibaba.fastjson.JSONObject;
  7. import org.apache.log4j.Logger;
  8. import org.quartz.DisallowConcurrentExecution;
  9. import org.quartz.PersistJobDataAfterExecution;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import com.chinacreator.common.util.DESUtil;
  12. import com.chinacreator.common.util.MD5;
  13. import com.chinacreator.common.util.URLUtil;
  14. import com.chinacreator.process.dao.DictionaryDao;
  15. import com.chinacreator.process.util.HttpInvoke;
  16. import com.chinacreator.process.util.JsonUtil;
  17. import com.chinacreator.video.queue.MessageService;
  18. import com.chinacreator.video.queue.bean.MessagePipe;
  19. /**
  20. * 抖音数据透传异常数据重试
  21. * @author xu.zhou
  22. * @date 20200610
  23. */
  24. @PersistJobDataAfterExecution
  25. @DisallowConcurrentExecution
  26. public class DouyinRetryMQJob {
  27. private Logger logger = Logger.getLogger("douyinretry");
  28. @Autowired
  29. private DictionaryDao dictionaryDao;
  30. @Autowired
  31. private MessageService messageService;
  32. public void doProcess() throws Exception {
  33. logger.info("接收vipretry队列JOB启动");
  34. if (dictionaryDao.getValue("recivemq").equals("0")) {
  35. long beginTime = System.currentTimeMillis();
  36. List<MessagePipe> list = messageService.reciveBatchMessage("douyinretry", 500);
  37. logger.info("接收douyinretry队列花费时间:" + (System.currentTimeMillis() - beginTime));
  38. List<Map> dataList = new ArrayList<Map>();
  39. for (MessagePipe messagePipe : list) {
  40. Map reqBean = transBean(messagePipe.getBody());
  41. dataList.add(reqBean);
  42. }
  43. if(dataList != null && dataList.size()>0){
  44. for (Map reqBean : dataList) {
  45. retry((String)reqBean.get("data"),(String)reqBean.get("filename"));
  46. }
  47. }
  48. } else {
  49. logger.info("停止接收douyinretry队列消息");
  50. }
  51. }
  52. public Map transBean(Map<String, Object> body) {
  53. String jsonStr = JsonUtil.objectToJson(body);
  54. return (Map) JsonUtil.jsonToBean(jsonStr, Map.class);
  55. }
  56. public void retry(String data,String filename) {
  57. long start = System.currentTimeMillis();
  58. String result = "";
  59. try {
  60. int timeout = 30 * 1000; //超时时间
  61. String invokeUrl = dictionaryDao.getValue("douyinurl");
  62. String douyindespwd = dictionaryDao.getValue("douyindespwd");
  63. String params = data.split("\\|\\|\\|")[1];
  64. String timestamp= (System.currentTimeMillis())/1000+"";
  65. params = DESUtil.encode(params, douyindespwd);
  66. String signature = MD5.MD5Encode(params+timestamp+douyindespwd);
  67. JSONObject reqJson = new JSONObject();
  68. reqJson.put("params", params);
  69. reqJson.put("timestamp", timestamp);
  70. reqJson.put("signature", signature);
  71. String paramsJson = reqJson.toJSONString();
  72. if(invokeUrl.startsWith("https")){
  73. result = HttpInvoke.sendhttpsReq("POST", invokeUrl, paramsJson, getProperty(), timeout);
  74. }else{
  75. result = HttpInvoke.sendHttpByPost("POST", invokeUrl, paramsJson, getProperty(), timeout);
  76. }
  77. logger.info("调接口返回信息result=>"+result+", 推送数据:"+paramsJson);
  78. } catch (Exception e) {
  79. //logger.error("推送数据出错:文件名=》"+filename+",异常信息=》"+e.getMessage()+", 推送数据:"+data);
  80. e.printStackTrace();
  81. result = "推送出现异常,"+e.getMessage();
  82. inserMq(data, filename);
  83. } finally {
  84. logger.info("文件名:"+filename+",耗时:"+(System.currentTimeMillis() - start)+" ms"+", 推送结果:"+result+",data:"+data);
  85. }
  86. }
  87. /**
  88. * 获取请求属性性
  89. * @return
  90. */
  91. private static Map getProperty(){
  92. Map reqProperty = new HashMap();
  93. reqProperty.put("Content-type", "application/json;charset=UTF-8");
  94. return reqProperty;
  95. }
  96. /**
  97. * 推送办理成功的短信到队列
  98. * @param orderInfo
  99. */
  100. public void inserMq(String data,String filename){
  101. try{
  102. logger.info("处理失败,重新推送入队列,filename=>"+filename+",data=>"+data);
  103. Map<String, String> map = new HashMap<String, String>();
  104. map.put("data", data);
  105. map.put("filename", filename);
  106. map.put("type", "douyinretry");
  107. String mqReciveUrl = dictionaryDao.getValue("mqReciveUrl");
  108. URLUtil.post(mqReciveUrl, JsonUtil.objectToJson(map));
  109. }catch (Exception e){
  110. e.printStackTrace();
  111. }
  112. }
  113. }