101f7562406f263ee0370199efc53991e61a4959.svn-base 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package com.chinacreator.process.job;
  2. import java.net.URLEncoder;
  3. import java.util.ArrayList;
  4. import java.util.Date;
  5. import java.util.HashMap;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.concurrent.CountDownLatch;
  9. import java.util.concurrent.ExecutorService;
  10. import java.util.concurrent.LinkedBlockingQueue;
  11. import java.util.concurrent.ThreadPoolExecutor;
  12. import java.util.concurrent.TimeUnit;
  13. import org.apache.commons.lang.StringUtils;
  14. import org.apache.log4j.Logger;
  15. import org.quartz.DisallowConcurrentExecution;
  16. import org.quartz.PersistJobDataAfterExecution;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import com.alibaba.fastjson.JSONObject;
  19. import com.chinacreator.common.exception.BusinessException;
  20. import com.chinacreator.common.util.DESUtil;
  21. import com.chinacreator.common.util.MD5;
  22. import com.chinacreator.process.dao.ChannelOrderAsynDao;
  23. import com.chinacreator.process.util.JsonUtil;
  24. import com.chinacreator.process.util.URLUtil;
  25. /**
  26. * 已弃用,葫芦侠已改为通用CP同步
  27. * 合作方订购数据同步
  28. * @author xu.zhou
  29. * @date 20210115
  30. */
  31. @PersistJobDataAfterExecution
  32. @DisallowConcurrentExecution
  33. public class ChannelOrderAsynJob {
  34. private static Logger logger = Logger.getLogger("channelordersyn");
  35. @Autowired
  36. private ChannelOrderAsynDao asyndao;
  37. public void doProcess() throws Exception {
  38. //try{
  39. logger.info(Thread.currentThread().getName()+"定时任务开始");
  40. long beginTime = System.currentTimeMillis();
  41. //asyndao = new ChannelOrderAsynDao();
  42. //获取要处理的数据
  43. List<HashMap> list = asyndao.getAsynData();
  44. //获取业务配置数据
  45. List<HashMap> confList = asyndao.getAsynConf();
  46. if(list != null && list.size() > 0 && confList != null && confList.size() > 0){
  47. //去除重复数据
  48. List<HashMap> dataList = paraseData(list);
  49. logger.info("有效用户数:"+dataList.size());
  50. //更新为正处理状态
  51. asyndao.batchUpdSyncStatus(getIds(dataList));
  52. CountDownLatch threadSignal = new CountDownLatch(dataList.size());
  53. ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  54. for(HashMap busimap : dataList){
  55. ChannelOrderAsynService continueService = new ChannelOrderAsynService(list.size(),threadSignal,busimap,asyndao,confList);
  56. executorService.execute(continueService);
  57. }
  58. executorService.shutdown();
  59. try {
  60. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. logger.info(Thread.currentThread().getName()+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  66. //} catch (Exception e) {
  67. //logger.error("定时任务启动处理出现异常,"+e.getMessage(),e);
  68. //}
  69. }
  70. /**
  71. * 拼接ID,用于更新数据
  72. * @param dataList
  73. * @return
  74. */
  75. private String getIds(List<HashMap> dataList){
  76. String ids = "";
  77. if(dataList != null && dataList.size()>0){
  78. for(HashMap tmphm : dataList){
  79. ids += ",'"+tmphm.get("ID")+"'";
  80. }
  81. }
  82. if(!"".equals(ids)){
  83. ids = ids.substring(1);
  84. }
  85. logger.info("ids=>"+ids);
  86. return ids;
  87. }
  88. /**
  89. * 去除重复数据
  90. * @param dataList
  91. * @return
  92. */
  93. private List<HashMap> paraseData(List<HashMap> dataList){
  94. logger.info("去重复前数据条数:"+dataList.size());
  95. //去重复后的数据集
  96. List<HashMap> reDataList = new ArrayList<HashMap>();
  97. HashMap<String, List> tmpMap = new HashMap<String, List>();
  98. for (HashMap dataMap : dataList) {
  99. if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){
  100. logger.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID"));
  101. }else{
  102. reDataList.add(dataMap);
  103. List tmpList = new ArrayList();
  104. tmpList.add(dataMap.get("SPID"));
  105. tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList);
  106. }
  107. }
  108. logger.info("去重复后数据条数:"+reDataList.size());
  109. return reDataList;
  110. }
  111. /**
  112. public static void main(String[] args) {
  113. ChannelOrderAsynJob job = new ChannelOrderAsynJob();
  114. try {
  115. job.doProcess();
  116. } catch (Exception e) {
  117. // TODO Auto-generated catch block
  118. e.printStackTrace();
  119. }
  120. }
  121. ***/
  122. }
  123. class ChannelOrderAsynService implements Runnable {
  124. private static Logger log = Logger.getLogger("channelordersyn");
  125. private int totalSize;
  126. private CountDownLatch threadSignal;
  127. private HashMap busimap;
  128. private ChannelOrderAsynDao asyndao;
  129. private List<HashMap> confList;
  130. public ChannelOrderAsynService(int totalSize,CountDownLatch threadSignal,HashMap busimap,ChannelOrderAsynDao asyndao,List<HashMap> confList){
  131. this.totalSize = totalSize;
  132. this.threadSignal = threadSignal;
  133. this.busimap = busimap;
  134. this.asyndao = asyndao;
  135. this.confList = confList;
  136. }
  137. @Override
  138. public void run() {
  139. long startime = System.currentTimeMillis();
  140. Map logMap = new HashMap();
  141. String id = busimap.get("ID").toString();
  142. logMap.put("busimap", busimap);
  143. String resultcode = "-1";
  144. String errorinfo = "";
  145. try {
  146. String result = invokeFace();
  147. log.info("调接口返回结果:"+result);
  148. //{"resultcode":"10006","errorInfo":"签名错误"}
  149. logMap.put("result", result);
  150. if (!StringUtils.isEmpty(result)) {
  151. Map<?, ?> map = JsonUtil.jsonToMap(result);
  152. resultcode = (String) map.get("resultcode");
  153. errorinfo = (String) map.get("errorInfo");
  154. }else{
  155. resultcode = "5000";
  156. errorinfo = "无返回信息";
  157. }
  158. //log.error("resultcode1=>"+resultcode);
  159. } catch (Exception e) {
  160. //log.error("resultcode1.1=>"+resultcode);
  161. if (e instanceof BusinessException) {
  162. errorinfo = ((BusinessException) e).getMessage();
  163. resultcode = ((BusinessException) e).getCode();
  164. }else{
  165. e.printStackTrace();
  166. resultcode = "8000";
  167. errorinfo = "同步出现异常,"+e.getMessage();
  168. log.error("同步出现异常,"+e.getMessage(),e);
  169. }
  170. //log.error("resultcode2=>"+resultcode);
  171. }finally {
  172. //log.error("resultcode3=>"+resultcode);
  173. threadSignal.countDown();
  174. String time = System.currentTimeMillis()-startime+"";
  175. try{
  176. if(errorinfo != null && errorinfo.length() > 500){
  177. errorinfo = errorinfo.substring(0,500);
  178. }
  179. //更新赠送会员结果出现异常
  180. asyndao.updSyncRes(id, resultcode, errorinfo, time);
  181. }catch(Exception e){
  182. log.error(busimap.get("USERID")+"更新处理结果出现异常,"+e.getMessage());
  183. resultcode = "8000";
  184. errorinfo = "更新处理结果出现异常,"+e.getMessage();
  185. }
  186. //写日志
  187. logMap.put("resultcode", resultcode);
  188. logMap.put("errorinfo", errorinfo);
  189. logMap.put("time", time);
  190. //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  191. log.info(JsonUtil.objectToJson(logMap));
  192. }
  193. }
  194. /**
  195. * 调接口
  196. * @return
  197. * @throws Exception
  198. */
  199. private String invokeFace() throws Exception {
  200. String result = "";
  201. try {
  202. String invokeurl = ""; //接口地址
  203. String pwd = ""; //加密KEY
  204. String method = ""; //方法,POST/GET
  205. int timeout = 10; //超时时间
  206. String canceltime = busimap.get("CANCELTIME") != null ? busimap.get("CANCELTIME").toString() : "";
  207. String ordertime = busimap.get("ORDERTIME") != null ? busimap.get("ORDERTIME").toString() : "";
  208. String endtime = busimap.get("ENDTIME") != null ? busimap.get("ENDTIME").toString() : "";
  209. String orderid = busimap.get("ORDERID") != null ? busimap.get("ORDERID").toString() : "";
  210. String channel = busimap.get("CHANNEL").toString();
  211. String type = busimap.get("TYPE").toString();
  212. String cpid = busimap.get("CPID").toString();
  213. String spid = busimap.get("SPID").toString();
  214. String userid = busimap.get("USERID").toString();
  215. for(HashMap confMap : confList){
  216. if(spid.equals(confMap.get("SPID"))){
  217. invokeurl = confMap.get("INVOKEURL").toString();
  218. pwd = confMap.get("PWD").toString();
  219. method = confMap.get("METHOD").toString();
  220. timeout = Integer.parseInt(confMap.get("TIMEOUT").toString());
  221. break;
  222. }
  223. }
  224. if (StringUtils.isEmpty(invokeurl)
  225. || StringUtils.isEmpty(pwd)
  226. || StringUtils.isEmpty(method)) {
  227. throw new BusinessException("5001", "接口参数不完整(invokeurl、pwd、method、timeout)" , new String[0]);
  228. }
  229. String timestamp = (System.currentTimeMillis())/1000+"";
  230. userid = DESUtil.encode(userid, pwd);
  231. String signature = orderid+cpid+spid+userid+type+channel+timestamp+pwd;
  232. signature = MD5.MD5Encode(orderid+cpid+spid+userid+type+channel+timestamp+pwd);
  233. if("POST".equals(method)){
  234. JSONObject params = new JSONObject();
  235. params.put("orderid", orderid);
  236. params.put("type", type);
  237. params.put("userid", userid);
  238. params.put("channel", channel);
  239. params.put("cpid", cpid);
  240. params.put("spid", spid);
  241. params.put("timestamp", timestamp);
  242. params.put("signature", signature);
  243. params.put("canceltime", canceltime);
  244. params.put("ordertime", ordertime);
  245. params.put("endtime", endtime);
  246. log.info("调接口地址:"+invokeurl+",调接口参数:"+params.toJSONString());
  247. //调接口
  248. result = URLUtil.postJson(invokeurl,params.toJSONString(),timeout*1000);
  249. }else{
  250. String params = "?endtime="+endtime+"&ordertime="+ordertime+"&canceltime="+canceltime+"&orderid="+orderid+"&type="+type+"&userid="+URLEncoder.encode(userid,"utf-8")+"&channel="+channel+"&cpid="+cpid+"&spid="+spid+"&timestamp="+timestamp+"&signature="+signature;
  251. invokeurl += params;
  252. //调接口
  253. result = URLUtil.get(invokeurl,timeout*1000);
  254. }
  255. } catch (Exception e) {
  256. e.printStackTrace();
  257. log.info("调接口出现异常,"+e.getMessage());
  258. throw e;
  259. }
  260. return result;
  261. }
  262. }