123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- package com.chinacreator.process.job;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import com.chinacreator.common.exception.BusinessException;
- import com.chinacreator.common.util.DESUtil;
- import com.chinacreator.common.util.MD5;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.util.HttpInvoke;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.URLUtil;
- import com.chinacreator.video.queue.MessageService;
- import com.chinacreator.video.queue.bean.MessagePipe;
- /**
- * 指定渠道订购芒果15和16元要推送给合作方
- * @author xu.zhou
- * @date 20200604
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class MangtvPushMQJob {
-
- private Logger logger = Logger.getLogger("mangtvpush");
- @Autowired
- private DictionaryDao dictionaryDao;
-
- @Autowired
- private MessageService messageService;
- public void doProcess() throws Exception {
- logger.info("接收指定渠道订购芒果15和16元要推送给合作方推送消息队列JOB启动");
- if (dictionaryDao.getValue("recivemq").equals("0")) {
- long beginTime = System.currentTimeMillis();
- List<MessagePipe> list = messageService.reciveBatchMessage("mangtvpush", 500);
- logger.info("接收消息队列花费时间:" + (System.currentTimeMillis() - beginTime));
- List<Map> dataList = new ArrayList<Map>();
- for (MessagePipe messagePipe : list) {
- Map reqBean = transBean(messagePipe.getBody());
- dataList.add(reqBean);
- }
- if(dataList != null && dataList.size()>0){
- logger.info("待处理数据条数:"+dataList.size());
- for (Map reqBean : dataList) {
- this.handleOrder(reqBean);
- }
- }
- } else {
- logger.info("停止接收队列消息");
- }
- }
-
- /**
- * 业务处理
- * @param mqBean
- */
- public void handleOrder(Map reqBean){
- Map logMap = new HashMap();
- logMap.put("data", reqBean);
- long starttime = System.currentTimeMillis();
- String resultCode = "-1";
- String resultInfo = "";
- try {
- Map<String, String> reMap = execpush(reqBean);
- if(!"0".equals(reMap.get("resultcode"))){//赠送未成功
- resultCode = reMap.get("resultcode");
- resultInfo = reMap.get("resultinfo");
- }else{
- resultCode = "0";
- resultInfo = "成功";
- }
- } catch (Exception e) {
- if (e instanceof BusinessException) {
- resultInfo = ((BusinessException) e).getMessage();
- resultCode = ((BusinessException) e).getCode();
- }else{
- e.printStackTrace();
- resultCode = "8000";
- resultInfo = "系统错误,"+e.getMessage();
- }
- } finally{
- //写日志
- logMap.put("time", System.currentTimeMillis() - starttime);
- logMap.put("reusltCode", resultCode);
- logMap.put("resultInfo", resultInfo);
- logger.info(JsonUtil.objectToJson(logMap));
- }
- }
-
-
- /**
- * 推送消息
- * @return
- * @throws Exception
- */
- private Map<String, String> execpush(Map reqBean){
- Map<String, String> reMap = new HashMap<String,String>();
- String resultcode = "-1"; //失败
- String resultinfo = ""; //失败
- try {
- int timeout = 30 * 1000; //超时时间
- String pwd = this.dictionaryDao.getValue("mangtvpushpwd");
- String invokeUrl = this.dictionaryDao.getValue("mangtvpushurl");
- String userid = (String)reqBean.get("userid");
- userid = DESUtil.encode(userid, pwd);
- String channel = (String)reqBean.get("orderchannel");
- String spid = (String)reqBean.get("spid");
- String cpid = (String)reqBean.get("cpid");
- String ordertime = (String)reqBean.get("ordertime");
- String timestamp = (System.currentTimeMillis() / 1000) + "";
- String signature = MD5.MD5Encode(userid+cpid+spid+channel+ordertime+timestamp+pwd);
- JSONObject reqJson = new JSONObject();
- reqJson.put("userid", userid);
- reqJson.put("channel", channel);
- reqJson.put("spid", spid);
- reqJson.put("cpid", cpid);
- reqJson.put("ordertime", ordertime);
- reqJson.put("timestamp", timestamp);
- reqJson.put("signature", signature);
- String paramsJson = reqJson.toJSONString();
- logger.info("invokeUrl: "+invokeUrl+",paramsJson: "+paramsJson);
- String result = "";
- if(invokeUrl.startsWith("https")){
- result = HttpInvoke.sendhttpsReq("POST", invokeUrl, paramsJson, getProperty(), timeout);
- }else{
- result = HttpInvoke.sendHttpByPost("POST", invokeUrl, paramsJson, getProperty(), timeout);
- }
- logger.info("调接口返回结果=> userid: " +reqBean.get("userid")+" , result: "+result);
- Map<?,?> map = JsonUtil.jsonToMap(result);
- resultcode = (String)map.get("resultcode");
- resultinfo = (String)map.get("errorinfo");
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("userid: "+reqBean.get("userid")+"推送消息失败,"+e);
- resultcode = "8000";
- resultinfo = e.getMessage();
- }
-
- reMap.put("resultcode", resultcode);
- reMap.put("resultinfo", resultinfo);
-
- return reMap;
- }
-
- /**
- * 解析数据
- * @param body
- * orderId TD_POINTS_ORDER_REC表ID字段,我方生成的订单流水号
- * orderNo 积分商城订单号
- * requestId 返回给客户的请求ID
- * @return
- */
- public Map transBean(Map<String, Object> body) {
- String jsonStr = JsonUtil.objectToJson(body);
- return (Map) JsonUtil.jsonToBean(jsonStr, Map.class);
- }
-
- /**
- * 获取请求属性性
- * @return
- */
- private static Map getProperty(){
- Map reqProperty = new HashMap();
- reqProperty.put("Content-type", "application/json;charset=UTF-8");
- return reqProperty;
- }
- }
|