123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- package com.chinacreator.process.job;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import com.chinacreator.common.util.DESUtil;
- import com.chinacreator.common.util.MD5;
- import com.chinacreator.common.util.URLUtil;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.util.HttpInvoke;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.video.queue.MessageService;
- import com.chinacreator.video.queue.bean.MessagePipe;
- /**
- * 抖音数据透传异常数据重试
- * @author xu.zhou
- * @date 20200610
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class DouyinRetryMQJob {
- private Logger logger = Logger.getLogger("douyinretry");
-
- @Autowired
- private DictionaryDao dictionaryDao;
-
- @Autowired
- private MessageService messageService;
- public void doProcess() throws Exception {
- logger.info("接收vipretry队列JOB启动");
- if (dictionaryDao.getValue("recivemq").equals("0")) {
- long beginTime = System.currentTimeMillis();
- List<MessagePipe> list = messageService.reciveBatchMessage("douyinretry", 500);
- logger.info("接收douyinretry队列花费时间:" + (System.currentTimeMillis() - beginTime));
- List<Map> dataList = new ArrayList<Map>();
- for (MessagePipe messagePipe : list) {
- Map reqBean = transBean(messagePipe.getBody());
- dataList.add(reqBean);
- }
-
- if(dataList != null && dataList.size()>0){
- for (Map reqBean : dataList) {
- retry((String)reqBean.get("data"),(String)reqBean.get("filename"));
- }
- }
- } else {
- logger.info("停止接收douyinretry队列消息");
- }
- }
- public Map transBean(Map<String, Object> body) {
- String jsonStr = JsonUtil.objectToJson(body);
- return (Map) JsonUtil.jsonToBean(jsonStr, Map.class);
- }
-
- public void retry(String data,String filename) {
- long start = System.currentTimeMillis();
- String result = "";
- try {
- int timeout = 30 * 1000; //超时时间
- String invokeUrl = dictionaryDao.getValue("douyinurl");
- String douyindespwd = dictionaryDao.getValue("douyindespwd");
- String params = data.split("\\|\\|\\|")[1];
- String timestamp= (System.currentTimeMillis())/1000+"";
- params = DESUtil.encode(params, douyindespwd);
- String signature = MD5.MD5Encode(params+timestamp+douyindespwd);
- JSONObject reqJson = new JSONObject();
- reqJson.put("params", params);
- reqJson.put("timestamp", timestamp);
- reqJson.put("signature", signature);
- String paramsJson = reqJson.toJSONString();
- if(invokeUrl.startsWith("https")){
- result = HttpInvoke.sendhttpsReq("POST", invokeUrl, paramsJson, getProperty(), timeout);
- }else{
- result = HttpInvoke.sendHttpByPost("POST", invokeUrl, paramsJson, getProperty(), timeout);
- }
- logger.info("调接口返回信息result=>"+result+", 推送数据:"+paramsJson);
- } catch (Exception e) {
- //logger.error("推送数据出错:文件名=》"+filename+",异常信息=》"+e.getMessage()+", 推送数据:"+data);
- e.printStackTrace();
- result = "推送出现异常,"+e.getMessage();
- inserMq(data, filename);
- } finally {
- logger.info("文件名:"+filename+",耗时:"+(System.currentTimeMillis() - start)+" ms"+", 推送结果:"+result+",data:"+data);
- }
- }
- /**
- * 获取请求属性性
- * @return
- */
- private static Map getProperty(){
- Map reqProperty = new HashMap();
- reqProperty.put("Content-type", "application/json;charset=UTF-8");
- return reqProperty;
- }
-
- /**
- * 推送办理成功的短信到队列
- * @param orderInfo
- */
- public void inserMq(String data,String filename){
- try{
- logger.info("处理失败,重新推送入队列,filename=>"+filename+",data=>"+data);
- Map<String, String> map = new HashMap<String, String>();
- map.put("data", data);
- map.put("filename", filename);
- map.put("type", "douyinretry");
- String mqReciveUrl = dictionaryDao.getValue("mqReciveUrl");
- URLUtil.post(mqReciveUrl, JsonUtil.objectToJson(map));
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
|