package com.chinacreator.process.dao; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.chinacreator.process.bean.ContinueBean; import com.chinacreator.process.bean.KuaishouPushBean; import com.chinacreator.process.bean.TdActiveclientRecBean; import com.chinacreator.process.util.DataSource; import com.frameworkset.common.poolman.PreparedDBUtil; import com.frameworkset.common.poolman.SQLExecutor; @Component public class KuaishouFlowMonthDao{ private String tabname1 = "NET3G.TD_KUAISHOU_FIRSTMONTH"; private String tabname2 = "NET3G.TD_ACTIVECLIENT_REC"; //表名 //private String tabname1 = "KCZX.TD_KUAISHOU_FIRSTMONTH_0707"; //private String tabname2 = "KCZX.TD_ACTIVECLIENT_REC_0707"; /** * 根据分区获取要处理的数据,最多重试3次 * @param partition * @param rows * @return * @throws SQLException */ public List getFlowMonthByPart(String partition, int rows)throws SQLException { String sql = " SELECT ID, USERID, SPID, FLOWCOUNT, TO_CHAR(FLOWCALCULATEDATE,'YYYYMMDDHH24MISS') FLOWCALCULATEDATE FROM "+tabname1+" PARTITION("+partition+") "+ " WHERE FLOWRESULTCODE NOT IN ('2', '0') AND SYSDATE >= FLOWCALCULATEDATE "; PreparedDBUtil pdb = new PreparedDBUtil(); return pdb.executeSelectForList(DataSource.NET3G, sql,0,rows, HashMap.class); } /** * 更新状态为执行中 * @param id * @return * @throws SQLException */ public boolean updFlowMonthExecing(String id) throws SQLException { //更新数据状态为处理中 String sql = "UPDATE "+tabname1+" SET FLOWRESULTCODE = '2' , FLOWCOUNT = FLOWCOUNT + 1, FLOWTIME = SYSDATE WHERE ID = ? "; Object rows = SQLExecutor.updateWithDBName(DataSource.NET3G, sql, id); if(Integer.parseInt(rows.toString()) > 0){ return true; }else{ return false; } } /** * 执行成功更新查询结果,下次执行时间设置为下月最后一天凌晨1点 * @param id * @return * @throws SQLException */ public boolean updFlowMonthSucc(String id )throws SQLException { String sql = " UPDATE "+tabname1+" SET FLOWRESULTCODE = '1', FLOWRESULTINFO = 'ok', FLOWTIME = SYSDATE, FLOWCOUNT = 0, " + " FLOWCALCULATEDATE = TO_DATE(TO_CHAR(LAST_DAY(ADD_MONTHS(TRUNC(SYSDATE),1)) -4,'YYYYMMDD')||'010000','YYYYMMDDHH24MISS') WHERE ID = ? "; Object rows = SQLExecutor.updateWithDBName(DataSource.NET3G, sql, id); if(Integer.parseInt(rows.toString()) > 0){ return true; }else{ return false; } } /** * 更新处理结果 * @param data * @param resultcode * @param resultinfo * @return * @throws SQLException */ public boolean updFlowMonthFailEnd(HashMap data, String resultcode, String resultinfo )throws SQLException { String sql = ""; String id = data.get("ID").toString(); int flowcount = Integer.parseInt(data.get("FLOWCOUNT").toString()); if(flowcount >= 1 || ("9055".equals(resultcode) || "9056".equals(resultcode) || "9052".equals(resultcode) || "9053".equals(resultcode) || "9054".equals(resultcode) || "9058".equals(resultcode))){//到达最大执行次数或者是指定错误编码,更新为最终状态(无订购关系,无配置信息,数据列表不为空但没有对应产品的数据信) //FLOWCALCULATEDATE,设置时间为月末最后第五天 sql = " UPDATE "+tabname1+" SET FLOWRESULTCODE = ? , FLOWRESULTINFO = ? , FLOWTIME = SYSDATE, FLOWCOUNT = 0, " + " FLOWCALCULATEDATE = TO_DATE(TO_CHAR(LAST_DAY(ADD_MONTHS(TRUNC(SYSDATE),1)) -4,'YYYYMMDD')||'010000','YYYYMMDDHH24MISS') WHERE ID = ? "; //9052计算时间不是当月,设置成当月 if("9052".equals(resultcode)){ sql = " UPDATE "+tabname1+" SET FLOWRESULTCODE = ? , FLOWRESULTINFO = ? , FLOWTIME = SYSDATE, FLOWCOUNT = 0, " + " FLOWCALCULATEDATE = TO_DATE(TO_CHAR(LAST_DAY(ADD_MONTHS(TRUNC(SYSDATE),0)) -4,'YYYYMMDD')||'010000','YYYYMMDDHH24MISS') WHERE ID = ? "; } }else{ sql = "UPDATE "+tabname1+" SET FLOWRESULTCODE = ? , FLOWRESULTINFO = ?, FLOWTIME = SYSDATE WHERE ID = ? "; } Object rows = SQLExecutor.updateWithDBName(DataSource.NET3G, sql, resultcode,resultinfo, id); if(Integer.parseInt(rows.toString()) > 0){ return true; }else{ return false; } } /** * 添加快手流量数据 * @param bean * @throws SQLException */ public void insertActive(TdActiveclientRecBean bean) throws SQLException { String sql = "INSERT INTO "+tabname2+" (USERID, ACTIVETYPE, ACTIVEMONTH , ACTIVEDAY ,ACTIVELASTTIME ,FLOWTOTAL, PROVINCE, AREA, SPID, ACTIVESRC, SPNAME ) " + " VALUES(#[userid], '1', TO_CHAR(SYSDATE, 'YYYYMM'), TO_CHAR(SYSDATE, 'YYYYMMDD'),SYSDATE, #[flowtotal], #[province], #[area], #[spid], 'KSFLOW', #[spname] )"; SQLExecutor.insertBean(DataSource.NET3G, sql, bean); } /** * 根据SPID查询SPNAME * @param spid * @return * @throws SQLException */ public String qrySpname(String spid)throws SQLException { String sql = "SELECT SPNAME FROM NET3G.TB_SP_INFO WHERE SPID = ? "; String res = (String)SQLExecutor.queryFieldWithDBName(DataSource.NET3G,sql,spid); return res; } /** * 查询SPID在TB_SP_AOP_CONFIG表的产品ID,多条数据只返回一个结果,不报错 * @param spid * @throws SQLException */ public String qryAopProductid(String spid) throws SQLException { String sql = " SELECT PRODUCT_ID FROM NET3G.TB_SP_AOP_CONFIG WHERE SP_ID = ? "; String res = SQLExecutor.queryObjectWithDBName(String.class, DataSource.NET3G,sql, spid); return res; } /** * 查询KAFKA产品配置表,获取产品ID * @param spid * @return * @throws SQLException */ public String qryKafkaProductid(String spid) throws SQLException { String sql = " SELECT PRODUCTID FROM NET3G.TB_KAFKA_PRODUCT_CONF WHERE SPID = ? "; String res = SQLExecutor.queryObjectWithDBName(String.class, DataSource.NET3G,sql, spid); return res; } /** * 根据ID查询订购关系 * @param id * @return HASHMAP * @throws SQLException */ public HashMap getOrderRealById(String id) throws SQLException { String sql = "SELECT SPID,USERID, TO_CHAR(ORDERTIME,'YYYYMMDDHH24MISS') ORDERTIME, PROVINCE, AREA, "+ " (CASE WHEN ENDTIME IS NULL THEN 'NULL' ELSE TO_CHAR(ENDTIME,'YYYYMMDDHH24MISS') END) ENDTIME, "+ " (CASE WHEN CANCELTIME IS NULL THEN 'NULL' ELSE TO_CHAR(CANCELTIME,'YYYYMMDDHH24MISS') END) CANCELTIME, "+ " (CASE WHEN ENDTIME IS NULL THEN '0' WHEN ENDTIME >= SYSDATE THEN '1' ELSE '2' END) STATUS "+ " FROM NET3G.TD_ORDER_RELATIONS WHERE ID = ? "; return SQLExecutor.queryObjectWithDBName(HashMap.class, DataSource.NET3G, sql, id); } /** * 获取接口信息 * @param projname 工程名称 * @param invokename 接口名称 * @param invoketype 接口类型,1外部接口,2内部接口 * @return * @throws Exception */ public HashMap getInvokeUrlInfo(String projname, String invokename, String invoketype) throws Exception { String sql = "SELECT * FROM NET3G.TB_PROJINVOKEURL_CONF WHERE STATUS = '0' AND PROJNAME = ? AND INVOKENAME = ? AND INVOKETYPE = ? "; return SQLExecutor.queryObjectWithDBName(HashMap.class, DataSource.NET3G, sql, projname, invokename, invoketype); } /** * 更新执行状态为2且时间超过50分钟的数据为待处理 * @param partition 分区标识 * @return */ public int updExecTimeout(String partition){ int res = 0; String sql = "UPDATE "+tabname1+" PARTITION("+partition+") A SET A.FLOWRESULTCODE = '1' WHERE A.FLOWRESULTCODE = '2' AND A.FLOWTIME IS NOT NULL AND A.FLOWTIME < SYSDATE - 50/(24*60) "; try { Object obj = SQLExecutor.updateWithDBName(DataSource.NET3G, sql, null); res = (Integer)obj; } catch (SQLException e) { e.printStackTrace(); } return res; } /** * 初始化新增数据的FLOWCALCULATEDATE值 * 设置为当月月末最后第五天凌晨1点 * @return * @throws SQLException */ public int updFlowMonthInit(String partition) { int res = 0; //更新数据状态为处理中 String sql = "UPDATE "+tabname1+" PARTITION("+partition+") SET FLOWCALCULATEDATE = TO_DATE(TO_CHAR(LAST_DAY(TRUNC(SYSDATE)) -4,'YYYYMMDD')||'010000','YYYYMMDDHH24MISS') "+ " WHERE FLOWCALCULATEDATE IS NULL "; try { Object obj = SQLExecutor.updateWithDBName(DataSource.NET3G, sql); res = (Integer)obj; } catch (SQLException e) { e.printStackTrace(); } return res; } public static void main(String[] args) throws SQLException { KuaishouFlowMonthDao dao = new KuaishouFlowMonthDao(); //System.out.println(dao.updatePush("202005091510455306373", "0", "ok")); //System.out.println(dao.findOrderRelaBySpid("18673197465", "1022")); //System.out.println(dao.getNo()); //System.out.println(dao.queryPush("202005091510455306373")); //System.out.println(dao.getOrderPush()); //System.out.println(dao.hasSync("18673197465", "190")); //System.out.println(dao.getSpidById("20201022095726288224")); System.out.println(dao.updFlowMonthInit("T_HASH_P01")); } }