f2485f511e23efdb1635f8dc1b6c2a138211263f.svn-base 8.5 KB


  1. package com.chinacreator.process.job;
  2. import java.net.URLEncoder;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Date;
  6. import java.util.HashMap;
  7. import java.util.Iterator;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.Set;
  11. import java.util.concurrent.CountDownLatch;
  12. import java.util.concurrent.ExecutorService;
  13. import java.util.concurrent.LinkedBlockingQueue;
  14. import java.util.concurrent.ThreadPoolExecutor;
  15. import java.util.concurrent.TimeUnit;
  16. import net.sf.json.JSONObject;
  17. import org.apache.log4j.Logger;
  18. import org.quartz.DisallowConcurrentExecution;
  19. import org.quartz.PersistJobDataAfterExecution;
  20. import org.springframework.beans.factory.annotation.Autowired;
  21. import com.chinacreator.common.exception.BusinessException;
  22. import com.chinacreator.common.util.DESUtil;
  23. import com.chinacreator.common.util.MD5;
  24. import com.chinacreator.process.bean.BackShareOrderBean;
  25. import com.chinacreator.process.dao.BackBusiVipAsynDao;
  26. import com.chinacreator.process.dao.DictionaryDao;
  27. import com.chinacreator.process.util.JsonUtil;
  28. import com.chinacreator.process.util.URLUtil;
  29. /**
  30. * 后向调能力平台异步处理
  31. * @author xu.zhou
  32. * @date 20201022
  33. */
  34. @PersistJobDataAfterExecution
  35. @DisallowConcurrentExecution
  36. public class BackBusiShareAsynJob {
  37. private static Logger logger = Logger.getLogger("backbusisharesyn");
  38. @Autowired
  39. private BackBusiVipAsynDao asyndao;
  40. @Autowired
  41. private DictionaryDao dictionaryDao;
  42. public void doProcess() throws Exception {
  43. logger.info(Thread.currentThread().getName()+"定时任务开始");
  44. long beginTime = System.currentTimeMillis();
  45. List<HashMap> list = asyndao.getInvokeShareData();
  46. if(list != null && list.size() > 0){
  47. CountDownLatch threadSignal = new CountDownLatch(list.size());
  48. ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  49. List<HashMap> dataList = paraseData(list);
  50. logger.info("数据库有效订购的用户数:"+dataList.size());
  51. for(HashMap vipmap : dataList){
  52. BackBusiShareSynService continueService = new BackBusiShareSynService(list.size(),threadSignal,vipmap,asyndao,dictionaryDao);
  53. executorService.execute(continueService);
  54. }
  55. executorService.shutdown();
  56. try {
  57. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  58. } catch (InterruptedException e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. logger.info(Thread.currentThread().getName()+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  63. }
  64. /**
  65. * 去除重复数据,防止赠送会员接口被并发限制
  66. * @param dataList
  67. * @return
  68. */
  69. private List<HashMap> paraseData(List<HashMap> dataList){
  70. //去重复后的数据集
  71. List<HashMap> reDataList = new ArrayList<HashMap>();
  72. HashMap<String, List> tmpMap = new HashMap<String, List>();
  73. for (HashMap dataMap : dataList) {
  74. if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){
  75. logger.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID"));
  76. }else{
  77. reDataList.add(dataMap);
  78. List tmpList = new ArrayList();
  79. tmpList.add(dataMap.get("SPID"));
  80. tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList);
  81. }
  82. }
  83. return reDataList;
  84. }
  85. }
  86. class BackBusiShareSynService implements Runnable {
  87. private static Logger log = Logger.getLogger("backbusisharesyn");
  88. private int totalSize;
  89. private CountDownLatch threadSignal;
  90. private HashMap vipmap;
  91. private DictionaryDao dictionaryDao;
  92. private BackBusiVipAsynDao asyndao;
  93. public BackBusiShareSynService(int totalSize,CountDownLatch threadSignal,HashMap vipmap,BackBusiVipAsynDao asyndao,DictionaryDao dictionaryDao){
  94. this.totalSize = totalSize;
  95. this.threadSignal = threadSignal;
  96. this.vipmap = vipmap;
  97. this.asyndao = asyndao;
  98. this.dictionaryDao = dictionaryDao;
  99. }
  100. @Override
  101. public void run() {
  102. long startime = System.currentTimeMillis();
  103. Map logMap = new HashMap();
  104. String id = vipmap.get("ID").toString();
  105. logMap.put("vipmap", vipmap);
  106. String resultcode = "-1";
  107. String errorinfo = "";
  108. String hasshare = "";
  109. try {
  110. List<HashMap> confList = asyndao.getBackBusiConf(vipmap.get("CPID").toString(), vipmap.get("SPID").toString());
  111. vipmap.put("confInfo", confList.get(0));
  112. hasshare = (confList.get(0).get("HASSHARE") == null ? "" : confList.get(0).get("HASSHARE").toString());
  113. //更新为正在赠送状态
  114. asyndao.updShareStatus(id,"4","纯免流和免流+会员产品异步处理中");
  115. //调能力平台
  116. shareOrder(hasshare);
  117. resultcode = "0";
  118. errorinfo = "成功";
  119. } catch (Exception e) {
  120. if (e instanceof BusinessException) {
  121. errorinfo = ((BusinessException) e).getMessage();
  122. resultcode = ((BusinessException) e).getCode();
  123. }else{
  124. e.printStackTrace();
  125. resultcode = "8000";
  126. errorinfo = "处理数据出现异常,"+e.getMessage();
  127. }
  128. }finally {
  129. threadSignal.countDown();
  130. String time = System.currentTimeMillis()-startime+"";
  131. try{
  132. //出现异常或调能力不成功,如果是不以接口结果为依据落订购关系的,resultcode设置为0
  133. if(!"0".equals(resultcode) && "1".equals(hasshare)){
  134. resultcode = "0";
  135. }
  136. //更新赠送会员结果出现异常
  137. asyndao.updShareStatus(id, resultcode, errorinfo);
  138. }catch(Exception e){
  139. log.error(vipmap.get("USERID")+"更新调能力平台结果出现异常,"+e.getMessage());
  140. }
  141. //写日志
  142. logMap.put("resultcode", resultcode);
  143. logMap.put("errorinfo", errorinfo);
  144. logMap.put("time", time);
  145. logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  146. log.info(JsonUtil.objectToJson(logMap));
  147. }
  148. }
  149. /**
  150. * 调能力平台
  151. * @param orderInfo
  152. * @param hasshare //调能力平台标识:0调能力平台并以结果落订购关系,1调能力平台其结果不影响订购关系,为空时不调能力平台,
  153. * @throws Exception
  154. */
  155. private void shareOrder(String hasshare) throws Exception{
  156. if("0".equals(hasshare) || "1".equals(hasshare)){
  157. String shareErcode = "0";
  158. String shareErnfo = "成功";
  159. try {
  160. this.invokeShare();
  161. } catch (BusinessException e) {
  162. e.printStackTrace();
  163. shareErcode = e.getCode();
  164. shareErnfo = e.getMessage();
  165. if("0".equals(hasshare)){
  166. throw new BusinessException("8888","调用能力共享平台订购接口失败");
  167. }
  168. } finally {
  169. saveBackShareLog(shareErcode,shareErnfo);
  170. }
  171. }
  172. }
  173. /**
  174. * 写后向订购能力平台日志
  175. * @param orderInfo
  176. * @param errorcode
  177. * @param errorinfo
  178. */
  179. private void saveBackShareLog(String errorcode,String errorinfo){
  180. BackShareOrderBean bso = new BackShareOrderBean();
  181. bso.setUserid(vipmap.get("USERID").toString());
  182. bso.setErrorcode(errorcode);
  183. bso.setErrorinfo(errorinfo);
  184. bso.setCpid(vipmap.get("CPID").toString());
  185. bso.setSpid(vipmap.get("SPID").toString());
  186. try {
  187. asyndao.addShareOrderLog(bso);
  188. } catch (Exception e) {
  189. e.printStackTrace();
  190. log.error("userid:"+vipmap.get("USERID")+",写后向订购能力平台日志出现异常,"+e.getMessage());
  191. }
  192. }
  193. /**
  194. * 调能力平台
  195. * @throws BusinessException
  196. */
  197. public void invokeShare() throws BusinessException{
  198. String result = "";
  199. String userid = "";
  200. String cpid = "";
  201. String spid = "";
  202. String url = "";
  203. try{
  204. userid = vipmap.get("USERID").toString();
  205. cpid = vipmap.get("CPID").toString();
  206. spid = vipmap.get("SPID").toString();
  207. url = dictionaryDao.getValue("shareOrderUrl");
  208. result = URLUtil.get(url+"?cpid="+cpid+"&spid="+spid+"&userid="+userid,15000);
  209. }catch (Exception e) {
  210. log.error("****:"+userid+cpid+spid+":"+e.getMessage());
  211. if(e.getMessage().indexOf("connect timed out")!= -1) { //如果连接超时就再重试一次
  212. try {
  213. result = URLUtil.get(url+"?cpid="+cpid+"&spid="+spid+"&userid="+userid,15000);
  214. log.info(userid+cpid+spid+"重试:"+result);
  215. if(result.indexOf("您已订购,请勿重复")!= -1){ //包含您已订购,请勿重复订购
  216. result = "{\"resultCode\":0,\"resultInfo\":\"成功!\",\"data\":null}";
  217. }
  218. }catch (Exception e1){
  219. throw new BusinessException("9170", "调用能力共享平台订购接口异常!");
  220. }
  221. }else{
  222. throw new BusinessException("9170", "调用能力共享平台订购接口异常");
  223. }
  224. }
  225. JSONObject obj = JSONObject.fromObject(result);
  226. if(!obj.getString("resultCode").equals("0")){
  227. throw new BusinessException(obj.getString("resultCode"), obj.getString("resultInfo"));
  228. }
  229. }
  230. }