Jelajahi Sumber

修改Application 通过args调度;增加run-execute控制sql的执行;修改部分handler分号段执行

huaerzx 2 tahun lalu
induk
melakukan
afe51b5481
24 mengubah file dengan 217 tambahan dan 85 penghapusan
  1. 116 21
      src/main/java/com/platomix/userprofile/Application.java
  2. 8 9
      src/main/java/com/platomix/userprofile/handler/FlatTagHandler.java
  3. 2 1
      src/main/java/com/platomix/userprofile/handler/core/ActivityJoinTagHandler.java
  4. 2 1
      src/main/java/com/platomix/userprofile/handler/core/AppPreferenceTagHandler.java
  5. 7 6
      src/main/java/com/platomix/userprofile/handler/core/CollectAllTagHandler.java
  6. 2 1
      src/main/java/com/platomix/userprofile/handler/core/ComplainTagHandler.java
  7. 2 1
      src/main/java/com/platomix/userprofile/handler/core/ForecastCancelTagHandler.java
  8. 2 1
      src/main/java/com/platomix/userprofile/handler/core/ForecastOrderTagHandler.java
  9. 2 1
      src/main/java/com/platomix/userprofile/handler/core/KuaishouTagHandler.java
  10. 4 2
      src/main/java/com/platomix/userprofile/handler/core/OrderSumDateTagHandler.java
  11. 2 1
      src/main/java/com/platomix/userprofile/handler/core/RmfTagHandler.java
  12. 7 2
      src/main/java/com/platomix/userprofile/handler/core/UserOrderStateTagHandler.java
  13. 8 3
      src/main/java/com/platomix/userprofile/handler/core/XxhAppTagHandler.java
  14. 7 6
      src/main/java/com/platomix/userprofile/handler/core/XxhLongTagHandler.java
  15. 9 3
      src/main/java/com/platomix/userprofile/handler/core/XxhOrderTagHandler.java
  16. 9 3
      src/main/java/com/platomix/userprofile/handler/core/XxhTagHandler.java
  17. 3 2
      src/main/java/com/platomix/userprofile/handler/support/ForecastCancelTagManager.java
  18. 2 2
      src/main/java/com/platomix/userprofile/handler/support/UserEventManager.java
  19. 3 3
      src/main/java/com/platomix/userprofile/handler/support/UserTagManager.java
  20. 4 4
      src/main/java/com/platomix/userprofile/handler/support/XxhTagManager.java
  21. 9 3
      src/main/java/com/platomix/userprofile/service/DynamicExecuteService.java
  22. 2 8
      src/main/java/com/platomix/userprofile/service/MobileRangeService.java
  23. 3 1
      src/main/resources/application-dev.yml
  24. 2 0
      src/main/resources/application-test.yml

+ 116 - 21
src/main/java/com/platomix/userprofile/Application.java

@@ -3,6 +3,13 @@
  */
 package com.platomix.userprofile;
 
+import cn.hutool.core.date.DateUtil;
+import com.platomix.userprofile.handler.AbstractTagHandler;
+import com.platomix.userprofile.handler.core.*;
+import com.platomix.userprofile.handler.support.ForecastCancelTagManager;
+import com.platomix.userprofile.handler.support.UserEventManager;
+import com.platomix.userprofile.handler.support.UserTagManager;
+import com.platomix.userprofile.handler.support.XxhTagManager;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.cache.annotation.EnableCaching;
@@ -10,39 +17,127 @@ import org.springframework.context.ConfigurableApplicationContext;
 
 import com.platomix.userprofile.handler.FlatTagHandler;
 
