84c696b92464cfff8748582994073c2b5f23f456.svn-base 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. package com.chinacreator.process.job;
  2. import java.net.URLEncoder;
  3. import java.sql.SQLException;
  4. import java.util.ArrayList;
  5. import java.util.Arrays;
  6. import java.util.Date;
  7. import java.util.HashMap;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.concurrent.CountDownLatch;
  11. import java.util.concurrent.ExecutorService;
  12. import java.util.concurrent.LinkedBlockingQueue;
  13. import java.util.concurrent.ThreadPoolExecutor;
  14. import java.util.concurrent.TimeUnit;
  15. import org.apache.log4j.Logger;
  16. import org.quartz.DisallowConcurrentExecution;
  17. import org.quartz.PersistJobDataAfterExecution;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.util.StringUtils;
  20. import com.chinacreator.common.exception.BusinessException;
  21. import com.chinacreator.common.util.MD5;
  22. import com.chinacreator.process.bean.ChannelOrderBean;
  23. import com.chinacreator.process.bean.ContinueBean;
  24. import com.chinacreator.process.bean.KuaishouPushBean;
  25. import com.chinacreator.process.dao.DictionaryDao;
  26. import com.chinacreator.process.dao.KuaishouDao;
  27. import com.chinacreator.process.util.DesUtil;
  28. import com.chinacreator.process.util.HttpInvoke;
  29. import com.chinacreator.process.util.JsonUtil;
  30. import com.chinacreator.process.util.URLUtil;
  31. import com.chinacreator.process.util.WriteLogUtil;
  32. import com.chinacreator.video.queue.MessageService;
  33. import com.chinacreator.video.queue.bean.MessagePipe;
  34. import com.alibaba.fastjson.JSON;
  35. import com.alibaba.fastjson.JSONObject;
  36. /**
  37. * 快手推送入库数据处理
  38. * 处理待推送的定时任务,负责把推送表中“待处理”的数据进行推送。
  39. * @author xu.zhou
  40. * @date 20200515
  41. */
  42. @PersistJobDataAfterExecution
  43. @DisallowConcurrentExecution
  44. public class KuaiShouPushMQJob {
  45. private Logger log = Logger.getLogger("kuaishoupush");
  46. @Autowired
  47. private DictionaryDao dictionaryDao;
  48. @Autowired
  49. private KuaishouDao kuaishouDao;
  50. public void doProcess() throws Exception {
  51. //log.info("接收快手推送数据队列JOB启动");
  52. //log.info(Thread.currentThread().getName()+"KuaiShouPushMQJob处理流量消耗完定时任务启动");
  53. WriteLogUtil.writeLong("KuaiShouPushMQJob处理流量消耗完定时任务开始", log, "KuaiShouPushMQJob");
  54. long beginTime = System.currentTimeMillis();
  55. //获取数据
  56. List<HashMap> dataList = kuaishouDao.qryProcPushListAll();
  57. //log.info("要处理的数据条数:"+ (dataList == null ? "0": dataList.size()));
  58. //去重复数据
  59. paraseData(dataList);
  60. //log.info("去重复后的数据条数:"+ (dataList == null ? "0": dataList.size()));
  61. if(dataList != null && dataList.size()>0){
  62. WriteLogUtil.writeLong("去重复后的数据条数:"+ dataList.size() ,log, "KuaiShouPushMQJob");
  63. CountDownLatch threadSignal = new CountDownLatch(dataList.size());
  64. ExecutorService executorService = new ThreadPoolExecutor(40, 50, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  65. for (Map reqBean : dataList) {
  66. KuaishouPushService service = new KuaishouPushService(dataList.size(),threadSignal,reqBean,dictionaryDao,kuaishouDao);
  67. executorService.execute(service);
  68. }
  69. executorService.shutdown();
  70. try {
  71. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  72. } catch (InterruptedException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. //log.info(Thread.currentThread().getName()+"KuaiShouPushMQJob处理流量消耗完定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  77. WriteLogUtil.writeLong(Thread.currentThread().getName()+"KuaiShouPushMQJob处理流量消耗完定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", log, "KuaiShouPushMQJob");
  78. }
  79. /**
  80. * 去重复数据(userid+spid+ordermonth)
  81. * @param dataList
  82. * @return
  83. */
  84. private void paraseData(List<HashMap> dataList){
  85. if (dataList == null || dataList.size() == 0){
  86. return;
  87. }
  88. HashMap<String, Object> reData = new HashMap<String, Object>();
  89. String pushmonth = "";
  90. String userid = "";
  91. String spids = "";
  92. String pushtype = "";
  93. HashMap dataMap = null;
  94. for (int i=0; i<dataList.size(); i++){
  95. //防止有数据为空
  96. try {
  97. dataMap = dataList.get(i);
  98. pushmonth = dataMap.get("PUSHMONTH").toString();
  99. userid = dataMap.get("SERIAL_NUMBER").toString();
  100. spids = dataMap.get("SPIDS").toString();
  101. pushtype = dataMap.get("PUSHTYPE").toString();
  102. if(reData.containsKey(userid+spids+pushmonth+pushtype)){
  103. dataList.remove(i);
  104. i--;
  105. }else{
  106. reData.put(userid+spids+pushmonth, dataMap);
  107. }
  108. } catch (Exception e) {
  109. e.printStackTrace();
  110. }
  111. }
  112. }
  113. /**
  114. * 解析数据
  115. * @param body
  116. * orderId TD_POINTS_ORDER_REC表ID字段,我方生成的订单流水号
  117. * orderNo 积分商城订单号
  118. * requestId 返回给客户的请求ID
  119. * @return
  120. */
  121. public Map transBean(Map<String, Object> body) {
  122. String jsonStr = JsonUtil.objectToJson(body);
  123. return (Map) JsonUtil.jsonToBean(jsonStr, Map.class);
  124. }
  125. }
  126. class KuaishouPushService implements Runnable {
  127. private Logger logger = Logger.getLogger("kuaishoupush");
  128. private int totalSize;
  129. private CountDownLatch threadSignal;
  130. private Map dataMap;
  131. private DictionaryDao dictionaryDao;
  132. private KuaishouDao kuaishouDao;
  133. public KuaishouPushService(int totalSize,CountDownLatch threadSignal, Map dataMap,DictionaryDao dictionaryDao,KuaishouDao kuaishouDao){
  134. this.totalSize = totalSize;
  135. this.threadSignal = threadSignal;
  136. this.dataMap = dataMap;
  137. this.dictionaryDao = dictionaryDao;
  138. this.kuaishouDao = kuaishouDao;
  139. }
  140. @Override
  141. public void run() {
  142. long startime = System.currentTimeMillis();
  143. Map logMap = new HashMap();
  144. logMap.put("data", dataMap);
  145. String resultcode = "-1";
  146. String errorinfo = "";
  147. try {
  148. KuaishouPushBean pushBean = new KuaishouPushBean();
  149. pushBean.setId(dataMap.get("ID").toString());
  150. pushBean.setSerial_number(dataMap.get("SERIAL_NUMBER").toString());
  151. pushBean.setPushmonth(dataMap.get("PUSHMONTH").toString());
  152. pushBean.setPushtype(dataMap.get("PUSHTYPE").toString());
  153. pushBean.setResultcode("2"); //推送结果编码, 1推送到队列,2推送中,0成功,其他失败
  154. pushBean.setResultinfo("处理中");
  155. pushBean.setSpids(dataMap.get("SPIDS").toString());
  156. kuaishouDao.updatePush(pushBean.getId(), pushBean.getResultcode(), pushBean.getResultinfo());
  157. String result = invokeKsPush(pushBean);
  158. if(!"".equals(result)){
  159. resultcode = JSON.parseObject(result).get("result").toString();
  160. errorinfo = result;
  161. }
  162. } catch (Exception e) {
  163. if (e instanceof BusinessException) {
  164. errorinfo = ((BusinessException) e).getMessage();
  165. resultcode = ((BusinessException) e).getCode();
  166. }else{
  167. e.printStackTrace();
  168. resultcode = "8000";
  169. errorinfo = "处理数据出现异常,"+e.getMessage();
  170. }
  171. } finally{
  172. threadSignal.countDown();
  173. try {
  174. if(errorinfo != null && errorinfo.length()>500){
  175. errorinfo = errorinfo.substring(0, 500);
  176. }
  177. if("1".equals(resultcode)){ //快手接口返回1,代表失败,记录表的1代表待处理
  178. resultcode = "8001";
  179. }
  180. kuaishouDao.updatePush(dataMap.get("ID").toString(), resultcode, errorinfo);
  181. } catch (Exception e) {
  182. e.printStackTrace();
  183. errorinfo += "|更新推送结果出现异常,"+e.getMessage();
  184. //logger.error("更新推送结果出现异常,"+e.getMessage());
  185. }
  186. //写日志
  187. logMap.put("jobname", "KuaiShouPushMQJob");
  188. logMap.put("resultcode", resultcode);
  189. logMap.put("errorinfo", errorinfo);
  190. logMap.put("time", (System.currentTimeMillis()-startime)+"");
  191. //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  192. logger.info(JsonUtil.objectToJson(logMap));
  193. }
  194. }
  195. /**
  196. * 推接调接口的参数
  197. * @param reqBean
  198. * @return
  199. * @throws Exception
  200. */
  201. private String getInvokeParams(KuaishouPushBean pushBean) throws Exception{
  202. String usermob = pushBean.getSerial_number();
  203. String fakeid = kuaishouDao.getFakeid(usermob, "kuaishou");//URLEncoder.encode("+R+6K/LYIjkgZnvW9gOZKQ==");
  204. if (StringUtils.isEmpty(fakeid)) {
  205. logger.info("订购关系表无fakeid, "+pushBean.getSerial_number());
  206. fakeid = getFakeidKuaishou(usermob);
  207. }
  208. if (StringUtils.isEmpty(fakeid)) {
  209. throw new BusinessException("9001","fakeid为空");
  210. }
  211. //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送
  212. String pushtype = pushBean.getPushtype();
  213. String cpid = "kuaishou";
  214. String type = "1"; //0 流量未耗尽 1 流量已耗尽
  215. if(!"0".equals(pushtype)){//当前推送类型不是已耗尽,
  216. type = "0";
  217. }
  218. String SECRET = "2rFUrLZyKV9S";
  219. String deskey = "ksks1234";
  220. fakeid = URLEncoder.encode(fakeid);
  221. logger.info("fakeid=>"+fakeid);
  222. usermob = URLEncoder.encode(DesUtil.encode(usermob,deskey));
  223. //Md5(usermob+ fakeid+cpid+spid+type+SECRET)
  224. String signdata = usermob+cpid+SECRET ;
  225. String signstr = MD5.MD5Encode(signdata);
  226. String sign = signstr.length()>16?signstr.substring(0,16):signstr;
  227. //String url = "http://shishangwei5.test.gifshow.com/rest/n/partner/cucc/callback/quota";
  228. //url = "https://shishangwei5.test.gifshow.com/rest/n/partner/cucc/callback/quota";
  229. JSONObject params = new JSONObject();
  230. params.put("fakeid", fakeid);
  231. params.put("usermob", usermob);
  232. params.put("cpid", cpid);
  233. params.put("type", type);
  234. params.put("sign", sign);
  235. String data = params.toJSONString();
  236. return data;
  237. }
  238. /**
  239. * 调接口
  240. * @param reqBean
  241. * @return
  242. * @throws Exception
  243. */
  244. private String invokeKsPush(KuaishouPushBean pushBean) throws Exception{
  245. String result = ""; //调快手接口返回结果
  246. String pushurl = dictionaryDao.getValue("kuaishoupushurl");
  247. String jsonParams = getInvokeParams(pushBean);
  248. logger.info("pushurl=>"+pushurl+", jsonParams=>"+jsonParams);
  249. if(pushurl.startsWith("https")){
  250. result = HttpInvoke.sendhttpsReq("POST", pushurl, jsonParams, getProperty());
  251. }else{
  252. result = HttpInvoke.sendHttpByPost("POST", pushurl, jsonParams, getProperty());
  253. }
  254. //{"result":"0","errorcode":"","host-name":"bjm7-rs1892.jxq"}
  255. logger.info(pushBean.getId()+", "+pushBean.getSerial_number()+", 调快手推送接口返回结果:"+result);
  256. //去空格、换行符号
  257. if(result != null) result = result.replaceAll("\r|\n", "").replaceAll(" ", "").replaceAll(" ", "");
  258. return result;
  259. }
  260. /**
  261. * 解析数据
  262. * @param body
  263. * orderId TD_POINTS_ORDER_REC表ID字段,我方生成的订单流水号
  264. * orderNo 积分商城订单号
  265. * requestId 返回给客户的请求ID
  266. * @return
  267. */
  268. public Map transBean(Map<String, Object> body) {
  269. String jsonStr = JsonUtil.objectToJson(body);
  270. return (Map) JsonUtil.jsonToBean(jsonStr, Map.class);
  271. }
  272. /**
  273. * 获取请求属性性
  274. * @return
  275. */
  276. private Map getProperty(){
  277. Map reqProperty = new HashMap();
  278. reqProperty.put("Content-type", "application/json;charset=UTF-8");
  279. return reqProperty;
  280. }
  281. /**
  282. * 调接口获取fakeid
  283. * @param userid
  284. * @return
  285. */
  286. public String getFakeidKuaishou(String userid){
  287. String fakeid = null;
  288. //logger.info("调接口获取fakeid,userid=>"+userid);
  289. try {
  290. final String baseurl="https://pts.10010.com:9005/api";
  291. final String user="000101";
  292. final String pwd="9nBrS4BV967z";
  293. final String service="0";
  294. final String function="3";
  295. final String appid="339302416384";
  296. Date curdate= new Date();
  297. String tick=(curdate.getTime()/1000)+"";
  298. String key=MD5.MD5Encode(user+tick+pwd).substring(0,16);
  299. Map<String,String> param = new HashMap<String,String>();
  300. param.put("user",user);
  301. param.put("tick",tick);
  302. param.put("key",key);
  303. param.put("service",service);
  304. param.put("function",function);
  305. param.put("mobile","86"+userid);
  306. param.put("appid",appid);
  307. String url = baseurl + "?" + HttpInvoke.mapToUrl(param);
  308. String resp = URLUtil.get(url);
  309. JSONObject resultObject = JSONObject.parseObject(resp);
  310. String result = resultObject.getString("result");
  311. String pcode = resultObject.getString("pcode");
  312. if("0".equals(result) && pcode != null && !"".equals(pcode)){
  313. fakeid = pcode;
  314. }
  315. } catch (Exception e) {
  316. e.printStackTrace();
  317. logger.error("调接口获取fakeid出现异常,userid=>"+userid+", "+e.getMessage());
  318. }
  319. logger.info("调接口获取fakeid,userid=>"+userid+", fakeid=>"+fakeid);
  320. return fakeid;
  321. }
  322. }