|
@@ -18,6 +18,7 @@ import org.apache.commons.io.FileUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
@@ -33,6 +34,9 @@ import java.util.concurrent.TimeUnit;
|
|
|
@Component
|
|
|
public class XxhTagManager {
|
|
|
|
|
|
+ final static Long stepSize=50000000L;
|
|
|
+ final static Long allUserSize=300000000L;
|
|
|
+ final static String tempTableSuf="_tempRN";
|
|
|
final static String ETL_TABLE_NAME = "ads.XXH_ALL_MONTH_ETL2";
|
|
|
@Autowired
|
|
|
protected DynamicExecuteService dynamicExecuteService;
|
|
@@ -153,4 +157,109 @@ public class XxhTagManager {
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 更新 etl库 历史数据带手机号的
|
|
|
+ * @param listStr 要处理的表名 【表名,userid字段】 用\n分割(直接从文本文件里复制所得)
|
|
|
+ */
|
|
|
+ public void updateHistoryTable2RowNo (String listStr) {
|
|
|
+ String[] s1=listStr.split("\\n");
|
|
|
+ Arrays.stream(s1).forEach(str-> {
|
|
|
+ String[] s2=str.split(",");
|
|
|
+ String tableName=s2[0];
|
|
|
+ String userId=s2[1];
|
|
|
+ JSONObject jsonObject =dynamicExecuteService.get(DsEnum.clickhouse, String.format("select create_table_query,partition_key from `system`.tables t where name ='%s' ",tableName));
|
|
|
+ if(jsonObject != null ){
|
|
|
+ String runCreateSql=jsonObject.getString("create_table_query");
|
|
|
+ String partition_key=jsonObject.getString("partition_key");
|
|
|
+ String tempRowNumber=tableName+tempTableSuf;
|
|
|
+ //【1】创建temp表
|
|
|
+ runCreateSql=runCreateSql.replace("CREATE TABLE","CREATE TABLE if not exists")
|
|
|
+ .replace(tableName,tempRowNumber);
|
|
|
+ log.info("\n" + runCreateSql);
|
|
|
+ //!!!
|
|
|
+ //dynamicExecuteService.execute(DsEnum.clickhouse,runCreateSql, "手动创建复制库表!");
|
|
|
+ //【2】查询列
|
|
|
+ List<JSONObject> colList= dynamicExecuteService.list(DsEnum.clickhouse, String.format("select name from system.columns where `table`='%s'",tableName));
|
|
|
+ SortedSet<String> columns = new TreeSet<String>();
|
|
|
+ colList.stream().map(item -> item.getString("name")).forEach(columns::add);
|
|
|
+
|
|
|
+ //【3】查询数据总量
|
|
|
+ if(!partition_key.isEmpty()){
|
|
|
+ List<JSONObject> partitionObjList =dynamicExecuteService.list(DsEnum.clickhouse,
|
|
|
+ String.format("select distinct `%s` as pcol from ads.%s ",partition_key,tableName));
|
|
|
+ List<String> partitionList = new ArrayList<>();
|
|
|
+ partitionObjList.stream().map(item -> item.getString("pcol")).forEach(partitionList::add);
|
|
|
+ if(partitionList.size()>0){
|
|
|
+ partitionList.stream().forEach(pcol->{
|
|
|
+ String _sql=String.format("with (select round(round((%s/(round(COUNT()/%s)+1)))/1000)*1000 from ads.%s where %s ='%s') as step\n",
|
|
|
+ allUserSize.toString(),stepSize.toString(),tableName,partition_key,pcol) +
|
|
|
+ "SELECT USERID from (select USERID from ads.CFG_ZT_USERS order by USERID) where rowNumberInAllBlocks() % (step/1000) =0\n";
|
|
|
+ List<JSONObject> userSpList =dynamicExecuteService.list(DsEnum.clickhouse,_sql);
|
|
|
+ //分区 分号段 计算
|
|
|
+ if(userSpList.size()>1){
|
|
|
+ SortedSet<String> mobileList = new TreeSet<String>();
|
|
|
+ userSpList.stream().map(item -> item.getString("USERID")).forEach(mobileList::add);
|
|
|
+ mobileList.add("999");
|
|
|
+
|
|
|
+ String mobileLeft = mobileList.first();
|
|
|
+ for(Iterator<String> it = mobileList.iterator(); it.hasNext(); ) {
|
|
|
+ String mobileRight = it.next();
|
|
|
+ if(mobileLeft.equals(mobileRight)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ _getJoinSql(tableName,columns,userId,partition_key,pcol,mobileLeft,mobileRight);
|
|
|
+ mobileLeft = mobileRight;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //分区 计算
|
|
|
+ else{
|
|
|
+ _getJoinSql(tableName,columns,userId,partition_key,pcol,"","");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ //直接 计算
|
|
|
+ _getJoinSql(tableName,columns,userId,"","","","");
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取join sql
|
|
|
+ * @param tableName
|
|
|
+ * @param partitionCol
|
|
|
+ * @param partitionVal
|
|
|
+ * @param leftUserId
|
|
|
+ * @param rigthUserId
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private String _getJoinSql(String tableName,SortedSet<String> columns, String userId, String partitionCol,String partitionVal,String leftUserId,String rigthUserId){
|
|
|
+ StrBuilder builder=new StrBuilder();
|
|
|
+ String joinStr=String.join(",", columns);
|
|
|
+ if(userId.equals("PHONE")){
|
|
|
+ joinStr=(joinStr+",").replace("PHONE,","s.PHONE,");
|
|
|
+ joinStr=joinStr.substring(0,joinStr.length()-1);
|
|
|
+ }else{
|
|
|
+ joinStr=(joinStr+",").replace(userId+",","s1.ROWNUMBER as "+userId+",");
|
|
|
+ joinStr=joinStr.substring(0,joinStr.length()-1);
|
|
|
+ }
|
|
|
+ builder.append(String.format("insert into ads.%s select ",tableName+tempTableSuf))
|
|
|
+ .appendLn(joinStr)
|
|
|
+ .appendLn(String.format(" from ads.%s s left join db92.DIC_PHONE_ROWNUMBER s1 on s.%s=s1.PHONE ",tableName,userId));
|
|
|
+
|
|
|
+ if(!partitionCol.isEmpty()){
|
|
|
+ builder.appendLn(String.format(" where %s='%s' ",partitionCol,partitionVal));
|
|
|
+ if(!leftUserId.isEmpty()){
|
|
|
+ builder.appendLn(String.format(" and s.%s>=%s and s.%s<%s",userId,leftUserId,userId,rigthUserId));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.info("\n" + builder.toString());
|
|
|
+ //!!!
|
|
|
+ //dynamicExecuteService.execute(DsEnum.clickhouse, builder.toString(), "插入temp表");
|
|
|
+ return builder.toString();
|
|
|
+ }
|
|
|
}
|