+import java.lang.reflect.Array;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.stream.Stream;
+
 /**
  * @author wookvn
  */
 @EnableCaching
 @SpringBootApplication
 public class Application {
-
+	//全量运行时的handler  !!注意!! 一定要保证 collectAllTagHandler是最后一个
+	static final String[] classHandlers=new String[]
+			{ "xxhLongTagHandler", "xxhTagHandler", "xxhAppTagHandler", "xxhOrderTagHandler",
+					"userOrderStateTagHandler", "orderSumDateTagHandler", "appPreferenceTagHandler",
+					"complainTagHandler","forecastCancelTagHandler", "forecastOrderTagHandler",
+					"collectAllTagHandler"};
+	//运行时需要取前2个月的handler
+	static final String[] classPerHandlers=new String[]
+			{ "xxhTagHandler", "xxhAppTagHandler","appPreferenceTagHandler"};
 	/**
+	 * # arg[0]=调用类别:
+	 * 		zt_tag(生成标签) dic_tag(更新mysql字典表) xxh_etl(更新原始信息化etl数据到xxh表)
+	 * 		z_cancel_data(准备退订数据) join_all(只合并宽表)  single_handler(单独调用 指定handler)
+	 * # arg[1]=月份: month
+	 * # arg[2]=HandlerName: [比如:java -jar xxx.jar xxhAppTagHandler(使用类名的小写开头的字符串)]
+	 *
 	 */
 	public static void main(String[] args) {
 		ConfigurableApplicationContext applicationContext = SpringApplication.run(Application.class, args);
-		//传参数方式执行对应的标签[比如:java -jar xxx.jar xxhAppTagHandler(使用类名的小写开头的字符串)]
-//		AbstractTagHandler tagHandler = applicationContext.getBean(args[0], AbstractTagHandler.class);
-		//单个标签操作
-//		AbstractTagHandler tagHandler = applicationContext.getBean(XxhAppTagHandler.class);
-//		tagHandler.handle("202111","1310","1312");
+
+//		AbstractTagHandler tagHandler = applicationContext.getBean("orderSumDateTagHandler", AbstractTagHandler.class);
+//		tagHandler.handle("202111");
+
 		//批量
 //		tagHandler.handleMultiMonth("202111", false, 3);
-		
-		//Flat表操作
-		FlatTagHandler tagHandler = applicationContext.getBean(FlatTagHandler.class);
-		tagHandler.tempDataToFlatView("202111");
-
-//		UserEventManager userEventHandler = applicationContext.getBean(UserEventManager.class);
-//		userEventHandler.genEventPropToMysql();
-		
-//		ForecastCancelTagManager manager = applicationContext.getBean(ForecastCancelTagManager.class);
-//		manager.tagCancelDataC4("202203", 1, false);
-		
-//		UserTagManager manager = applicationContext.getBean(UserTagManager.class);
-//		manager.genSomeTagToMysql();
-		
+//		//Flat表操作
+//		FlatTagHandler tagHandler = applicationContext.getBean(FlatTagHandler.class);
+//		tagHandler.tempDataToFlatView("202111");
+		//运行程序
+		StartProg(applicationContext,args);
 		applicationContext.close();
 	}
-	
+
+	/**
+	 * 根据args参数
+	 * @param args
+	 */
+	static void StartProg(ConfigurableApplicationContext applicationContext ,String[] args){
+		//arg0 调用类别
+		String sendTp = "";
+		//arg1 月份
+		String permonth = DateUtil.format(DateUtil.offsetMonth(new Date(), -1), "yyyyMM");
+		String p_permonth =  DateUtil.format(DateUtil.offsetMonth(new Date(), -2), "yyyyMM");
+		//arg2 HandlerName
+		String handlerName = "";
+		if (args != null && args.length > 0) {
+			sendTp = args[0];
+			if (args.length > 1) {
+				permonth = args[1];
+				p_permonth = DateUtil.format(DateUtil.offsetMonth(DateUtil.parse(permonth.substring(0, 4) + "-" + permonth.substring(4) + "-01", "yyyy-MM-dd"), -1), "yyyyMM");
+			}
+			if (args.length > 2) {
+				handlerName = args[2];
+			}
+		}
+
+		/**
+		 * 生成指定月份全部标签(不传月份参数会默认跑前一个月的标签)
+		 */
+		if (sendTp.equals("zt_tag")) {
+			for (Integer i = 0; i < classHandlers.length; i++) {
+				AbstractTagHandler tagHandler = applicationContext.getBean(classHandlers[i], AbstractTagHandler.class);
+				if (Arrays.asList(classPerHandlers).contains(classHandlers[i])) {
+					tagHandler.handle(p_permonth);
+				} else {
+					tagHandler.handle(permonth);
+				}
+			}
+			//生成rfm表 todo
+			//Flat表操作
+			FlatTagHandler flatTagHandler = applicationContext.getBean(FlatTagHandler.class);
+			flatTagHandler.tempDataToFlatView(permonth);
+		}
+		/**
+		 * 单独调用指定handler
+		 */
+		if (sendTp.equals("single_handler")) {
+			AbstractTagHandler tagHandler = applicationContext.getBean(handlerName, AbstractTagHandler.class);
+			tagHandler.handle(permonth);
+		}
+		/**
+		 * 更新mysql中的字典表
+		 */
+		if (sendTp.equals("dic_tag")) {
+			UserEventManager userEventHandler = applicationContext.getBean(UserEventManager.class);
+			userEventHandler.genEventPropToMysql();
+			UserTagManager manager = applicationContext.getBean(UserTagManager.class);
+			manager.genSomeTagToMysql();
+		}
+		/**
+		 * 更新etl2->xxh
+		 */
+		if (sendTp.equals("xxh_etl")) {
+			XxhTagManager xxhTagManager = applicationContext.getBean(XxhTagManager.class);
+			xxhTagManager.xxhFromEtlSp(permonth);
+		}
+		/**
+		 * 退订订ding  todo
+		 */
+		//退订数据准备  先调用这个准备数据,然后取TensorFlow训练,并预测 ?月结果
+		if (sendTp.equals("z_cancel_data")) {
+			ForecastCancelTagManager manager = applicationContext.getBean(ForecastCancelTagManager.class);
+			manager.tagCancelDataC4(permonth,-1,true);
+			manager.tagCancelDataC4V2(permonth);
+		}
+
+		if(sendTp.equals("join_all")){
+			FlatTagHandler flatTagHandler = applicationContext.getBean(FlatTagHandler.class);
+			flatTagHandler.tempDataToFlatView(permonth);
+		}
+	}
 }

