Kaynağa Gözat

拷贝表到其他库的方法

huaerzx 2 yıl önce
ebeveyn
işleme
f1922922a0

+ 3 - 1
src/main/java/com/platomix/userprofile/enums/DsEnum.java

@@ -14,7 +14,9 @@ public enum DsEnum {
 	
 	oracle108,
 	
-	clickhouse;
+	clickhouse,
+
+	clickhouse91;
 	
 	public String getName() {
 		return this.name();

+ 13 - 38
src/main/java/com/platomix/userprofile/handler/FlatTagHandler.java

@@ -14,6 +14,8 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import com.platomix.userprofile.model.ShellCommond;
+import com.platomix.userprofile.utils.ShellUtils;
 import org.apache.commons.io.FileUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -38,7 +40,7 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 @Service
 public class FlatTagHandler {
-	final static String SHELL_DIC = "shell/";
+
 	final static String CLUSTER_NAME = "cluster_3s_1r";
 	final static String[] INVALID_FIELD_NAMES = new String[] { "USERID", "MONTH", "DEVICE_NUMBER", "cTime" };
 
@@ -111,7 +113,7 @@ public class FlatTagHandler {
 		List<String> dataList = dynamicExecuteService.list(DsEnum.clickhouse, sqlBuilder.toString()).stream().map(item ->item.getString("USERID_V")).collect(Collectors.toList());
 
 		try {
-			FileUtils.forceMkdir(new File(SHELL_DIC));
+			FileUtils.forceMkdir(new File(ShellUtils.SHELL_DIC));
 			for(int i = 0; i < clickhouseDbCluster.length; i++) {
 				String leftUserId=i==0?"0": dataList.get(i-1);
 				String rightUserId=i==clickhouseDbCluster.length-1?"999": dataList.get(i);
@@ -128,7 +130,7 @@ public class FlatTagHandler {
 							.appendLn(_dealSelectColumnSql(columnList))
 							.appendLn("\nfrom ads." + TagKit.getTagTempTable(tempTableName,month))
 							.appendLn("where THEMONTH_V = '"+month+"' "+userIdSql);
-					_shellToCluster(new ShellCommond(clickhouseDbCluster[i],i==0,builder.toString()));
+					_shellToCluster(new ShellCommond(clickhouseDbCluster[i],i==0,false,builder.toString()));
 				}else{
 					builder.append("echo 'select ")
 //							.append(String.join(",", columnList))
@@ -139,11 +141,11 @@ public class FlatTagHandler {
 							.append(String.format(" --query='INSERT INTO ads.%s( ",cluTableName))
 							.append(String.join(",", columnList))
 							.appendLn(") FORMAT TabSeparated'");
-					_shellToCluster(new ShellCommond(clickhouseDbCluster[i],i==0,builder.toString()));
+					_shellToCluster(new ShellCommond(clickhouseDbCluster[i],i==0,false,builder.toString()));
 				}
 			}
-		}catch (IOException iOException){
-			log.info(iOException.getMessage());
+		} catch (IOException | InterruptedException e) {
+			log.info(e.getMessage());
 		}
 	}
 	
@@ -154,22 +156,13 @@ public class FlatTagHandler {
 	 * @throws IOException
 	 */
 	@Async("shellThreadPool")
-	public void _shellToCluster(ShellCommond shellData) throws IOException {
+	public void _shellToCluster(ShellCommond shellData) throws IOException, InterruptedException {
 		//如果是插入本地
-		if(shellData.getIsLocalIp()){
+		if (shellData.getIsLocalIp()) {
 			dynamicExecuteService.execute(DsEnum.clickhouse, shellData.getShellCmd(), "插入temp宽表 到本机clu表");
+		} else {
+			ShellUtils.shellToCluster(shellData);
 		}
-		else {
-			String filePath = getShellFilePath(shellData);
-			File shellFile = new File(filePath);
-			FileUtils.writeStringToFile(shellFile, shellData.getShellCmd(), "UTF-8");
-			Runtime.getRuntime().exec("chmod 777 " + filePath);
-
-			String commond="sh " + shellFile.getAbsolutePath();
-			log.info(commond);
-			Runtime.getRuntime().exec(commond);
-		}
-		//log.info("\n" + shellData.getShellCmd());
 	}
 	/**
 	 * 创建Flat表 [初始化执行一次]
@@ -294,26 +287,8 @@ public class FlatTagHandler {
 		return false;
 	}
 	
-	/**
-	 * 获取shell文件路径
-	 * @param shellData
-	 * @return
-	 */
-	private String getShellFilePath(ShellCommond shellData){
-		return 	SHELL_DIC + "to"+shellData.getIp().replace('.','_')+".sh";
-	}
 
-	@Data
-	public class ShellCommond {
-		private String ip;
-		private Boolean isLocalIp=false;
-		private String shellCmd;
-		public ShellCommond(String ip, Boolean isLocalIp,String shellCmd) {
-			this.ip=ip;
-			this.isLocalIp=isLocalIp;
-			this.shellCmd=shellCmd;
-		}
-	}
+
 	
 	/**
 	 * 其它标签处理

+ 52 - 70
src/main/java/com/platomix/userprofile/handler/support/XxhTagManager.java

@@ -5,15 +5,22 @@ package com.platomix.userprofile.handler.support;
 
 import cn.hutool.core.date.DateUtil;
 import cn.hutool.core.io.IoUtil;
+import com.alibaba.fastjson.JSONObject;
 import com.platomix.userprofile.enums.DsEnum;
+import com.platomix.userprofile.model.ShellCommond;
 import com.platomix.userprofile.service.DynamicExecuteService;
 import com.platomix.userprofile.service.MobileRangeService;
+import com.platomix.userprofile.utils.ShellUtils;
 import com.platomix.userprofile.utils.StrBuilder;
 import com.platomix.userprofile.utils.TagKit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
+import java.io.File;
+import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +40,16 @@ public class XxhTagManager {
 	@Autowired
 	private MobileRangeService mobileRangeService;
 
+	@Value("${spring.datasource.dynamic.datasource.clickhouse.username}")
+	private String clickhouseUserName;
+	@Value("${spring.datasource.dynamic.datasource.clickhouse.password}")
+	private String clickhousePassWord;
+
+	@Value("${spring.datasource.dynamic.datasource.clickhouse91.username}")
+	private String clickhouseUserName91;
+	@Value("${spring.datasource.dynamic.datasource.clickhouse91.password}")
+	private String clickhousePassWord91;
+
 	final static long _STEP=17000000L;
 	/**
 	 * 从etl表向 xxh表转化 转化时 重复消除重复
@@ -87,76 +104,11 @@ public class XxhTagManager {
 	/**
 	 * 删除表!!!
 	 */
-	public void deleteTables() {
-		String listStr=
-				"LINSHI_SXX_GUANGDONG\n" +
-				"LINSHI_SXX_ORDERCHANNEL\n" +
-				"LINSHI_SXX_ORDERCHANNEL\n" +
-				"LINSHI_TO_SHUKE_MD5\n" +
-				"LINSHI_YAQI_XUQIU\n" +
-				"LINSHI_YAQI_XUQIU2\n" +
-				"LINSHI_YOUXIAO_RSA\n" +
-				"LINSHI_hunan_QD\n" +
-				"LINSHI_hunan_QD_2\n" +
-				"MINGXI_B_202111\n" +
-				"TMP_ORDER_PRE_TX_1\n" +
-				"TMP_ORDER_PRE_TX\n" +
-				"XXH_ALL_APP_MONTH_ALL\n" +
-				"XXH_ALL_APP_MONTH_bak\n" +
-				"XXH_ALL_APP_MONTH_clu\n" +
-				"XXH_ALL_MONTH_ETL_test\n" +
-				"XXH_ALL_MONTH_bak\n" +
-				"XXH_ALL_MONTH_clu\n" +
-				"XXH_ALL_MONTH_test\n" +
-				"XXH_APP_temp\n" +
-				"XXH_APP_temp2\n" +
-				"XXH_USERS_XINZENG09\n" +
-				"XXH_XINZENG09_APP_temp\n" +
-				"XXH_XINZENG09_APP_temp2\n" +
-				"ALS_XXH_CANCEL_NEW_C1\n" +
-				"ALS_XXH_CANCEL_NEW_C2\n" +
-				"ALS_XXH_CANCEL_NEW_C3\n" +
-				"ALS_XXH_CANCEL_NEW_C3_dxflow_lowDim\n" +
-				"ALS_XXH_CANCEL_RESULT_base_dataC3_dxflow_low_e200\n" +
-				"ALS_XXH_CANCEL_RESULT_base_dataC3_low_e200\n" +
-				"ALS_XXH_CANCEL_RESULT_base_dataC3_low_spid6_e300\n" +
-				"ALS_XXH_CANCEL_RESULT_base_dataC3_low_spid979_e300\n" +
-				"ALS_XXH_CANCEL_RESULT_base_embed_data1\n" +
-				"ALS_XXH_CANCEL_RESULT_base_embed_data2\n" +
-				"ALS_XXH_CANCEL_RESULT_base_permodel_embed_data1\n" +
-				"ALS_XXH_CANCEL_RESULT_deepcross_embed_data1\n" +
-				"DW_SXX_202222\n" +
-				"DW_SXX_202224\n" +
-				"DW_SXX_2022242\n" +
-				"DW_SXX_2022243\n" +
-				"DW_SXX_2022244\n" +
-				"DW_SXX_NMG_220401\n" +
-				"DW_SXX_NMG_JIEGUO\n" +
-				"DW_XXH_CANCEL_NEW\n" +
-				"ETL_LTYG_HMD\n" +
-				"ETL_TD_ORDER_RELATIONS\n" +
-				"ETL_TD_ORDER_RELATIONS_ALL\n" +
-				"ETL_TD_ORDER_RELATIONS_MONTH\n" +
-				"ETL_TD_ORDER_RELATIONS_YEAR_ALL\n" +
-				"ETL_TD_ORDER_RELATIONS_YEAR_T1\n" +
-				"ETL_TD_ORDER_RELATIONS_YEAR_clu\n" +
-				"ETL_TD_ORDER_RELATIONS_clu\n" +
-				"ETL_TL_VIP_LOG_ALL\n" +
-				"ETL_TL_VIP_LOG_clu\n" +
-				"HIS_FLOW_ORDER_ALL\n" +
-				"HIS_FLOW_ORDER_RSA_ENCODED_20220602\n" +
-				"HIS_FLOW_ORDER_clu\n" +
-				"HIS_FLOW_ORDER_test\n" +
-				"TD_DXFLOWINPUT_MONTH_ALL\n" +
-				"TD_DXFLOWINPUT_MONTH_clu\n" +
-				"Z_USER_TAG_FLAT_temp_dev_202204\n" +
-				"USER_ORDER_CANCEL_TEMP\n" +
-				"V_USER_ORDER_CANCEL_TEMP\n" +
-				"temp_return_respond\n" +
-				"temp_user_map\n" +
-				"temp_userid_map\n" +
-				"z_user_uuid_temp_202106_202108\n" +
-				"ztemp_relation_users" ;
+	public void deleteTables(String listStr) {
+		//		String listStr=
+//				"table1\n" +
+//				"table2\n" +
+//				"table3";
 	    String[] s1=listStr.split("\\n");
 		Arrays.stream(s1).forEach(str-> {
 //			if (str.indexOf("_clu") > 0 || str.indexOf("_ALL") > 0)
@@ -171,4 +123,34 @@ public class XxhTagManager {
 		});
 	}
 
+	/**
+	 * 像91创建需要导的表
+	 * @param listStr 要处理的表名 用\n分割(直接从文本文件里复制所得)
+	 */
+	public void createTables(String listStr) throws IOException {
+		String[] s1=listStr.split("\\n");
+		FileUtils.forceMkdir(new File(ShellUtils.SHELL_DIC));
+		Arrays.stream(s1).forEach(str-> {
+//			if (str.indexOf("_clu") > 0 || str.indexOf("_ALL") > 0)
+			JSONObject jsonObject =dynamicExecuteService.get(DsEnum.clickhouse, String.format("select create_table_query from `system`.tables t where name ='%s' ",str));
+			if(jsonObject != null ){
+				String runCreateSql=jsonObject.getString("create_table_query");
+				log.info("\n" + runCreateSql);
+				dynamicExecuteService.execute(DsEnum.clickhouse91,runCreateSql.replace("CREATE TABLE","CREATE TABLE if not exists"), "手动创建复制库表!");
+
+				//写shell插入数据
+				StrBuilder builder=new StrBuilder();
+				builder.append("echo 'select * ")
+						.append(String.format(" from ads.%s ' | ",str))
+						.append(String.format(" curl %s:8123?database=ads -u%s:%s -d @- | ","10.0.1.135",clickhouseUserName,clickhousePassWord))
+						.append(String.format(" docker run -i --rm yandex/clickhouse-client --host %s --port 9007 --user %s --password %s --input_format_allow_errors_num=100","10.0.1.91",clickhouseUserName91,clickhousePassWord91))
+						.append(String.format(" --query='INSERT INTO ads.%s FORMAT TabSeparated' ",str));
+				try {
+					ShellUtils.shellToCluster(new ShellCommond(str+"_91",false,true,builder.toString()));
+				} catch (IOException | InterruptedException e) {
+					e.printStackTrace();
+				}
+			}
+		});
+	}
 }

+ 17 - 0
src/main/java/com/platomix/userprofile/model/ShellCommond.java

@@ -0,0 +1,17 @@
+package com.platomix.userprofile.model;
+
+import lombok.Data;
+
+@Data
+public class ShellCommond {
+    private String ip;
+    private Boolean isLocalIp=false;
+    private Boolean isWait=false;
+    private String shellCmd;
+    public ShellCommond(String ip, Boolean isLocalIp,Boolean isWait, String shellCmd) {
+        this.ip=ip;
+        this.isLocalIp=isLocalIp;
+        this.isWait=isWait;
+        this.shellCmd=shellCmd;
+    }
+}

+ 61 - 0
src/main/java/com/platomix/userprofile/utils/ShellUtils.java

@@ -0,0 +1,61 @@
+/**
+ * 
+ */
+package com.platomix.userprofile.utils;
+
+import com.platomix.userprofile.model.ShellCommond;
+import org.apache.commons.io.FileUtils;
+
+import java.io.*;
+import java.util.regex.Pattern;
+
+/**
+ * @author wookvn
+ *
+ *	辅助工具类
+ */
+public class ShellUtils {
+
+	final public static String SHELL_DIC = "shell/";
+
+	/**
+	 * 生成shell并执行 //	@Async("shellThreadPool")
+	 * @param shellData
+	 * @throws IOException
+	 */
+	public static void shellToCluster(ShellCommond shellData) throws IOException, InterruptedException {
+		String filePath = getShellFilePath(shellData);
+		File shellFile = new File(filePath);
+		FileUtils.writeStringToFile(shellFile, shellData.getShellCmd(), "UTF-8");
+		Runtime.getRuntime().exec("chmod 777 " + filePath);
+
+		String commond="sh " + shellFile.getAbsolutePath();
+		Process p= Runtime.getRuntime().exec(commond);
+		if(shellData.getIsWait()==true){
+			final InputStream is1 = p.getInputStream();
+			new Thread(() -> {
+				BufferedReader br = new BufferedReader(new InputStreamReader(is1));
+				try{
+					while(br.readLine() != null) { ; }
+				}
+				catch(Exception e) {
+					e.printStackTrace();
+				}
+			}).start();
+			InputStream is2 = p.getErrorStream();
+			BufferedReader br2 = new BufferedReader(new InputStreamReader(is2));
+			while(br2.readLine() != null){}
+			int i = p.waitFor();
+			System.out.println(i);
+		}
+	}
+
+	/**
+	 * 获取shell文件路径
+	 * @param shellData
+	 * @return
+	 */
+	private static String getShellFilePath(ShellCommond shellData){
+		return 	SHELL_DIC + "to"+shellData.getIp().replace('.','_')+".sh";
+	}
+}

+ 2 - 0
src/main/java/com/platomix/userprofile/utils/TagKit.java

@@ -86,4 +86,6 @@ public class TagKit {
 		LocalDate months = monthDate.plusMonths(1);
 		System.out.println(DateUtil.format(months.atStartOfDay(), "yyyyMM"));
 	}
+
+
 }

+ 5 - 0
src/main/resources/application-dev.yml

@@ -54,6 +54,11 @@ spring:
                url: jdbc:clickhouse://10.0.17.152:8123/ads?socket_timeout=30000000&batchsize=50000&numPartitions=1&rewriteBatchedStatements=true
                username: default
                password: ltbiydhao66
+            clickhouse91:
+               driver-class-name: com.clickhouse.jdbc.ClickHouseDriver
+               url: jdbc:clickhouse://10.0.17.50:8123/ads?socket_timeout=30000000&batchsize=50000&numPartitions=1&rewriteBatchedStatements=true
+               username: default
+               password: ltbiydhao66
 dflow:
    use-debug: true
    #如果设置为true 会执行删除 更新 插入操作!!,flase只会打印sql

+ 5 - 0
src/main/resources/application-test.yml

@@ -42,6 +42,11 @@ spring:
                url: jdbc:clickhouse://10.0.17.152:8123/ads?socket_timeout=30000000&batchsize=50000&numPartitions=1&rewriteBatchedStatements=true
                username: default
                password: ltbiydhao66
+            clickhouse91:
+               driver-class-name: com.clickhouse.jdbc.ClickHouseDriver
+               url: jdbc:clickhouse://10.0.17.50:8123/ads?socket_timeout=30000000&batchsize=50000&numPartitions=1&rewriteBatchedStatements=true
+               username: default
+               password: ltbiydhao66
 dflow:
    #如果设置为true 会执行删除 更新 插入操作!!,flase只会打印sql
    run-execute: true