973e79cb1138b6cc87ab88e0d2952d157fee0ccc.svn-base 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package com.chinacreator.process.job;
  2. import java.sql.SQLException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Arrays;
  6. import java.util.Calendar;
  7. import java.util.Date;
  8. import java.util.HashMap;
  9. import java.util.Iterator;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.Set;
  13. import java.util.concurrent.CountDownLatch;
  14. import java.util.concurrent.ExecutorService;
  15. import java.util.concurrent.LinkedBlockingQueue;
  16. import java.util.concurrent.ThreadPoolExecutor;
  17. import java.util.concurrent.TimeUnit;
  18. import org.apache.log4j.Logger;
  19. import org.quartz.DisallowConcurrentExecution;
  20. import org.quartz.PersistJobDataAfterExecution;
  21. import org.springframework.beans.factory.annotation.Autowired;
  22. import org.springframework.util.StringUtils;
  23. import com.chinacreator.common.exception.BusinessException;
  24. import com.chinacreator.process.bean.KuaishouPushBean;
  25. import com.chinacreator.process.dao.DictionaryDao;
  26. import com.chinacreator.process.dao.KuaishouDao;
  27. import com.chinacreator.process.util.JsonUtil;
  28. import com.chinacreator.process.util.WriteLogUtil;
  29. /**
  30. * 快手月初推送
  31. * 替换KuaishouPushMonthJob
  32. * 月初推送定时任务(KuaishouPushMonthNewJob)只负责把表(TD_KUAISHOU_FIRSTMONTH)添加到表TD_KAFKA_KUAISHOU_PUSH,状态为待处理(resultcode=1)
  33. * 每2分钟执行一次
  34. * @author xu.zhou
  35. * @date 20210323
  36. */
  37. @PersistJobDataAfterExecution
  38. @DisallowConcurrentExecution
  39. public class KuaishouPushMonthNewJob {
  40. private static Logger logger = Logger.getLogger("kuaishoupushmonth");
  41. @Autowired
  42. private KuaishouDao kuaishouDao; // = new KuaishouDao();
  43. @Autowired
  44. private DictionaryDao dictionaryDao; // = new DictionaryDao();
  45. public void doProcess() throws Exception {
  46. //logger.info(Thread.currentThread().getName()+"KuaishouPushMonthNewJob月初快手流量未耗尽定时任务开始");
  47. WriteLogUtil.writeLong("KuaishouPushMonthNewJob月初快手流量未耗尽定时任务开始", logger, "KuaishouPushMonthNewJob");
  48. int count = 0;
  49. long beginTime = System.currentTimeMillis();
  50. int rows = 800; //每次取数据条数
  51. String kuaishoufmrows = dictionaryDao.getValue("kuaishoufmrows");
  52. if (!StringUtils.isEmpty(kuaishoufmrows)) {
  53. try {
  54. rows = Integer.parseInt(kuaishoufmrows);
  55. } catch (Exception e) {
  56. rows = 800;
  57. }
  58. }
  59. //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
  60. String partition = "T_HASH_P";
  61. for(int i = 1; i <= 50; i++){
  62. count = 0;
  63. partition = "T_HASH_P";
  64. try {
  65. if(i < 10){
  66. partition = partition + "0" + i;
  67. }else{
  68. partition = partition + i;
  69. }
  70. //按分区标识获取订购数据
  71. List<HashMap> dataList = kuaishouDao.getFirstMonthByPart(partition,rows);
  72. count = (dataList != null ? dataList.size() : 0);
  73. //logger.info(partition+",用户数:"+(list != null ? list.size() : "0"));
  74. if(dataList != null && dataList.size() > 0){
  75. //logger.info(partition+",去重复前用户数:"+list.size());
  76. dataList = paraseData(dataList); //去重复数据
  77. logger.info(partition+",去重复前用户数:"+count+",去重复后用户数:"+dataList.size());
  78. //推送月份
  79. String pushmonth = new SimpleDateFormat("yyyyMM").format(new Date());
  80. CountDownLatch threadSignal = new CountDownLatch(dataList.size());
  81. ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  82. //把数据更新为正在处理状态
  83. for(HashMap hm : dataList){
  84. boolean res = kuaishouDao.updFirstMonthExecing(hm.get("ID").toString());
  85. }
  86. for(HashMap hm : dataList){
  87. KuaishouPushMonthNewService continueService = new KuaishouPushMonthNewService(dataList.size(),threadSignal,hm,pushmonth,kuaishouDao);
  88. executorService.execute(continueService);
  89. }
  90. executorService.shutdown();
  91. try {
  92. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  93. } catch (InterruptedException e) {
  94. e.printStackTrace();
  95. }
  96. }
  97. } catch (Exception e) {
  98. logger.info(partition+",执行出现异常,"+e.getMessage());
  99. e.printStackTrace();
  100. }
  101. Thread.sleep(100);
  102. }
  103. //logger.info(Thread.currentThread().getName()+"KuaishouPushMonthNewJob月初快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  104. WriteLogUtil.writeLong("KuaishouPushMonthNewJob月初快手流量未耗尽定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒", logger, "KuaishouPushMonthNewJob");
  105. }
  106. /**
  107. * 按手机号码去除重复数据,确保获取的同一组数据里手机号码不相同,防止同一手机号码处理并发而造成处理异常
  108. * @param dataList
  109. * @return
  110. */
  111. private List<HashMap> paraseData(List<HashMap> dataList){
  112. //去重复后的数据集
  113. List<HashMap> reDataList = new ArrayList<HashMap>();
  114. HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
  115. for (HashMap dataMap : dataList) {
  116. if(tmpMap.containsKey(dataMap.get("USERID"))){
  117. logger.info("重复数据,"+dataMap);
  118. }else{
  119. tmpMap.put(dataMap.get("USERID"), dataMap);
  120. reDataList.add(dataMap);
  121. }
  122. }
  123. return reDataList;
  124. }
  125. public static void main(String[] args) {
  126. KuaishouPushMonthNewJob job = new KuaishouPushMonthNewJob();
  127. try {
  128. job.doProcess();
  129. } catch (Exception e) {
  130. e.printStackTrace();
  131. }
  132. }
  133. }
  134. class KuaishouPushMonthNewService implements Runnable {
  135. private static Logger logger = Logger.getLogger("kuaishoupushmonth");
  136. private int totalSize;
  137. private CountDownLatch threadSignal;
  138. private List<String> spidsList;
  139. private String pushmonth;
  140. private KuaishouDao kuaishouDao;
  141. private HashMap hm; //用户数据,ID和USERID
  142. public KuaishouPushMonthNewService(int totalSize,CountDownLatch threadSignal,HashMap hm,String pushmonth,KuaishouDao kuaishouDao){
  143. this.totalSize = totalSize;
  144. this.threadSignal = threadSignal;
  145. this.hm = hm;
  146. this.pushmonth = pushmonth;
  147. this.kuaishouDao = kuaishouDao;
  148. }
  149. @Override
  150. public void run() {
  151. long startime = System.currentTimeMillis();
  152. Map logMap = new HashMap();
  153. logMap.put("userinfo", hm);
  154. logMap.put("pushmonth", pushmonth);
  155. String resultcode = "-1";
  156. String errorinfo = "";
  157. String realflag = ""; //添加到推送表标识,空,未推送,1订购推送,2月未推送
  158. String id = hm.get("ID").toString();
  159. String calculatedate = getCalculatedate(); //下次推送时间,一般为下月的第一天0点30分01秒
  160. try {
  161. //处理业务逻辑
  162. realflag = pushService(logMap);
  163. if(logMap.get("errorinfo") != null){
  164. errorinfo = logMap.get("errorinfo").toString();
  165. }
  166. //未出现异常,设置为成功
  167. resultcode = "0";
  168. if("".equals(errorinfo)){
  169. errorinfo = "ok";
  170. }
  171. } catch (Exception e) {
  172. if (e instanceof BusinessException) {
  173. errorinfo = ((BusinessException) e).getMessage();
  174. resultcode = ((BusinessException) e).getCode();
  175. }else{
  176. e.printStackTrace();
  177. resultcode = "8000";
  178. errorinfo = "处理数据出现异常,"+e.getMessage();
  179. }
  180. }finally {
  181. threadSignal.countDown();
  182. try{
  183. //暂未同步或者处理出现异常,恢复初始状态,下次再处理
  184. if("9055".equals(resultcode) || "8000".equals(resultcode)){
  185. kuaishouDao.updFirstMonthRc(id,"1", null, null, null);
  186. }else{
  187. resultcode = "0";
  188. kuaishouDao.updFirstMonthRc(id,resultcode,errorinfo, realflag, calculatedate);
  189. }
  190. }catch(Exception e){
  191. e.printStackTrace();
  192. errorinfo = "更新数据出现异常,"+e.getMessage();
  193. resultcode = "8001";
  194. }
  195. //写日志
  196. logMap.put("jobname", "KuaishouPushMonthNewJob");
  197. logMap.put("resultcode", resultcode);
  198. logMap.put("errorinfo", errorinfo);
  199. logMap.put("realflag", realflag);
  200. //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  201. logger.info(JsonUtil.objectToJson(logMap));
  202. }
  203. }
  204. /**
  205. * 添加数据到推送表TD_KAFKA_KUAISHOU_PUSH
  206. * calculatedate为空,代表是新增的订购数据,处理订购推送
  207. * calculatedate不为空,代表是月初要推送的数据,处理月初推送
  208. * @param logMap
  209. * @throws Exception
  210. */
  211. private String pushService(Map logMap) throws Exception{
  212. String realflag = ""; //添加到推送表标识,空,未推送,1订购推送,2月未推送
  213. String userid = hm.get("USERID").toString();
  214. String id = hm.get("ID").toString();
  215. String spid = hm.get("SPID").toString();
  216. //查询SPID是否在TB_SP_AOP_CONFIG配置,因为快手流量消耗数据是根据元素ID与此表匹配得到SPID,如果SPID没有此表内,代表流量数据是不会从KAFKA推送过来,也就是这种快手订购不要触发流量耗尽或有流量的推送
  217. if(!kuaishouDao.hasAopConf(spid)){
  218. throw new BusinessException("9057",spid+"在TB_SP_AOP_CONFIG表无配置");
  219. }
  220. //获取订购关系
  221. HashMap orderRealMap = kuaishouDao.getOrderRealById(id);
  222. if(orderRealMap == null){
  223. throw new BusinessException("9056","订购关系表无订购数据");
  224. }
  225. //查询用户当前有效的快手订购关系
  226. List<HashMap> realList = kuaishouDao.getRealByUserid(hm.get("USERID").toString());
  227. if(realList == null || realList.size() == 0){
  228. throw new BusinessException("9056","当月无有效订购关系");
  229. }
  230. List<String> spidsList = new ArrayList<String>();
  231. for(HashMap realtmp : realList){
  232. if(!spidsList.contains(realtmp.get("SPID").toString())){
  233. spidsList.add(realtmp.get("SPID").toString());
  234. }
  235. }
  236. //拼接所有有效订购的SPID
  237. String spids = "";
  238. for(String tmp : spidsList){
  239. if("".equals(spids)){
  240. spids += tmp;
  241. }else{
  242. spids += "#"+tmp;
  243. }
  244. }
  245. logMap.put("spids", spids);
  246. //查询当月是否有推送过同类型的数据
  247. List<HashMap> dataList = kuaishouDao.queryPush(userid, pushmonth, "2");
  248. if(dataList == null || dataList.size() == 0){
  249. KuaishouPushBean pushBean = new KuaishouPushBean();
  250. String pushid = kuaishouDao.getNo();
  251. pushBean.setSerial_number(userid);
  252. pushBean.setId(pushid);
  253. pushBean.setPushmonth(pushmonth);
  254. pushBean.setPushtype("2"); //推送类型:0为流量已耗尽,1为流量未耗尽,2为月末推送
  255. pushBean.setResultcode("1");//推送结果编码, 1待处理,2处理中,0成功,其他失败
  256. pushBean.setResultinfo("待处理");
  257. pushBean.setSpids(spids);
  258. //添加数据到推送表
  259. kuaishouDao.insertPush(pushBean);
  260. //添加到测试表
  261. //kuaishouDao.insertPushZx(pushBean);
  262. realflag = "2";
  263. //logger.info(id+",添加月初推送到推送表");
  264. logMap.put("errorinfo", "添加月初推送到推送表");
  265. }else{
  266. logMap.put("errorinfo", "当月已推送");
  267. //logger.info(id+",当月已推送,不处理");
  268. }
  269. return realflag;
  270. }
  271. /**
  272. * 获取下次推送的默认时间(下个月1号零点三十分零一秒)
  273. * @return
  274. */
  275. private String getCalculatedate() {
  276. SimpleDateFormat dft = new SimpleDateFormat("yyyyMMdd");
  277. Calendar calendar = Calendar.getInstance();
  278. calendar.add(Calendar.MONTH, 1);
  279. calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMinimum(Calendar.DAY_OF_MONTH));
  280. return dft.format(calendar.getTime())+"003001";
  281. }
  282. }