+ 8 - 9
src/main/java/com/platomix/userprofile/handler/FlatTagHandler.java

@@ -82,7 +82,7 @@ public class FlatTagHandler {
 		//删除月份分区数据
 		String dropViewDataSql = String.format("alter table ads.%s on cluster %s drop partition '%s'", cluTableName, CLUSTER_NAME, month);
 		log.info("\n" + dropViewDataSql);
-//		dynamicExecuteService.execute(DsEnum.clickhouse, dropViewDataSql, "删除分区[month="+month+"]数据完成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, dropViewDataSql, "删除分区[month="+month+"]数据完成");
 		
 		//再次查询列变动后的字段名
 		List<JSONObject> checkTableDiffList = _getDiffColumnList();
@@ -226,17 +226,17 @@ public class FlatTagHandler {
 	public void _shellToCluster(ShellCommond shellData) throws IOException {
 		//如果是插入本地
 		if(shellData.getIsLocalIp()){
-			//dynamicExecuteService.execute(DsEnum.clickhouse, shellData.getShellCmd(), "插入temp宽表 到本机clu表");
+			dynamicExecuteService.execute(DsEnum.clickhouse, shellData.getShellCmd(), "插入temp宽表 到本机clu表");
 		}
 		else {
 			String filePath = getShellFilePath(shellData);
 			File shellFile = new File(filePath);
 			FileUtils.writeStringToFile(shellFile, shellData.getShellCmd(), "UTF-8");
-			//Runtime.getRuntime().exec("chmod 777 " + filePath);
+			Runtime.getRuntime().exec("chmod 777 " + filePath);
 
 			String commond="sh " + shellFile.getAbsolutePath();
 			log.info(commond);
-			//Runtime.getRuntime().exec(commond);
+			Runtime.getRuntime().exec(commond);
 		}
 		//log.info("\n" + shellData.getShellCmd());
 	}
@@ -258,8 +258,7 @@ public class FlatTagHandler {
 		sqlBuilder.deleteLastChar().deleteLastChar();
 		sqlBuilder.appendLn(") ENGINE = MergeTree PARTITION BY THEMONTH_V ORDER BY USERID_V");
 		log.info("\n" + sqlBuilder.toString());
-		// dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(),
-		// 	"创建ads.Z_USER_TAG_FLAT_clu表完成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "创建ads.Z_USER_TAG_FLAT_clu表完成");
 	}
 
 	/**
@@ -287,7 +286,7 @@ public class FlatTagHandler {
 				//1.drop视图
 				String dropViewSql = String.format("drop table %s ON cluster %s", flatAllViewName, CLUSTER_NAME);
 				log.info("\n" + dropViewSql);
-//				dynamicExecuteService.execute(DsEnum.clickhouse, dropViewSql, "删除视图完成");
+				dynamicExecuteService.execute(DsEnum.clickhouse, dropViewSql, "删除视图完成");
 				log.info("删除视图完成,开始等待8s处理增删列...");
 				TimeUnit.SECONDS.sleep(8);
 				
@@ -299,7 +298,7 @@ public class FlatTagHandler {
 				String createViewSql = String.format("CREATE TABLE %s ON cluster %s AS ads.%s ENGINE = Distributed(%s, ads, %s, rand())", 
 						flatAllViewName, CLUSTER_NAME, cluTableName, CLUSTER_NAME, cluTableName);
 				log.info("\n" + createViewSql);
-//				dynamicExecuteService.execute(DsEnum.clickhouse, createViewSql, "创建视图完成");
+				dynamicExecuteService.execute(DsEnum.clickhouse, createViewSql, "创建视图完成");
 				log.info("创建视图完成,开始等待8s进入下一操作...");
 				TimeUnit.SECONDS.sleep(8);
 			}
@@ -326,7 +325,7 @@ public class FlatTagHandler {
 		if(!columns.isEmpty()) {
 			String addColumnSql = String.format("alter table ads.%s ON cluster %s %s", cluTableName, CLUSTER_NAME, String.join(",", columns));
 			log.info("\n" + addColumnSql);
-			//dynamicExecuteService.execute(DsEnum.clickhouse, addColumnSql, "更新字段完成");
+			dynamicExecuteService.execute(DsEnum.clickhouse, addColumnSql, "更新字段完成");
 		}
 	}
 	

+ 2 - 1
src/main/java/com/platomix/userprofile/handler/core/ActivityJoinTagHandler.java

@@ -6,6 +6,7 @@ package com.platomix.userprofile.handler.core;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -49,7 +50,7 @@ public class ActivityJoinTagHandler extends AbstractTagHandler {
 				  .appendLn("	and  match(MEMBER_PHONE,'\\d{11}')")
 				  .appendLn(")");
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 	}
 	
 	@Override

+ 2 - 1
src/main/java/com/platomix/userprofile/handler/core/AppPreferenceTagHandler.java

@@ -9,6 +9,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -65,7 +66,7 @@ public class AppPreferenceTagHandler extends AbstractTagHandler {
 				  .appendLn("where MONTH_ID >= '"+threeMonthAgo+"' and MONTH_ID <= '"+month+"' and ONE_FLUX > 1024")
 				  .appendLn("group by DEVICE_NUMBER");
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 	}
 	
 	@Override

+ 7 - 6
src/main/java/com/platomix/userprofile/handler/core/CollectAllTagHandler.java

@@ -107,12 +107,7 @@ public class CollectAllTagHandler extends AbstractTagHandler{
 		log.info("\n" + sqlBuilder.toString());
 		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 	}
-	
-	@Override
-	public boolean executeInBatchs() {
-		return false;
-	}
-	
+
 	private String _getLrMobileSql(String mobileLeft, String mobileRight, String fieldName) {
 		return mobileLeft == null || mobileRight == null ? "" : String.format(" and %s>='%s' and %s<'%s'", fieldName, mobileLeft, fieldName, mobileRight);
 	}
@@ -127,4 +122,10 @@ public class CollectAllTagHandler extends AbstractTagHandler{
 			cols.add(col);
 		}
 	}
+
+	@Override
+	public boolean executeInBatchs() {
+		return true;
+	}
+
 }

+ 2 - 1
src/main/java/com/platomix/userprofile/handler/core/ComplainTagHandler.java

@@ -7,6 +7,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -61,6 +62,6 @@ public class ComplainTagHandler extends AbstractTagHandler {
 				  .appendLn("where THEMONTH < '"+month+"'")
 				  .appendLn("group by ACCEPT_PHONENUM");
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 	}
 }

+ 2 - 1
src/main/java/com/platomix/userprofile/handler/core/ForecastCancelTagHandler.java

@@ -3,6 +3,7 @@
  */
 package com.platomix.userprofile.handler.core;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -42,7 +43,7 @@ public class ForecastCancelTagHandler extends AbstractTagHandler {
 				"	where TARGET_MONTH ='"+month+"'\n" + 
 				") t");
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "退订预测标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "退订预测标签生成");
 	}
 
 }

