0646a4a1fc1a59d83644195f55cbacfe3b50a1a9.svn-base 13 KB


  1. package com.chinacreator.process.job;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.chinacreator.common.exception.BusinessException;
  5. import com.chinacreator.common.util.DESUtil;
  6. import com.chinacreator.process.bean.ContractProductPushBean;
  7. import com.chinacreator.process.bean.KuaishouPushBean;
  8. import com.chinacreator.process.dao.ContractProductDao;
  9. import com.chinacreator.process.dao.DictionaryDao;
  10. import com.chinacreator.process.dao.KuaishouDao;
  11. import com.chinacreator.process.util.JsonUtil;
  12. import com.chinacreator.process.util.SHAUtil;
  13. import com.chinacreator.process.util.URLUtil;
  14. import org.apache.log4j.Logger;
  15. import org.quartz.DisallowConcurrentExecution;
  16. import org.quartz.PersistJobDataAfterExecution;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.util.StringUtils;
  19. import java.net.URLEncoder;
  20. import java.text.DateFormat;
  21. import java.text.ParseException;
  22. import java.text.SimpleDateFormat;
  23. import java.util.*;
  24. import java.util.concurrent.*;
  25. /**
  26. * 合约产品合约期到期后短信通知(每月25号10点至22点)
  27. * ContractProductJob将TD_CONTRACT_PRODUCT表里面符合条件的数据添加到表TD_CONTRACT_PRODUCT_PUSH 并推送
  28. * 每个月25号10点至20点期间每10秒钟执行一次
  29. * @author can.he
  30. * @date 20210825
  31. */
  32. @PersistJobDataAfterExecution
  33. @DisallowConcurrentExecution
  34. public class ContractProductJob {
  35. private static Logger logger = Logger.getLogger("contractpush");
  36. @Autowired
  37. private ContractProductDao contractProductDao;
  38. @Autowired
  39. private DictionaryDao dictionaryDao;
  40. // private ContractProductDao contractProductDao = new ContractProductDao();
  41. // private DictionaryDao dictionaryDao = new DictionaryDao();
  42. public void doProcess() throws Exception {
  43. //判断是否在当前时间范围(每个月的25号10点至20点之间)
  44. Date nowDate = new Date();
  45. if(!getBetweenTime(nowDate)){
  46. return;
  47. }
  48. logger.info(Thread.currentThread().getName()+"ContractProductJob合约产品定时任务开始");
  49. long beginTime = System.currentTimeMillis();
  50. int rows = 800; //每次取数据条数
  51. String contractfmrows = dictionaryDao.getValue("contractfmrows");
  52. if (!StringUtils.isEmpty(contractfmrows)) {
  53. try {
  54. rows = Integer.parseInt(contractfmrows);
  55. } catch (Exception e) {
  56. rows = 800;
  57. }
  58. }
  59. //当前时间往前推5个月的时间
  60. String newDate = getStepMonth(nowDate, -5);
  61. logger.info("当前日期往前推5个月是:"+newDate);
  62. //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
  63. String partition = "T_HASH_P";
  64. for(int i = 1; i <= 20; i++){
  65. partition = "T_HASH_P";
  66. try {
  67. if(i < 10){
  68. partition = partition + "0" + i;
  69. }else{
  70. partition = partition + i;
  71. }
  72. //按分区标识获取订购数据
  73. // List<HashMap> list = contractProductDao.getContractProductByPart(partition,rows,newDate);
  74. List<HashMap> list = contractProductDao.getContractProductByPart(partition,rows,newDate);
  75. logger.info(partition+",用户数:"+(list != null ? list.size() : "0"));
  76. if(list != null && list.size() > 0){
  77. logger.info(partition+",去重复前用户数:"+list.size());
  78. list = paraseData(list);
  79. logger.info(partition+",去重复后用户数:"+list.size());
  80. //把数据更新为正在处理状态
  81. contractProductDao.batchUpdSyncStatus(getIds(list));
  82. //推送月份
  83. String pushmonth = new SimpleDateFormat("yyyyMM").format(new Date());
  84. CountDownLatch threadSignal = new CountDownLatch(list.size());
  85. ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  86. // //把数据更新为正在处理状态
  87. // for(HashMap hm : list){
  88. // boolean res = contractProductDao.updContractProductExecing(hm.get("ID").toString());
  89. // }
  90. for(HashMap hm : list){
  91. ContractProductService continueService = new ContractProductService(list.size(),threadSignal,hm,pushmonth,dictionaryDao,contractProductDao);
  92. executorService.execute(continueService);
  93. }
  94. executorService.shutdown();
  95. try {
  96. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  97. } catch (InterruptedException e) {
  98. e.printStackTrace();
  99. }
  100. }
  101. } catch (Exception e) {
  102. logger.info(partition+",执行出现异常,"+e.getMessage());
  103. e.printStackTrace();
  104. }
  105. }
  106. logger.info(Thread.currentThread().getName()+"ContractProductJob合约产品定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  107. }
  108. /**
  109. * 拼接ID,用于更新数据
  110. * @param dataList
  111. * @return
  112. */
  113. private String getIds(List<HashMap> dataList){
  114. String ids = "";
  115. if(dataList != null && dataList.size()>0){
  116. for(HashMap tmphm : dataList){
  117. ids += ",'"+tmphm.get("ID")+"'";
  118. }
  119. }
  120. if(!"".equals(ids)){
  121. ids = ids.substring(1);
  122. }
  123. logger.info("ids=>"+ids);
  124. return ids;
  125. }
  126. private List<HashMap> paraseData(List<HashMap> dataList){
  127. //去重复后的数据集
  128. List<HashMap> reDataList = new ArrayList<HashMap>();
  129. HashMap<String, List> tmpMap = new HashMap<String, List>();
  130. for (HashMap dataMap : dataList) {
  131. if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){
  132. logger.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID"));
  133. }else{
  134. reDataList.add(dataMap);
  135. List tmpList = new ArrayList();
  136. tmpList.add(dataMap.get("SPID"));
  137. tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList);
  138. }
  139. }
  140. return reDataList;
  141. }
  142. /**
  143. * 在给定的日期加上或减去指定月份后的日期
  144. *
  145. * @param sourceDate 原始时间
  146. * @param month 要调整的月份,向前为负数,向后为正数
  147. * @return
  148. */
  149. public String getStepMonth(Date sourceDate, int month) {
  150. DateFormat DATE_FORMAT = new SimpleDateFormat("yyyyMM");
  151. Calendar c = Calendar.getInstance();
  152. c.setTime(sourceDate);
  153. c.add(Calendar.MONTH, month);
  154. return DATE_FORMAT.format(c.getTime());
  155. }
  156. //获取当前时间
  157. public String getNowDate(Date date) {
  158. SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  159. return format.format(date);
  160. }
  161. //获取当前日期
  162. public static boolean getBetweenTime(Date nowTime) {
  163. SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  164. SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM");
  165. try {
  166. Date startTime = format.parse(format1.format(nowTime)+"-25 10:00:00");
  167. Date endTime = format.parse(format1.format(nowTime)+"-25 19:50:00");
  168. // Date endTime = format.parse(format1.format(nowTime)+"-31 23:59:00");
  169. return isEffectiveDate(nowTime, startTime, endTime);
  170. } catch (ParseException e) {
  171. e.printStackTrace();
  172. }
  173. return false;
  174. }
  175. //判断是否在当前时间范围
  176. public static boolean isEffectiveDate(Date nowTime, Date startTime, Date endTime) {
  177. if (nowTime.getTime() == startTime.getTime()
  178. || nowTime.getTime() == endTime.getTime()) {
  179. return true;
  180. }
  181. Calendar date = Calendar.getInstance();
  182. date.setTime(nowTime);
  183. Calendar begin = Calendar.getInstance();
  184. begin.setTime(startTime);
  185. Calendar end = Calendar.getInstance();
  186. end.setTime(endTime);
  187. if (date.after(begin) && date.before(end)) {
  188. return true;
  189. } else {
  190. return false;
  191. }
  192. }
  193. }
  194. class ContractProductService implements Runnable {
  195. private static Logger logger = Logger.getLogger("contractpush");
  196. private int totalSize;
  197. private CountDownLatch threadSignal;
  198. private List<String> spidsList;
  199. private String pushmonth;
  200. private ContractProductDao contractProductDao;
  201. private DictionaryDao dictionaryDao;
  202. private HashMap hm; //用户数据,ID和USERID
  203. public ContractProductService(int totalSize,CountDownLatch threadSignal,HashMap hm,String pushmonth,DictionaryDao dictionaryDao,ContractProductDao contractProductDao){
  204. this.totalSize = totalSize;
  205. this.threadSignal = threadSignal;
  206. this.hm = hm;
  207. this.pushmonth = pushmonth;
  208. this.contractProductDao = contractProductDao;
  209. this.dictionaryDao = dictionaryDao;
  210. }
  211. @Override
  212. public void run() {
  213. long startime = System.currentTimeMillis();
  214. Map logMap = new HashMap();
  215. logMap.put("userinfo", hm);
  216. logMap.put("pushmonth", pushmonth);
  217. String resultcode = "-1"; //0,成功、1采集推送数据、短信推送数据处理失败,2短信推送失败
  218. String errorinfo = "";
  219. String realflag = ""; //添加到推送表标识,空,未推送,1已经推送 1订购推送,2月未推送
  220. String id = hm.get("ID").toString();
  221. try {
  222. //处理业务逻辑
  223. realflag = pushService(logMap);
  224. if(logMap.get("errorinfo") != null){
  225. errorinfo = logMap.get("errorinfo").toString();
  226. }
  227. //未出现异常,设置为成功
  228. resultcode = "0";
  229. if("".equals(errorinfo)){
  230. errorinfo = "ok"; //目前是那种不是订购状态的数据,不需要推送的
  231. }
  232. } catch (Exception e) {
  233. if (e instanceof BusinessException) {
  234. errorinfo = ((BusinessException) e).getMessage();
  235. resultcode = ((BusinessException) e).getCode();
  236. }else{
  237. e.printStackTrace();
  238. resultcode = "8000";
  239. errorinfo = "处理数据出现异常,"+e.getMessage();
  240. }
  241. }finally {
  242. threadSignal.countDown();
  243. try{
  244. //暂未同步或者处理出现异常,恢复初始状态,下次再处理
  245. if("9056".equals(resultcode) || ("8000".equals(resultcode)&& !errorinfo.contains("唯一约束") && !errorinfo.contains("unique") )){
  246. resultcode = "1"; //推送失败的继续推
  247. }else{
  248. resultcode = "0";
  249. }
  250. if(realflag.equals("1")){ //短信推送失败的
  251. resultcode = "3";
  252. }
  253. contractProductDao.updContractProduct(id,resultcode,errorinfo);
  254. }catch(Exception e){
  255. e.printStackTrace();
  256. errorinfo = "更新数据出现异常,"+e.getMessage();
  257. resultcode = "8001";
  258. }
  259. //写日志
  260. logMap.put("jobname", "ContractProductJob");
  261. logMap.put("resultcode", resultcode);
  262. logMap.put("errorinfo", errorinfo);
  263. logMap.put("realflag", realflag);
  264. // logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  265. logger.info(JsonUtil.objectToJson(logMap));
  266. }
  267. }
  268. /**
  269. * 添加数据到推送表TD_CONTRACT_PRODUCT_PUSH且发送短信
  270. * @param logMap
  271. * @throws Exception
  272. */
  273. private String pushService(Map logMap) throws Exception{
  274. String realflag = ""; //添加到推送表标识,空,未推送,1,已经推送 1订购推送,2月未推送
  275. String userid = hm.get("USERID").toString();
  276. String id = hm.get("ID").toString();
  277. String spid = hm.get("SPID").toString();
  278. String cpid = hm.get("CPID").toString();
  279. //获取订购关系 (进行数据比对 确认是否为订购的)
  280. HashMap orderRealMap = contractProductDao.getOrderRealById(id);
  281. if(orderRealMap == null){
  282. throw new BusinessException("9056","订购关系表无订购数据");
  283. }
  284. //失效时间(如果是null 则代表是订购)
  285. String ENDTIME = orderRealMap.get("ENDTIME").toString();
  286. if(ENDTIME.equals("NULL")){ //为空代表是订购的
  287. ContractProductPushBean pushBean = new ContractProductPushBean();
  288. String pushid = contractProductDao.getNo();
  289. pushBean.setUserid(userid);
  290. pushBean.setId(pushid);
  291. pushBean.setPushmonth(pushmonth);
  292. pushBean.setCpid(cpid);
  293. pushBean.setSpid(spid);
  294. pushBean.setPushid(id);
  295. //添加数据到推送表
  296. contractProductDao.insertPush(pushBean);
  297. String spname ="产品";
  298. List<HashMap> spList = contractProductDao.findSpInfo(spid);
  299. if (spList.get(0).get("SPNAME") != null && !StringUtils.isEmpty(spList.get(0).get("SPNAME").toString())){
  300. spname = spList.get(0).get("SPNAME").toString();
  301. }
  302. String content = dictionaryDao.getValue("contractpushcontent");
  303. // String content = "【中国联通】您订购的"+spname+"合约已到期,到期后退订不再扣取违约金,如不退订次月将自动续订,感谢您的支持!";
  304. //发送短信
  305. String smsInfo= "失败";
  306. String smsCode = "-1";
  307. smsCode= send(pushid,userid,content.replace("#",spname));
  308. if(smsCode!= null && smsCode.equals("0")){ //短信发送成功
  309. logMap.put("errorinfo", "短信发送成功");
  310. smsInfo="成功";
  311. }else{
  312. logMap.put("errorinfo", "短信发送失败");
  313. realflag = "1"; //短信推送失败
  314. }
  315. contractProductDao.updatePush(pushid,smsCode,smsInfo);
  316. }
  317. return realflag;
  318. }
  319. //发送短信
  320. public String send(String pushid,String userid,String content) throws Exception {
  321. System.out.println("content=="+content);
  322. String url = dictionaryDao.getValue("contractpushurl");
  323. // String url = "http://111.206.133.54/smsbusi/sms/send";
  324. String smsid = "10655117";
  325. String pwd = "wo6bslq2";
  326. userid = DESUtil.encode(userid,pwd);
  327. String timestamp = System.currentTimeMillis()/1000+"";
  328. String sign = SHAUtil.shaEncode(smsid+userid+timestamp+content+pwd).toLowerCase();
  329. userid = URLEncoder.encode(userid,"utf-8");
  330. content = URLEncoder.encode(content,"utf-8");
  331. url = url+"?smsid="+smsid+"&userid="+userid+"&timestamp="+timestamp+"&sign="+sign+"&content="+content;
  332. String result = "";
  333. try {
  334. result = URLUtil.get(url);
  335. } catch (Exception e) {
  336. e.printStackTrace();
  337. }
  338. logger.info(pushid+", "+userid+", 调短信接口地址返回结果:"+result);
  339. if(result!="" && result!=null){
  340. JSONObject obj = JSON.parseObject(result);
  341. String resultcode = obj.getString("resultcode");
  342. return resultcode;
  343. }
  344. return null;
  345. }
  346. public static void main(String[] args) {
  347. ContractProductJob job = new ContractProductJob();
  348. try {
  349. job.doProcess();
  350. // job.test();
  351. } catch (Exception e) {
  352. e.printStackTrace();
  353. }
  354. }
  355. }