123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- package com.chinacreator.process.job;
- import java.net.URLEncoder;
- import java.sql.SQLException;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.util.StringUtils;
- import com.chinacreator.common.exception.BusinessException;
- import com.chinacreator.common.util.MD5;
- import com.chinacreator.process.bean.ChannelOrderBean;
- import com.chinacreator.process.bean.ContinueBean;
- import com.chinacreator.process.bean.KuaishouPushBean;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.dao.KuaishouDao;
- import com.chinacreator.process.util.DesUtil;
- import com.chinacreator.process.util.HttpInvoke;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.URLUtil;
- import com.chinacreator.process.util.WriteLogUtil;
- import com.chinacreator.video.queue.MessageService;
- import com.chinacreator.video.queue.bean.MessagePipe;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- /**
- * 快手推送入库数据处理
- * 处理待推送的定时任务,负责把推送表中“待处理”的数据进行推送。
- * @author xu.zhou
- * @date 20200515
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class KuaiShouPushMQJob {
- private Logger log = Logger.getLogger("kuaishoupush");
- @Autowired
- private DictionaryDao dictionaryDao;
-
- @Autowired
- private KuaishouDao kuaishouDao;
- public void doProcess() throws Exception {
- //log.info("接收快手推送数据队列JOB启动");
- //log.info(Thread.currentThread().getName()+"KuaiShouPushMQJob处理流量消耗完定时任务启动");
- WriteLogUtil.writeLong("KuaiShouPushMQJob处理流量消耗完定时任务开始", log, "KuaiShouPushMQJob");
- long beginTime = System.currentTimeMillis();
- //获取数据
- List<HashMap> dataList = kuaishouDao.qryProcPushListAll();
- //log.info("要处理的数据条数:"+ (dataList == null ? "0": dataList.size()));
- //去重复数据
- paraseData(dataList);
- //log.info("去重复后的数据条数:"+ (dataList == null ? "0": dataList.size()));
- if(dataList != null && dataList.size()>0){
- WriteLogUtil.writeLong("去重复后的数据条数:"+ dataList.size() ,log, "KuaiShouPushMQJob");
- CountDownLatch threadSignal = new CountDownLatch(dataList.size());
- ExecutorService executorService = new ThreadPoolExecutor(40, 50, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- for (Map reqBean : dataList) {
- KuaishouPushService service = new KuaishouPushService(dataList.size(),threadSignal,reqBean,dictionaryDao,kuaishouDao);
- executorService.execute(service);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- //log.info(Thread.currentThread().getName()+"KuaiShouPushMQJob处理流量消耗完定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
- WriteLogUtil.writeLong(Thread.currentThread().getName()+"KuaiShouPushMQJob处理流量消耗完定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", log, "KuaiShouPushMQJob");
- }
-
- /**
- * 去重复数据(userid+spid+ordermonth)
- * @param dataList
- * @return
- */
- private void paraseData(List<HashMap> dataList){
- if (dataList == null || dataList.size() == 0){
- return;
- }
- HashMap<String, Object> reData = new HashMap<String, Object>();
- String pushmonth = "";
- String userid = "";
- String spids = "";
- String pushtype = "";
- HashMap dataMap = null;
- for (int i=0; i<dataList.size(); i++){
- //防止有数据为空
- try {
- dataMap = dataList.get(i);
- pushmonth = dataMap.get("PUSHMONTH").toString();
- userid = dataMap.get("SERIAL_NUMBER").toString();
- spids = dataMap.get("SPIDS").toString();
- pushtype = dataMap.get("PUSHTYPE").toString();
- if(reData.containsKey(userid+spids+pushmonth+pushtype)){
- dataList.remove(i);
- i--;
- }else{
- reData.put(userid+spids+pushmonth, dataMap);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * 解析数据
- * @param body
- * orderId TD_POINTS_ORDER_REC表ID字段,我方生成的订单流水号
- * orderNo 积分商城订单号
- * requestId 返回给客户的请求ID
- * @return
- */
- public Map transBean(Map<String, Object> body) {
- String jsonStr = JsonUtil.objectToJson(body);
- return (Map) JsonUtil.jsonToBean(jsonStr, Map.class);
- }
- }
- class KuaishouPushService implements Runnable {
-
- private Logger logger = Logger.getLogger("kuaishoupush");
- private int totalSize;
- private CountDownLatch threadSignal;
- private Map dataMap;
- private DictionaryDao dictionaryDao;
- private KuaishouDao kuaishouDao;
-
- public KuaishouPushService(int totalSize,CountDownLatch threadSignal, Map dataMap,DictionaryDao dictionaryDao,KuaishouDao kuaishouDao){
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- this.dataMap = dataMap;
- this.dictionaryDao = dictionaryDao;
- this.kuaishouDao = kuaishouDao;
- }
- @Override
- public void run() {
- long startime = System.currentTimeMillis();
- Map logMap = new HashMap();
- logMap.put("data", dataMap);
- String resultcode = "-1";
- String errorinfo = "";
- try {
- KuaishouPushBean pushBean = new KuaishouPushBean();
- pushBean.setId(dataMap.get("ID").toString());
- pushBean.setSerial_number(dataMap.get("SERIAL_NUMBER").toString());
- pushBean.setPushmonth(dataMap.get("PUSHMONTH").toString());
- pushBean.setPushtype(dataMap.get("PUSHTYPE").toString());
- pushBean.setResultcode("2"); //推送结果编码, 1推送到队列,2推送中,0成功,其他失败
- pushBean.setResultinfo("处理中");
- pushBean.setSpids(dataMap.get("SPIDS").toString());
- kuaishouDao.updatePush(pushBean.getId(), pushBean.getResultcode(), pushBean.getResultinfo());
- String result = invokeKsPush(pushBean);
- if(!"".equals(result)){
- resultcode = JSON.parseObject(result).get("result").toString();
- errorinfo = result;
- }
- } catch (Exception e) {
- if (e instanceof BusinessException) {
- errorinfo = ((BusinessException) e).getMessage();
- resultcode = ((BusinessException) e).getCode();
- }else{
- e.printStackTrace();
- resultcode = "8000";
- errorinfo = "处理数据出现异常,"+e.getMessage();
- }
- } finally{
- threadSignal.countDown();
- try {
- if(errorinfo != null && errorinfo.length()>500){
- errorinfo = errorinfo.substring(0, 500);
- }
- if("1".equals(resultcode)){ //快手接口返回1,代表失败,记录表的1代表待处理
- resultcode = "8001";
- }
- kuaishouDao.updatePush(dataMap.get("ID").toString(), resultcode, errorinfo);
- } catch (Exception e) {
- e.printStackTrace();
- errorinfo += "|更新推送结果出现异常,"+e.getMessage();
- //logger.error("更新推送结果出现异常,"+e.getMessage());
- }
- //写日志
- logMap.put("jobname", "KuaiShouPushMQJob");
- logMap.put("resultcode", resultcode);
- logMap.put("errorinfo", errorinfo);
- logMap.put("time", (System.currentTimeMillis()-startime)+"");
- //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
- logger.info(JsonUtil.objectToJson(logMap));
-
- }
- }
-
- /**
- * 推接调接口的参数
- * @param reqBean
- * @return
- * @throws Exception
- */
- private String getInvokeParams(KuaishouPushBean pushBean) throws Exception{
- String usermob = pushBean.getSerial_number();
- String fakeid = kuaishouDao.getFakeid(usermob, "kuaishou");//URLEncoder.encode("+R+6K/LYIjkgZnvW9gOZKQ==");
- if (StringUtils.isEmpty(fakeid)) {
- logger.info("订购关系表无fakeid, "+pushBean.getSerial_number());
- fakeid = getFakeidKuaishou(usermob);
- }
- if (StringUtils.isEmpty(fakeid)) {
- throw new BusinessException("9001","fakeid为空");
- }
- //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送
- String pushtype = pushBean.getPushtype();
- String cpid = "kuaishou";
- String type = "1"; //0 流量未耗尽 1 流量已耗尽
- if(!"0".equals(pushtype)){//当前推送类型不是已耗尽,
- type = "0";
- }
- String SECRET = "2rFUrLZyKV9S";
- String deskey = "ksks1234";
- fakeid = URLEncoder.encode(fakeid);
- logger.info("fakeid=>"+fakeid);
- usermob = URLEncoder.encode(DesUtil.encode(usermob,deskey));
- //Md5(usermob+ fakeid+cpid+spid+type+SECRET)
- String signdata = usermob+cpid+SECRET ;
- String signstr = MD5.MD5Encode(signdata);
- String sign = signstr.length()>16?signstr.substring(0,16):signstr;
- //String url = "http://shishangwei5.test.gifshow.com/rest/n/partner/cucc/callback/quota";
- //url = "https://shishangwei5.test.gifshow.com/rest/n/partner/cucc/callback/quota";
- JSONObject params = new JSONObject();
- params.put("fakeid", fakeid);
- params.put("usermob", usermob);
- params.put("cpid", cpid);
- params.put("type", type);
- params.put("sign", sign);
- String data = params.toJSONString();
- return data;
- }
-
- /**
- * 调接口
- * @param reqBean
- * @return
- * @throws Exception
- */
- private String invokeKsPush(KuaishouPushBean pushBean) throws Exception{
- String result = ""; //调快手接口返回结果
- String pushurl = dictionaryDao.getValue("kuaishoupushurl");
- String jsonParams = getInvokeParams(pushBean);
- logger.info("pushurl=>"+pushurl+", jsonParams=>"+jsonParams);
- if(pushurl.startsWith("https")){
- result = HttpInvoke.sendhttpsReq("POST", pushurl, jsonParams, getProperty());
- }else{
- result = HttpInvoke.sendHttpByPost("POST", pushurl, jsonParams, getProperty());
- }
- //{"result":"0","errorcode":"","host-name":"bjm7-rs1892.jxq"}
- logger.info(pushBean.getId()+", "+pushBean.getSerial_number()+", 调快手推送接口返回结果:"+result);
- //去空格、换行符号
- if(result != null) result = result.replaceAll("\r|\n", "").replaceAll(" ", "").replaceAll(" ", "");
- return result;
- }
- /**
- * 解析数据
- * @param body
- * orderId TD_POINTS_ORDER_REC表ID字段,我方生成的订单流水号
- * orderNo 积分商城订单号
- * requestId 返回给客户的请求ID
- * @return
- */
- public Map transBean(Map<String, Object> body) {
- String jsonStr = JsonUtil.objectToJson(body);
- return (Map) JsonUtil.jsonToBean(jsonStr, Map.class);
- }
-
- /**
- * 获取请求属性性
- * @return
- */
- private Map getProperty(){
- Map reqProperty = new HashMap();
- reqProperty.put("Content-type", "application/json;charset=UTF-8");
- return reqProperty;
- }
-
- /**
- * 调接口获取fakeid
- * @param userid
- * @return
- */
- public String getFakeidKuaishou(String userid){
- String fakeid = null;
- //logger.info("调接口获取fakeid,userid=>"+userid);
- try {
- final String baseurl="https://pts.10010.com:9005/api";
- final String user="000101";
- final String pwd="9nBrS4BV967z";
- final String service="0";
- final String function="3";
- final String appid="339302416384";
-
- Date curdate= new Date();
-
- String tick=(curdate.getTime()/1000)+"";
- String key=MD5.MD5Encode(user+tick+pwd).substring(0,16);
-
- Map<String,String> param = new HashMap<String,String>();
- param.put("user",user);
- param.put("tick",tick);
- param.put("key",key);
- param.put("service",service);
- param.put("function",function);
- param.put("mobile","86"+userid);
- param.put("appid",appid);
-
- String url = baseurl + "?" + HttpInvoke.mapToUrl(param);
- String resp = URLUtil.get(url);
-
- JSONObject resultObject = JSONObject.parseObject(resp);
- String result = resultObject.getString("result");
- String pcode = resultObject.getString("pcode");
- if("0".equals(result) && pcode != null && !"".equals(pcode)){
- fakeid = pcode;
- }
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("调接口获取fakeid出现异常,userid=>"+userid+", "+e.getMessage());
- }
- logger.info("调接口获取fakeid,userid=>"+userid+", fakeid=>"+fakeid);
- return fakeid;
- }
- }
|