+ 2 - 1
src/main/java/com/platomix/userprofile/handler/core/ForecastOrderTagHandler.java

@@ -9,6 +9,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -77,7 +78,7 @@ public class ForecastOrderTagHandler extends AbstractTagHandler {
 			sqlBuilder.appendLn(_genOrderPreferenceSql(param, tagListMap));
 		}
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "订购偏好预测[临时表]生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "订购偏好预测[临时表]生成");
 	}
 	
 	/**

+ 2 - 1
src/main/java/com/platomix/userprofile/handler/core/KuaishouTagHandler.java

@@ -7,6 +7,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -55,7 +56,7 @@ public class KuaishouTagHandler extends AbstractTagHandler {
 		sqlBuilder.appendLn("from ads.XXH_KUAISHOU_YUCEJI_RESULT")
 				  .appendLn("where MONTH_ID = '"+month+"'");
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 	}
 
 	@Override

+ 4 - 2
src/main/java/com/platomix/userprofile/handler/core/OrderSumDateTagHandler.java

@@ -7,6 +7,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.platomix.userprofile.enums.DsEnum;
+import com.platomix.userprofile.service.MobileRangeService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -65,12 +67,12 @@ public class OrderSumDateTagHandler extends AbstractTagHandler {
 				  .appendLn("	group by USERID")
 				  .appendLn(") t");
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 	}
 	
 	@Override
 	public boolean executeInBatchs() {
 		return false;
 	}
-	
+
 }

+ 2 - 1
src/main/java/com/platomix/userprofile/handler/core/RmfTagHandler.java

@@ -7,6 +7,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -56,7 +57,7 @@ public class RmfTagHandler extends AbstractTagHandler {
 		sqlBuilder.appendLn(sql);
 		sqlBuilder.appendLn("from ads.ALS_RMF_CLUSTER2_RESULT");
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 	}
 
 	@Override

+ 7 - 2
src/main/java/com/platomix/userprofile/handler/core/UserOrderStateTagHandler.java

@@ -6,6 +6,7 @@ package com.platomix.userprofile.handler.core;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -25,7 +26,7 @@ import lombok.extern.slf4j.Slf4j;
 public class UserOrderStateTagHandler extends AbstractTagHandler {
 	
 	final static String ZT_TABLE_NAME = "ads.ZT_USER_ORDER_STATE";
-
+	final static long _STEP=50000000L;
 	@Autowired
 	private UserprofileTagService userprofileTagService;
 
@@ -72,12 +73,16 @@ public class UserOrderStateTagHandler extends AbstractTagHandler {
 				  .appendLn("	ORDER BY CANCELTIME1 DESC")
 				  .appendLn(") GROUP BY USERID");
 		log.info("\n" + sqlBuilder.toString());
-		//dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "用户订购状态标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "用户订购状态标签生成");
 	}
 	
 	@Override
 	public boolean executeInBatchs() {
 		return false;
 	}
+	@Override
+	public long calcMobileRangeStep() {
+		return _STEP;
+	}
 
 }

+ 8 - 3
src/main/java/com/platomix/userprofile/handler/core/XxhAppTagHandler.java

@@ -7,6 +7,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -26,7 +27,7 @@ import lombok.extern.slf4j.Slf4j;
 public class XxhAppTagHandler extends AbstractTagHandler {
 	
 	final static String ZT_TABLE_NAME = "ads.ZT_USER_TAG_XXH_APP";
-	
+	final static long _STEP=50000000L;
 	@Autowired
 	private UserprofileTagService userprofileTagService;
 	
@@ -76,11 +77,15 @@ public class XxhAppTagHandler extends AbstractTagHandler {
 				  .appendLn(") g ") 
 				  .appendLn("group by MONTH_ID,DEVICE_NUMBER");
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 	}
 	
 	@Override
 	public boolean executeInBatchs() {
-		return false;
+		return true;
+	}
+	@Override
+	public long calcMobileRangeStep() {
+		return _STEP;
 	}
 }

+ 7 - 6
src/main/java/com/platomix/userprofile/handler/core/XxhLongTagHandler.java

@@ -5,6 +5,7 @@ package com.platomix.userprofile.handler.core;
 
 import java.util.concurrent.TimeUnit;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -51,7 +52,7 @@ public class XxhLongTagHandler extends AbstractTagHandler {
 		sqlBuilder.appendLn("	order by MONTH_ID desc ");
 		sqlBuilder.appendLn(") group by DEVICE_NUMBER ");
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 	}
 
 	
@@ -64,10 +65,10 @@ public class XxhLongTagHandler extends AbstractTagHandler {
 	 */
 	@Override
 	public void beforeInvoke(String month) {
-//		dynamicExecuteService.execute(DsEnum.clickhouse, String.format("drop table if exists %s",XXH_TABLE_NAME), "删除long标签原始数据");
-//		dynamicExecuteService.execute(DsEnum.clickhouse, String.format("drop table if exists %s",ZT_TABLE_NAME), "删除long标签缓存数据");
+		dynamicExecuteService.execute(DsEnum.clickhouse, String.format("drop table if exists %s",XXH_TABLE_NAME), "删除long标签原始数据");
+		dynamicExecuteService.execute(DsEnum.clickhouse, String.format("drop table if exists %s",ZT_TABLE_NAME), "删除long标签缓存数据");
 		try {
-			TimeUnit.SECONDS.sleep(20);
+			TimeUnit.SECONDS.sleep(10);
 		} catch (InterruptedException e) {
 			e.printStackTrace();
 		}
@@ -104,9 +105,9 @@ public class XxhLongTagHandler extends AbstractTagHandler {
 		String ageSql = userprofileTagService.genSelectSql("年龄阶段", "CERT_AGE", null, null);
 		sqlBuilder.appendLn(ageSql);
 
-		sqlBuilder.appendLn("from ads.XXH_ALL_MONTH_LONG ");
+		sqlBuilder.appendLn(String.format("from %s ",XXH_TABLE_NAME));
 		log.info("\n" + sqlBuilder.toString());
-		//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 
 	}
 

+ 9 - 3
src/main/java/com/platomix/userprofile/handler/core/XxhOrderTagHandler.java

@@ -10,6 +10,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -30,7 +31,7 @@ import lombok.extern.slf4j.Slf4j;
 public class XxhOrderTagHandler extends AbstractTagHandler {
 
 	final static String ZT_TABLE_NAME = "ads.ZT_USER_TAG_ORDER";
-
+	final static long _STEP=50000000L;
 	@Autowired
 	private UserprofileTagService userprofileTagService;
 
@@ -167,7 +168,7 @@ public class XxhOrderTagHandler extends AbstractTagHandler {
 				"left join ads.ETL_V_TB_SP_INFO d  on s1.SPID=d.SPID").appendLn(
 				") s");
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 	}
 	
 	/**
@@ -276,6 +277,11 @@ public class XxhOrderTagHandler extends AbstractTagHandler {
 	
 	@Override
 	public boolean executeInBatchs() {
-		return false;
+		return true;
+	}
+
+	@Override
+	public long calcMobileRangeStep() {
+		return _STEP;
 	}
 }

+ 9 - 3
src/main/java/com/platomix/userprofile/handler/core/XxhTagHandler.java

@@ -8,6 +8,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -28,7 +29,7 @@ import lombok.extern.slf4j.Slf4j;
 public class XxhTagHandler extends AbstractTagHandler{
 	
 	final static String ZT_TABLE_NAME = "ads.ZT_USER_TAG_XXH";
-	
+	final static long _STEP=50000000L;
 	@Autowired
 	private UserprofileTagService userprofileTagService;
 
@@ -93,7 +94,7 @@ public class XxhTagHandler extends AbstractTagHandler{
 				  .appendLn("TOTAL_FLUX, TOTAL_FEE, MON_TOTAL_FEE,FLUX_M, RESOURCE_NAME, PKG_NAME, MANU_NAME, MODEL_NAME, IS_ACCT")
 				  .appendLn(") t");
 		log.info("\n" + sqlBuilder.toString());
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "信息化标签生成");
 	}
 	
 	/**
@@ -229,6 +230,11 @@ public class XxhTagHandler extends AbstractTagHandler{
 	 */
 	@Override
 	public boolean executeInBatchs() {
-		return false;
+		return true;
+	}
+
+	@Override
+	public long calcMobileRangeStep() {
+		return _STEP;
 	}
 }

