Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Sep 12, 2024
1 parent e3e5ca1 commit 04b2dfd
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 57 deletions.
22 changes: 22 additions & 0 deletions be/src/vec/exec/scan/vmeta_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
case TMetadataType::TASKS:
RETURN_IF_ERROR(_build_tasks_metadata_request(meta_scan_range, &request));
break;
case TMetadataType::PARTITION_VALUES:
RETURN_IF_ERROR(_build_partition_values_metadata_request(meta_scan_range, &request));
break;
default:
_meta_eos = true;
return Status::OK();
Expand Down Expand Up @@ -461,6 +464,25 @@ Status VMetaScanner::_build_tasks_metadata_request(const TMetaScanRange& meta_sc
return Status::OK();
}

Status VMetaScanner::_build_partition_values_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request) {
VLOG_CRITICAL << "VMetaScanner::_build_partition_values_metadata_request";
if (!meta_scan_range.__isset.partition_values_params) {
return Status::InternalError("Can not find TPartitionValuesMetadataParams from meta_scan_range.");
}

// create request
request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);

// create TMetadataTableRequestParams
TMetadataTableRequestParams metadata_table_params;
metadata_table_params.__set_metadata_type(TMetadataType::PARTITION_VALUES);
metadata_table_params.__set_partition_values_metadata_params(meta_scan_range.partition_values_params);

request->__set_metada_table_params(metadata_table_params);
return Status::OK();
}

