ccc0bf0019223a1b17526902060cc4ff51bcfd0b.svn-base 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package com.chinacreator.process.job;
  2. import java.util.ArrayList;
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. import com.alibaba.fastjson.JSONObject;
  7. import org.apache.log4j.Logger;
  8. import org.quartz.DisallowConcurrentExecution;
  9. import org.quartz.PersistJobDataAfterExecution;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import com.chinacreator.common.exception.BusinessException;
  12. import com.chinacreator.common.util.DESUtil;
  13. import com.chinacreator.common.util.MD5;
  14. import com.chinacreator.process.dao.DictionaryDao;
  15. import com.chinacreator.process.util.HttpInvoke;
  16. import com.chinacreator.process.util.JsonUtil;
  17. import com.chinacreator.process.util.URLUtil;
  18. import com.chinacreator.video.queue.MessageService;
  19. import com.chinacreator.video.queue.bean.MessagePipe;
  20. /**
  21. * 指定渠道订购芒果15和16元要推送给合作方
  22. * @author xu.zhou
  23. * @date 20200604
  24. */
  25. @PersistJobDataAfterExecution
  26. @DisallowConcurrentExecution
  27. public class MangtvPushMQJob {
  28. private Logger logger = Logger.getLogger("mangtvpush");
  29. @Autowired
  30. private DictionaryDao dictionaryDao;
  31. @Autowired
  32. private MessageService messageService;
  33. public void doProcess() throws Exception {
  34. logger.info("接收指定渠道订购芒果15和16元要推送给合作方推送消息队列JOB启动");
  35. if (dictionaryDao.getValue("recivemq").equals("0")) {
  36. long beginTime = System.currentTimeMillis();
  37. List<MessagePipe> list = messageService.reciveBatchMessage("mangtvpush", 500);
  38. logger.info("接收消息队列花费时间:" + (System.currentTimeMillis() - beginTime));
  39. List<Map> dataList = new ArrayList<Map>();
  40. for (MessagePipe messagePipe : list) {
  41. Map reqBean = transBean(messagePipe.getBody());
  42. dataList.add(reqBean);
  43. }
  44. if(dataList != null && dataList.size()>0){
  45. logger.info("待处理数据条数:"+dataList.size());
  46. for (Map reqBean : dataList) {
  47. this.handleOrder(reqBean);
  48. }
  49. }
  50. } else {
  51. logger.info("停止接收队列消息");
  52. }
  53. }
  54. /**
  55. * 业务处理
  56. * @param mqBean
  57. */
  58. public void handleOrder(Map reqBean){
  59. Map logMap = new HashMap();
  60. logMap.put("data", reqBean);
  61. long starttime = System.currentTimeMillis();
  62. String resultCode = "-1";
  63. String resultInfo = "";
  64. try {
  65. Map<String, String> reMap = execpush(reqBean);
  66. if(!"0".equals(reMap.get("resultcode"))){//赠送未成功
  67. resultCode = reMap.get("resultcode");
  68. resultInfo = reMap.get("resultinfo");
  69. }else{
  70. resultCode = "0";
  71. resultInfo = "成功";
  72. }
  73. } catch (Exception e) {
  74. if (e instanceof BusinessException) {
  75. resultInfo = ((BusinessException) e).getMessage();
  76. resultCode = ((BusinessException) e).getCode();
  77. }else{
  78. e.printStackTrace();
  79. resultCode = "8000";
  80. resultInfo = "系统错误,"+e.getMessage();
  81. }
  82. } finally{
  83. //写日志
  84. logMap.put("time", System.currentTimeMillis() - starttime);
  85. logMap.put("reusltCode", resultCode);
  86. logMap.put("resultInfo", resultInfo);
  87. logger.info(JsonUtil.objectToJson(logMap));
  88. }
  89. }
  90. /**
  91. * 推送消息
  92. * @return
  93. * @throws Exception
  94. */
  95. private Map<String, String> execpush(Map reqBean){
  96. Map<String, String> reMap = new HashMap<String,String>();
  97. String resultcode = "-1"; //失败
  98. String resultinfo = ""; //失败
  99. try {
  100. int timeout = 30 * 1000; //超时时间
  101. String pwd = this.dictionaryDao.getValue("mangtvpushpwd");
  102. String invokeUrl = this.dictionaryDao.getValue("mangtvpushurl");
  103. String userid = (String)reqBean.get("userid");
  104. userid = DESUtil.encode(userid, pwd);
  105. String channel = (String)reqBean.get("orderchannel");
  106. String spid = (String)reqBean.get("spid");
  107. String cpid = (String)reqBean.get("cpid");
  108. String ordertime = (String)reqBean.get("ordertime");
  109. String timestamp = (System.currentTimeMillis() / 1000) + "";
  110. String signature = MD5.MD5Encode(userid+cpid+spid+channel+ordertime+timestamp+pwd);
  111. JSONObject reqJson = new JSONObject();
  112. reqJson.put("userid", userid);
  113. reqJson.put("channel", channel);
  114. reqJson.put("spid", spid);
  115. reqJson.put("cpid", cpid);
  116. reqJson.put("ordertime", ordertime);
  117. reqJson.put("timestamp", timestamp);
  118. reqJson.put("signature", signature);
  119. String paramsJson = reqJson.toJSONString();
  120. logger.info("invokeUrl: "+invokeUrl+",paramsJson: "+paramsJson);
  121. String result = "";
  122. if(invokeUrl.startsWith("https")){
  123. result = HttpInvoke.sendhttpsReq("POST", invokeUrl, paramsJson, getProperty(), timeout);
  124. }else{
  125. result = HttpInvoke.sendHttpByPost("POST", invokeUrl, paramsJson, getProperty(), timeout);
  126. }
  127. logger.info("调接口返回结果=> userid: " +reqBean.get("userid")+" , result: "+result);
  128. Map<?,?> map = JsonUtil.jsonToMap(result);
  129. resultcode = (String)map.get("resultcode");
  130. resultinfo = (String)map.get("errorinfo");
  131. } catch (Exception e) {
  132. e.printStackTrace();
  133. logger.error("userid: "+reqBean.get("userid")+"推送消息失败,"+e);
  134. resultcode = "8000";
  135. resultinfo = e.getMessage();
  136. }
  137. reMap.put("resultcode", resultcode);
  138. reMap.put("resultinfo", resultinfo);
  139. return reMap;
  140. }
  141. /**
  142. * 解析数据
  143. * @param body
  144. * orderId TD_POINTS_ORDER_REC表ID字段,我方生成的订单流水号
  145. * orderNo 积分商城订单号
  146. * requestId 返回给客户的请求ID
  147. * @return
  148. */
  149. public Map transBean(Map<String, Object> body) {
  150. String jsonStr = JsonUtil.objectToJson(body);
  151. return (Map) JsonUtil.jsonToBean(jsonStr, Map.class);
  152. }
  153. /**
  154. * 获取请求属性性
  155. * @return
  156. */
  157. private static Map getProperty(){
  158. Map reqProperty = new HashMap();
  159. reqProperty.put("Content-type", "application/json;charset=UTF-8");
  160. return reqProperty;
  161. }
  162. }