+ 3 - 2
src/main/java/com/platomix/userprofile/handler/support/ForecastCancelTagManager.java

@@ -9,6 +9,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import com.platomix.userprofile.enums.DsEnum;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -80,7 +81,7 @@ public class ForecastCancelTagManager {
 		String sqlText = IoUtil.read(Thread.currentThread().getContextClassLoader().getResourceAsStream("sql/ALS_XXH_CANCEL_NEW_C4.sql"), Charset.forName("utf-8"));
 		String sql = TagKit.sqlMatchReplace(sqlText, params);
 		log.info("\n" + sql);
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "ALS_XXH_CANCEL_NEW_C4数据生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sql, "ALS_XXH_CANCEL_NEW_C4数据生成");
 	}
 	
 	/**
@@ -93,7 +94,7 @@ public class ForecastCancelTagManager {
 		String sqlText = IoUtil.read(Thread.currentThread().getContextClassLoader().getResourceAsStream("sql/ALS_XXH_CANCEL_NEW_C4_lowDim.sql"), Charset.forName("utf-8"));
 		String sql = TagKit.sqlMatchReplace(sqlText, params);
 		log.info("\n" + sql);
-//		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "ALS_XXH_CANCEL_NEW_C4_lowDim数据生成");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sql, "ALS_XXH_CANCEL_NEW_C4_lowDim数据生成");
 	}
 	
 	private void _dropPartition(String tableName, String partitionMonth) {

+ 2 - 2
src/main/java/com/platomix/userprofile/handler/support/UserEventManager.java

@@ -91,7 +91,7 @@ public class UserEventManager {
 					log.info("\n" + uptSql);
 					//更新mysql
 					String logRemark = "update attr[id="+id+"]";
-//					dynamicExecuteService.execute(DsEnum.master, uptSql, logRemark);
+					dynamicExecuteService.execute(DsEnum.master, uptSql, logRemark);
 				}
 			}
 		}
@@ -118,7 +118,7 @@ public class UserEventManager {
 				log.info("\n" + uptSql);
 				//更新mysql
 				String logRemark = "update attr[id="+id+"]";
-//				dynamicExecuteService.execute(DsEnum.master, uptSql, logRemark);
+				dynamicExecuteService.execute(DsEnum.master, uptSql, logRemark);
 			}
 		}
 	}

+ 3 - 3
src/main/java/com/platomix/userprofile/handler/support/UserTagManager.java

@@ -34,9 +34,9 @@ public class UserTagManager {
 	 * 生成标签到mysql数据库
 	 */
 	public void genSomeTagToMysql() {
-//		genOrderProductClass();
-//		genOrderChannel();
-//		genOrderProducts();
+		genOrderProductClass();
+		genOrderChannel();
+		genOrderProducts();
 	}
 	
 	/**

+ 4 - 4
src/main/java/com/platomix/userprofile/handler/support/XxhTagManager.java

@@ -37,7 +37,7 @@ public class XxhTagManager {
 	 * 从etl表向 xxh表转化
 	 * @param month 月份
 	 */
