f7c8023566fd8897478edf8b3a6ecf65657eb14f.svn-base 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. package com.chinacreator.process.job;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.chinacreator.common.exception.BusinessException;
  5. import com.chinacreator.common.util.DESUtil;
  6. import com.chinacreator.common.util.URLUtil;
  7. import com.chinacreator.process.bean.PointShopMqBean;
  8. import com.chinacreator.process.bean.PointShopOrderBean;
  9. import com.chinacreator.process.dao.BackBusiVipAsynDao;
  10. import com.chinacreator.process.dao.DictionaryDao;
  11. import com.chinacreator.process.dao.TaobDao;
  12. import com.chinacreator.process.util.HttpInvoke;
  13. import com.chinacreator.process.util.JsonUtil;
  14. import org.apache.log4j.Logger;
  15. import org.quartz.DisallowConcurrentExecution;
  16. import org.quartz.PersistJobDataAfterExecution;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import java.net.URLEncoder;
  19. import java.util.*;
  20. import java.util.concurrent.*;
  21. /**
  22. * 淘宝业务异步处理(需要回调)
  23. *
  24. * @author zhengrong.yan
  25. * @date 20210127
  26. */
  27. @PersistJobDataAfterExecution
  28. @DisallowConcurrentExecution
  29. public class TaobSyncJob {
  30. private Logger log = Logger.getLogger("taobsync");
  31. @Autowired
  32. private DictionaryDao dictionaryDao;
  33. @Autowired
  34. private TaobDao taobDao;
  35. @Autowired
  36. private BackBusiVipAsynDao asyndao;
  37. public void doProcess() throws Exception {
  38. log.info(Thread.currentThread().getName() + "定时任务开始");
  39. long beginTime = System.currentTimeMillis();
  40. List<HashMap> list = taobDao.getAsynData();
  41. if (list != null && list.size() > 0) {
  42. CountDownLatch threadSignal = new CountDownLatch(list.size());
  43. ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  44. //去除重复数据
  45. List<HashMap> dataList = paraseData(list);
  46. log.info("数据库有效需要处理的数:" + dataList.size());
  47. for (HashMap vipmap : dataList) {
  48. TaobSynService continueService = new TaobSynService(list.size(), threadSignal, vipmap, asyndao, dictionaryDao, taobDao);
  49. executorService.execute(continueService);
  50. }
  51. executorService.shutdown();
  52. try {
  53. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  54. } catch (InterruptedException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. log.info(Thread.currentThread().getName() + "定时任务完成,耗时:" + (System.currentTimeMillis() - beginTime) / 1000 + " 秒");
  59. }
  60. /**
  61. * 去除重复数据,防止赠送会员接口被并发限制
  62. *
  63. * @param dataList
  64. * @return
  65. */
  66. private List<HashMap> paraseData(List<HashMap> dataList) {
  67. //去重复后的数据集
  68. List<HashMap> reDataList = new ArrayList<HashMap>();
  69. HashMap<String, List> tmpMap = new HashMap<String, List>();
  70. for (HashMap dataMap : dataList) {
  71. if (tmpMap.containsKey(dataMap.get("USERID").toString())) {
  72. log.info("重复数据," + dataMap.get("USERID"));
  73. } else {
  74. reDataList.add(dataMap);
  75. List tmpList = new ArrayList();
  76. tmpList.add(dataMap.get("SPID"));
  77. tmpMap.put(dataMap.get("USERID").toString(), tmpList);
  78. }
  79. }
  80. return reDataList;
  81. }
  82. public static void main(String[] args) {
  83. TaobSyncJob job = new TaobSyncJob();
  84. try {
  85. job.doProcess();
  86. } catch (Exception e) {
  87. // TODO Auto-generated catch block
  88. e.printStackTrace();
  89. }
  90. }
  91. }
  92. class TaobSynService implements Runnable {
  93. private static Logger log = Logger.getLogger("taobsync");
  94. private int totalSize;
  95. private CountDownLatch threadSignal;
  96. private HashMap busimap;
  97. private DictionaryDao dictionaryDao;
  98. private BackBusiVipAsynDao asyndao;
  99. private TaobDao taobDao;
  100. public TaobSynService(int totalSize, CountDownLatch threadSignal, HashMap busimap, BackBusiVipAsynDao asyndao, DictionaryDao dictionaryDao, TaobDao taobDao) {
  101. this.totalSize = totalSize;
  102. this.threadSignal = threadSignal;
  103. this.busimap = busimap;
  104. this.asyndao = asyndao;
  105. this.dictionaryDao = dictionaryDao;
  106. this.taobDao = taobDao;
  107. }
  108. /**
  109. * 业务处理
  110. *
  111. * @param
  112. */
  113. public void run() {
  114. long startime = System.currentTimeMillis();
  115. Map logMap = new HashMap();
  116. logMap.put("data", busimap);
  117. PointShopOrderBean psoBean = new PointShopOrderBean();
  118. String resultCode = "-1";
  119. String resultInfo = "";
  120. String backresult = "";
  121. try {
  122. //办理结果编码,0成功,1待处理,2处理中,其他为异常
  123. psoBean.setId(busimap.get("ID").toString());
  124. psoBean.setUserid(busimap.get("USERID").toString());
  125. psoBean.setSpid(busimap.get("SPID").toString());
  126. psoBean.setCpid(busimap.get("CPID").toString());
  127. psoBean.setOrderchannel(busimap.get("CHANNEL").toString());
  128. psoBean.setBusiType(busimap.get("TYPE").toString());
  129. psoBean.setOrderNo(busimap.get("ORDERID").toString());
  130. psoBean.setResultCode("2");
  131. psoBean.setResultInfo("处理中");
  132. taobDao.upStatus(psoBean);
  133. //更新订购状态,调接口
  134. String result = invokeVideoif(psoBean);
  135. JSONObject jsonObject = JSON.parseObject(result);
  136. resultCode = jsonObject.getString("resultcode");
  137. resultInfo = jsonObject.getString("resultinfo");
  138. if (jsonObject.get("resultinfo") == null) {
  139. resultInfo = jsonObject.getString("errorinfo");
  140. }
  141. } catch (Exception e) {
  142. e.printStackTrace();
  143. if (e instanceof BusinessException) {
  144. resultInfo = ((BusinessException) e).getMessage();
  145. resultCode = ((BusinessException) e).getCode();
  146. } else {
  147. resultCode = "8000";
  148. resultInfo = "系统错误," + e.getMessage();
  149. }
  150. } finally {
  151. threadSignal.countDown();
  152. //更新订购操作表信息,
  153. try {
  154. if (resultInfo != null && resultInfo.length() > 250) {
  155. resultInfo = resultInfo.substring(0, 250);
  156. }
  157. if (psoBean != null) {
  158. psoBean.setResultCode(resultCode);
  159. psoBean.setResultInfo(resultInfo);
  160. //更新后向产品订购记录表处理结果
  161. taobDao.updSendStatus(psoBean.getResultCode(), psoBean.getResultInfo(), psoBean.getId());
  162. }
  163. } catch (Exception e) {
  164. e.printStackTrace();
  165. log.error("更新数据出现异常," + psoBean.getUserid() + ", resultCode:" + resultCode + ", resultInfo:" + resultInfo);
  166. }
  167. if ("0".equals(resultCode)) {
  168. inserSmstMq(psoBean); //发送短信
  169. }
  170. try {
  171. backresult = this.callBack(psoBean.getUserid(), psoBean.getCpid(), psoBean.getSpid(), psoBean.getBusiType(),psoBean.getId(),busimap.get("ORDERID").toString(),resultCode,resultInfo);
  172. } catch (Exception e2) {
  173. e2.printStackTrace();
  174. }
  175. logMap.put("data2", psoBean);
  176. logMap.put("reusltCode", resultCode);
  177. logMap.put("resultInfo", resultInfo);
  178. logMap.put("backresult", backresult);
  179. log.info(JsonUtil.objectToJson(logMap));
  180. }
  181. }
  182. /**
  183. * 拼接转发URL参数
  184. *
  185. * @param orderInfo
  186. * @return
  187. * @throws
  188. */
  189. private String getParams(PointShopOrderBean orderInfo) {
  190. String params = "";
  191. try {
  192. String cpid = orderInfo.getCpid();
  193. String spid = orderInfo.getSpid();
  194. String type = orderInfo.getBusiType();
  195. String userid = orderInfo.getUserid();
  196. String timestamp = (System.currentTimeMillis() / 1000) + "";
  197. String seqnumber = "";
  198. String seqnumberParams = "";
  199. if (!"".equals(seqnumber)) {
  200. seqnumberParams = "&seqnumber=" + seqnumber;
  201. }
  202. String pwd = taobDao.getPwd(cpid, spid); //TB_CP_ACCOUNT_CONFIG表NETPWD字段
  203. if ("0".equals(type)) {//订购
  204. params += "ordertype=0&access=1&apptype=2&channel=" + orderInfo.getOrderchannel() + "&cpid=" + cpid + "&spid=" + spid + "&timestamp=" + timestamp + "&token=";
  205. } else {//退订
  206. params += "ordertype=0&access=1&apptype=2&channel=" + orderInfo.getOrderchannel() + "&cpid=" + cpid + "&spid=" + spid + "&timestamp=" + timestamp;
  207. }
  208. userid = DESUtil.encode(userid, pwd);//手机号码加密
  209. params += "&userid=" + URLEncoder.encode(userid, "utf-8"); //重新编码
  210. params += seqnumberParams;
  211. } catch (Exception e) {
  212. e.printStackTrace();
  213. }
  214. return params;
  215. }
  216. /**
  217. * 调videoif-order.do接口
  218. *
  219. * @param
  220. * @return
  221. * @throws Exception
  222. */
  223. private String invokeVideoif(PointShopOrderBean orderBean) throws Exception {
  224. String result = "";
  225. try {
  226. String url = dictionaryDao.getValue("commonOrderUrl");
  227. if (orderBean.getBusiType().equals("0")) {
  228. url += "order.do?";
  229. } else {
  230. url = "cancelOrder.do?";
  231. }
  232. String params = getParams(orderBean);
  233. url = url + params;
  234. log.info("tburl: " + url);
  235. result = URLUtil.get(url); //调赠送会员接口,超时时间设置为10秒
  236. log.info("调videoif结果=> userid: " + orderBean.getUserid() + ", orderid: " + orderBean.getOrderNo() + " , result: " + result);
  237. } catch (Exception e) {
  238. e.printStackTrace();
  239. log.error("id=>" + orderBean.getId() + "=>userid: " + orderBean.getUserid() + "调videoif失败," + e);
  240. }
  241. return result;
  242. }
  243. /**
  244. * 回调淘宝接口
  245. *
  246. * @param
  247. * @param id
  248. */
  249. private String callBack(String userid, String cpid, String spid, String status,String id,String orderid,String resultCode,String resultinfo) {
  250. String bcstatus = "1"; //回调状态,1未回调,0回调完成,2回调异常
  251. String cbReqParams = ""; //回调请求报文
  252. String cbRspParams = ""; //回调响应报文
  253. try {
  254. PointShopOrderBean bean = taobDao.getOrderRec(userid, cpid, spid);
  255. TreeMap<String, String> orderOutObj = null;
  256. orderOutObj = new TreeMap<String, String>();
  257. orderOutObj.put("orderid", orderid);
  258. orderOutObj.put("userid", userid);
  259. orderOutObj.put("cpid", cpid);
  260. orderOutObj.put("spid", spid);
  261. orderOutObj.put("resultCode", resultCode);
  262. orderOutObj.put("resultinfo", resultinfo);
  263. if (bean != null) {
  264. String reusltCode = bean.getResultCode(); //办理结果编码,0成功,1待处理,2处理中,其他为异常
  265. if ("0".equals(reusltCode)) {
  266. orderOutObj.put("status", status);//状态 订购0退订1
  267. if ("0".equals(status)) {
  268. orderOutObj.put("ordertime", bean.getOrderTime());
  269. } else {
  270. orderOutObj.put("canceltime", bean.getCanceltime());
  271. }
  272. }
  273. orderOutObj.put("resultcode", reusltCode);
  274. }
  275. String callBackUrl = this.dictionaryDao.getValue("taobCallBackUrl");
  276. cbReqParams = JsonUtil.objectToJson(orderOutObj);
  277. log.info("===========回调地址:" + callBackUrl);
  278. log.info("===========回调请求参数:" + cbReqParams);
  279. cbRspParams = HttpInvoke.sendHttpByPost("POST", callBackUrl, cbReqParams, getProperty());
  280. log.info("============回调响应参数:" + cbRspParams);
  281. if ("0000".equals(cbRspParams)) {
  282. bcstatus = "0";
  283. }
  284. } catch (Exception e) {
  285. bcstatus = "2";
  286. e.printStackTrace();
  287. log.error("订单=>" + userid + "=>cpid=>" + cpid + "=>spid=>" + spid + ",回调接口出现异常," + e.getMessage());
  288. cbRspParams = "返回报文: " + cbRspParams + ",回调异常," + e.getMessage();
  289. } finally {
  290. if (cbRspParams != null && cbRspParams.length() > 250) {
  291. cbRspParams = cbRspParams.substring(0, 250);
  292. }
  293. log.info("数据库需处理id==>" + id + "=>cbReqParams: " + cbReqParams + ", cbRspParams=>" + cbRspParams);
  294. addParamsByCb(cbRspParams, id);
  295. }
  296. return bcstatus;
  297. }
  298. /**
  299. * 添加回调报文
  300. *
  301. * @param requestId
  302. */
  303. private void addParamsByCb(String rspParams, String requestId) {
  304. try {
  305. //一个中文占两个 字节
  306. if (rspParams != null && rspParams.length() > 100) {
  307. rspParams = rspParams.substring(0, 100);
  308. }
  309. taobDao.updParamsByCb(rspParams, requestId);
  310. } catch (Exception e) {
  311. e.printStackTrace();
  312. log.error("requestId: " + requestId + ", 更新callBack状态出现异常," + e.getMessage());
  313. }
  314. }
  315. /**
  316. * 解析数据
  317. *
  318. * @param body orderId TD_POINTS_ORDER_REC表ID字段,我方生成的订单流水号
  319. * orderNo 积分商城订单号
  320. * requestId 返回给客户的请求ID
  321. * @return
  322. */
  323. public PointShopMqBean transBean(Map<String, Object> body) {
  324. String jsonStr = JsonUtil.objectToJson(body);
  325. return (PointShopMqBean) JsonUtil.jsonToBean(jsonStr, PointShopMqBean.class);
  326. }
  327. /**
  328. * 发送短信
  329. *
  330. * @param psoBean
  331. */
  332. public void inserSmstMq(PointShopOrderBean psoBean) {
  333. try {
  334. if (psoBean != null) {
  335. Map<String, String> map = new HashMap<String, String>();
  336. map.put("userid", psoBean.getUserid());
  337. map.put("cpid", psoBean.getCpid());
  338. map.put("spid", psoBean.getSpid());
  339. map.put("result", "0");
  340. map.put("channel", "tbcjhd");
  341. map.put("style", "0000");
  342. map.put("times", "");
  343. map.put("orderType", "");
  344. map.put("type", "cssms");
  345. if (psoBean.getBusiType().equals("0")) {
  346. map.put("busiType", "tran_succ_tb");
  347. } else {
  348. map.put("busiType", "cancel_succ_tb");
  349. }
  350. log.info(JsonUtil.objectToJson(map));
  351. String mqReciveUrl = dictionaryDao.getValue("mqReciveUrl");
  352. URLUtil.post(mqReciveUrl, JsonUtil.objectToJson(map));
  353. }
  354. } catch (Exception e) {
  355. e.printStackTrace();
  356. }
  357. }
  358. /**
  359. * 获取请求属性性
  360. *
  361. * @return
  362. */
  363. private static Map getProperty() {
  364. Map reqProperty = new HashMap();
  365. reqProperty.put("Content-type", "application/json;charset=UTF-8");
  366. return reqProperty;
  367. }
  368. }