85d430861cb34d6b29c23af5f2d8baa37608b3c6.svn-base 14 KB


  1. package com.chinacreator.process.job;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.chinacreator.common.exception.BusinessException;
  5. import com.chinacreator.common.util.DESUtil;
  6. import com.chinacreator.process.bean.ContractProductPushBean;
  7. import com.chinacreator.process.dao.ContractProductDao;
  8. import com.chinacreator.process.dao.DictionaryDao;
  9. import com.chinacreator.process.util.JsonUtil;
  10. import com.chinacreator.process.util.SHAUtil;
  11. import com.chinacreator.process.util.URLUtil;
  12. import org.apache.log4j.Logger;
  13. import org.quartz.DisallowConcurrentExecution;
  14. import org.quartz.PersistJobDataAfterExecution;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.util.StringUtils;
  17. import java.net.URLEncoder;
  18. import java.text.DateFormat;
  19. import java.text.ParseException;
  20. import java.text.SimpleDateFormat;
  21. import java.util.*;
  22. import java.util.concurrent.*;
  23. /**
  24. * 合约产品合约期到期后短信通知(每月25号10点至22点)
  25. * ContractProductJob将TD_CONTRACT_PRODUCT表里面符合条件的数据添加到表TD_CONTRACT_PRODUCT_PUSH 并推送
  26. * 每个月25号10点至20点期间每10秒钟执行一次
  27. * @author can.he
  28. * @date 20210825
  29. */
  30. @PersistJobDataAfterExecution
  31. @DisallowConcurrentExecution
  32. public class ContractProductTestJob {
  33. private static Logger logger = Logger.getLogger("contractpush");
  34. @Autowired
  35. private ContractProductDao contractProductDao;
  36. @Autowired
  37. private DictionaryDao dictionaryDao;
  38. // private ContractProductDao contractProductDao = new ContractProductDao();
  39. // private DictionaryDao dictionaryDao = new DictionaryDao();
  40. // @Autowired
  41. // private KuaishouDao kuaishouDao; // = new ContractProductDao();
  42. //
  43. public void doProcess() throws Exception {
  44. System.out.println(new Date()+"-----------------------------------------------");
  45. logger.info(new Date()+"-----------------------------------------------");
  46. //判断是否在当前时间范围(每个月的25号10点至20点之间)
  47. Date nowDate = new Date();
  48. if(!getBetweenTime(nowDate)){
  49. return;
  50. }
  51. logger.info(Thread.currentThread().getName()+"ContractProductJob合约产品定时任务开始");
  52. long beginTime = System.currentTimeMillis();
  53. int rows = 800; //每次取数据条数
  54. String contractfmrows = dictionaryDao.getValue("contractfmrows");
  55. if (!StringUtils.isEmpty(contractfmrows)) {
  56. try {
  57. rows = Integer.parseInt(contractfmrows);
  58. } catch (Exception e) {
  59. rows = 800;
  60. }
  61. }
  62. //当前时间往前推5个月的时间
  63. String newDate = getStepMonth(nowDate, -5);
  64. logger.info("当前日期往前推5个月是:"+newDate);
  65. //订购关系表分区标识,分区标识从T_HASH_P01到T_HASH_P50
  66. String partition = "T_HASH_P";
  67. for(int i = 1; i <= 20; i++){
  68. partition = "T_HASH_P";
  69. try {
  70. if(i < 10){
  71. partition = partition + "0" + i;
  72. }else{
  73. partition = partition + i;
  74. }
  75. //按分区标识获取订购数据
  76. // List<HashMap> list = contractProductDao.getContractProductByPart(partition,rows,newDate);
  77. List<HashMap> list = contractProductDao.getContractProductByPart(partition,rows,"202108");
  78. logger.info(partition+",用户数:"+(list != null ? list.size() : "0"));
  79. if(list != null && list.size() > 0){
  80. logger.info(partition+",去重复前用户数:"+list.size());
  81. // list = paraseData(list); //hecan
  82. logger.info(partition+",去重复后用户数:"+list.size());
  83. //把数据更新为正在处理状态
  84. contractProductDao.batchUpdSyncStatus(getIds(list));
  85. //推送月份
  86. String pushmonth = new SimpleDateFormat("yyyyMM").format(new Date());
  87. CountDownLatch threadSignal = new CountDownLatch(list.size());
  88. ExecutorService executorService = new ThreadPoolExecutor(30, 40, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
  89. // //把数据更新为正在处理状态
  90. // for(HashMap hm : list){
  91. // boolean res = contractProductDao.updContractProductExecing(hm.get("ID").toString());
  92. // }
  93. for(HashMap hm : list){
  94. ContractProductTestJobService continueService = new ContractProductTestJobService(list.size(),threadSignal,hm,pushmonth,dictionaryDao,contractProductDao);
  95. executorService.execute(continueService);
  96. }
  97. executorService.shutdown();
  98. try {
  99. executorService.awaitTermination(5L, TimeUnit.MINUTES);
  100. } catch (InterruptedException e) {
  101. e.printStackTrace();
  102. }
  103. }
  104. } catch (Exception e) {
  105. logger.info(partition+",执行出现异常,"+e.getMessage());
  106. e.printStackTrace();
  107. }
  108. Thread.sleep(100);
  109. }
  110. logger.info(Thread.currentThread().getName()+"ContractProductJob合约产品定时任务完成,耗时:"+(System.currentTimeMillis()-beginTime)/1000+" 秒");
  111. }
  112. /**
  113. * 拼接ID,用于更新数据
  114. * @param dataList
  115. * @return
  116. */
  117. private String getIds(List<HashMap> dataList){
  118. String ids = "";
  119. if(dataList != null && dataList.size()>0){
  120. for(HashMap tmphm : dataList){
  121. ids += ",'"+tmphm.get("ID")+"'";
  122. }
  123. }
  124. if(!"".equals(ids)){
  125. ids = ids.substring(1);
  126. }
  127. logger.info("ids=>"+ids);
  128. return ids;
  129. }
  130. private List<HashMap> paraseData(List<HashMap> dataList){
  131. //去重复后的数据集
  132. List<HashMap> reDataList = new ArrayList<HashMap>();
  133. HashMap<String, List> tmpMap = new HashMap<String, List>();
  134. for (HashMap dataMap : dataList) {
  135. if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"))){
  136. logger.info("重复数据,"+dataMap.get("USERID")+dataMap.get("CPID")+dataMap.get("SPID"));
  137. }else{
  138. reDataList.add(dataMap);
  139. List tmpList = new ArrayList();
  140. tmpList.add(dataMap.get("SPID"));
  141. tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("CPID")+dataMap.get("SPID"), tmpList);
  142. }
  143. }
  144. return reDataList;
  145. }
  146. /**
  147. * 在给定的日期加上或减去指定月份后的日期
  148. *
  149. * @param sourceDate 原始时间
  150. * @param month 要调整的月份,向前为负数,向后为正数
  151. * @return
  152. */
  153. public String getStepMonth(Date sourceDate, int month) {
  154. DateFormat DATE_FORMAT = new SimpleDateFormat("yyyyMM");
  155. Calendar c = Calendar.getInstance();
  156. c.setTime(sourceDate);
  157. c.add(Calendar.MONTH, month);
  158. return DATE_FORMAT.format(c.getTime());
  159. }
  160. //获取当前时间
  161. public String getNowDate(Date date) {
  162. SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  163. return format.format(date);
  164. }
  165. //获取当前日期
  166. public static boolean getBetweenTime(Date nowTime) {
  167. SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  168. SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM");
  169. try {
  170. Date startTime = format.parse(format1.format(nowTime)+"-25 08:00:00");
  171. Date endTime = format.parse(format1.format(nowTime)+"-29 19:50:00");
  172. return isEffectiveDate(nowTime, startTime, endTime);
  173. } catch (ParseException e) {
  174. e.printStackTrace();
  175. }
  176. return false;
  177. }
  178. //判断是否在当前时间范围
  179. public static boolean isEffectiveDate(Date nowTime, Date startTime, Date endTime) {
  180. if (nowTime.getTime() == startTime.getTime()
  181. || nowTime.getTime() == endTime.getTime()) {
  182. return true;
  183. }
  184. Calendar date = Calendar.getInstance();
  185. date.setTime(nowTime);
  186. Calendar begin = Calendar.getInstance();
  187. begin.setTime(startTime);
  188. Calendar end = Calendar.getInstance();
  189. end.setTime(endTime);
  190. if (date.after(begin) && date.before(end)) {
  191. return true;
  192. } else {
  193. return false;
  194. }
  195. }
  196. }
  197. class ContractProductTestJobService implements Runnable {
  198. private static Logger logger = Logger.getLogger("contractpush");
  199. private int totalSize;
  200. private CountDownLatch threadSignal;
  201. private List<String> spidsList;
  202. private String pushmonth;
  203. private ContractProductDao contractProductDao;
  204. private DictionaryDao dictionaryDao;
  205. private HashMap hm; //用户数据,ID和USERID
  206. public ContractProductTestJobService(int totalSize,CountDownLatch threadSignal,HashMap hm,String pushmonth,DictionaryDao dictionaryDao,ContractProductDao contractProductDao){
  207. this.totalSize = totalSize;
  208. this.threadSignal = threadSignal;
  209. this.hm = hm;
  210. this.pushmonth = pushmonth;
  211. this.contractProductDao = contractProductDao;
  212. this.dictionaryDao = dictionaryDao;
  213. }
  214. @Override
  215. public void run() {
  216. long startime = System.currentTimeMillis();
  217. Map logMap = new HashMap();
  218. logMap.put("userinfo", hm);
  219. logMap.put("pushmonth", pushmonth);
  220. String resultcode = "-1"; //0,成功、1采集推送数据、短信推送数据处理失败,2短信推送失败
  221. String errorinfo = "";
  222. String realflag = ""; //添加到推送表标识,空,未推送,1已经推送 1订购推送,2月未推送
  223. String id = hm.get("ID").toString();
  224. try {
  225. //处理业务逻辑
  226. realflag = pushService(logMap);
  227. if(logMap.get("errorinfo") != null){
  228. errorinfo = logMap.get("errorinfo").toString();
  229. }
  230. //未出现异常,设置为成功
  231. resultcode = "0";
  232. if("".equals(errorinfo)){
  233. errorinfo = "ok"; //目前是那种不是订购状态的数据,不需要推送的
  234. }
  235. } catch (Exception e) {
  236. if (e instanceof BusinessException) {
  237. errorinfo = ((BusinessException) e).getMessage();
  238. resultcode = ((BusinessException) e).getCode();
  239. }else{
  240. e.printStackTrace();
  241. resultcode = "8000";
  242. errorinfo = "处理数据出现异常,"+e.getMessage();
  243. }
  244. }finally {
  245. threadSignal.countDown();
  246. try{
  247. //暂未同步或者处理出现异常,恢复初始状态,下次再处理
  248. if("9056".equals(resultcode) || ("8000".equals(resultcode)&& !errorinfo.contains("唯一约束"))){
  249. resultcode = "1"; //推送失败的继续推
  250. }else{
  251. resultcode = "0";
  252. }
  253. if(realflag.equals("1")){ //短信推送失败的
  254. resultcode = "3";
  255. }
  256. contractProductDao.updContractProduct(id,resultcode,errorinfo);
  257. }catch(Exception e){
  258. e.printStackTrace();
  259. errorinfo = "更新数据出现异常,"+e.getMessage();
  260. resultcode = "8001";
  261. }
  262. //写日志
  263. logMap.put("jobname", "ContractProductJob");
  264. logMap.put("resultcode", resultcode);
  265. logMap.put("errorinfo", errorinfo);
  266. logMap.put("realflag", realflag);
  267. //logMap.put("count", totalSize+"/"+(totalSize - threadSignal.getCount()));
  268. logger.info(JsonUtil.objectToJson(logMap));
  269. }
  270. }
  271. /**
  272. * 添加数据到推送表TD_CONTRACT_PRODUCT_PUSH且发送短信
  273. * @param logMap
  274. * @throws Exception
  275. */
  276. private String pushService(Map logMap) throws Exception{
  277. String realflag = ""; //添加到推送表标识,空,未推送,1,已经推送 1订购推送,2月未推送
  278. String userid = hm.get("USERID").toString();
  279. String id = hm.get("ID").toString();
  280. String spid = hm.get("SPID").toString();
  281. String cpid = hm.get("CPID").toString();
  282. //获取订购关系 (进行数据比对 确认是否为订购的)
  283. HashMap orderRealMap = contractProductDao.getOrderRealById(id); //hecan
  284. // if(orderRealMap == null){ //hecan
  285. // throw new BusinessException("9056","订购关系表无订购数据"); //hecan
  286. // } //hecan
  287. //失效时间(如果是null 则代表是订购)
  288. // String ENDTIME = orderRealMap.get("ENDTIME").toString(); //hecan
  289. // if(ENDTIME.equals("NULL")){ //为空代表是订购的 //hecan
  290. ContractProductPushBean pushBean = new ContractProductPushBean();
  291. String pushid = contractProductDao.getNo();
  292. pushBean.setUserid(userid);
  293. pushBean.setId(pushid);
  294. pushBean.setPushmonth(pushmonth);
  295. pushBean.setCpid(cpid);
  296. pushBean.setSpid(spid);
  297. pushBean.setPushid(id);
  298. //添加数据到推送表
  299. contractProductDao.insertPush(pushBean);
  300. String spname ="产品";
  301. List<HashMap> spList = contractProductDao.findSpInfo(spid);
  302. if (spList.get(0).get("spname") != null && !StringUtils.isEmpty(spList.get(0).get("spname").toString())){
  303. spname = spList.get(0).get("spname").toString();
  304. }
  305. String content = "【中国联通】您订购的"+spname+"合约已到期,到期后退订不再扣取违约金,如不退订次月将自动续订,感谢您的支持!";
  306. //发送短信
  307. String smsInfo= "失败";
  308. String smsCode = "-1";
  309. // smsCode= send(pushid,userid,content); //hecan
  310. if(smsCode!= null && smsCode.equals("0")){ //短信发送成功
  311. logMap.put("errorinfo", "短信发送成功");
  312. smsInfo="成功";
  313. }else{
  314. logMap.put("errorinfo", "短信发送失败");
  315. realflag = "1"; //短信推送失败
  316. }
  317. contractProductDao.updatePush(pushid,smsCode,smsInfo);
  318. // } //hecan
  319. return realflag;
  320. }
  321. //发送短信
  322. public String send(String pushid,String userid,String content) throws Exception {
  323. String url = dictionaryDao.getValue("contractpushurl");
  324. // String url = "http://111.206.133.54/smsbusi/sms/send";
  325. String smsid = "10655117";
  326. String pwd = "wo6bslq2";
  327. userid = DESUtil.encode(userid,pwd);
  328. String timestamp = System.currentTimeMillis()/1000+"";
  329. String sign = SHAUtil.shaEncode(smsid+userid+timestamp+content+pwd).toLowerCase();
  330. userid = URLEncoder.encode(userid,"utf-8");
  331. content = URLEncoder.encode(content,"utf-8");
  332. url = url+"?smsid="+smsid+"&userid="+userid+"&timestamp="+timestamp+"&sign="+sign+"&content="+content;
  333. String result = "";
  334. try {
  335. result = URLUtil.get(url);
  336. } catch (Exception e) {
  337. e.printStackTrace();
  338. }
  339. logger.info(pushid+", "+userid+", 调短信接口地址返回结果:"+result);
  340. if(result!="" && result!=null){
  341. JSONObject obj = JSON.parseObject(result);
  342. String resultcode = obj.getString("resultcode");
  343. // System.out.println(resultcode);
  344. return resultcode;
  345. }
  346. return null;
  347. }
  348. /**
  349. * 获取下次推送的默认时间(下个月1号零点三十分零一秒)
  350. * @return
  351. */
  352. private String getCalculatedate() {
  353. SimpleDateFormat dft = new SimpleDateFormat("yyyyMMdd");
  354. Calendar calendar = Calendar.getInstance();
  355. calendar.add(Calendar.MONTH, 1);
  356. calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMinimum(Calendar.DAY_OF_MONTH));
  357. return dft.format(calendar.getTime())+"003001";
  358. }
  359. public static void main(String[] args) {
  360. ContractProductJob job = new ContractProductJob();
  361. try {
  362. job.doProcess();
  363. } catch (Exception e) {
  364. e.printStackTrace();
  365. }
  366. }
  367. }