package com.chinacreator.process.job; import java.net.URLEncoder; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.StringUtils; import com.chinacreator.common.exception.BusinessException; import com.chinacreator.common.util.MD5; import com.chinacreator.process.bean.ChannelOrderBean; import com.chinacreator.process.bean.ContinueBean; import com.chinacreator.process.bean.KuaishouPushBean; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.dao.KuaishouDao; import com.chinacreator.process.util.DesUtil; import com.chinacreator.process.util.HttpInvoke; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.process.util.URLUtil; import com.chinacreator.process.util.WriteLogUtil; import com.chinacreator.video.queue.MessageService; import com.chinacreator.video.queue.bean.MessagePipe; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; /** * 快手推送入库数据处理 * 处理待推送的定时任务,负责把推送表中“待处理”的数据进行推送。 * @author xu.zhou * @date 20200515 */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class KuaiShouPushMQJob { private Logger log = Logger.getLogger("kuaishoupush"); @Autowired private DictionaryDao dictionaryDao; @Autowired private KuaishouDao kuaishouDao; public void doProcess() throws Exception { //log.info("接收快手推送数据队列JOB启动"); //log.info(Thread.currentThread().getName()+"KuaiShouPushMQJob处理流量消耗完定时任务启动"); WriteLogUtil.writeLong("KuaiShouPushMQJob处理流量消耗完定时任务开始", log, "KuaiShouPushMQJob"); long beginTime = System.currentTimeMillis(); //获取数据 List dataList = kuaishouDao.qryProcPushListAll(); //log.info("要处理的数据条数:"+ (dataList == null ? "0": dataList.size())); //去重复数据 paraseData(dataList); //log.info("去重复后的数据条数:"+ (dataList == null ? "0": dataList.size())); if(dataList != null && dataList.size()>0){ WriteLogUtil.writeLong("去重复后的数据条数:"+ dataList.size() ,log, "KuaiShouPushMQJob"); CountDownLatch threadSignal = new CountDownLatch(dataList.size()); ExecutorService executorService = new ThreadPoolExecutor(40, 50, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); for (Map reqBean : dataList) { KuaishouPushService service = new KuaishouPushService(dataList.size(),threadSignal,reqBean,dictionaryDao,kuaishouDao); executorService.execute(service); } executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } //log.info(Thread.currentThread().getName()+"KuaiShouPushMQJob处理流量消耗完定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒"); WriteLogUtil.writeLong(Thread.currentThread().getName()+"KuaiShouPushMQJob处理流量消耗完定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", log, "KuaiShouPushMQJob"); } /** * 去重复数据(userid+spid+ordermonth) * @param dataList * @return */ private void paraseData(List dataList){ if (dataList == null || dataList.size() == 0){ return; } HashMap reData = new HashMap(); String pushmonth = ""; String userid = ""; String spids = ""; String pushtype = ""; HashMap dataMap = null; for (int i=0; i body) { String jsonStr = JsonUtil.objectToJson(body); return (Map) JsonUtil.jsonToBean(jsonStr, Map.class); } } class KuaishouPushService implements Runnable { private Logger logger = Logger.getLogger("kuaishoupush"); private int totalSize; private CountDownLatch threadSignal; private Map dataMap; private DictionaryDao dictionaryDao; private KuaishouDao kuaishouDao; public KuaishouPushService(int totalSize,CountDownLatch threadSignal, Map dataMap,DictionaryDao dictionaryDao,KuaishouDao kuaishouDao){ this.totalSize = totalSize; this.threadSignal = threadSignal; this.dataMap = dataMap; this.dictionaryDao = dictionaryDao; this.kuaishouDao = kuaishouDao; } @Override public void run() { long startime = System.currentTimeMillis(); Map logMap = new HashMap(); logMap.put("data", dataMap); String resultcode = "-1"; String errorinfo = ""; try { KuaishouPushBean pushBean = new KuaishouPushBean(); pushBean.setId(dataMap.get("ID").toString()); pushBean.setSerial_number(dataMap.get("SERIAL_NUMBER").toString()); pushBean.setPushmonth(dataMap.get("PUSHMONTH").toString()); pushBean.setPushtype(dataMap.get("PUSHTYPE").toString()); pushBean.setResultcode("2"); //推送结果编码, 1推送到队列,2推送中,0成功,其他失败 pushBean.setResultinfo("处理中"); pushBean.setSpids(dataMap.get("SPIDS").toString()); kuaishouDao.updatePush(pushBean.getId(), pushBean.getResultcode(), pushBean.getResultinfo()); String result = invokeKsPush(pushBean); if(!"".equals(result)){ resultcode = JSON.parseObject(result).get("result").toString(); errorinfo = result; } } catch (Exception e) { if (e instanceof BusinessException) { errorinfo = ((BusinessException) e).getMessage(); resultcode = ((BusinessException) e).getCode(); }else{ e.printStackTrace(); resultcode = "8000"; errorinfo = "处理数据出现异常,"+e.getMessage(); } } finally{ threadSignal.countDown(); try { if(errorinfo != null && errorinfo.length()>500){ errorinfo = errorinfo.substring(0, 500); } if("1".equals(resultcode)){ //快手接口返回1,代表失败,记录表的1代表待处理 resultcode = "8001"; } kuaishouDao.updatePush(dataMap.get("ID").toString(), resultcode, errorinfo); } catch (Exception e) { e.printStackTrace(); errorinfo += "|更新推送结果出现异常,"+e.getMessage(); //logger.error("更新推送结果出现异常,"+e.getMessage()); } //写日志 logMap.put("jobname", "KuaiShouPushMQJob"); logMap.put("resultcode", resultcode); logMap.put("errorinfo", errorinfo); logMap.put("time", (System.currentTimeMillis()-startime)+""); //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount())); logger.info(JsonUtil.objectToJson(logMap)); } } /** * 推接调接口的参数 * @param reqBean * @return * @throws Exception */ private String getInvokeParams(KuaishouPushBean pushBean) throws Exception{ String usermob = pushBean.getSerial_number(); String fakeid = kuaishouDao.getFakeid(usermob, "kuaishou");//URLEncoder.encode("+R+6K/LYIjkgZnvW9gOZKQ=="); if (StringUtils.isEmpty(fakeid)) { logger.info("订购关系表无fakeid, "+pushBean.getSerial_number()); fakeid = getFakeidKuaishou(usermob); } if (StringUtils.isEmpty(fakeid)) { throw new BusinessException("9001","fakeid为空"); } //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送 String pushtype = pushBean.getPushtype(); String cpid = "kuaishou"; String type = "1"; //0 流量未耗尽 1 流量已耗尽 if(!"0".equals(pushtype)){//当前推送类型不是已耗尽, type = "0"; } String SECRET = "2rFUrLZyKV9S"; String deskey = "ksks1234"; fakeid = URLEncoder.encode(fakeid); logger.info("fakeid=>"+fakeid); usermob = URLEncoder.encode(DesUtil.encode(usermob,deskey)); //Md5(usermob+ fakeid+cpid+spid+type+SECRET) String signdata = usermob+cpid+SECRET ; String signstr = MD5.MD5Encode(signdata); String sign = signstr.length()>16?signstr.substring(0,16):signstr; //String url = "http://shishangwei5.test.gifshow.com/rest/n/partner/cucc/callback/quota"; //url = "https://shishangwei5.test.gifshow.com/rest/n/partner/cucc/callback/quota"; JSONObject params = new JSONObject(); params.put("fakeid", fakeid); params.put("usermob", usermob); params.put("cpid", cpid); params.put("type", type); params.put("sign", sign); String data = params.toJSONString(); return data; } /** * 调接口 * @param reqBean * @return * @throws Exception */ private String invokeKsPush(KuaishouPushBean pushBean) throws Exception{ String result = ""; //调快手接口返回结果 String pushurl = dictionaryDao.getValue("kuaishoupushurl"); String jsonParams = getInvokeParams(pushBean); logger.info("pushurl=>"+pushurl+", jsonParams=>"+jsonParams); if(pushurl.startsWith("https")){ result = HttpInvoke.sendhttpsReq("POST", pushurl, jsonParams, getProperty()); }else{ result = HttpInvoke.sendHttpByPost("POST", pushurl, jsonParams, getProperty()); } //{"result":"0","errorcode":"","host-name":"bjm7-rs1892.jxq"} logger.info(pushBean.getId()+", "+pushBean.getSerial_number()+", 调快手推送接口返回结果:"+result); //去空格、换行符号 if(result != null) result = result.replaceAll("\r|\n", "").replaceAll(" ", "").replaceAll(" ", ""); return result; } /** * 解析数据 * @param body * orderId TD_POINTS_ORDER_REC表ID字段,我方生成的订单流水号 * orderNo 积分商城订单号 * requestId 返回给客户的请求ID * @return */ public Map transBean(Map body) { String jsonStr = JsonUtil.objectToJson(body); return (Map) JsonUtil.jsonToBean(jsonStr, Map.class); } /** * 获取请求属性性 * @return */ private Map getProperty(){ Map reqProperty = new HashMap(); reqProperty.put("Content-type", "application/json;charset=UTF-8"); return reqProperty; } /** * 调接口获取fakeid * @param userid * @return */ public String getFakeidKuaishou(String userid){ String fakeid = null; //logger.info("调接口获取fakeid,userid=>"+userid); try { final String baseurl="https://pts.10010.com:9005/api"; final String user="000101"; final String pwd="9nBrS4BV967z"; final String service="0"; final String function="3"; final String appid="339302416384"; Date curdate= new Date(); String tick=(curdate.getTime()/1000)+""; String key=MD5.MD5Encode(user+tick+pwd).substring(0,16); Map param = new HashMap(); param.put("user",user); param.put("tick",tick); param.put("key",key); param.put("service",service); param.put("function",function); param.put("mobile","86"+userid); param.put("appid",appid); String url = baseurl + "?" + HttpInvoke.mapToUrl(param); String resp = URLUtil.get(url); JSONObject resultObject = JSONObject.parseObject(resp); String result = resultObject.getString("result"); String pcode = resultObject.getString("pcode"); if("0".equals(result) && pcode != null && !"".equals(pcode)){ fakeid = pcode; } } catch (Exception e) { e.printStackTrace(); logger.error("调接口获取fakeid出现异常,userid=>"+userid+", "+e.getMessage()); } logger.info("调接口获取fakeid,userid=>"+userid+", fakeid=>"+fakeid); return fakeid; } }