package com.chinacreator.process.job; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.quartz.DisallowConcurrentExecution; import org.quartz.PersistJobDataAfterExecution; import org.springframework.beans.factory.annotation.Autowired; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.chinacreator.common.exception.BusinessException; import com.chinacreator.common.util.MD5; import com.chinacreator.process.bean.ContinueBean; import com.chinacreator.process.bean.ContinueLogBean; import com.chinacreator.process.bean.KuaishouPushBean; import com.chinacreator.process.dao.ContinueOrderDao; import com.chinacreator.process.dao.DictionaryDao; import com.chinacreator.process.dao.KuaishouDao; import com.chinacreator.process.util.DesUtil; import com.chinacreator.process.util.HttpInvoke; import com.chinacreator.process.util.JsonUtil; import com.chinacreator.process.util.SpringUtils; import com.chinacreator.process.util.URLUtil; /** * 快手订购推送(已弃用) * 订购流量未耗尽的定时任务(KuaishouPushOrderJob)把快手订购表的未处理的数据判断是否要推送,是,则添加到推送表,否,则不处理。 在处理后数据把推送表的ID(如果要推送)添加到快手订购表(TD_KUAISHOU_ORDER); 1. 获取快手订购记录表数据TD_KUAISHOU_ORDER 2. 查询推送记录表TD_KAFKA_KUAISHOU_PUSH,看本月是否已有推送过流量未耗尽 3. 无,则把数据添加到推送记录表 * @author xu.zhou * @date 20200515 */ @PersistJobDataAfterExecution @DisallowConcurrentExecution public class KuaishouPushOrderJob { private static Logger logger = Logger.getLogger("kuaishoupush"); @Autowired private KuaishouDao kuaishouDao; @Autowired private DictionaryDao dictionaryDao; public void doProcess() throws Exception { long beginTime = System.currentTimeMillis(); logger.info(Thread.currentThread().getName()+"KuaishouPushOrderJob处理订购快手流量未耗尽定时任务开始"); List dataList = kuaishouDao.getOrderPush(); logger.info("去重复前数据条数:"+ (dataList == null ? "0" : dataList.size())); paraseData(dataList); logger.info("去重复后数据条数:"+ (dataList == null ? "0" : dataList.size())); if(dataList != null && dataList.size() > 0){ CountDownLatch threadSignal = new CountDownLatch(dataList.size()); ExecutorService executorService = new ThreadPoolExecutor(12, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); for(HashMap dataMap : dataList){ KuaishouPushOrderService continueService = new KuaishouPushOrderService(dataList.size(),threadSignal,dataMap,kuaishouDao,dictionaryDao); executorService.execute(continueService); } executorService.shutdown(); try { executorService.awaitTermination(5L, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info(Thread.currentThread().getName()+"KuaishouPushOrderJob处理订购快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒"); } /** * 去重复数据(userid+spid+ordermonth) * @param dataList * @return */ private void paraseData(List dataList){ if (dataList == null || dataList.size() == 0){ return; } HashMap reData = new HashMap(); String ordermonth = ""; String userid = ""; String spid = ""; HashMap dataMap = null; for (int i=0; i pushList = kuaishouDao.queryPush(userid, pushmonth); logger.info("pushList=>"+pushList); if(pushList != null && pushList.size() > 0){ List spidsList = null; boolean haspush = false; for(HashMap tmpMap : pushList){ spidsList = Arrays.asList(tmpMap.get("SPIDS").toString().split("#")); logger.info("spid=>"+spid+", spidsList=》"+spidsList); if(spidsList.contains(spid)){ haspush = true; break; } } if(haspush){ throw new BusinessException("9054","同一月份已推送相同数据"); } } KuaishouPushBean pushBean = new KuaishouPushBean(); pushid = kuaishouDao.getNo(); pushBean.setSerial_number(userid); pushBean.setId(pushid); pushBean.setPushmonth(pushmonth); pushBean.setPushtype("1"); //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送 pushBean.setResultcode("1");//推送结果编码, 1待处理,2处理中,0成功,其他失败 pushBean.setResultinfo("待处理"); pushBean.setSpids(spid); //添加数据到推送表 kuaishouDao.insertPush(pushBean); resultcode = "0"; errorinfo = "ok"; } catch (Exception e) { if (e instanceof BusinessException) { errorinfo = ((BusinessException) e).getMessage(); resultcode = ((BusinessException) e).getCode(); }else{ e.printStackTrace(); resultcode = "8000"; errorinfo = "处理数据出现异常,"+e.getMessage(); } }finally { threadSignal.countDown(); try{ //未CP同步的数据,延迟处理 if(!"9055".equals(resultcode)){ kuaishouDao.updOrderPush(dataMap.get("ID").toString(), "3", errorinfo, (System.currentTimeMillis()-startime)+"", pushid); } } catch (Exception e) { e.printStackTrace(); errorinfo += "|更新ORDER表出现异常,"+e.getMessage(); } //写日志 logMap.put("jobname", "KuaishouPushOrderJob"); logMap.put("time", (System.currentTimeMillis()-startime)+""); logMap.put("resultcode", resultcode); logMap.put("errorinfo", errorinfo); //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount())); logger.info(JsonUtil.objectToJson(logMap)); } } /** * 调接口 * @param reqBean * @return * @throws Exception */ private String invokeKsPush(KuaishouPushBean pushBean) throws Exception{ String result = ""; //调快手接口返回结果 String pushurl = dictionaryDao.getValue("kuaishoupushurl"); String jsonParams = getInvokeParams(pushBean); logger.info("pushurl=>"+pushurl+", jsonParams=>"+jsonParams); if(pushurl.startsWith("https")){ result = HttpInvoke.sendhttpsReq("POST", pushurl, jsonParams, getProperty()); }else{ result = HttpInvoke.sendHttpByPost("POST", pushurl, jsonParams, getProperty()); } logger.info("调快手推送接口返回结果:"+result); //去空格、换行符号 if(result != null) result = result.replaceAll("\r|\n", "").replaceAll(" ", "").replaceAll(" ", ""); return result; } /** * 推接调接口的参数 * @param reqBean * @return * @throws Exception */ private String getInvokeParams(KuaishouPushBean pushBean) throws Exception{ String pwd = dictionaryDao.getValue("kuaishoupushpwd"); String userid = DesUtil.encode(pushBean.getSerial_number(), pwd); //手机号码加密 String timestamp= (System.currentTimeMillis())/1000+""; String month = pushBean.getPushmonth(); String pushtype = pushBean.getPushtype(); String signature = MD5.MD5Encode(userid+pushtype+month+timestamp+pwd); JSONObject json = new JSONObject(); json.put("timestamp", timestamp); json.put("signature", signature); json.put("userid", userid); json.put("month", month); json.put("type", pushtype); return json.toJSONString(); } /** * 获取请求属性性 * @return */ private static Map getProperty(){ Map reqProperty = new HashMap(); reqProperty.put("Content-type", "application/json;charset=UTF-8"); return reqProperty; } }