diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
new file mode 100644
index 000000000..4d7d656ad
--- /dev/null
+++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.persistence;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityCore;
+import org.apache.polaris.core.entity.PolarisEntityId;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.entity.PolarisPrivilege;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wraps an existing impl of PolarisMetaStoreManager and delegates expected "read" operations
+ * through to the wrapped instance while throwing errors on unexpected operations or enqueuing
+ * expected write operations into a collection to be committed as a single atomic unit.
+ *
+ *
Note that as long as the server-side multi-commit transaction semantics are effectively only
+ * SERIALIZABLE isolation (i.e. if we can resolve all UpdateRequirements "statically" before the set
+ * of commits and translate these into an atomic collection of compare-and-swap operations to apply
+ * the transaction), this workspace should also reject readEntity/loadEntity operations to avoid
+ * implying that any reads from this transaction workspace include writes performed into this
+ * transaction workspace that haven't yet been committed.
+ *
+ *
Not thread-safe; instances should only be used within a single request context and should not
+ * be reused between requests.
+ */
+public class TransactionWorkspaceMetaStoreManager implements PolarisMetaStoreManager {
+ private final PolarisMetaStoreManager delegate;
+
+ // TODO: If we want to support the semantic of opening a transaction in which multiple
+ // reads and writes occur on the same entities, where the reads are expected to see the writes
+ // within the transaction workspace that haven't actually been committed, we can augment this
+ // class by allowing these pendingUpdates to represent the latest state of the entity if we
+ // also increment entityVersion. We'd need to store both a "latest view" of all updated entities
+ // to serve reads within the same transaction while also storing the ordered list of
+ // pendingUpdates that ultimately need to be applied in order within the real MetaStoreManager.
+ private final List pendingUpdates = new ArrayList<>();
+
+ public TransactionWorkspaceMetaStoreManager(PolarisMetaStoreManager delegate) {
+ this.delegate = delegate;
+ }
+
+ public List getPendingUpdates() {
+ return ImmutableList.copyOf(pendingUpdates);
+ }
+
+ @Override
+ public BaseResult bootstrapPolarisService(@NotNull PolarisCallContext callCtx) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "bootstrapPolarisService");
+ return null;
+ }
+
+ @Override
+ public BaseResult purge(@NotNull PolarisCallContext callCtx) {
+ callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "purge");
+ return null;
+ }
+
+ @Override
+ public PolarisMetaStoreManager.EntityResult readEntityByName(
+ @NotNull PolarisCallContext callCtx,
+ @Nullable List catalogPath,
+ @NotNull PolarisEntityType entityType,
+ @NotNull PolarisEntitySubType entitySubType,
+ @NotNull String name) {
+ callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "readEntityByName");
+ return null;
+ }
+
+ @Override
+ public ListEntitiesResult listEntities(
+ @NotNull PolarisCallContext callCtx,
+ @Nullable List catalogPath,
+ @NotNull PolarisEntityType entityType,
+ @NotNull PolarisEntitySubType entitySubType) {
+ callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "listEntities");
+ return null;
+ }
+
+ @Override
+ public GenerateEntityIdResult generateNewEntityId(@NotNull PolarisCallContext callCtx) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "generateNewEntityId");
+ return null;
+ }
+
+ @Override
+ public CreatePrincipalResult createPrincipal(
+ @NotNull PolarisCallContext callCtx, @NotNull PolarisBaseEntity principal) {
+ callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "createPrincipal");
+ return null;
+ }
+
+ @Override
+ public PrincipalSecretsResult loadPrincipalSecrets(
+ @NotNull PolarisCallContext callCtx, @NotNull String clientId) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "loadPrincipalSecrets");
+ return null;
+ }
+
+ @Override
+ public PrincipalSecretsResult rotatePrincipalSecrets(
+ @NotNull PolarisCallContext callCtx,
+ @NotNull String clientId,
+ long principalId,
+ @NotNull String mainSecret,
+ boolean reset) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "rotatePrincipalSecrets");
+ return null;
+ }
+
+ @Override
+ public CreateCatalogResult createCatalog(
+ @NotNull PolarisCallContext callCtx,
+ @NotNull PolarisBaseEntity catalog,
+ @NotNull List principalRoles) {
+ callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "createCatalog");
+ return null;
+ }
+
+ @Override
+ public EntityResult createEntityIfNotExists(
+ @NotNull PolarisCallContext callCtx,
+ @Nullable List catalogPath,
+ @NotNull PolarisBaseEntity entity) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "createEntityIfNotExists");
+ return null;
+ }
+
+ @Override
+ public EntitiesResult createEntitiesIfNotExist(
+ @NotNull PolarisCallContext callCtx,
+ @Nullable List catalogPath,
+ @NotNull List extends PolarisBaseEntity> entities) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "createEntitiesIfNotExist");
+ return null;
+ }
+
+ @Override
+ public EntityResult updateEntityPropertiesIfNotChanged(
+ @NotNull PolarisCallContext callCtx,
+ @Nullable List catalogPath,
+ @NotNull PolarisBaseEntity entity) {
+ pendingUpdates.add(new EntityWithPath(catalogPath, entity));
+ return new EntityResult(entity);
+ }
+
+ @Override
+ public EntitiesResult updateEntitiesPropertiesIfNotChanged(
+ @NotNull PolarisCallContext callCtx, @NotNull List entities) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "updateEntitiesPropertiesIfNotChanged");
+ return null;
+ }
+
+ @Override
+ public EntityResult renameEntity(
+ @NotNull PolarisCallContext callCtx,
+ @Nullable List catalogPath,
+ @NotNull PolarisEntityCore entityToRename,
+ @Nullable List newCatalogPath,
+ @NotNull PolarisEntity renamedEntity) {
+ callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "renameEntity");
+ return null;
+ }
+
+ @Override
+ public DropEntityResult dropEntityIfExists(
+ @NotNull PolarisCallContext callCtx,
+ @Nullable List catalogPath,
+ @NotNull PolarisEntityCore entityToDrop,
+ @Nullable Map cleanupProperties,
+ boolean cleanup) {
+ callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "dropEntityIfExists");
+ return null;
+ }
+
+ @Override
+ public PrivilegeResult grantUsageOnRoleToGrantee(
+ @NotNull PolarisCallContext callCtx,
+ @Nullable PolarisEntityCore catalog,
+ @NotNull PolarisEntityCore role,
+ @NotNull PolarisEntityCore grantee) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "grantUsageOnRoleToGrantee");
+ return null;
+ }
+
+ @Override
+ public PrivilegeResult revokeUsageOnRoleFromGrantee(
+ @NotNull PolarisCallContext callCtx,
+ @Nullable PolarisEntityCore catalog,
+ @NotNull PolarisEntityCore role,
+ @NotNull PolarisEntityCore grantee) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "revokeUsageOnRoleFromGrantee");
+ return null;
+ }
+
+ @Override
+ public PrivilegeResult grantPrivilegeOnSecurableToRole(
+ @NotNull PolarisCallContext callCtx,
+ @NotNull PolarisEntityCore grantee,
+ @Nullable List catalogPath,
+ @NotNull PolarisEntityCore securable,
+ @NotNull PolarisPrivilege privilege) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "grantPrivilegeOnSecurableToRole");
+ return null;
+ }
+
+ @Override
+ public PrivilegeResult revokePrivilegeOnSecurableFromRole(
+ @NotNull PolarisCallContext callCtx,
+ @NotNull PolarisEntityCore grantee,
+ @Nullable List catalogPath,
+ @NotNull PolarisEntityCore securable,
+ @NotNull PolarisPrivilege privilege) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "revokePrivilegeOnSecurableFromRole");
+ return null;
+ }
+
+ @Override
+ public LoadGrantsResult loadGrantsOnSecurable(
+ @NotNull PolarisCallContext callCtx, long securableCatalogId, long securableId) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "loadGrantsOnSecurable");
+ return null;
+ }
+
+ @Override
+ public LoadGrantsResult loadGrantsToGrantee(
+ PolarisCallContext callCtx, long granteeCatalogId, long granteeId) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "loadGrantsToGrantee");
+ return null;
+ }
+
+ @Override
+ public ChangeTrackingResult loadEntitiesChangeTracking(
+ @NotNull PolarisCallContext callCtx, @NotNull List entityIds) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "loadEntitiesChangeTracking");
+ return null;
+ }
+
+ @Override
+ public EntityResult loadEntity(
+ @NotNull PolarisCallContext callCtx, long entityCatalogId, long entityId) {
+ callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "loadEntity");
+ return null;
+ }
+
+ @Override
+ public EntitiesResult loadTasks(
+ @NotNull PolarisCallContext callCtx, String executorId, int limit) {
+ callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "loadTasks");
+ return null;
+ }
+
+ @Override
+ public ScopedCredentialsResult getSubscopedCredsForEntity(
+ @NotNull PolarisCallContext callCtx,
+ long catalogId,
+ long entityId,
+ boolean allowListOperation,
+ @NotNull Set allowedReadLocations,
+ @NotNull Set allowedWriteLocations) {
+ return delegate.getSubscopedCredsForEntity(
+ callCtx,
+ catalogId,
+ entityId,
+ allowListOperation,
+ allowedReadLocations,
+ allowedWriteLocations);
+ }
+
+ @Override
+ public ValidateAccessResult validateAccessToLocations(
+ @NotNull PolarisCallContext callCtx,
+ long catalogId,
+ long entityId,
+ @NotNull Set actions,
+ @NotNull Set locations) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "validateAccessToLocations");
+ return null;
+ }
+
+ @Override
+ public PolarisMetaStoreManager.CachedEntryResult loadCachedEntryById(
+ @NotNull PolarisCallContext callCtx, long entityCatalogId, long entityId) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "loadCachedEntryById");
+ return null;
+ }
+
+ @Override
+ public PolarisMetaStoreManager.CachedEntryResult loadCachedEntryByName(
+ @NotNull PolarisCallContext callCtx,
+ long entityCatalogId,
+ long parentId,
+ @NotNull PolarisEntityType entityType,
+ @NotNull String entityName) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "loadCachedEntryByName");
+ return null;
+ }
+
+ @Override
+ public PolarisMetaStoreManager.CachedEntryResult refreshCachedEntity(
+ @NotNull PolarisCallContext callCtx,
+ int entityVersion,
+ int entityGrantRecordsVersion,
+ @NotNull PolarisEntityType entityType,
+ long entityCatalogId,
+ long entityId) {
+ callCtx
+ .getDiagServices()
+ .fail("illegal_method_in_transaction_workspace", "refreshCachedEntity");
+ return null;
+ }
+}
diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
index 5bd51be36..0df8717ff 100644
--- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
+++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
@@ -171,6 +171,7 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog
private Map catalogProperties;
private Map tableDefaultProperties;
private final FileIOFactory fileIOFactory;
+ private PolarisMetaStoreManager metaStoreManager;
/**
* @param entityManager provides handle to underlying PolarisMetaStoreManager with which to
@@ -197,6 +198,7 @@ public BasePolarisCatalog(
this.catalogId = catalogEntity.getId();
this.catalogName = catalogEntity.getName();
this.fileIOFactory = fileIOFactory;
+ this.metaStoreManager = entityManager.getMetaStoreManager();
}
@Override
@@ -275,6 +277,10 @@ public void initialize(String name, Map properties) {
}
}
+ public void setMetaStoreManager(PolarisMetaStoreManager newMetaStoreManager) {
+ this.metaStoreManager = newMetaStoreManager;
+ }
+
@Override
protected Map properties() {
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
@@ -489,11 +495,7 @@ private void createNamespaceInternal(
NamespaceEntity entity =
new NamespaceEntity.Builder(namespace)
.setCatalogId(getCatalogId())
- .setId(
- entityManager
- .getMetaStoreManager()
- .generateNewEntityId(getCurrentPolarisContext())
- .getId())
+ .setId(getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
.setParentId(resolvedParent.getRawLeafEntity().getId())
.setProperties(metadata)
.setCreateTimestamp(System.currentTimeMillis())
@@ -513,8 +515,7 @@ private void createNamespaceInternal(
}
PolarisEntity returnedEntity =
PolarisEntity.of(
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.createEntityIfNotExists(
getCurrentPolarisContext(),
PolarisEntity.toCoreList(resolvedParent.getRawFullPath()),
@@ -610,8 +611,7 @@ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyExcept
// drop if exists and is empty
PolarisCallContext polarisCallContext = callContext.getPolarisCallContext();
PolarisMetaStoreManager.DropEntityResult dropEntityResult =
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.dropEntityIfExists(
getCurrentPolarisContext(),
PolarisEntity.toCoreList(catalogPath),
@@ -663,8 +663,7 @@ public boolean setProperties(Namespace namespace, Map properties
List parentPath = resolvedEntities.getRawFullPath();
PolarisEntity returnedEntity =
Optional.ofNullable(
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.updateEntityPropertiesIfNotChanged(
getCurrentPolarisContext(),
PolarisEntity.toCoreList(parentPath),
@@ -696,8 +695,7 @@ public boolean removeProperties(Namespace namespace, Set properties)
List parentPath = resolvedEntities.getRawFullPath();
PolarisEntity returnedEntity =
Optional.ofNullable(
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.updateEntityPropertiesIfNotChanged(
getCurrentPolarisContext(),
PolarisEntity.toCoreList(parentPath),
@@ -743,8 +741,7 @@ public List listNamespaces(Namespace namespace) throws NoSuchNamespac
List catalogPath = resolvedEntities.getRawFullPath();
List entities =
PolarisEntity.toNameAndIdList(
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.listEntities(
getCurrentPolarisContext(),
PolarisEntity.toCoreList(catalogPath),
@@ -885,7 +882,7 @@ private Map refreshCredentials(
entityManager
.getCredentialCache()
.getOrGenerateSubScopeCreds(
- entityManager.getMetaStoreManager(),
+ getMetaStoreManager(),
callContext.getPolarisCallContext(),
entity,
allowList,
@@ -977,7 +974,7 @@ private void validateLocationsForTableLike(
// implementation, then the validation should go through that API instead as follows:
//
// PolarisMetaStoreManager.ValidateAccessResult validateResult =
- // entityManager.getMetaStoreManager().validateAccessToLocations(
+ // getMetaStoreManager().validateAccessToLocations(
// getCurrentPolarisContext(),
// storageInfoHolderEntity.getCatalogId(),
// storageInfoHolderEntity.getId(),
@@ -1036,8 +1033,7 @@ private void validateNoLocationOverlap(
private void validateNoLocationOverlap(
String location, List parentPath, String name) {
PolarisMetaStoreManager.ListEntitiesResult siblingNamespacesResult =
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.listEntities(
callContext.getPolarisCallContext(),
parentPath.stream().map(PolarisEntity::toCore).collect(Collectors.toList()),
@@ -1060,8 +1056,7 @@ private void validateNoLocationOverlap(
.map(
ns -> {
PolarisMetaStoreManager.ListEntitiesResult siblingTablesResult =
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.listEntities(
callContext.getPolarisCallContext(),
parentPath.stream()
@@ -1335,10 +1330,7 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
.setSubType(PolarisEntitySubType.TABLE)
.setBaseLocation(metadata.location())
.setId(
- entityManager
- .getMetaStoreManager()
- .generateNewEntityId(getCurrentPolarisContext())
- .getId())
+ getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
.build();
} else {
existingLocation = entity.getMetadataLocation();
@@ -1539,10 +1531,7 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) {
.setCatalogId(getCatalogId())
.setSubType(PolarisEntitySubType.VIEW)
.setId(
- entityManager
- .getMetaStoreManager()
- .generateNewEntityId(getCurrentPolarisContext())
- .getId())
+ getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
.build();
} else {
existingLocation = entity.getMetadataLocation();
@@ -1610,6 +1599,10 @@ private PolarisCallContext getCurrentPolarisContext() {
return callContext.getPolarisCallContext();
}
+ private PolarisMetaStoreManager getMetaStoreManager() {
+ return metaStoreManager;
+ }
+
@VisibleForTesting
long getCatalogId() {
// TODO: Properly handle initialization
@@ -1660,8 +1653,7 @@ private void renameTableLike(
// rename the entity now
PolarisMetaStoreManager.EntityResult returnedEntityResult =
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.renameEntity(
getCurrentPolarisContext(),
PolarisEntity.toCoreList(catalogPath),
@@ -1724,8 +1716,7 @@ private void renameTableLike(
.addKeyValue("toEntity.getTableIdentifier()", toEntity.getTableIdentifier())
.addKeyValue("returnedEntity.getTableIdentifier()", returnedEntity.getTableIdentifier())
.log("Returned entity identifier doesn't match toEntity identifier");
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.updateEntityPropertiesIfNotChanged(
getCurrentPolarisContext(),
PolarisEntity.toCoreList(newCatalogPath),
@@ -1771,8 +1762,7 @@ private void createTableLike(
PolarisEntity returnedEntity =
PolarisEntity.of(
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.createEntityIfNotExists(
getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity));
LOGGER.debug("Created TableLike entity {} with TableIdentifier {}", entity, identifier);
@@ -1801,8 +1791,7 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {
List catalogPath = resolvedEntities.getRawParentPath();
PolarisEntity returnedEntity =
Optional.ofNullable(
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.updateEntityPropertiesIfNotChanged(
getCurrentPolarisContext(), PolarisEntity.toCoreList(catalogPath), entity)
.getEntity())
@@ -1851,8 +1840,7 @@ private void updateTableLike(TableIdentifier identifier, PolarisEntity entity) {
}
}
- return entityManager
- .getMetaStoreManager()
+ return getMetaStoreManager()
.dropEntityIfExists(
getCurrentPolarisContext(),
PolarisEntity.toCoreList(catalogPath),
@@ -1895,10 +1883,7 @@ private boolean sendNotificationForTableLike(
.setCatalogId(getCatalogId())
.setSubType(PolarisEntitySubType.TABLE)
.setId(
- entityManager
- .getMetaStoreManager()
- .generateNewEntityId(getCurrentPolarisContext())
- .getId())
+ getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
.setLastNotificationTimestamp(request.getPayload().getTimestamp())
.build();
} else {
@@ -1984,8 +1969,7 @@ private List listTableLike(PolarisEntitySubType subType, Namesp
List catalogPath = resolvedEntities.getRawFullPath();
List entities =
PolarisEntity.toNameAndIdList(
- entityManager
- .getMetaStoreManager()
+ getMetaStoreManager()
.listEntities(
getCurrentPolarisContext(),
PolarisEntity.toCoreList(catalogPath),
diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
index 53cc053b8..5488f820a 100644
--- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
+++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
@@ -27,11 +27,8 @@
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.util.EnumSet;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
-import org.apache.iceberg.UpdateRequirement;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -47,7 +44,6 @@
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
-import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
import org.apache.polaris.core.auth.PolarisAuthorizer;
@@ -308,7 +304,7 @@ public Response updateTable(
Namespace ns = decodeNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of(ns, RESTUtil.decodeString(table));
- if (isCreate(commitTableRequest)) {
+ if (PolarisCatalogHandlerWrapper.isCreate(commitTableRequest)) {
return Response.ok(
newHandlerWrapper(securityContext, prefix)
.updateTableForStagedCreate(tableIdentifier, commitTableRequest))
@@ -321,27 +317,6 @@ public Response updateTable(
}
}
- /**
- * TODO: Make the helper in org.apache.iceberg.rest.CatalogHandlers public instead of needing to
- * copy/pastehere.
- */
- private static boolean isCreate(UpdateTableRequest request) {
- boolean isCreate =
- request.requirements().stream()
- .anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance);
-
- if (isCreate) {
- List invalidRequirements =
- request.requirements().stream()
- .filter(req -> !(req instanceof UpdateRequirement.AssertTableDoesNotExist))
- .collect(Collectors.toList());
- Preconditions.checkArgument(
- invalidRequirements.isEmpty(), "Invalid create requirements: %s", invalidRequirements);
- }
-
- return isCreate;
- }
-
@Override
public Response createView(
String prefix,
diff --git a/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java b/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
index 78e3935d8..29aa328c7 100644
--- a/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
+++ b/polaris-service/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
@@ -18,6 +18,7 @@
*/
package org.apache.polaris.service.catalog;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.Closeable;
import java.io.IOException;
@@ -31,6 +32,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.MetadataUpdate;
@@ -38,8 +40,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.Transaction;
-import org.apache.iceberg.Transactions;
+import org.apache.iceberg.TableOperations;
import org.apache.iceberg.UpdateRequirement;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
@@ -48,6 +49,7 @@
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.BadRequestException;
+import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
@@ -68,6 +70,7 @@
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.LoadViewResponse;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
+import org.apache.polaris.core.PolarisConfiguration;
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
import org.apache.polaris.core.auth.PolarisAuthorizer;
@@ -77,7 +80,9 @@
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.TransactionWorkspaceMetaStoreManager;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.core.persistence.resolver.ResolverPath;
import org.apache.polaris.core.persistence.resolver.ResolverStatus;
@@ -136,6 +141,27 @@ public PolarisCatalogHandlerWrapper(
this.catalogFactory = catalogFactory;
}
+ /**
+ * TODO: Make the helper in org.apache.iceberg.rest.CatalogHandlers public instead of needing to
+ * copy/paste here.
+ */
+ public static boolean isCreate(UpdateTableRequest request) {
+ boolean isCreate =
+ request.requirements().stream()
+ .anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance);
+
+ if (isCreate) {
+ List invalidRequirements =
+ request.requirements().stream()
+ .filter(req -> !(req instanceof UpdateRequirement.AssertTableDoesNotExist))
+ .collect(Collectors.toList());
+ Preconditions.checkArgument(
+ invalidRequirements.isEmpty(), "Invalid create requirements: %s", invalidRequirements);
+ }
+
+ return isCreate;
+ }
+
private void initializeCatalog() {
this.baseCatalog =
catalogFactory.createCallContextCatalog(
@@ -954,25 +980,91 @@ public void commitTransaction(CommitTransactionRequest commitTransactionRequest)
throw new BadRequestException("Cannot update table on external catalogs.");
}
- // TODO: Implement this properly
- List transactions =
- commitTransactionRequest.tableChanges().stream()
- .map(
- change -> {
- Table table = baseCatalog.loadTable(change.identifier());
- if (!(table instanceof BaseTable)) {
- throw new IllegalStateException(
- "Cannot wrap catalog that does not produce BaseTable");
- }
- Transaction transaction =
- Transactions.newTransaction(
- change.identifier().toString(), ((BaseTable) table).operations());
- CatalogHandlers.updateTable(baseCatalog, change.identifier(), change);
- return transaction;
- })
- .toList();
+ if (!(baseCatalog instanceof BasePolarisCatalog)) {
+ throw new BadRequestException(
+ "Unsupported operation: commitTransaction with baseCatalog type: %s",
+ baseCatalog.getClass().getName());
+ }
- transactions.forEach(Transaction::commitTransaction);
+ // Swap in TransactionWorkspaceMetaStoreManager for all mutations made by this baseCatalog to
+ // only go into an in-memory collection that we can commit as a single atomic unit after all
+ // validations.
+ TransactionWorkspaceMetaStoreManager transactionMetaStoreManager =
+ new TransactionWorkspaceMetaStoreManager(entityManager.getMetaStoreManager());
+ ((BasePolarisCatalog) baseCatalog).setMetaStoreManager(transactionMetaStoreManager);
+
+ commitTransactionRequest.tableChanges().stream()
+ .forEach(
+ change -> {
+ Table table = baseCatalog.loadTable(change.identifier());
+ if (!(table instanceof BaseTable)) {
+ throw new IllegalStateException(
+ "Cannot wrap catalog that does not produce BaseTable");
+ }
+ if (isCreate(change)) {
+ throw new BadRequestException(
+ "Unsupported operation: commitTranaction with updateForStagedCreate: %s",
+ change);
+ }
+
+ TableOperations tableOps = ((BaseTable) table).operations();
+ TableMetadata currentMetadata = tableOps.current();
+
+ // Validate requirements; any CommitFailedExceptions will fail the overall request
+ change.requirements().forEach(requirement -> requirement.validate(currentMetadata));
+
+ // Apply changes
+ TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(currentMetadata);
+ change.updates().stream()
+ .forEach(
+ singleUpdate -> {
+ // Note: If location-overlap checking is refactored to be atomic, we could
+ // support validation within a single multi-table transaction as well, but
+ // will need to update the TransactionWorkspaceMetaStoreManager to better
+ // expose the concept of being able to read uncommitted updates.
+ if (singleUpdate instanceof MetadataUpdate.SetLocation) {
+ if (!currentMetadata
+ .location()
+ .equals(((MetadataUpdate.SetLocation) singleUpdate).location())
+ && !callContext
+ .getPolarisCallContext()
+ .getConfigurationStore()
+ .getConfiguration(
+ callContext.getPolarisCallContext(),
+ PolarisConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) {
+ throw new BadRequestException(
+ "Unsupported operation: commitTransaction containing SetLocation"
+ + " for table '%s' and new location '%s'",
+ change.identifier(),
+ ((MetadataUpdate.SetLocation) singleUpdate).location());
+ }
+ }
+
+ // Apply updates to builder
+ singleUpdate.applyTo(metadataBuilder);
+ });
+
+ // Commit into transaction workspace we swapped the baseCatalog to use
+ TableMetadata updatedMetadata = metadataBuilder.build();
+ if (!updatedMetadata.changes().isEmpty()) {
+ tableOps.commit(currentMetadata, updatedMetadata);
+ }
+ });
+
+ // Commit the collected updates in a single atomic operation
+ List pendingUpdates =
+ transactionMetaStoreManager.getPendingUpdates();
+ PolarisMetaStoreManager.EntitiesResult result =
+ entityManager
+ .getMetaStoreManager()
+ .updateEntitiesPropertiesIfNotChanged(
+ callContext.getPolarisCallContext(), pendingUpdates);
+ if (!result.isSuccess()) {
+ // TODO: Retries and server-side cleanup on failure
+ throw new CommitFailedException(
+ "Transaction commit failed with status: %s, extraInfo: %s",
+ result.getReturnStatus(), result.getExtraInformation());
+ }
}
public ListTablesResponse listViews(Namespace namespace) {
diff --git a/polaris-service/src/test/java/org/apache/polaris/service/catalog/PolarisRestCatalogIntegrationTest.java b/polaris-service/src/test/java/org/apache/polaris/service/catalog/PolarisRestCatalogIntegrationTest.java
index 5c858ccf9..7b937fdce 100644
--- a/polaris-service/src/test/java/org/apache/polaris/service/catalog/PolarisRestCatalogIntegrationTest.java
+++ b/polaris-service/src/test/java/org/apache/polaris/service/catalog/PolarisRestCatalogIntegrationTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.polaris.service.catalog;
-import static org.apache.iceberg.types.Types.NestedField.required;
import static org.apache.polaris.service.context.DefaultContextResolver.REALM_PROPERTY_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -37,14 +36,22 @@
import java.util.Optional;
import java.util.UUID;
import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.BaseTransaction;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdatePartitionSpec;
+import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.auth.OAuth2Properties;
@@ -116,7 +123,6 @@ public class PolarisRestCatalogIntegrationTest extends CatalogTests
ConfigOverride.config(
"server.adminConnectors[0].port", "0")); // Bind to random port to support parallelism
- protected static final Schema SCHEMA = new Schema(required(4, "data", Types.StringType.get()));
protected static final String VIEW_QUERY = "select * from ns1.layer1_table";
private RESTCatalog restCatalog;
@@ -769,4 +775,191 @@ public void testSendNotificationInternalCatalog() {
.returns("Cannot update internal catalog via notifications", ErrorResponse::message);
}
}
+
+ // Test copied from iceberg/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+ // TODO: If TestRESTCatalog can be refactored to be more usable as a shared base test class,
+ // just inherit these test cases from that instead of copying them here.
+ @Test
+ public void diffAgainstSingleTable() {
+ Namespace namespace = Namespace.of("namespace");
+ TableIdentifier identifier = TableIdentifier.of(namespace, "multipleDiffsAgainstSingleTable");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(namespace);
+ }
+
+ Table table = catalog().buildTable(identifier, SCHEMA).create();
+ Transaction transaction = table.newTransaction();
+
+ UpdateSchema updateSchema =
+ transaction.updateSchema().addColumn("new_col", Types.LongType.get());
+ Schema expectedSchema = updateSchema.apply();
+ updateSchema.commit();
+
+ UpdatePartitionSpec updateSpec =
+ transaction.updateSpec().addField("shard", Expressions.bucket("id", 16));
+ PartitionSpec expectedSpec = updateSpec.apply();
+ updateSpec.commit();
+
+ TableCommit tableCommit =
+ TableCommit.create(
+ identifier,
+ ((BaseTransaction) transaction).startMetadata(),
+ ((BaseTransaction) transaction).currentMetadata());
+
+ restCatalog.commitTransaction(tableCommit);
+
+ Table loaded = catalog().loadTable(identifier);
+ assertThat(loaded.schema().asStruct()).isEqualTo(expectedSchema.asStruct());
+ assertThat(loaded.spec().fields()).isEqualTo(expectedSpec.fields());
+ }
+
+ // Test copied from iceberg/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+ // TODO: If TestRESTCatalog can be refactored to be more usable as a shared base test class,
+ // just inherit these test cases from that instead of copying them here.
+ @Test
+ public void multipleDiffsAgainstMultipleTables() {
+ Namespace namespace = Namespace.of("multiDiffNamespace");
+ TableIdentifier identifier1 = TableIdentifier.of(namespace, "multiDiffTable1");
+ TableIdentifier identifier2 = TableIdentifier.of(namespace, "multiDiffTable2");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(namespace);
+ }
+
+ Table table1 = catalog().buildTable(identifier1, SCHEMA).create();
+ Table table2 = catalog().buildTable(identifier2, SCHEMA).create();
+ Transaction t1Transaction = table1.newTransaction();
+ Transaction t2Transaction = table2.newTransaction();
+
+ UpdateSchema updateSchema =
+ t1Transaction.updateSchema().addColumn("new_col", Types.LongType.get());
+ Schema expectedSchema = updateSchema.apply();
+ updateSchema.commit();
+
+ UpdateSchema updateSchema2 =
+ t2Transaction.updateSchema().addColumn("new_col2", Types.LongType.get());
+ Schema expectedSchema2 = updateSchema2.apply();
+ updateSchema2.commit();
+
+ TableCommit tableCommit1 =
+ TableCommit.create(
+ identifier1,
+ ((BaseTransaction) t1Transaction).startMetadata(),
+ ((BaseTransaction) t1Transaction).currentMetadata());
+
+ TableCommit tableCommit2 =
+ TableCommit.create(
+ identifier2,
+ ((BaseTransaction) t2Transaction).startMetadata(),
+ ((BaseTransaction) t2Transaction).currentMetadata());
+
+ restCatalog.commitTransaction(tableCommit1, tableCommit2);
+
+ assertThat(catalog().loadTable(identifier1).schema().asStruct())
+ .isEqualTo(expectedSchema.asStruct());
+
+ assertThat(catalog().loadTable(identifier2).schema().asStruct())
+ .isEqualTo(expectedSchema2.asStruct());
+ }
+
+ // Test copied from iceberg/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
+ // TODO: If TestRESTCatalog can be refactored to be more usable as a shared base test class,
+ // just inherit these test cases from that instead of copying them here.
+ @Test
+ public void multipleDiffsAgainstMultipleTablesLastFails() {
+ Namespace namespace = Namespace.of("multiDiffNamespace");
+ TableIdentifier identifier1 = TableIdentifier.of(namespace, "multiDiffTable1");
+ TableIdentifier identifier2 = TableIdentifier.of(namespace, "multiDiffTable2");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(namespace);
+ }
+
+ catalog().createTable(identifier1, SCHEMA);
+ catalog().createTable(identifier2, SCHEMA);
+
+ Table table1 = catalog().loadTable(identifier1);
+ Table table2 = catalog().loadTable(identifier2);
+ Schema originalSchemaOne = table1.schema();
+
+ Transaction t1Transaction = catalog().loadTable(identifier1).newTransaction();
+ t1Transaction.updateSchema().addColumn("new_col1", Types.LongType.get()).commit();
+
+ Transaction t2Transaction = catalog().loadTable(identifier2).newTransaction();
+ t2Transaction.updateSchema().renameColumn("data", "new-column").commit();
+
+ // delete the colum that is being renamed in the above TX to cause a conflict
+ table2.updateSchema().deleteColumn("data").commit();
+ Schema updatedSchemaTwo = table2.schema();
+
+ TableCommit tableCommit1 =
+ TableCommit.create(
+ identifier1,
+ ((BaseTransaction) t1Transaction).startMetadata(),
+ ((BaseTransaction) t1Transaction).currentMetadata());
+
+ TableCommit tableCommit2 =
+ TableCommit.create(
+ identifier2,
+ ((BaseTransaction) t2Transaction).startMetadata(),
+ ((BaseTransaction) t2Transaction).currentMetadata());
+
+ assertThatThrownBy(() -> restCatalog.commitTransaction(tableCommit1, tableCommit2))
+ .isInstanceOf(CommitFailedException.class)
+ .hasMessageContaining("Requirement failed: current schema changed: expected id 0 != 1");
+
+ Schema schema1 = catalog().loadTable(identifier1).schema();
+ assertThat(schema1.asStruct()).isEqualTo(originalSchemaOne.asStruct());
+
+ Schema schema2 = catalog().loadTable(identifier2).schema();
+ assertThat(schema2.asStruct()).isEqualTo(updatedSchemaTwo.asStruct());
+ assertThat(schema2.findField("data")).isNull();
+ assertThat(schema2.findField("new-column")).isNull();
+ assertThat(schema2.columns()).hasSize(1);
+ }
+
+ @Test
+ public void testMultipleConflictingCommitsToSingleTableInTransaction() {
+ Namespace namespace = Namespace.of("ns1");
+ TableIdentifier identifier =
+ TableIdentifier.of(namespace, "multipleConflictingCommitsAgainstSingleTable");
+
+ if (requiresNamespaceCreate()) {
+ catalog().createNamespace(namespace);
+ }
+
+ // Start two independent transactions on the same base table.
+ Table table = catalog().buildTable(identifier, SCHEMA).create();
+ Schema originalSchema = catalog().loadTable(identifier).schema();
+ Transaction transaction1 = table.newTransaction();
+ Transaction transaction2 = table.newTransaction();
+
+ transaction1.updateSchema().renameColumn("data", "new-column1").commit();
+ transaction2.updateSchema().renameColumn("data", "new-column2").commit();
+
+ TableCommit tableCommit1 =
+ TableCommit.create(
+ identifier,
+ ((BaseTransaction) transaction1).startMetadata(),
+ ((BaseTransaction) transaction1).currentMetadata());
+ TableCommit tableCommit2 =
+ TableCommit.create(
+ identifier,
+ ((BaseTransaction) transaction2).startMetadata(),
+ ((BaseTransaction) transaction2).currentMetadata());
+
+ // "Initial" commit requirements will succeed for both commits being based on the original
+ // table but should fail the entire transaction on the second commit.
+ assertThatThrownBy(() -> restCatalog.commitTransaction(tableCommit1, tableCommit2))
+ .isInstanceOf(CommitFailedException.class);
+
+ // If an implementation validates all UpdateRequirements up-front, then it might pass
+ // tests where the UpdateRequirement fails up-front without being atomic. Here we can
+ // catch such scenarios where update requirements appear to be fine up-front but will
+ // fail when trying to commit the second update, and verify that nothing was actually
+ // committed in the end.
+ Schema latestCommittedSchema = catalog().loadTable(identifier).schema();
+ assertThat(latestCommittedSchema.asStruct()).isEqualTo(originalSchema.asStruct());
+ }
}