123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package com.chinacreator.process.job;
- import com.alibaba.druid.util.StringUtils;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.chinacreator.process.dao.IntensiveSyncOrderDao;
- import com.chinacreator.process.util.AesUtilByIntensive;
- import com.chinacreator.process.util.URLUtil;
- import org.apache.log4j.Logger;
- import org.quartz.DisallowConcurrentExecution;
- import org.quartz.PersistJobDataAfterExecution;
- import org.springframework.beans.factory.annotation.Autowired;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- @PersistJobDataAfterExecution
- @DisallowConcurrentExecution
- public class IntensiveSyncOrderJob {
- private Logger logger = Logger.getLogger("intensiveSyncOrder");
- @Autowired
- private IntensiveSyncOrderDao intensiveSyncOrderDao;
- public void doProcess() throws Exception {
- logger.info("开始处理9楼集约化平台订购关系同步job");
- try {
- //获取表中需要订购的数据
- List<HashMap> dataList = intensiveSyncOrderDao.queryList();
- logger.info("获取数据条数" + dataList.size());
- if (dataList != null && dataList.size() > 0) {
- dataList = paraseData(dataList); //去重复数据
- logger.info("去重复后用户数:" + dataList.size());
- ExecutorService executorService = new ThreadPoolExecutor(10, 20, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
- for (HashMap<String, String> data : dataList) {
- intensiveSyncOrderDao.updataQueryList(data.get("ID"), "2");
- }
- for (HashMap<String, String> data : dataList) {
- IntensiveSyncOrderService intensiveSyncOrderService = new IntensiveSyncOrderService(intensiveSyncOrderDao,data);
- executorService.execute(intensiveSyncOrderService);
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(5L, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- Thread.sleep(100);
- //获取执行状态为2且执行时间超过10分钟的数据为待处理
- int i = intensiveSyncOrderDao.updExecTimeout();
- if (i > 0) {
- logger.info("更新状态为0处理中且执行时间超过10分钟的数据为待处理的异常数据条数【" + i + "】");
- }
- }
- }catch (Exception ee){
- ee.printStackTrace();
- logger.info("9楼集约化平台订购关系同步job执行出现异常,"+ee.getMessage());
- }
- logger.info("结束9楼集约化平台订购关系同步job");
- }
- /**
- * 按手机号码与spid去除重复数据,确保获取的同一组数据里手机号码与spid不相同
- * @param dataList
- * @return
- */
- private List<HashMap> paraseData(List<HashMap> dataList){
- //去重复后的数据集
- List<HashMap> reDataList = new ArrayList<HashMap>();
- HashMap<Object, Object> tmpMap = new HashMap<Object, Object>();
- for (HashMap dataMap : dataList) {
- if(tmpMap.containsKey(dataMap.get("USERID").toString()+dataMap.get("SPID").toString())){
- logger.info("重复数据,"+dataMap);
- }else{
- tmpMap.put(dataMap.get("USERID").toString()+dataMap.get("SPID").toString(), dataMap);
- reDataList.add(dataMap);
- }
- }
- return reDataList;
- }
- }
- class IntensiveSyncOrderService implements Runnable{
- private static Logger logger = Logger.getLogger("intensiveSyncOrder");
- private IntensiveSyncOrderDao intensiveSyncOrderDao;
- private HashMap<String,String> data;
- public IntensiveSyncOrderService(IntensiveSyncOrderDao intensiveSyncOrderDao, HashMap<String, String> data) {
- this.intensiveSyncOrderDao = intensiveSyncOrderDao;
- this.data = data;
- }
- @Override
- public void run() {
- String resultcode = "3000";
- String resultinfo = "无返回";
- String resultdata = "";
- String id = data.get("ID");
- String syncount = data.get("SYNCOUNT");
- HashMap<String,Object> logmap = new HashMap<String, Object>();
- logmap.put("data",data);
- try {
- List<HashMap> invokeUrl = intensiveSyncOrderDao.getInvokeUrl("actvieprocess", "intensiveSyncOrder");
- logmap.put("invokeUrl",invokeUrl);
- String cpid = "";
- String busiid = "";
- String aespwd = "";
- String channel = "";
- String url = "";
- if (invokeUrl!=null&&invokeUrl.size()>0){
- HashMap invokeUrlConf = invokeUrl.get(0);
- String param1 = invokeUrlConf.get("PARAM1").toString();
- JSONObject object = JSON.parseObject(param1);
- cpid = object.getString("cpid");
- busiid = object.getString("busiid");
- aespwd = object.getString("aespwd");
- channel = object.getString("channel");
- url = invokeUrlConf.get("INVOKEURL").toString();
- }
- String jiyueChannel = intensiveSyncOrderDao.getChannel(data.get("SPID"));
- if (!StringUtils.isEmpty(jiyueChannel)){
- channel = jiyueChannel;
- }
- String spid = data.get("SPID");
- String userid = data.get("USERID");
- String ordertime = data.get("ORDERTIME");
- String canceltime = data.get("CANCELTIME");
- String productid = intensiveSyncOrderDao.getProductid(spid);
- HashMap<String,String> map = new HashMap<String,String>();
- map.put("busiid",busiid);
- map.put("serialid",id);
- map.put("usermob",userid);
- map.put("productid",productid);
- map.put("ordertime",ordertime);
- map.put("canceltime",canceltime);
- map.put("channel",channel);
- logmap.put("reqDate",map);
- HashMap<String,Object> hashMap = new HashMap<String,Object>();
- hashMap.put("cpid",cpid);
- hashMap.put("data", AesUtilByIntensive.encrypt(JSON.toJSONString(map),aespwd));
- String resp = URLUtil.postJson(url, JSON.toJSONString(hashMap));
- logmap.put("resp",resp);
- if (!StringUtils.isEmpty(resp)){
- JSONObject object = JSON.parseObject(resp);
- resultcode = object.getString("resultCode");
- resultinfo = object.getString("resultInfo");
- resultdata = object.getString("data");
- }
- if (!StringUtils.isEmpty(resultdata)){
- try {
- resultdata = AesUtilByIntensive.decrypt(resultdata,aespwd);
- }catch (Exception e){
- logger.error("解密data出错"+e);
- }
- }else {
- resultdata = resp;
- }
- }catch (Exception e){
- e.printStackTrace();
- resultcode = "8000";
- resultinfo = e.getMessage();
- resultdata = "";
- }finally {
- logmap.put("resultcode",resultcode);
- logmap.put("resultinfo",resultinfo);
- logmap.put("resultdata",resultdata);
- try {
- intensiveSyncOrderDao.updata(id,resultcode,resultinfo,resultdata,Integer.parseInt(syncount)+1);
- }catch (Exception e){
- e.printStackTrace();
- logger.error("id: "+id+"修改失败:"+e.getMessage());
- }
- logger.info(logmap);
- }
- }
- }
|