-	private void _xxhFromEtl(String month, String mobileLeft, String mobileRight) {
+	private void xxhFromEtl(String month, String mobileLeft, String mobileRight) {
 		String lrMobileSql = mobileLeft == null || mobileRight == null ? "" : String.format(" and DEVICE_NUMBER>='%s' and DEVICE_NUMBER<'%s'", mobileLeft, mobileRight);
 		StrBuilder sqlBuilder = new StrBuilder();
 		sqlBuilder.appendLn("insert into ads.XXH_ALL_MONTH")
@@ -47,7 +47,7 @@ public class XxhTagManager {
 				.appendLn(String.format("from %s",ETL_TABLE_NAME))
 				.appendLn("where MONTH_ID ='"+month+"'" + lrMobileSql);
 		log.info("\n" + sqlBuilder.toString());
-		//dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "ETL导入数据转化为XXH基础数据1");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "ETL导入数据转化为XXH基础数据1");
 
 		StrBuilder sqlBuilderApp = new StrBuilder();
 		sqlBuilderApp.appendLn("insert into ads.XXH_ALL_APP_MONTH")
@@ -59,7 +59,7 @@ public class XxhTagManager {
 				.appendLn(") s")
 				.appendLn("group by MONTH_ID,DEVICE_NUMBER, APP_PRE, CON_PRE, APP_NAME, TIMES, MINS, ONE_FLUX");
 		log.info("\n" + sqlBuilderApp.toString());
-		//dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilderApp.toString(), "ETL导入数据转化为XXH基础数据2");
+		dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilderApp.toString(), "ETL导入数据转化为XXH基础数据2");
 
 
 	}
@@ -76,7 +76,7 @@ public class XxhTagManager {
 			if(mobileLeft.equals(mobileRight)) {
 				continue;
 			}
-			_xxhFromEtl(month, mobileLeft, mobileRight);
+			xxhFromEtl(month, mobileLeft, mobileRight);
 			mobileLeft = mobileRight;
 		}
 	}

+ 9 - 3
src/main/java/com/platomix/userprofile/service/DynamicExecuteService.java

@@ -6,6 +6,7 @@ package com.platomix.userprofile.service;
 import java.util.List;
 
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import com.alibaba.fastjson.JSONObject;
@@ -26,8 +27,11 @@ public class DynamicExecuteService {
 
 	@Autowired
 	private DynamicExecuteMapper dynamicExecuteMapper;
-	
-	
+
+	@Value("${dflow.run-execute}")
+	private Boolean runExecute;
+
+
 	/**
 	 * 
 	 * @param dsEnum
@@ -36,7 +40,9 @@ public class DynamicExecuteService {
 	@DS("#dsEnum.name")
 	public void execute(DsEnum dsEnum, String sql, String logRemark) {
 		long startTime = System.currentTimeMillis();
-		dynamicExecuteMapper.execute(sql);
+		if(runExecute==true){
+			dynamicExecuteMapper.execute(sql);
+		}
 		log.info(String.format("%s|cost: %ss", logRemark, (System.currentTimeMillis() - startTime)/1000.0f));
 	}
 	

+ 2 - 8
src/main/java/com/platomix/userprofile/service/MobileRangeService.java

@@ -41,18 +41,12 @@ public class MobileRangeService {
 	 */
 	@Cacheable(cacheNames = "userProfileCache", key = "'mobile_range_'+#p0")
 	public SortedSet<String> getList(long step){
+		//todo
 		log.info("计算用户手机号范围开始.....");
 		long startTime = System.currentTimeMillis();
 		StrBuilder sqlBuilder = new StrBuilder();
 		sqlBuilder.appendLn("SELECT rowNumberInAllBlocks() % " + step + " as rNum, USERID")
-				  .appendLn(" from (")
-				  .appendLn(" select DISTINCT USERID")
-				  .appendLn(" from (")
-				  .appendLn(" select DEVICE_NUMBER as USERID from ads.XXH_ALL_MONTH where MONTH_ID ='"+DEFAULT_MONTH+"'")
-				  .appendLn("    UNION ALL ")
-				  .appendLn(" select USERID from ads.ETL_TD_ORDER_RELATIONS_YEAR  where THEMONTH ='"+DEFAULT_MONTH+"'")
-				  .appendLn(" ) order by USERID")
-				  .appendLn(") where rNum=0");
+				  .appendLn("from (select DEVICE_NUMBER as USERID from ads.XXH_ALL_MONTH_LONG order by USERID) where rNum=0");
 		List<JSONObject> dataList = dynamicExecuteService.list(DsEnum.clickhouse, sqlBuilder.toString());
 		log.info("计算用户手机号范围结束.....|cost:{}s", (System.currentTimeMillis()-startTime)/1000);
 		SortedSet<String> resultSet = new TreeSet<String>();

+ 3 - 1
src/main/resources/application-dev.yml

@@ -56,12 +56,14 @@ spring:
                password: ltbiydhao66
 dflow:
    use-debug: true
+   #如果设置为true 会执行删除 更新 插入操作!!,flase只会打印sql
+   run-execute: false
    flat-temp-table-name: Z_USER_TAG_FLAT_temp_dev
    flat-clu-table-name: Z_USER_TAG_FLAT_clu1_dev
    flat-all-view-name: ads.Z_USER_TAG_FLAT_ALL_dev
    
 dbcluster:
-   #确保第一个是(源数据地址) 向(目标地址)分发
+   #确保第一个是(源数据地址) 它会向(目标地址)分发
    clickhouse:
       10.0.1.135,
       10.0.1.150,

+ 2 - 0
src/main/resources/application-test.yml

@@ -44,6 +44,8 @@ spring:
                password: ltbiydhao66
 dflow:
    use-debug: false
+   #如果设置为true 会执行删除 更新 插入操作!!,flase只会打印sql
+   run-execute: true
    flat-temp-table-name: Z_USER_TAG_FLAT_temp
    flat-clu-table-name: Z_USER_TAG_FLAT_clu1
    flat-all-view-name: ads.Z_USER_TAG_FLAT_ALL