08f79113dbcd3aa0d4d020687fa3e4f5ecd0f718.svn-base 9.0 KB


  1. package com.chinacreator.process.dao;
  2. import java.sql.SQLException;
  3. import java.util.ArrayList;
  4. import java.util.HashMap;
  5. import java.util.List;
  6. import org.apache.log4j.Logger;
  7. import org.springframework.stereotype.Component;
  8. import com.alibaba.fastjson.JSONObject;
  9. import com.chinacreator.process.bean.ContinueBean;
  10. import com.chinacreator.process.bean.KuaishouPushBean;
  11. import com.chinacreator.process.bean.TdActiveclientRecBean;
  12. import com.chinacreator.process.util.DataSource;
  13. import com.frameworkset.common.poolman.PreparedDBUtil;
  14. import com.frameworkset.common.poolman.SQLExecutor;
  15. @Component
  16. public class KuaishouFlowMonthDao{
  17. private String tabname1 = "NET3G.TD_KUAISHOU_FIRSTMONTH";
  18. private String tabname2 = "NET3G.TD_ACTIVECLIENT_REC";
  19. //表名
  20. //private String tabname1 = "KCZX.TD_KUAISHOU_FIRSTMONTH_0707";
  21. //private String tabname2 = "KCZX.TD_ACTIVECLIENT_REC_0707";
  22. /**
  23. * 根据分区获取要处理的数据,最多重试3次
  24. * @param partition
  25. * @param rows
  26. * @return
  27. * @throws SQLException
  28. */
  29. public List<HashMap> getFlowMonthByPart(String partition, int rows)throws SQLException {
  30. String sql = " SELECT ID, USERID, SPID, FLOWCOUNT, TO_CHAR(FLOWCALCULATEDATE,'YYYYMMDDHH24MISS') FLOWCALCULATEDATE FROM "+tabname1+" PARTITION("+partition+") "+
  31. " WHERE FLOWRESULTCODE NOT IN ('2', '0') AND SYSDATE >= FLOWCALCULATEDATE ";
  32. PreparedDBUtil pdb = new PreparedDBUtil();
  33. return pdb.executeSelectForList(DataSource.NET3G, sql,0,rows, HashMap.class);
  34. }
  35. /**
  36. * 更新状态为执行中
  37. * @param id
  38. * @return
  39. * @throws SQLException
  40. */
  41. public boolean updFlowMonthExecing(String id) throws SQLException {
  42. //更新数据状态为处理中
  43. String sql = "UPDATE "+tabname1+" SET FLOWRESULTCODE = '2' , FLOWCOUNT = FLOWCOUNT + 1, FLOWTIME = SYSDATE WHERE ID = ? ";
  44. Object rows = SQLExecutor.updateWithDBName(DataSource.NET3G, sql, id);
  45. if(Integer.parseInt(rows.toString()) > 0){
  46. return true;
  47. }else{
  48. return false;
  49. }
  50. }
  51. /**
  52. * 执行成功更新查询结果,下次执行时间设置为下月最后一天凌晨1点
  53. * @param id
  54. * @return
  55. * @throws SQLException
  56. */
  57. public boolean updFlowMonthSucc(String id )throws SQLException {
  58. String sql = " UPDATE "+tabname1+" SET FLOWRESULTCODE = '1', FLOWRESULTINFO = 'ok', FLOWTIME = SYSDATE, FLOWCOUNT = 0, " +
  59. " FLOWCALCULATEDATE = TO_DATE(TO_CHAR(LAST_DAY(ADD_MONTHS(TRUNC(SYSDATE),1)) -4,'YYYYMMDD')||'010000','YYYYMMDDHH24MISS') WHERE ID = ? ";
  60. Object rows = SQLExecutor.updateWithDBName(DataSource.NET3G, sql, id);
  61. if(Integer.parseInt(rows.toString()) > 0){
  62. return true;
  63. }else{
  64. return false;
  65. }
  66. }
  67. /**
  68. * 更新处理结果
  69. * @param data
  70. * @param resultcode
  71. * @param resultinfo
  72. * @return
  73. * @throws SQLException
  74. */
  75. public boolean updFlowMonthFailEnd(HashMap data, String resultcode, String resultinfo )throws SQLException {
  76. String sql = "";
  77. String id = data.get("ID").toString();
  78. int flowcount = Integer.parseInt(data.get("FLOWCOUNT").toString());
  79. if(flowcount >= 1 ||
  80. ("9055".equals(resultcode) || "9056".equals(resultcode) || "9052".equals(resultcode)
  81. || "9053".equals(resultcode) || "9054".equals(resultcode) || "9058".equals(resultcode))){//到达最大执行次数或者是指定错误编码,更新为最终状态(无订购关系,无配置信息,数据列表不为空但没有对应产品的数据信)
  82. //FLOWCALCULATEDATE,设置时间为月末最后第五天
  83. sql = " UPDATE "+tabname1+" SET FLOWRESULTCODE = ? , FLOWRESULTINFO = ? , FLOWTIME = SYSDATE, FLOWCOUNT = 0, " +
  84. " FLOWCALCULATEDATE = TO_DATE(TO_CHAR(LAST_DAY(ADD_MONTHS(TRUNC(SYSDATE),1)) -4,'YYYYMMDD')||'010000','YYYYMMDDHH24MISS') WHERE ID = ? ";
  85. //9052计算时间不是当月,设置成当月
  86. if("9052".equals(resultcode)){
  87. sql = " UPDATE "+tabname1+" SET FLOWRESULTCODE = ? , FLOWRESULTINFO = ? , FLOWTIME = SYSDATE, FLOWCOUNT = 0, " +
  88. " FLOWCALCULATEDATE = TO_DATE(TO_CHAR(LAST_DAY(ADD_MONTHS(TRUNC(SYSDATE),0)) -4,'YYYYMMDD')||'010000','YYYYMMDDHH24MISS') WHERE ID = ? ";
  89. }
  90. }else{
  91. sql = "UPDATE "+tabname1+" SET FLOWRESULTCODE = ? , FLOWRESULTINFO = ?, FLOWTIME = SYSDATE WHERE ID = ? ";
  92. }
  93. Object rows = SQLExecutor.updateWithDBName(DataSource.NET3G, sql, resultcode,resultinfo, id);
  94. if(Integer.parseInt(rows.toString()) > 0){
  95. return true;
  96. }else{
  97. return false;
  98. }
  99. }
  100. /**
  101. * 添加快手流量数据
  102. * @param bean
  103. * @throws SQLException
  104. */
  105. public void insertActive(TdActiveclientRecBean bean) throws SQLException {
  106. String sql = "INSERT INTO "+tabname2+" (USERID, ACTIVETYPE, ACTIVEMONTH , ACTIVEDAY ,ACTIVELASTTIME ,FLOWTOTAL, PROVINCE, AREA, SPID, ACTIVESRC, SPNAME ) " +
  107. " VALUES(#[userid], '1', TO_CHAR(SYSDATE, 'YYYYMM'), TO_CHAR(SYSDATE, 'YYYYMMDD'),SYSDATE, #[flowtotal], #[province], #[area], #[spid], 'KSFLOW', #[spname] )";
  108. SQLExecutor.insertBean(DataSource.NET3G, sql, bean);
  109. }
  110. /**
  111. * 根据SPID查询SPNAME
  112. * @param spid
  113. * @return
  114. * @throws SQLException
  115. */
  116. public String qrySpname(String spid)throws SQLException {
  117. String sql = "SELECT SPNAME FROM NET3G.TB_SP_INFO WHERE SPID = ? ";
  118. String res = (String)SQLExecutor.queryFieldWithDBName(DataSource.NET3G,sql,spid);
  119. return res;
  120. }
  121. /**
  122. * 查询SPID在TB_SP_AOP_CONFIG表的产品ID,多条数据只返回一个结果,不报错
  123. * @param spid
  124. * @throws SQLException
  125. */
  126. public String qryAopProductid(String spid) throws SQLException {
  127. String sql = " SELECT PRODUCT_ID FROM NET3G.TB_SP_AOP_CONFIG WHERE SP_ID = ? ";
  128. String res = SQLExecutor.queryObjectWithDBName(String.class, DataSource.NET3G,sql, spid);
  129. return res;
  130. }
  131. /**
  132. * 查询KAFKA产品配置表,获取产品ID
  133. * @param spid
  134. * @return
  135. * @throws SQLException
  136. */
  137. public String qryKafkaProductid(String spid) throws SQLException {
  138. String sql = " SELECT PRODUCTID FROM NET3G.TB_KAFKA_PRODUCT_CONF WHERE SPID = ? ";
  139. String res = SQLExecutor.queryObjectWithDBName(String.class, DataSource.NET3G,sql, spid);
  140. return res;
  141. }
  142. /**
  143. * 根据ID查询订购关系
  144. * @param id
  145. * @return HASHMAP
  146. * @throws SQLException
  147. */
  148. public HashMap getOrderRealById(String id) throws SQLException {
  149. String sql = "SELECT SPID,USERID, TO_CHAR(ORDERTIME,'YYYYMMDDHH24MISS') ORDERTIME, PROVINCE, AREA, "+
  150. " (CASE WHEN ENDTIME IS NULL THEN 'NULL' ELSE TO_CHAR(ENDTIME,'YYYYMMDDHH24MISS') END) ENDTIME, "+
  151. " (CASE WHEN CANCELTIME IS NULL THEN 'NULL' ELSE TO_CHAR(CANCELTIME,'YYYYMMDDHH24MISS') END) CANCELTIME, "+
  152. " (CASE WHEN ENDTIME IS NULL THEN '0' WHEN ENDTIME >= SYSDATE THEN '1' ELSE '2' END) STATUS "+
  153. " FROM NET3G.TD_ORDER_RELATIONS WHERE ID = ? ";
  154. return SQLExecutor.queryObjectWithDBName(HashMap.class, DataSource.NET3G, sql, id);
  155. }
  156. /**
  157. * 获取接口信息
  158. * @param projname 工程名称
  159. * @param invokename 接口名称
  160. * @param invoketype 接口类型,1外部接口,2内部接口
  161. * @return
  162. * @throws Exception
  163. */
  164. public HashMap getInvokeUrlInfo(String projname, String invokename, String invoketype) throws Exception {
  165. String sql = "SELECT * FROM NET3G.TB_PROJINVOKEURL_CONF WHERE STATUS = '0' AND PROJNAME = ? AND INVOKENAME = ? AND INVOKETYPE = ? ";
  166. return SQLExecutor.queryObjectWithDBName(HashMap.class, DataSource.NET3G, sql, projname, invokename, invoketype);
  167. }
  168. /**
  169. * 更新执行状态为2且时间超过50分钟的数据为待处理
  170. * @param partition 分区标识
  171. * @return
  172. */
  173. public int updExecTimeout(String partition){
  174. int res = 0;
  175. String sql = "UPDATE "+tabname1+" PARTITION("+partition+") A SET A.FLOWRESULTCODE = '1' WHERE A.FLOWRESULTCODE = '2' AND A.FLOWTIME IS NOT NULL AND A.FLOWTIME < SYSDATE - 50/(24*60) ";
  176. try {
  177. Object obj = SQLExecutor.updateWithDBName(DataSource.NET3G, sql, null);
  178. res = (Integer)obj;
  179. } catch (SQLException e) {
  180. e.printStackTrace();
  181. }
  182. return res;
  183. }
  184. /**
  185. * 初始化新增数据的FLOWCALCULATEDATE值
  186. * 设置为当月月末最后第五天凌晨1点
  187. * @return
  188. * @throws SQLException
  189. */
  190. public int updFlowMonthInit(String partition) {
  191. int res = 0;
  192. //更新数据状态为处理中
  193. String sql = "UPDATE "+tabname1+" PARTITION("+partition+") SET FLOWCALCULATEDATE = TO_DATE(TO_CHAR(LAST_DAY(TRUNC(SYSDATE)) -4,'YYYYMMDD')||'010000','YYYYMMDDHH24MISS') "+
  194. " WHERE FLOWCALCULATEDATE IS NULL ";
  195. try {
  196. Object obj = SQLExecutor.updateWithDBName(DataSource.NET3G, sql);
  197. res = (Integer)obj;
  198. } catch (SQLException e) {
  199. e.printStackTrace();
  200. }
  201. return res;
  202. }
  203. public static void main(String[] args) throws SQLException {
  204. KuaishouFlowMonthDao dao = new KuaishouFlowMonthDao();
  205. //System.out.println(dao.updatePush("202005091510455306373", "0", "ok"));
  206. //System.out.println(dao.findOrderRelaBySpid("18673197465", "1022"));
  207. //System.out.println(dao.getNo());
  208. //System.out.println(dao.queryPush("202005091510455306373"));
  209. //System.out.println(dao.getOrderPush());
  210. //System.out.println(dao.hasSync("18673197465", "190"));
  211. //System.out.println(dao.getSpidById("20201022095726288224"));
  212. System.out.println(dao.updFlowMonthInit("T_HASH_P01"));
  213. }
  214. }