123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- package com.chinacreator.process.job;
- import java.math.BigDecimal;
- import java.math.RoundingMode;
- import java.sql.SQLException;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.util.StringUtils;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.chinacreator.common.exception.BusinessException;
- import com.chinacreator.process.bean.KuaishouPushBean;
- import com.chinacreator.process.bean.TdActiveclientRecBean;
- import com.chinacreator.process.dao.DictionaryDao;
- import com.chinacreator.process.dao.KuaishouDao;
- import com.chinacreator.process.dao.KuaishouFlowMonthDao;
- import com.chinacreator.process.util.HttpInvoke;
- import com.chinacreator.process.util.JsonUtil;
- import com.chinacreator.process.util.WriteLogUtil;
- /**
- * 快手月末查流量
- * @author xu.zhou
- * @date 20220708
- */
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class KuaishouFlowMonthJob {
- private static Logger logger = Logger.getLogger("kuaishouFlowMonth");
-
- @Autowired
- private KuaishouFlowMonthDao kuaishouDao; // = new KuaishouFlowMonthDao();
-
- @Autowired
- private DictionaryDao dictionaryDao; // = new DictionaryDao();
-
- public void doProcess() throws Exception {
- //执行开始时间
- long beginTime = System.currentTimeMillis();
- int startpart = 1;
- int endpart = 25;
- String jobname = "KuaishouFlowMonthJob";
- try {
- WriteLogUtil.writeLong(jobname+"定时任务开始", logger, jobname);
- int count = 0;
- int rows = 800; //每次取数据条数
- String confrows = dictionaryDao.getValue("kuaishouflowrows");
- if (!StringUtils.isEmpty(confrows)) {
- try {
- rows = Integer.parseInt(confrows);
- } catch (Exception e) {
- rows = 800;
- }
- }
- //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
- String partition = "T_HASH_P";
- for(int i = startpart; i <= endpart; i++){
- count = 0;
- partition = "T_HASH_P";
- try {
- if(i < 10){
- partition = partition + "0" + i;
- }else{
- partition = partition + i;
- }
-
- //按分区标识获取订购数据
- List<HashMap> dataList = kuaishouDao.getFlowMonthByPart(partition,rows);
- count = (dataList != null ? dataList.size() : 0);
- if(dataList != null && dataList.size() > 0){
- dataList = paraseData(dataList); //去重复数据
- logger.info(partition+",去重复前用户数:"+count+",去重复后用户数:"+dataList.size());
-
- CountDownLatch threadSignal = new CountDownLatch(dataList.size());
- ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- //把数据更新为正在处理状态,执行次数加1
- for(HashMap hm : dataList){
- boolean res = kuaishouDao.updFlowMonthExecing(hm.get("ID").toString());
- }
- for(HashMap hm : dataList){
- KuaishouFlowMonthService continueService = new KuaishouFlowMonthService(dataList.size(), threadSignal, hm, kuaishouDao, jobname);
- executorService.execute(continueService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } catch (Exception e) {
- logger.info(partition+",执行出现异常,"+e.getMessage());
- e.printStackTrace();
- }
-
- //每取一轮数据,休眠100毫秒
- Thread.sleep(100);
-
- //更新执行状态为2且执行时间超过50分钟的数据为待处理
- int res = kuaishouDao.updExecTimeout(partition);
- if(res > 0){
- logger.info("更新状态为2处理中且执行时间超过50分钟的数据为待处理的异常数据条数【"+res+"】");
- }
- //初始化新增数据的FLOWCALCULATEDATE值
- res = kuaishouDao.updFlowMonthInit(partition);
- if(res > 0){
- logger.info("初始化新增数据的FLOWCALCULATEDATE值的数据条数【"+res+"】");
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- logger.info(jobname+"定时任务执行出现异常,"+e.getMessage());
- } finally {
- WriteLogUtil.writeLong(jobname+"定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", logger, jobname);
- }
- }
-
-
- /**
- * 按手机号码去除重复数据,确保获取的同一组数据里手机号码不相同,防止同一手机号码处理并发而造成处理异常
- * @param dataList
- * @return
- */
- private List<HashMap> paraseData(List<HashMap> dataList){
- //去重复后的数据集
- List<HashMap> reDataList = new ArrayList<HashMap>();
- HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
- for (HashMap dataMap : dataList) {
- if(tmpMap.containsKey(dataMap.get("USERID"))){
- logger.info("重复数据,"+dataMap);
- }else{
- tmpMap.put(dataMap.get("USERID"), dataMap);
- reDataList.add(dataMap);
- }
- }
- return reDataList;
- }
-
- public static void main(String[] args) {
- KuaishouFlowMonthJob job = new KuaishouFlowMonthJob();
- try {
- job.doProcess();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 处理类
- * @author xu.zhou
- * @date 20220708
- */
- class KuaishouFlowMonthService implements Runnable {
- private static Logger logger = Logger.getLogger("kuaishouFlowMonth");
- private int totalSize;
- private CountDownLatch threadSignal;
- private List<String> spidsList;
- private KuaishouFlowMonthDao kuaishouFMDao;
- private HashMap hm; //用户数据,ID和USERID
- private String jobname;
-
- public KuaishouFlowMonthService(int totalSize,CountDownLatch threadSignal,HashMap hm,KuaishouFlowMonthDao kuaishouFMDao, String jobname){
- this.totalSize = totalSize;
- this.threadSignal = threadSignal;
- this.hm = hm;
- this.kuaishouFMDao = kuaishouFMDao;
- this.jobname = jobname;
- }
-
- @Override
- public void run() {
- long startime = System.currentTimeMillis();
- Map logMap = new HashMap();
- logMap.put("data", hm);
- String resultcode = "5000";
- String errorinfo = "";
- String useflow = ""; //useflow
- String maxflow = ""; //最大流量
- String surflow = ""; //surflow
- boolean hasexists = false;
- boolean haskafka = false; //是否从KAFKA配置表得到的产品ID
- String id = hm.get("ID").toString();
- long starttime = System.currentTimeMillis();
- //String calculatedate = getCalculatedate(); //下次推送时间,一般为下月的第一天0点30分01秒
- TdActiveclientRecBean bean = new TdActiveclientRecBean();
- try {
- //判断流量查询日期是否已跨月
- if(!valiFlowCalculateDate()){
- throw new BusinessException("9052","当前月份不等于流量查询月份FLOWCALCULATEDATE");
- }
-
- String spid = hm.get("SPID") == null ? null : hm.get("SPID")+"";
-
- //获取订购关系
- HashMap orderRealMap = kuaishouFMDao.getOrderRealById(id);
- if(orderRealMap == null){
- throw new BusinessException("9055","订购关系表无订购数据");
- }
-
- bean.setProvince(orderRealMap.get("PROVINCE")+"");
- bean.setArea(orderRealMap.get("AREA")+"");
- bean.setSpid(orderRealMap.get("SPID").toString());
- bean.setUserid(orderRealMap.get("USERID").toString());
-
- //订购关系已失效
- if("2".equals(orderRealMap.get("STATUS"))){
- throw new BusinessException("9056","当月无有效订购关系");
- }
-
- //SPID为空取订购关系的SPID
- if(StringUtils.isEmpty(spid) || "null".equals(spid)){
- spid = orderRealMap.get("SPID").toString();
- }
-
- //获取SPNAME
- bean.setSpname(kuaishouFMDao.qrySpname(spid));
-
- //获取产品ID
- String productid = kuaishouFMDao.qryAopProductid(spid);
- if(StringUtils.isEmpty(productid)){
- throw new BusinessException("9053","无产品ID配置信息");
- //从kafka同步的订购关系,在TB_SP_AOP_INFO表没有,要到TB_KAFKA_PRODUCT_CONF表查询
- /***
- * 从KAFKA落的快手订购关系,不包含为活跃客户
- productid = kuaishouFMDao.qryKafkaProductid(spid);
- if(StringUtils.isEmpty(productid)){
- throw new BusinessException("9053","无产品ID配置信息");
- }
- haskafka = true;
- ***/
- }
- logMap.put("productid", productid);
- //调接口获取流量信息
- String result = invoke();
- //logMap.put("result", result);
- if(!StringUtils.isEmpty(result)){
- JSONObject resObj = new JSONObject().parseObject(result);
- if("0".equals(resObj.get("resultCode")+"")){//接口返回成功
- JSONArray resdata = resObj.getJSONArray("data");
- if(resdata != null && resObj.getJSONArray("data").size()>0){
- hasexists = true;
- JSONObject tmpobj = null;
- /***筛选逻辑
- 1、FEE_POLICY_NAME包含“快手”筛选
- 2、ADDUP_UPPER(免流总量)不为0筛选
- 3、根据以上条件筛选后,取X_USED_VALUE(使用量)
- 1)有一个使用量,直接取值
- 2)有多个使用量,若使用量相同,直接取相同值
- 3)有多个使用量,若使用量不同。
- a、排除非查询号码的值 //移到最上面,与1和2并列
- b、若还有多个值,取最大值
- ***/
- for(int i = 0; i< resdata.size(); i++){
- tmpobj = (JSONObject)resdata.get(i);
- if(productid.equals(tmpobj.getString("PRODUCT_ID"))){
- if(logMap.get("result") == null){
- List<String> tmpList = new ArrayList<String>();
- tmpList.add(tmpobj.toJSONString());
- logMap.put("result", tmpList);
- logMap.put("resultsize", 1);
- }else{
- List<String> tmpList = (List<String>)logMap.get("result");
- tmpList.add(tmpobj.toJSONString());
- logMap.put("result", tmpList);
- logMap.put("resultsize", (Integer)logMap.get("resultsize")+1);
- }
- //从kafka同步的订购关系,REAL_RESOURCE_TYPE是13
- if(haskafka && !"13".equals(tmpobj.get("REAL_RESOURCE_TYPE"))){
- continue;
- }
- //产品名称不包含快手
- if(tmpobj.getString("FEE_POLICY_NAME").indexOf("快手") == -1){
- continue;
- }
- //ADDUP_UPPER(免流总量)不为0筛选
- if("0".equals(tmpobj.get("ADDUP_UPPER"))){
- continue;
- }
- //手机号码不相同,不取值
- if(!hm.get("USERID").equals(tmpobj.get("USER_NUMBER"))){
- continue;
- }
- //最大流量
- maxflow = tmpobj.getString("ADDUP_UPPER");
- //剩余流量
- surflow = tmpobj.getString("X_CANUSE_VALUE");
-
- //已使用流量
- if("".equals(useflow)){
- useflow = tmpobj.getString("X_USED_VALUE");
- }else{
- //取最大值
- try {
- if(Integer.parseInt(tmpobj.getString("X_USED_VALUE")) > Integer.parseInt(useflow)){
- useflow = tmpobj.getString("X_USED_VALUE");
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }else{
- throw new BusinessException("9059","调接口失败");
- }
- }
-
- if(!hasexists){
- throw new BusinessException("9057","接口返回数据列表为空");
- }
- //有返回流量数据,但没有对应产品的信息
- if("".equals(useflow)){
- throw new BusinessException("9058","接口返回数据列表无"+productid+"产品ID数据");
- }
- //未出现异常,设置为成功
- resultcode = "0";
- //设置已使用流量值,由KB转换成MB,保留三位小数
- bean.setFlowtotal(formatDouble(Double.parseDouble(useflow)/1024));
- } catch (Exception e) {
- if (e instanceof BusinessException) {
- errorinfo = ((BusinessException) e).getMessage();
- resultcode = ((BusinessException) e).getCode();
- }else{
- e.printStackTrace();
- resultcode = "8000";
- errorinfo = "处理数据出现异常,"+e.getMessage();
- }
- }finally {
-
- threadSignal.countDown();
- try{
- //执行SQL是否成功
- boolean res = false;
- //暂未同步或者处理出现异常,恢复初始状态,下次再处理
- if("0".equals(resultcode)){//成功
- //更新状态为成功
- res = kuaishouFMDao.updFlowMonthSucc(id);
- //添加数据到活跃表
- kuaishouFMDao.insertActive(bean);
- logMap.put("insertActive", true );
- }else{//失败
- res = kuaishouFMDao.updFlowMonthFailEnd(hm,resultcode,errorinfo);
- }
- logMap.put("updFlowMonth", res );
- }catch(Exception e){
- logMap.put("updFlowMonth", false );
- e.printStackTrace();
- errorinfo = "更新数据出现异常,"+e.getMessage();
- resultcode = "8001";
- }
- //写日志
- logMap.put("haskafka", haskafka);
- logMap.put("resultcode", resultcode);
- logMap.put("errorinfo", errorinfo);
- //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
- logMap.put("times", System.currentTimeMillis() - starttime);
- logger.info(JsonUtil.objectToJson(logMap));
- }
- }
-
- /**
- * 验证计算月份是否为当月
- * @return
- */
- private boolean valiFlowCalculateDate(){
- boolean res = false;
- SimpleDateFormat dft = new SimpleDateFormat("yyyyMM");
- String currMonth = dft.format(new Date()); //当前月
- String flowCalculateDate = hm.get("FLOWCALCULATEDATE").toString();
- if(currMonth.equals(flowCalculateDate.substring(0, 6))){
- res = true;
- }
- return res;
- }
-
- /**
- * 调接口
- * @param reqBean
- * @return
- * @throws Exception
- */
- private String invoke() throws Exception{
- String result = ""; //调快手接口返回结果
- HashMap confMap = kuaishouFMDao.getInvokeUrlInfo("shareorder","selectallow", "2");
- if(confMap == null || confMap.size() == 0){
- throw new BusinessException("9054","无接口配置数据");
- }
-
- String invokeurl = confMap.get("INVOKEURL").toString();
- String userId = hm.get("USERID").toString();
- SimpleDateFormat dft = new SimpleDateFormat("yyyyMM");
- //查询周期,当月
- String cycleId = dft.format(new Date());
- String params = "userId="+userId+"&cycleId="+cycleId;
- int timeout = 20 * 1000; //超时时间
- //logger.info("invokeurl=>"+invokeurl+", params=>"+params);
- if(invokeurl.startsWith("https")){
- result = HttpInvoke.sendhttpsReq("GET", invokeurl, params, getProperty(), timeout);
- }else{
- result = HttpInvoke.sendHttpByGet(invokeurl+"?"+params, timeout);
- }
- //{"result":"0","errorcode":"","host-name":"bjm7-rs1892.jxq"}
- //logger.info(params+", 调快接口返回结果:"+result);
- //去空格、换行符号
- if(result != null) result = result.replaceAll("\r|\n", "").replaceAll(" ", "").replaceAll(" ", "");
- return result;
- }
-
- /**
- * 获取请求属性性
- * @return
- */
- private Map getProperty(){
- Map reqProperty = new HashMap();
- reqProperty.put("Content-type", "application/json;charset=UTF-8");
- return reqProperty;
- }
-
- /**
- * 获取下次推送的默认时间(下个月1号零点三十分零一秒)
- * @return
- */
- private String getCalculatedate() {
- SimpleDateFormat dft = new SimpleDateFormat("yyyyMMdd");
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.MONTH, 1);
- calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMinimum(Calendar.DAY_OF_MONTH));
- return dft.format(calendar.getTime())+"003001";
- }
-
- /**
- * double类型如果小数点后为零则显示整数否则保留三位小数,比如23.0,转换后为23
- * @param d
- * @return
- */
- public String formatDouble(double d) {
- String res = "";
- String tmp = "";
- BigDecimal bg = new BigDecimal(d).setScale(4, RoundingMode.UP);
- double num = bg.doubleValue();
- //防止double值大于8位数出现科学计数法格式
- java.text.NumberFormat nf = java.text.NumberFormat.getInstance();
- nf.setGroupingUsed(false);
- tmp = nf.format(num);
- //以小数点分隔为两部分
- String [] arry = tmp.split("\\.");
- //如果有小数点
- if(arry.length == 2){
- //小数点前面的是0,则小数点后面的置为空,不要了
- if("0".equals(arry[1])){
- arry[1] = "";
- }else if(arry[1].length() > 3){//小数点后面超过3位数,只要前三位
- arry[1] = arry[1].substring(0,3);
- }
- //去除小数点后的无效0(最后一位为0,去除),执行三次,因为前面保留了最多小数点后的三位小数
- if(arry[1].lastIndexOf("0") == arry[1].length()-1){
- arry[1] = arry[1].substring(0, arry[1].length()-1);
- }
- if(arry[1].lastIndexOf("0") == arry[1].length()-1){
- arry[1] = arry[1].substring(0, arry[1].length()-1);
- }
- if(arry[1].lastIndexOf("0") == arry[1].length()-1){
- arry[1] = arry[1].substring(0, arry[1].length()-1);
- }
- //如果第二部分为空,则只返回第一部分的数据
- if("".equals(arry[1])){
- res = arry[0];
- }else{
- res = arry[0]+"."+arry[1];
- }
- }else{
- res = tmp;
- }
- return res;
- }
- }
|