|
@@ -17,6 +17,7 @@ import java.util.stream.Collectors;
|
|
|
import java.util.stream.IntStream;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
+import com.platomix.userprofile.utils.TagKit;
|
|
|
import org.apache.commons.io.FileUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
@@ -54,8 +55,6 @@ public class FlatTagHandler {
|
|
|
|
|
|
final static String[] INVALID_FIELD_NAMES = new String[] { "USERID", "MONTH", "DEVICE_NUMBER", "cTime" };
|
|
|
|
|
|
- @Value("${dflow.use-debug}")
|
|
|
- private Boolean useDebug;
|
|
|
|
|
|
@Value("${dbcluster.clickhouse}")
|
|
|
private String[] clickhouseDbCluster;
|
|
@@ -77,7 +76,7 @@ public class FlatTagHandler {
|
|
|
*/
|
|
|
public void tempDataToFlatView(String month) {
|
|
|
//校验并处理列变动
|
|
|
- _checkAndDealColumn();
|
|
|
+ _checkAndDealColumn(month);
|
|
|
|
|
|
//删除月份分区数据
|
|
|
String dropViewDataSql = String.format("alter table ads.%s on cluster %s drop partition '%s'", cluTableName, CLUSTER_NAME, month);
|
|
@@ -85,7 +84,7 @@ public class FlatTagHandler {
|
|
|
dynamicExecuteService.execute(DsEnum.clickhouse, dropViewDataSql, "删除分区[month="+month+"]数据完成");
|
|
|
|
|
|
//再次查询列变动后的字段名
|
|
|
- List<JSONObject> checkTableDiffList = _getDiffColumnList();
|
|
|
+ List<JSONObject> checkTableDiffList = _getDiffColumnList(month);
|
|
|
List<String> columnList = new ArrayList<>();
|
|
|
for(JSONObject columnObj : checkTableDiffList) {
|
|
|
String tempName = columnObj.getString("temp_name"),
|
|
@@ -116,49 +115,49 @@ public class FlatTagHandler {
|
|
|
*/
|
|
|
@Deprecated
|
|
|
public void tmpDataToFlat(String month) {
|
|
|
- List<JSONObject> sourceColumnList = dynamicExecuteService.list(DsEnum.clickhouse, "select name from system.columns where `table`='" + tempTableName + "'");
|
|
|
- List<String> sourceColumnNameList = sourceColumnList.stream().map(item -> item.getString("name")).collect(Collectors.toList());
|
|
|
-
|
|
|
- List<JSONObject> targetColumnList = dynamicExecuteService.list(DsEnum.clickhouse, "select name from system.columns where `table`='" + cluTableName + "'");
|
|
|
- Map<String, String> targetColumnMap = new HashMap<String, String>();
|
|
|
- for (JSONObject jsonObject : targetColumnList) {
|
|
|
- targetColumnMap.put(jsonObject.getString("name"), jsonObject.getString("type"));
|
|
|
- }
|
|
|
-
|
|
|
- StrBuilder sqlBuilder = new StrBuilder();
|
|
|
- sqlBuilder.appendLn("insert into " + flatAllViewName + "(");
|
|
|
- targetColumnMap.keySet().stream().filter(f -> sourceColumnNameList.contains(f)).map(item -> "`" + item + "`")
|
|
|
- .forEach(sqlBuilder::appendLn);
|
|
|
- sqlBuilder.appendLn(")").appendLn("with");
|
|
|
- // 省份
|
|
|
- String provinceSql = userprofileTagService.genSelectSql("省份", "b.PROVINCE", null, "TMP_PROVINCE");
|
|
|
- sqlBuilder.append(provinceSql).appendLn(",");
|
|
|
- // 城市
|
|
|
- String citySql = userprofileTagService.genSelectSql("城市", "b.AREA", null, "TMP_AREA");
|
|
|
- sqlBuilder.appendLn(citySql);
|
|
|
-
|
|
|
- sqlBuilder.appendLn("select THEMONTH_V,USERID_V,");
|
|
|
- for (Entry<String, String> entry : targetColumnMap.entrySet()) {
|
|
|
- String name = entry.getKey(), type = entry.getValue();
|
|
|
- // 包含一些特殊字段/或者源表中不存在该字段(大宽表比临时表多的字段),不处理
|
|
|
- if(Stream.of(INVALID_FIELD_NAMES).anyMatch(f -> name.contains(f))
|
|
|
- || !sourceColumnNameList.contains(name)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- if( // 投诉标签处理
|
|
|
- _handleComplain(sqlBuilder, name) ||
|
|
|
- // 省份标签处理
|
|
|
- _handleProvince(sqlBuilder, name) ||
|
|
|
- // 城市标签处理
|
|
|
- _handleCity(sqlBuilder, name) ||
|
|
|
- //其它标签处理
|
|
|
- _handleOthers(sqlBuilder, name, type)) {
|
|
|
- }
|
|
|
- }
|
|
|
- sqlBuilder.deleteLastChar().deleteLastChar();
|
|
|
- sqlBuilder.appendLn("\nfrom ads." + tempTableName + " a left join ETL_VW_TB_CALLER_AREACODE b on substring(a.USERID_V , 1, 7) = b.CALLERHEAD");
|
|
|
- log.info("\n" + sqlBuilder.toString());
|
|
|
- //dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "临时表导入视图生成");
|
|
|
+// List<JSONObject> sourceColumnList = dynamicExecuteService.list(DsEnum.clickhouse, "select name from system.columns where `table`='" + tempTableName + "'");
|
|
|
+// List<String> sourceColumnNameList = sourceColumnList.stream().map(item -> item.getString("name")).collect(Collectors.toList());
|
|
|
+//
|
|
|
+// List<JSONObject> targetColumnList = dynamicExecuteService.list(DsEnum.clickhouse, "select name from system.columns where `table`='" + cluTableName + "'");
|
|
|
+// Map<String, String> targetColumnMap = new HashMap<String, String>();
|
|
|
+// for (JSONObject jsonObject : targetColumnList) {
|
|
|
+// targetColumnMap.put(jsonObject.getString("name"), jsonObject.getString("type"));
|
|
|
+// }
|
|
|
+//
|
|
|
+// StrBuilder sqlBuilder = new StrBuilder();
|
|
|
+// sqlBuilder.appendLn("insert into " + flatAllViewName + "(");
|
|
|
+// targetColumnMap.keySet().stream().filter(f -> sourceColumnNameList.contains(f)).map(item -> "`" + item + "`")
|
|
|
+// .forEach(sqlBuilder::appendLn);
|
|
|
+// sqlBuilder.appendLn(")").appendLn("with");
|
|
|
+// // 省份
|
|
|
+// String provinceSql = userprofileTagService.genSelectSql("省份", "b.PROVINCE", null, "TMP_PROVINCE");
|
|
|
+// sqlBuilder.append(provinceSql).appendLn(",");
|
|
|
+// // 城市
|
|
|
+// String citySql = userprofileTagService.genSelectSql("城市", "b.AREA", null, "TMP_AREA");
|
|
|
+// sqlBuilder.appendLn(citySql);
|
|
|
+//
|
|
|
+// sqlBuilder.appendLn("select THEMONTH_V,USERID_V,");
|
|
|
+// for (Entry<String, String> entry : targetColumnMap.entrySet()) {
|
|
|
+// String name = entry.getKey(), type = entry.getValue();
|
|
|
+// // 包含一些特殊字段/或者源表中不存在该字段(大宽表比临时表多的字段),不处理
|
|
|
+// if(Stream.of(INVALID_FIELD_NAMES).anyMatch(f -> name.contains(f))
|
|
|
+// || !sourceColumnNameList.contains(name)) {
|
|
|
+// continue;
|
|
|
+// }
|
|
|
+// if( // 投诉标签处理
|
|
|
+// _handleComplain(sqlBuilder, name) ||
|
|
|
+// // 省份标签处理
|
|
|
+// _handleProvince(sqlBuilder, name) ||
|
|
|
+// // 城市标签处理
|
|
|
+// _handleCity(sqlBuilder, name) ||
|
|
|
+// //其它标签处理
|
|
|
+// _handleOthers(sqlBuilder, name, type)) {
|
|
|
+// }
|
|
|
+// }
|
|
|
+// sqlBuilder.deleteLastChar().deleteLastChar();
|
|
|
+// sqlBuilder.appendLn("\nfrom ads." + tempTableName + " a left join ETL_VW_TB_CALLER_AREACODE b on substring(a.USERID_V , 1, 7) = b.CALLERHEAD");
|
|
|
+// log.info("\n" + sqlBuilder.toString());
|
|
|
+// //dynamicExecuteService.execute(DsEnum.clickhouse, sqlBuilder.toString(), "临时表导入视图生成");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -172,9 +171,9 @@ public class FlatTagHandler {
|
|
|
//按手机号 查找分段
|
|
|
Set<String> setps=new HashSet<>();
|
|
|
StrBuilder sqlBuilder=new StrBuilder();
|
|
|
- sqlBuilder.appendLn(String.format("with intDiv((select COUNT() from ads.%s),%d) as step",tempTableName,clickhouseDbCluster.length))
|
|
|
+ sqlBuilder.appendLn(String.format("with intDiv((select COUNT() from ads.%s),%d) as step", TagKit.getTagTempTable(tempTableName,month),clickhouseDbCluster.length))
|
|
|
.appendLn("select USERID_V,(rowNumberInAllBlocks()+1) rownum from (")
|
|
|
- .appendLn(String.format("select USERID_V from ads.%s order by USERID_V ",tempTableName)+")")
|
|
|
+ .appendLn(String.format("select USERID_V from ads.%s order by USERID_V ",TagKit.getTagTempTable(tempTableName,month))+")")
|
|
|
.append("where rownum in (");
|
|
|
IntStream.range(1, clickhouseDbCluster.length).forEachOrdered(idx -> {setps.add(String.format("step*%d",idx));});
|
|
|
sqlBuilder.append(String.join(",", setps));
|
|
@@ -197,13 +196,13 @@ public class FlatTagHandler {
|
|
|
.appendLn(")")
|
|
|
.appendLn("select")
|
|
|
.appendLn(String.join(",", columnList))
|
|
|
- .appendLn("\nfrom ads." + tempTableName)
|
|
|
+ .appendLn("\nfrom ads." + TagKit.getTagTempTable(tempTableName,month))
|
|
|
.appendLn("where THEMONTH_V = '"+month+"' "+userIdSql);
|
|
|
_shellToCluster(new ShellCommond(clickhouseDbCluster[i],i==0,builder.toString()));
|
|
|
}else{
|
|
|
builder.append("echo 'select ")
|
|
|
.append(String.join(",", columnList))
|
|
|
- .append(String.format(" from ads.%s where THEMONTH_V = toString(%s) %s' | ",tempTableName,month,userIdSql))
|
|
|
+ .append(String.format(" from ads.%s where THEMONTH_V = toString(%s) %s' | ",TagKit.getTagTempTable(tempTableName,month),month,userIdSql))
|
|
|
.append(String.format(" curl %s:8123?database=ads -u%s:%s -d @- | ",firstDb,clickhouseUserName,clickhousePassWord))
|
|
|
.append(String.format(" docker run -i --rm yandex/clickhouse-client --host %s --port 9007 --user %s --password %s --input_format_allow_errors_num=100",clickhouseDbCluster[i],clickhouseUserName,clickhousePassWord))
|
|
|
.append(String.format(" --query='INSERT INTO ads.%s( ",cluTableName))
|
|
@@ -243,8 +242,8 @@ public class FlatTagHandler {
|
|
|
/**
|
|
|
* 创建Flat表 [初始化执行一次]
|
|
|
*/
|
|
|
- public void initCreateFlatTable() {
|
|
|
- List<JSONObject> columnList = dynamicExecuteService.list(DsEnum.clickhouse, "select name,`type` from system.columns where `table`='" + tempTableName + "'");
|
|
|
+ public void initCreateFlatTable(String month) {
|
|
|
+ List<JSONObject> columnList = dynamicExecuteService.list(DsEnum.clickhouse, "select name,`type` from system.columns where `table`='" + TagKit.getTagTempTable(tempTableName,month) + "'");
|
|
|
StrBuilder sqlBuilder = new StrBuilder();
|
|
|
sqlBuilder.appendLn("create table ads." + cluTableName + " on cluster " + CLUSTER_NAME + " (")
|
|
|
.appendLn("`THEMONTH_V` String, ").appendLn("`USERID_V` String,");
|
|
@@ -264,8 +263,8 @@ public class FlatTagHandler {
|
|
|
/**
|
|
|
* 校验字段列是否有改变
|
|
|
*/
|
|
|
- private void _checkAndDealColumn() {
|
|
|
- List<JSONObject> checkTableDiffList = _getDiffColumnList();
|
|
|
+ private void _checkAndDealColumn(String month) {
|
|
|
+ List<JSONObject> checkTableDiffList = _getDiffColumnList(month);
|
|
|
List<String> addColumns = new ArrayList<>();
|
|
|
// List<String> delColumns = new ArrayList<>();
|
|
|
for(JSONObject dataObj : checkTableDiffList) {
|
|
@@ -284,7 +283,7 @@ public class FlatTagHandler {
|
|
|
//是否需要增删字段; 是->先drop视图
|
|
|
if(!addColumns.isEmpty()) { //|| !delColumns.isEmpty()
|
|
|
//1.drop视图
|
|
|
- String dropViewSql = String.format("drop table %s ON cluster %s", flatAllViewName, CLUSTER_NAME);
|
|
|
+ String dropViewSql = String.format("drop table IF EXISTS %s ON cluster %s", flatAllViewName, CLUSTER_NAME);
|
|
|
log.info("\n" + dropViewSql);
|
|
|
dynamicExecuteService.execute(DsEnum.clickhouse, dropViewSql, "删除视图完成");
|
|
|
log.info("删除视图完成,开始等待8s处理增删列...");
|
|
@@ -307,12 +306,12 @@ public class FlatTagHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private List<JSONObject> _getDiffColumnList(){
|
|
|
+ private List<JSONObject> _getDiffColumnList(String month){
|
|
|
StrBuilder checkTableDiffSql = new StrBuilder()
|
|
|
.appendLn("select temp.name as temp_name,temp.type as tp,clu.name as clu_name from")
|
|
|
- .appendLn("(select name,`type` from system.columns where `table`='" + tempTableName +(useDebug==true?"_dev":"") +"')temp")
|
|
|
+ .appendLn("(select name,`type` from system.columns where `table`='" + TagKit.getTagTempTable(tempTableName,month) +"')temp")
|
|
|
.appendLn("full outer join")
|
|
|
- .appendLn("(select name,`type` from system.columns where `table`='" + cluTableName +(useDebug==true?"_dev":"")+ "')clu")
|
|
|
+ .appendLn("(select name,`type` from system.columns where `table`='" + cluTableName + "')clu")
|
|
|
.appendLn("on temp.name=clu.name");
|
|
|
return dynamicExecuteService.list(DsEnum.clickhouse, checkTableDiffSql.toString());
|
|
|
}
|