Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support persisting TableMetadata in the metastore #433

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,12 @@ public static <T> Builder<T> builder() {
"If set to true, allows tables to be dropped with the purge parameter set to true.")
.defaultValue(true)
.build();

public static final PolarisConfiguration<Boolean> METADATA_CACHE_ENABLED =
PolarisConfiguration.<Boolean>builder()
.key("METADATA_CACHE_ENABLED")
.description(
"If set to true, support serving table metadata without reading the metadata.json file.")
.defaultValue(false)
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public enum PolarisEntityType {
// generic table is either a view or a real table
TABLE_LIKE(7, NAMESPACE, false, false),
TASK(8, ROOT, false, false),
FILE(9, TABLE_LIKE, false, false);
FILE(9, TABLE_LIKE, false, false),
TABLE_METADATA(10, TABLE_LIKE, false, false);

// to efficiently map a code to its corresponding entity type, use a reverse array which
// is initialized below
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.core.entity;

import com.fasterxml.jackson.annotation.JsonIgnore;

/**
* A {@link PolarisEntity} for storing table metadata. This can contain the raw content of the
* `metadata.json` or more granular information
*/
public class TableMetadataEntity extends PolarisEntity {
private static final String CONTENT_KEY = "content";
private static final String METADATA_LOCATION_KEY = "metadata_location";

public TableMetadataEntity(PolarisBaseEntity sourceEntity) {
super(sourceEntity);
}

public static TableMetadataEntity of(PolarisBaseEntity sourceEntity) {
if (sourceEntity != null) {
return new TableMetadataEntity(sourceEntity);
}
return null;
}

@JsonIgnore
public String getContent() {
return getInternalPropertiesAsMap().get(CONTENT_KEY);
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved
}

@JsonIgnore
public String getMetadataLocation() {
return getInternalPropertiesAsMap().get(METADATA_LOCATION_KEY);
}

public static class Builder
extends PolarisEntity.BaseBuilder<TableMetadataEntity, TableMetadataEntity.Builder> {
public Builder() {
super();
setType(PolarisEntityType.TABLE_METADATA);
}

@Override
public TableMetadataEntity build() {
return new TableMetadataEntity(buildBase());
}

public TableMetadataEntity.Builder setContent(String content) {
this.internalProperties.put(CONTENT_KEY, content);
return this;
}

public TableMetadataEntity.Builder setMetadataLocation(String metadataLocation) {
this.internalProperties.put(METADATA_LOCATION_KEY, metadataLocation);
this.setName(metadataLocation);
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public EntityCacheTest() {
callCtx = new PolarisCallContext(metaStore, diagServices);
metaStoreManager = new PolarisMetaStoreManagerImpl();

// bootstrap the mata store with our test schema
// bootstrap the metastore with our test schema
tm = new PolarisTestMetaStoreManager(metaStoreManager, callCtx);
tm.testCreateTestCatalog();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand Down Expand Up @@ -102,6 +103,7 @@
import org.apache.polaris.core.storage.aws.PolarisS3FileIOClientFactory;
import org.apache.polaris.service.catalog.io.FileIOFactory;
import org.apache.polaris.service.exception.IcebergExceptionMapper;
import org.apache.polaris.service.persistence.MetadataCacheManager;
import org.apache.polaris.service.task.TaskExecutor;
import org.apache.polaris.service.types.NotificationRequest;
import org.apache.polaris.service.types.NotificationType;
Expand Down Expand Up @@ -820,6 +822,33 @@ public Map<String, String> getCredentialConfig(
storageInfo.get());
}

public TableMetadata loadTableMetadata(TableIdentifier identifier) {
boolean useMetadataCache =
callContext
.getPolarisCallContext()
.getConfigurationStore()
.getConfiguration(
callContext.getPolarisCallContext(), PolarisConfiguration.METADATA_CACHE_ENABLED);
if (!useMetadataCache) {
return loadTableMetadata(loadTable(identifier));
} else {
Supplier<TableMetadata> fallback = () -> loadTableMetadata(loadTable(identifier));
return MetadataCacheManager.loadTableMetadata(
identifier,
callContext.getPolarisCallContext(),
entityManager,
resolvedEntityView,
fallback);
}
}

private static TableMetadata loadTableMetadata(Table table) {
if (table instanceof BaseTable baseTable) {
return baseTable.operations().current();
}
throw new IllegalArgumentException("Cannot load metadata for " + table.name());
}

/**
* Based on configuration settings, for callsites that need to handle potentially setting a new
* base location for a TableLike entity, produces the transformed location if applicable, or else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,16 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier, String snaps
PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_TABLE;
authorizeBasicTableLikeOperationOrThrow(op, PolarisEntitySubType.TABLE, tableIdentifier);

return doCatalogOperation(() -> CatalogHandlers.loadTable(baseCatalog, tableIdentifier));
return doCatalogOperation(
() -> {
if (baseCatalog instanceof BasePolarisCatalog basePolarisCatalog) {
return LoadTableResponse.builder()
.withTableMetadata(basePolarisCatalog.loadTableMetadata(tableIdentifier))
.build();
}

return CatalogHandlers.loadTable(baseCatalog, tableIdentifier);
});
}

public LoadTableResponse loadTableWithAccessDelegation(
Expand Down Expand Up @@ -844,29 +853,31 @@ public LoadTableResponse loadTableWithAccessDelegation(
// when data-access is specified but access delegation grants are not found.
return doCatalogOperation(
() -> {
Table table = baseCatalog.loadTable(tableIdentifier);
TableMetadata tableMetadata = null;
if (baseCatalog instanceof BasePolarisCatalog basePolarisCatalog) {
tableMetadata = basePolarisCatalog.loadTableMetadata(tableIdentifier);
}

if (table instanceof BaseTable baseTable) {
TableMetadata tableMetadata = baseTable.operations().current();
// The metadata failed to load
if (tableMetadata == null) {
throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString());
}

if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
LoadTableResponse.Builder responseBuilder =
LoadTableResponse.builder().withTableMetadata(tableMetadata);
if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
LOGGER
.atDebug()
.addKeyValue("tableIdentifier", tableIdentifier)
.addKeyValue("tableLocation", tableMetadata.location())
.log("Fetching client credentials for table");
responseBuilder.addAllConfig(
credentialDelegation.getCredentialConfig(
tableIdentifier, tableMetadata, actionsRequested));
}
LOGGER
.atDebug()
.addKeyValue("tableIdentifier", tableIdentifier)
.addKeyValue("tableLocation", tableMetadata.location())
.log("Fetching client credentials for table");
responseBuilder.addAllConfig(
credentialDelegation.getCredentialConfig(
tableIdentifier, tableMetadata, actionsRequested));
return responseBuilder.build();
} else if (table instanceof BaseMetadataTable) {
// metadata tables are loaded on the client side, return NoSuchTableException for now
throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString());
} else {
throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
}

throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.persistence;

import java.util.Collection;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.TableLikeEntity;
import org.apache.polaris.core.entity.TableMetadataEntity;
import org.apache.polaris.core.persistence.PolarisEntityManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataCacheManager {
private static final Logger LOGGER = LoggerFactory.getLogger(MetadataCacheManager.class);

/** Load the cached {@link Table} or fall back to `fallback` if one doesn't exist */
public static TableMetadata loadTableMetadata(
TableIdentifier tableIdentifier,
PolarisCallContext callContext,
PolarisEntityManager entityManager,
PolarisResolutionManifestCatalogView resolvedEntityView,
Supplier<TableMetadata> fallback) {
LOGGER.debug(String.format("Loading cached metadata for %s", tableIdentifier));
Optional<TableMetadata> cachedMetadata =
loadCachedTableMetadata(tableIdentifier, callContext, entityManager, resolvedEntityView);
if (cachedMetadata.isPresent()) {
LOGGER.debug(String.format("Using cached metadata for %s", tableIdentifier));
return cachedMetadata.get();
} else {
TableMetadata metadata = fallback.get();
PolarisMetaStoreManager.EntityResult cacheResult =
cacheTableMetadata(
tableIdentifier, metadata, callContext, entityManager, resolvedEntityView);
if (!cacheResult.isSuccess()) {
LOGGER.debug(String.format("Failed to cache metadata for %s", tableIdentifier));
}
return metadata;
}
}

/**
* Attempt to add table metadata to the cache
*
* @return The result of trying to cache the metadata
*/
private static PolarisMetaStoreManager.EntityResult cacheTableMetadata(
TableIdentifier tableIdentifier,
TableMetadata metadata,
PolarisCallContext callContext,
PolarisEntityManager entityManager,
PolarisResolutionManifestCatalogView resolvedEntityView) {
PolarisResolvedPathWrapper resolvedEntities =
resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, PolarisEntitySubType.TABLE);
if (resolvedEntities == null) {
return new PolarisMetaStoreManager.EntityResult(
PolarisMetaStoreManager.ReturnStatus.ENTITY_NOT_FOUND, null);
} else {
TableLikeEntity tableEntity = TableLikeEntity.of(resolvedEntities.getRawLeafEntity());
TableMetadataEntity tableMetadataEntity =
new TableMetadataEntity.Builder()
.setCatalogId(tableEntity.getCatalogId())
.setParentId(tableEntity.getId())
.setId(entityManager.getMetaStoreManager().generateNewEntityId(callContext).getId())
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved
.setCreateTimestamp(System.currentTimeMillis())
.setMetadataLocation(metadata.metadataFileLocation())
.setContent(TableMetadataParser.toJson(metadata))
.build();
try {
return entityManager
.getMetaStoreManager()
.createEntityIfNotExists(
callContext,
PolarisEntity.toCoreList(resolvedEntities.getRawFullPath()),
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved
tableMetadataEntity);
} catch (RuntimeException e) {
// PersistenceException (& other extension-specific exceptions) may not be in scope,
// but we can make a best-effort attempt to swallow it and just forego caching
if (e.toString().contains("PersistenceException")) {
return new PolarisMetaStoreManager.EntityResult(
PolarisMetaStoreManager.ReturnStatus.UNEXPECTED_ERROR_SIGNALED, e.getMessage());
} else {
throw e;
}
}
}
}

/** Return the cached {@link Table} entity, if one exists */
private static @NotNull Optional<TableMetadata> loadCachedTableMetadata(
TableIdentifier tableIdentifier,
PolarisCallContext callContext,
PolarisEntityManager entityManager,
PolarisResolutionManifestCatalogView resolvedEntityView) {
PolarisResolvedPathWrapper resolvedEntities =
resolvedEntityView.getPassthroughResolvedPath(tableIdentifier, PolarisEntitySubType.TABLE);
if (resolvedEntities == null) {
return Optional.empty();
} else {
TableLikeEntity entity = TableLikeEntity.of(resolvedEntities.getRawLeafEntity());
String metadataLocation = entity.getMetadataLocation();
PolarisMetaStoreManager.ListEntitiesResult metadataResult =
entityManager
.getMetaStoreManager()
.listEntities(
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved
callContext,
PolarisEntity.toCoreList(resolvedEntities.getRawFullPath()),
PolarisEntityType.TABLE_METADATA,
PolarisEntitySubType.ANY_SUBTYPE);
return Optional.ofNullable(metadataResult.getEntities()).stream()
.flatMap(Collection::stream)
.flatMap(
result -> {
PolarisMetaStoreManager.EntityResult metadataEntityResult =
entityManager
.getMetaStoreManager()
.loadEntity(callContext, result.getCatalogId(), result.getId());
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved
return Optional.ofNullable(metadataEntityResult.getEntity())
.map(TableMetadataEntity::of)
.stream();
})
.filter(
metadata -> {
if (metadata.getMetadataLocation().equals(metadataLocation)) {
return true;
} else {
LOGGER.debug(
String.format("Deleting old entry for %s", metadata.getMetadataLocation()));
entityManager
.getMetaStoreManager()
.dropEntityIfExists(
eric-maynard marked this conversation as resolved.
Show resolved Hide resolved
callContext,
PolarisEntity.toCoreList(resolvedEntities.getRawFullPath()),
metadata,
null,
/* purge= */ false);
return false;
}
})
.findFirst()
.map(metadataEntity -> TableMetadataParser.fromJson(metadataEntity.getContent()));
}
}
}
Loading
Loading