package com.chinacreator.process.job; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import com.alibaba.fastjson.JSONObject; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import com.chinacreator.common.util.DESUtil; import com.chinacreator.common.util.MD5; import com.chinacreator.common.util.URLUtil; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.util.HttpInvoke; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.video.queue.MessageService; import com.chinacreator.video.queue.bean.MessagePipe; /** * 抖音数据透传异常数据重试 * @author xu.zhou * @date 20200610 */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class DouyinRetryMQJob { private Logger logger = Logger.getLogger("douyinretry"); @Autowired private DictionaryDao dictionaryDao; @Autowired private MessageService messageService; public void doProcess() throws Exception { logger.info("接收vipretry队列JOB启动"); if (dictionaryDao.getValue("recivemq").equals("0")) { long beginTime = System.currentTimeMillis(); List list = messageService.reciveBatchMessage("douyinretry", 500); logger.info("接收douyinretry队列花费时间:" + (System.currentTimeMillis() - beginTime)); List dataList = new ArrayList(); for (MessagePipe messagePipe : list) { Map reqBean = transBean(messagePipe.getBody()); dataList.add(reqBean); } if(dataList != null && dataList.size()>0){ for (Map reqBean : dataList) { retry((String)reqBean.get("data"),(String)reqBean.get("filename")); } } } else { logger.info("停止接收douyinretry队列消息"); } } public Map transBean(Map body) { String jsonStr = JsonUtil.objectToJson(body); return (Map) JsonUtil.jsonToBean(jsonStr, Map.class); } public void retry(String data,String filename) { long start = System.currentTimeMillis(); String result = ""; try { int timeout = 30 * 1000; //超时时间 String invokeUrl = dictionaryDao.getValue("douyinurl"); String douyindespwd = dictionaryDao.getValue("douyindespwd"); String params = data.split("\\|\\|\\|")[1]; String timestamp= (System.currentTimeMillis())/1000+""; params = DESUtil.encode(params, douyindespwd); String signature = MD5.MD5Encode(params+timestamp+douyindespwd); JSONObject reqJson = new JSONObject(); reqJson.put("params", params); reqJson.put("timestamp", timestamp); reqJson.put("signature", signature); String paramsJson = reqJson.toJSONString(); if(invokeUrl.startsWith("https")){ result = HttpInvoke.sendhttpsReq("POST", invokeUrl, paramsJson, getProperty(), timeout); }else{ result = HttpInvoke.sendHttpByPost("POST", invokeUrl, paramsJson, getProperty(), timeout); } logger.info("调接口返回信息result=>"+result+", 推送数据:"+paramsJson); } catch (Exception e) { //logger.error("推送数据出错:文件名=》"+filename+",异常信息=》"+e.getMessage()+", 推送数据:"+data); e.printStackTrace(); result = "推送出现异常,"+e.getMessage(); inserMq(data, filename); } finally { logger.info("文件名:"+filename+",耗时:"+(System.currentTimeMillis() - start)+" ms"+", 推送结果:"+result+",data:"+data); } } /** * 获取请求属性性 * @return */ private static Map getProperty(){ Map reqProperty = new HashMap(); reqProperty.put("Content-type", "application/json;charset=UTF-8"); return reqProperty; } /** * 推送办理成功的短信到队列 * @param orderInfo */ public void inserMq(String data,String filename){ try{ logger.info("处理失败,重新推送入队列,filename=>"+filename+",data=>"+data); Map map = new HashMap(); map.put("data", data); map.put("filename", filename); map.put("type", "douyinretry"); String mqReciveUrl = dictionaryDao.getValue("mqReciveUrl"); URLUtil.post(mqReciveUrl, JsonUtil.objectToJson(map)); }catch (Exception e){ e.printStackTrace(); } } }