c3ce70cd7a6483650a341ad4ad47c3c20d30b614.svn-base 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package com.chinacreator.process.job;
  2. import java.sql.SQLException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Arrays;
  6. import java.util.Date;
  7. import java.util.HashMap;
  8. import java.util.Iterator;
  9. import java.util.List;
  10. import java.util.Map;
  11. import java.util.Set;
  12. import java.util.concurrent.CountDownLatch;
  13. import java.util.concurrent.ExecutorService;
  14. import java.util.concurrent.LinkedBlockingQueue;
  15. import java.util.concurrent.ThreadPoolExecutor;
  16. import java.util.concurrent.TimeUnit;
  17. import org.apache.log4j.Logger;
  18. import org.quartz.DisallowConcurrentExecution;
  19. import org.quartz.PersistJobDataAfterExecution;
  20. import org.springframework.beans.factory.annotation.Autowired;
  21. import com.alibaba.fastjson.JSON;
  22. import com.alibaba.fastjson.JSONObject;
  23. import com.chinacreator.common.exception.BusinessException;
  24. import com.chinacreator.common.util.MD5;
  25. import com.chinacreator.process.bean.ContinueBean;
  26. import com.chinacreator.process.bean.ContinueLogBean;
  27. import com.chinacreator.process.bean.KuaishouPushBean;
  28. import com.chinacreator.process.dao.ContinueOrderDao;
  29. import com.chinacreator.process.dao.DictionaryDao;
  30. import com.chinacreator.process.dao.KuaishouDao;
  31. import com.chinacreator.process.util.DesUtil;
  32. import com.chinacreator.process.util.HttpInvoke;
  33. import com.chinacreator.process.util.JsonUtil;
  34. import com.chinacreator.process.util.SpringUtils;
  35. import com.chinacreator.process.util.URLUtil;
  36. /**
  37. * 快手订购推送(已弃用)
  38. * 订购流量未耗尽的定时任务(KuaishouPushOrderJob)把快手订购表的未处理的数据判断是否要推送,是,则添加到推送表,否,则不处理。
  39. 在处理后数据把推送表的ID(如果要推送)添加到快手订购表(TD_KUAISHOU_ORDER);
  40. 1. 获取快手订购记录表数据TD_KUAISHOU_ORDER
  41. 2. 查询推送记录表TD_KAFKA_KUAISHOU_PUSH,看本月是否已有推送过流量未耗尽
  42. 3. 无,则把数据添加到推送记录表
  43. * @author xu.zhou
  44. * @date 20200515
  45. */
  46. @PersistJobDataAfterExecution
  47. @DisallowConcurrentExecution
  48. public class KuaishouPushOrderJob {
  49. private static Logger logger = Logger.getLogger("kuaishoupush");
  50. @Autowired
  51. private KuaishouDao kuaishouDao;
  52. @Autowired
  53. private DictionaryDao dictionaryDao;
  54. public void doProcess() throws Exception {
  55. long beginTime = System.currentTimeMillis();
  56. logger.info(Thread.currentThread().getName()+"KuaishouPushOrderJob处理订购快手流量未耗尽定时任务开始");
  57. List<HashMap> dataList = kuaishouDao.getOrderPush();
  58. logger.info("去重复前数据条数:"+ (dataList == null ? "0" : dataList.size()));
  59. paraseData(dataList);
  60. logger.info("去重复后数据条数:"+ (dataList == null ? "0" : dataList.size()));
  61. if(dataList != null && dataList.size() > 0){
  62. CountDownLatch threadSignal = new CountDownLatch(dataList.size());
  63. ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  64. for(HashMap dataMap : dataList){
  65. KuaishouPushOrderService continueService = new KuaishouPushOrderService(dataList.size(),threadSignal,dataMap,kuaishouDao,dictionaryDao);
  66. executorService.execute(continueService);
  67. }
  68. executorService.shutdown();
  69. try {
  70. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  71. } catch (InterruptedException e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. logger.info(Thread.currentThread().getName()+"KuaishouPushOrderJob处理订购快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  76. }
  77. /**
  78. * 去重复数据(userid+spid+ordermonth)
  79. * @param dataList
  80. * @return
  81. */
  82. private void paraseData(List<HashMap> dataList){
  83. if (dataList == null || dataList.size() == 0){
  84. return;
  85. }
  86. HashMap<String, Object> reData = new HashMap<String, Object>();
  87. String ordermonth = "";
  88. String userid = "";
  89. String spid = "";
  90. HashMap dataMap = null;
  91. for (int i=0; i<dataList.size(); i++){
  92. dataMap = dataList.get(i);
  93. ordermonth = dataMap.get("ORDERTIME").toString().substring(0, 6);
  94. userid = dataMap.get("USERID").toString();
  95. spid = dataMap.get("SPID").toString();
  96. if(reData.containsKey(userid+spid+ordermonth)){
  97. dataList.remove(i);
  98. i--;
  99. }else{
  100. reData.put(userid+spid+ordermonth, dataMap);
  101. }
  102. }
  103. }
  104. }
  105. class KuaishouPushOrderService implements Runnable {
  106. private static Logger logger = Logger.getLogger("kuaishoupush");
  107. private int totalSize;
  108. private CountDownLatch threadSignal;
  109. private HashMap dataMap;
  110. private DictionaryDao dictionaryDao;
  111. private KuaishouDao kuaishouDao;
  112. public KuaishouPushOrderService(int totalSize,CountDownLatch threadSignal,HashMap dataMap,KuaishouDao kuaishouDao,DictionaryDao dictionaryDao){
  113. this.totalSize = totalSize;
  114. this.threadSignal = threadSignal;
  115. this.dataMap = dataMap;
  116. this.kuaishouDao = kuaishouDao;
  117. this.dictionaryDao = dictionaryDao;
  118. }
  119. @Override
  120. public void run() {
  121. long startime = System.currentTimeMillis();
  122. Map logMap = new HashMap();
  123. logMap.put("dataMap", dataMap);
  124. String pushid = ""; //推送记录表ID
  125. String resultcode = "-1";
  126. String errorinfo = "";
  127. boolean hasinvoke = false; //是否调了接口
  128. try {
  129. String spid = (String)dataMap.get("SPID");
  130. String pushmonth = dataMap.get("ORDERTIME").toString().substring(0, 6);
  131. String userid = (String)dataMap.get("USERID");
  132. //判断CP是否已同步完成
  133. if(!kuaishouDao.hasSync(userid, spid)){
  134. throw new BusinessException("9055","数据CP同步未完成,暂不处理");
  135. }
  136. List<HashMap> pushList = kuaishouDao.queryPush(userid, pushmonth);
  137. logger.info("pushList=>"+pushList);
  138. if(pushList != null && pushList.size() > 0){
  139. List<String> spidsList = null;
  140. boolean haspush = false;
  141. for(HashMap tmpMap : pushList){
  142. spidsList = Arrays.asList(tmpMap.get("SPIDS").toString().split("#"));
  143. logger.info("spid=>"+spid+", spidsList=》"+spidsList);
  144. if(spidsList.contains(spid)){
  145. haspush = true;
  146. break;
  147. }
  148. }
  149. if(haspush){
  150. throw new BusinessException("9054","同一月份已推送相同数据");
  151. }
  152. }
  153. KuaishouPushBean pushBean = new KuaishouPushBean();
  154. pushid = kuaishouDao.getNo();
  155. pushBean.setSerial_number(userid);
  156. pushBean.setId(pushid);
  157. pushBean.setPushmonth(pushmonth);
  158. pushBean.setPushtype("1"); //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送
  159. pushBean.setResultcode("1");//推送结果编码, 1待处理,2处理中,0成功,其他失败
  160. pushBean.setResultinfo("待处理");
  161. pushBean.setSpids(spid);
  162. //添加数据到推送表
  163. kuaishouDao.insertPush(pushBean);
  164. resultcode = "0";
  165. errorinfo = "ok";
  166. } catch (Exception e) {
  167. if (e instanceof BusinessException) {
  168. errorinfo = ((BusinessException) e).getMessage();
  169. resultcode = ((BusinessException) e).getCode();
  170. }else{
  171. e.printStackTrace();
  172. resultcode = "8000";
  173. errorinfo = "处理数据出现异常,"+e.getMessage();
  174. }
  175. }finally {
  176. threadSignal.countDown();
  177. try{
  178. //未CP同步的数据,延迟处理
  179. if(!"9055".equals(resultcode)){
  180. kuaishouDao.updOrderPush(dataMap.get("ID").toString(), "3", errorinfo, (System.currentTimeMillis()-startime)+"", pushid);
  181. }
  182. } catch (Exception e) {
  183. e.printStackTrace();
  184. errorinfo += "|更新ORDER表出现异常,"+e.getMessage();
  185. }
  186. //写日志
  187. logMap.put("jobname", "KuaishouPushOrderJob");
  188. logMap.put("time", (System.currentTimeMillis()-startime)+"");
  189. logMap.put("resultcode", resultcode);
  190. logMap.put("errorinfo", errorinfo);
  191. //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  192. logger.info(JsonUtil.objectToJson(logMap));
  193. }
  194. }
  195. /**
  196. * 调接口
  197. * @param reqBean
  198. * @return
  199. * @throws Exception
  200. */
  201. private String invokeKsPush(KuaishouPushBean pushBean) throws Exception{
  202. String result = ""; //调快手接口返回结果
  203. String pushurl = dictionaryDao.getValue("kuaishoupushurl");
  204. String jsonParams = getInvokeParams(pushBean);
  205. logger.info("pushurl=>"+pushurl+", jsonParams=>"+jsonParams);
  206. if(pushurl.startsWith("https")){
  207. result = HttpInvoke.sendhttpsReq("POST", pushurl, jsonParams, getProperty());
  208. }else{
  209. result = HttpInvoke.sendHttpByPost("POST", pushurl, jsonParams, getProperty());
  210. }
  211. logger.info("调快手推送接口返回结果:"+result);
  212. //去空格、换行符号
  213. if(result != null) result = result.replaceAll("\r|\n", "").replaceAll(" ", "").replaceAll(" ", "");
  214. return result;
  215. }
  216. /**
  217. * 推接调接口的参数
  218. * @param reqBean
  219. * @return
  220. * @throws Exception
  221. */
  222. private String getInvokeParams(KuaishouPushBean pushBean) throws Exception{
  223. String pwd = dictionaryDao.getValue("kuaishoupushpwd");
  224. String userid = DesUtil.encode(pushBean.getSerial_number(), pwd); //手机号码加密
  225. String timestamp= (System.currentTimeMillis())/1000+"";
  226. String month = pushBean.getPushmonth();
  227. String pushtype = pushBean.getPushtype();
  228. String signature = MD5.MD5Encode(userid+pushtype+month+timestamp+pwd);
  229. JSONObject json = new JSONObject();
  230. json.put("timestamp", timestamp);
  231. json.put("signature", signature);
  232. json.put("userid", userid);
  233. json.put("month", month);
  234. json.put("type", pushtype);
  235. return json.toJSONString();
  236. }
  237. /**
  238. * 获取请求属性性
  239. * @return
  240. */
  241. private static Map getProperty(){
  242. Map reqProperty = new HashMap();
  243. reqProperty.put("Content-type", "application/json;charset=UTF-8");
  244. return reqProperty;
  245. }
  246. }