From 04b2dfde54f4ccbbd4b9f07bb48deac2a6678422 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 13 Sep 2024 00:03:49 +0800 Subject: [PATCH] 2 --- be/src/vec/exec/scan/vmeta_scanner.cpp | 22 +++++ be/src/vec/exec/scan/vmeta_scanner.h | 2 + .../org/apache/doris/nereids/DorisParser.g4 | 6 +- .../functions/table/PartitionValues.java | 4 +- .../tablefunction/MetadataGenerator.java | 83 ++++++++++++++++++- .../MetadataTableValuedFunction.java | 2 - .../PartitionValuesTableValuedFunction.java | 55 ++---------- .../tablefunction/TableValuedFunctionIf.java | 2 + gensrc/thrift/FrontendService.thrift | 1 + gensrc/thrift/PlanNodes.thrift | 8 ++ gensrc/thrift/Types.thrift | 3 +- 11 files changed, 131 insertions(+), 57 deletions(-) diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index f5864924a389fa7..512fd2a9a6aa45d 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -241,6 +241,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { case TMetadataType::TASKS: RETURN_IF_ERROR(_build_tasks_metadata_request(meta_scan_range, &request)); break; + case TMetadataType::PARTITION_VALUES: + RETURN_IF_ERROR(_build_partition_values_metadata_request(meta_scan_range, &request)); + break; default: _meta_eos = true; return Status::OK(); @@ -461,6 +464,25 @@ Status VMetaScanner::_build_tasks_metadata_request(const TMetaScanRange& meta_sc return Status::OK(); } +Status VMetaScanner::_build_partition_values_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request) { + VLOG_CRITICAL << "VMetaScanner::_build_partition_values_metadata_request"; + if (!meta_scan_range.__isset.partition_values_params) { + return Status::InternalError("Can not find TPartitionValuesMetadataParams from meta_scan_range."); + } + + // create request + request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE); + + // create TMetadataTableRequestParams + TMetadataTableRequestParams metadata_table_params; + metadata_table_params.__set_metadata_type(TMetadataType::PARTITION_VALUES); + metadata_table_params.__set_partition_values_metadata_params(meta_scan_range.partition_values_params); + + request->__set_metada_table_params(metadata_table_params); + return Status::OK(); +} + Status VMetaScanner::close(RuntimeState* state) { VLOG_CRITICAL << "VMetaScanner::close"; RETURN_IF_ERROR(VScanner::close(state)); diff --git a/be/src/vec/exec/scan/vmeta_scanner.h b/be/src/vec/exec/scan/vmeta_scanner.h index a9975300cdc5002..3a68c9d869c901b 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.h +++ b/be/src/vec/exec/scan/vmeta_scanner.h @@ -86,6 +86,8 @@ class VMetaScanner : public VScanner { TFetchSchemaTableDataRequest* request); Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* request); + Status _build_partition_values_metadata_request(const TMetaScanRange& meta_scan_range, + TFetchSchemaTableDataRequest* request); bool _meta_eos; TupleId _tuple_id; TUserIdentity _user_identity; diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 013847337ac58db..249e94aedc738f1 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -1265,13 +1265,13 @@ optScanParams : ATSIGN funcName=identifier LEFT_PAREN (properties=propertyItemList)? RIGHT_PAREN ; -sysTableParams - : DOLLARSIGN sysTableName=identifier +metaTableParams + : DOLLARSIGN metaTableName=identifier ; relationPrimary : multipartIdentifier optScanParams? materializedViewName? tableSnapshot? specifiedPartition? - tabletList? tableAlias sample? relationHint? sysTableParams? lateralView* #tableName + tabletList? tableAlias sample? relationHint? metaTableParams? lateralView* #tableName | LEFT_PAREN query RIGHT_PAREN tableAlias lateralView* #aliasedQuery | tvfName=identifier LEFT_PAREN (properties=propertyItemList)? diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PartitionValues.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PartitionValues.java index 6b88c8d93c17d21..d206c764c0675ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PartitionValues.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/PartitionValues.java @@ -22,7 +22,7 @@ import org.apache.doris.nereids.trees.expressions.Properties; import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; import org.apache.doris.nereids.types.coercion.AnyDataType; -import org.apache.doris.tablefunction.PartitionsTableValuedFunction; +import org.apache.doris.tablefunction.PartitionValuesTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionIf; import java.util.Map; @@ -44,7 +44,7 @@ public FunctionSignature customSignature() { protected TableValuedFunctionIf toCatalogFunction() { try { Map arguments = getTVFProperties().getMap(); - return new PartitionsTableValuedFunction(arguments); + return new PartitionValuesTableValuedFunction(arguments); } catch (Throwable t) { throw new AnalysisException("Can not build PartitionsTableValuedFunction by " + this + ": " + t.getMessage(), t); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 4d07d219dc4882e..88e7c3be45a2f50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.SchemaTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.TableProperty; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; @@ -42,11 +43,13 @@ import org.apache.doris.common.proc.PartitionsProcDir; import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.datasource.hudi.source.HudiCachedPartitionProcessor; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; @@ -78,6 +81,7 @@ import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPartitionValuesMetadataParams; import org.apache.doris.thrift.TPartitionsMetadataParams; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TRow; @@ -204,7 +208,8 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData } TFetchSchemaTableDataResult result; TMetadataTableRequestParams params = request.getMetadaTableParams(); - switch (request.getMetadaTableParams().getMetadataType()) { + TMetadataType metadataType = request.getMetadaTableParams().getMetadataType(); + switch (metadataType) { case ICEBERG: result = icebergMetadataResult(params); break; @@ -232,11 +237,17 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData case TASKS: result = taskMetadataResult(params); break; + case PARTITION_VALUES: + result = partitionValuesMetadataResult(params); + break; default: return errorResult("Metadata table params is not set."); } if (result.getStatus().getStatusCode() == TStatusCode.OK) { - filterColumns(result, params.getColumnsName(), params.getMetadataType(), params); + if (metadataType != TMetadataType.PARTITION_VALUES) { + // partition_values' result already sorted by column names + filterColumns(result, params.getColumnsName(), params.getMetadataType(), params); + } } if (LOG.isDebugEnabled()) { LOG.debug("getMetadataTable() end."); @@ -1459,4 +1470,72 @@ private static void fillBatch(List dataBatch, Map dataBatch; + try { + TableIf table = PartitionValuesTableValuedFunction.analyzeAndGetTable(ctlName, dbName, tblName); + TableType tableType = table.getType(); + switch (tableType) { + case HMS_EXTERNAL_TABLE: + dataBatch = partitionValuesMetadataResultForHmsTable((HMSExternalTable) table, + params.getColumnsName()); + break; + default: + return errorResult("not support table type " + tableType.name()); + } + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } catch (Throwable t) { + LOG.warn("error when get partition values metadata. {}.{}.{}", ctlName, dbName, tblName, t); + return errorResult("error when get partition values metadata: " + Util.getRootCauseMessage(t)); + } + } + + private static List partitionValuesMetadataResultForHmsTable(HMSExternalTable tbl, List colNames) + throws AnalysisException { + List partitionCols = tbl.getPartitionColumns(); + List colIdxs = Lists.newArrayList(); + for (String colName : colNames) { + for (int i = 0; i < partitionCols.size(); ++i) { + if (partitionCols.get(i).getName().equalsIgnoreCase(colName)) { + colIdxs.add(i); + } + } + } + if (colIdxs.size() != colNames.size()) { + throw new AnalysisException( + "column " + colNames + " does not match partition columns of table " + tbl.getName()); + } + + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) tbl.getCatalog()); + HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( + tbl.getDbName(), tbl.getName(), tbl.getPartitionColumnTypes()); + Map> valuesMap = hivePartitionValues.getPartitionValuesMap(); + List dataBatch = Lists.newArrayList(); + for (Map.Entry> entry : valuesMap.entrySet()) { + TRow trow = new TRow(); + List values = entry.getValue(); + if (values.size() != partitionCols.size()) { + continue; + } + for (Integer idx : colIdxs) { + trow.addToColumnValue(new TCell().setStringVal(values.get(idx))); // COLUMN_VALUE + } + dataBatch.add(trow); + } + return dataBatch; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java index f6b6bcb667fe786..a7e25bc7f824453 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java @@ -50,8 +50,6 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co return JobsTableValuedFunction.getColumnIndexFromColumnName(columnName, params); case TASKS: return TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params); - case PARTITION_VALUES: - return PartitionValuesTableValuedFunction.getColumnIndexFromColumnName(columnName, params); default: throw new AnalysisException("Unknown Metadata TableValuedFunction type"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java index b42e1b781211a43..b8712e668accd0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java @@ -20,14 +20,12 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; @@ -35,12 +33,11 @@ import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TMetaScanRange; -import org.apache.doris.thrift.TMetadataTableRequestParams; import org.apache.doris.thrift.TMetadataType; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -65,51 +62,11 @@ public class PartitionValuesTableValuedFunction extends MetadataTableValuedFunct private static final ImmutableSet PROPERTIES_SET = ImmutableSet.of(CATALOG, DB, TABLE); - private static final ImmutableList SCHEMA_FOR_OLAP_TABLE = ImmutableList.of( - new Column("PartitionId", ScalarType.createType(PrimitiveType.BIGINT)), - new Column("PartitionName", ScalarType.createStringType()), - new Column("VisibleVersion", ScalarType.createType(PrimitiveType.BIGINT)), - new Column("VisibleVersionTime", ScalarType.createStringType()), - new Column("State", ScalarType.createStringType()), - new Column("PartitionKey", ScalarType.createStringType()), - new Column("Range", ScalarType.createStringType()), - new Column("DistributionKey", ScalarType.createStringType()), - new Column("Buckets", ScalarType.createType(PrimitiveType.INT)), - new Column("ReplicationNum", ScalarType.createType(PrimitiveType.INT)), - new Column("StorageMedium", ScalarType.createStringType()), - new Column("CooldownTime", ScalarType.createStringType()), - new Column("RemoteStoragePolicy", ScalarType.createStringType()), - new Column("LastConsistencyCheckTime", ScalarType.createStringType()), - new Column("DataSize", ScalarType.createStringType()), - new Column("IsInMemory", ScalarType.createType(PrimitiveType.BOOLEAN)), - new Column("ReplicaAllocation", ScalarType.createStringType()), - new Column("IsMutable", ScalarType.createType(PrimitiveType.BOOLEAN)), - new Column("SyncWithBaseTables", ScalarType.createType(PrimitiveType.BOOLEAN)), - new Column("UnsyncTables", ScalarType.createStringType())); - - private List schema; - - public static Integer getColumnIndexFromColumnName(String columnName, TMetadataTableRequestParams params) - throws org.apache.doris.common.AnalysisException { - if (!params.isSetPartitionValuesMetadataParams()) { - throw new org.apache.doris.common.AnalysisException("PartitionValues metadata params is not set."); - } - TPartitionValuesMetadataParams partitionValuesMetadataParams = params.getPartitionValuesMetadataParams(); - String catalogName = partitionValuesMetadataParams.getCatalog(); - String dbName = partitionValuesMetadataParams.getDatabase(); - String tblName = partitionValuesMetadataParams.getTable(); - TableIf table = analyzeAndGetTable(catalogName, dbName, tblName); - if (InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalogName)) { - return OLAP_TABLE_COLUMN_TO_INDEX.get(columnName.toLowerCase()); - } else { - return EXTERNAL_TABLE_COLUMN_TO_INDEX.get(columnName.toLowerCase()); - } - } - private final String catalogName; private final String databaseName; private final String tableName; private TableIf table; + private List schema; public PartitionValuesTableValuedFunction(Map params) throws AnalysisException { Map validParams = Maps.newHashMap(); @@ -135,7 +92,7 @@ public PartitionValuesTableValuedFunction(Map params) throws Ana } } - private static TableIf analyzeAndGetTable(String catalogName, String dbName, String tableName) { + public static TableIf analyzeAndGetTable(String catalogName, String dbName, String tableName) { if (!Env.getCurrentEnv().getAccessManager() .checkTblPriv(ConnectContext.get(), catalogName, dbName, tableName, PrivPredicate.SHOW)) { @@ -207,7 +164,11 @@ public List getTableColumns() throws AnalysisException { Preconditions.checkNotNull(table); // TODOļ¼š support other type of tables if (schema == null) { - schema = ((HMSExternalTable) table).getPartitionColumns(); + List partitionColumns = ((HMSExternalTable) table).getPartitionColumns(); + schema = Lists.newArrayList(); + for (Column column : partitionColumns) { + schema.add(new Column(column.getName(), ScalarType.createStringType())); + } } return schema; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index 6b6fda088a8a041..d4faa46019541c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -77,6 +77,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map