Status VMetaScanner::close(RuntimeState* state) {
VLOG_CRITICAL << "VMetaScanner::close";
RETURN_IF_ERROR(VScanner::close(state));
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vmeta_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class VMetaScanner : public VScanner {
TFetchSchemaTableDataRequest* request);
Status _build_queries_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
Status _build_partition_values_metadata_request(const TMetaScanRange& meta_scan_range,
TFetchSchemaTableDataRequest* request);
bool _meta_eos;
TupleId _tuple_id;
TUserIdentity _user_identity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,13 +1265,13 @@ optScanParams
: ATSIGN funcName=identifier LEFT_PAREN (properties=propertyItemList)? RIGHT_PAREN
;

sysTableParams
: DOLLARSIGN sysTableName=identifier
metaTableParams
: DOLLARSIGN metaTableName=identifier
;

relationPrimary
: multipartIdentifier optScanParams? materializedViewName? tableSnapshot? specifiedPartition?
tabletList? tableAlias sample? relationHint? sysTableParams? lateralView* #tableName
tabletList? tableAlias sample? relationHint? metaTableParams? lateralView* #tableName
| LEFT_PAREN query RIGHT_PAREN tableAlias lateralView* #aliasedQuery
| tvfName=identifier LEFT_PAREN
(properties=propertyItemList)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.PartitionsTableValuedFunction;
import org.apache.doris.tablefunction.PartitionValuesTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;

import java.util.Map;
Expand All @@ -44,7 +44,7 @@ public FunctionSignature customSignature() {
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return new PartitionsTableValuedFunction(arguments);
return new PartitionValuesTableValuedFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build PartitionsTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.catalog.SchemaTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.TableProperty;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
Expand All @@ -42,11 +43,13 @@
import org.apache.doris.common.proc.PartitionsProcDir;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hudi.source.HudiCachedPartitionProcessor;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
Expand Down Expand Up @@ -78,6 +81,7 @@
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPartitionValuesMetadataParams;
import org.apache.doris.thrift.TPartitionsMetadataParams;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TRow;
Expand Down Expand Up @@ -204,7 +208,8 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData
}
TFetchSchemaTableDataResult result;
TMetadataTableRequestParams params = request.getMetadaTableParams();
switch (request.getMetadaTableParams().getMetadataType()) {
TMetadataType metadataType = request.getMetadaTableParams().getMetadataType();
switch (metadataType) {
case ICEBERG:
result = icebergMetadataResult(params);
break;
Expand Down Expand Up @@ -232,11 +237,17 @@ public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableData
case TASKS:
result = taskMetadataResult(params);
break;
case PARTITION_VALUES:
result = partitionValuesMetadataResult(params);
break;
default:
return errorResult("Metadata table params is not set.");
}
if (result.getStatus().getStatusCode() == TStatusCode.OK) {
filterColumns(result, params.getColumnsName(), params.getMetadataType(), params);
if (metadataType != TMetadataType.PARTITION_VALUES) {
// partition_values' result already sorted by column names
filterColumns(result, params.getColumnsName(), params.getMetadataType(), params);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("getMetadataTable() end.");
Expand Down Expand Up @@ -1459,4 +1470,72 @@ private static void fillBatch(List<TRow> dataBatch, Map<String, Map<String, Stri
}
}
}

private static TFetchSchemaTableDataResult partitionValuesMetadataResult(TMetadataTableRequestParams params) {
if (!params.isSetPartitionsValuesMetadataParams()) {
return errorResult("partition values metadata params is not set.");
}

TPartitionValuesMetadataParams metaParams = params.getPartitionsValuesMetadataParams();
String ctlName = metaParams.getCatalog();
String dbName = metaParams.getDatabase();
String tblName = metaParams.getTable();
List<TRow> dataBatch;
try {
TableIf table = PartitionValuesTableValuedFunction.analyzeAndGetTable(ctlName, dbName, tblName);
TableType tableType = table.getType();
switch (tableType) {
case HMS_EXTERNAL_TABLE:
dataBatch = partitionValuesMetadataResultForHmsTable((HMSExternalTable) table,
params.getColumnsName());
break;
default:
return errorResult("not support table type " + tableType.name());
}
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
result.setDataBatch(dataBatch);
result.setStatus(new TStatus(TStatusCode.OK));
return result;
} catch (Throwable t) {
LOG.warn("error when get partition values metadata. {}.{}.{}", ctlName, dbName, tblName, t);
return errorResult("error when get partition values metadata: " + Util.getRootCauseMessage(t));
}
}

private static List<TRow> partitionValuesMetadataResultForHmsTable(HMSExternalTable tbl, List<String> colNames)
throws AnalysisException {
List<Column> partitionCols = tbl.getPartitionColumns();
List<Integer> colIdxs = Lists.newArrayList();
for (String colName : colNames) {
for (int i = 0; i < partitionCols.size(); ++i) {
if (partitionCols.get(i).getName().equalsIgnoreCase(colName)) {
colIdxs.add(i);
}
}
}
if (colIdxs.size() != colNames.size()) {
throw new AnalysisException(
"column " + colNames + " does not match partition columns of table " + tbl.getName());
}

HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) tbl.getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
tbl.getDbName(), tbl.getName(), tbl.getPartitionColumnTypes());
Map<Long, List<String>> valuesMap = hivePartitionValues.getPartitionValuesMap();
List<TRow> dataBatch = Lists.newArrayList();
for (Map.Entry<Long, List<String>> entry : valuesMap.entrySet()) {
TRow trow = new TRow();
List<String> values = entry.getValue();
if (values.size() != partitionCols.size()) {
continue;
}
for (Integer idx : colIdxs) {
trow.addToColumnValue(new TCell().setStringVal(values.get(idx))); // COLUMN_VALUE
}
dataBatch.add(trow);
}
return dataBatch;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public static Integer getColumnIndexFromColumnName(TMetadataType type, String co
return JobsTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
case TASKS:
return TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
case PARTITION_VALUES:
return PartitionValuesTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
default:
throw new AnalysisException("Unknown Metadata TableValuedFunction type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,24 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
Expand All @@ -65,51 +62,11 @@ public class PartitionValuesTableValuedFunction extends MetadataTableValuedFunct

private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(CATALOG, DB, TABLE);

private static final ImmutableList<Column> SCHEMA_FOR_OLAP_TABLE = ImmutableList.of(
new Column("PartitionId", ScalarType.createType(PrimitiveType.BIGINT)),
new Column("PartitionName", ScalarType.createStringType()),
new Column("VisibleVersion", ScalarType.createType(PrimitiveType.BIGINT)),
new Column("VisibleVersionTime", ScalarType.createStringType()),
new Column("State", ScalarType.createStringType()),
new Column("PartitionKey", ScalarType.createStringType()),
new Column("Range", ScalarType.createStringType()),
new Column("DistributionKey", ScalarType.createStringType()),
new Column("Buckets", ScalarType.createType(PrimitiveType.INT)),
new Column("ReplicationNum", ScalarType.createType(PrimitiveType.INT)),
new Column("StorageMedium", ScalarType.createStringType()),
new Column("CooldownTime", ScalarType.createStringType()),
new Column("RemoteStoragePolicy", ScalarType.createStringType()),
new Column("LastConsistencyCheckTime", ScalarType.createStringType()),
new Column("DataSize", ScalarType.createStringType()),
new Column("IsInMemory", ScalarType.createType(PrimitiveType.BOOLEAN)),
new Column("ReplicaAllocation", ScalarType.createStringType()),
new Column("IsMutable", ScalarType.createType(PrimitiveType.BOOLEAN)),
new Column("SyncWithBaseTables", ScalarType.createType(PrimitiveType.BOOLEAN)),
new Column("UnsyncTables", ScalarType.createStringType()));

private List<Column> schema;

public static Integer getColumnIndexFromColumnName(String columnName, TMetadataTableRequestParams params)
throws org.apache.doris.common.AnalysisException {
if (!params.isSetPartitionValuesMetadataParams()) {
throw new org.apache.doris.common.AnalysisException("PartitionValues metadata params is not set.");
}
TPartitionValuesMetadataParams partitionValuesMetadataParams = params.getPartitionValuesMetadataParams();
String catalogName = partitionValuesMetadataParams.getCatalog();
String dbName = partitionValuesMetadataParams.getDatabase();
String tblName = partitionValuesMetadataParams.getTable();
TableIf table = analyzeAndGetTable(catalogName, dbName, tblName);
if (InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalogName)) {
return OLAP_TABLE_COLUMN_TO_INDEX.get(columnName.toLowerCase());
} else {
return EXTERNAL_TABLE_COLUMN_TO_INDEX.get(columnName.toLowerCase());
}
}

private final String catalogName;
private final String databaseName;
private final String tableName;
private TableIf table;
private List<Column> schema;

public PartitionValuesTableValuedFunction(Map<String, String> params) throws AnalysisException {
Map<String, String> validParams = Maps.newHashMap();
Expand All @@ -135,7 +92,7 @@ public PartitionValuesTableValuedFunction(Map<String, String> params) throws Ana
}
}

private static TableIf analyzeAndGetTable(String catalogName, String dbName, String tableName) {
public static TableIf analyzeAndGetTable(String catalogName, String dbName, String tableName) {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), catalogName, dbName,
tableName, PrivPredicate.SHOW)) {
Expand Down Expand Up @@ -207,7 +164,11 @@ public List<Column> getTableColumns() throws AnalysisException {
Preconditions.checkNotNull(table);
// TODO: support other type of tables
if (schema == null) {
schema = ((HMSExternalTable) table).getPartitionColumns();
List<Column> partitionColumns = ((HMSExternalTable) table).getPartitionColumns();
schema = Lists.newArrayList();
for (Column column : partitionColumns) {
schema.add(new Column(column.getName(), ScalarType.createStringType()));
}
}
return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public static TableValuedFunctionIf getTableFunction(String funcName, Map<String
return new GroupCommitTableValuedFunction(params);
case QueryTableValueFunction.NAME:
return QueryTableValueFunction.createQueryTableValueFunction(params);
case PartitionValuesTableValuedFunction.NAME:
return new PartitionValuesTableValuedFunction(params);
default:
throw new AnalysisException("Could not find table function " + funcName);
}
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,7 @@ struct TMetadataTableRequestParams {
10: optional PlanNodes.TTasksMetadataParams tasks_metadata_params
11: optional PlanNodes.TPartitionsMetadataParams partitions_metadata_params
12: optional PlanNodes.TMetaCacheStatsParams meta_cache_stats_params
13: optional PlanNodes.TPartitionValuesMetadataParams partitions_values_metadata_params
}

struct TSchemaTableRequestParams {
Expand Down
8 changes: 8 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,12 @@ struct TPartitionsMetadataParams {
3: optional string table
}

struct TPartitionValuesMetadataParams {
1: optional string catalog
2: optional string database
3: optional string table
}

struct TJobsMetadataParams {
1: optional string type
2: optional Types.TUserIdentity current_user_ident
Expand All @@ -555,6 +561,7 @@ struct TQueriesMetadataParams {
4: optional TJobsMetadataParams jobs_params
5: optional TTasksMetadataParams tasks_params
6: optional TPartitionsMetadataParams partitions_params
7: optional TPartitionValuesMetadataParams partition_values_params
}

struct TMetaCacheStatsParams {
Expand All @@ -571,6 +578,7 @@ struct TMetaScanRange {
8: optional TTasksMetadataParams tasks_params
9: optional TPartitionsMetadataParams partitions_params
10: optional TMetaCacheStatsParams meta_cache_stats_params
11: optional TPartitionValuesMetadataParams partition_values_params
}

// Specification of an individual data range which is held in its entirety
Expand Down
3 changes: 2 additions & 1 deletion gensrc/thrift/Types.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,8 @@ enum TMetadataType {
JOBS,
TASKS,
WORKLOAD_SCHED_POLICY,
PARTITIONS;
PARTITIONS,
PARTITION_VALUES;
}

enum TIcebergQueryType {
Expand Down

0 comments on commit 04b2dfd

Please sign in to comment.