Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement atomic multi-table transactions #238

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,372 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.core.persistence;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntityCore;
import org.apache.polaris.core.entity.PolarisEntityId;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisPrivilege;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* Wraps an existing impl of PolarisMetaStoreManager and delegates expected "read" operations
* through to the wrapped instance while throwing errors on unexpected operations or enqueuing
* expected write operations into a collection to be committed as a single atomic unit.
*
* <p>Note that as long as the server-side multi-commit transaction semantics are effectively only
* SERIALIZABLE isolation (i.e. if we can resolve all UpdateRequirements "statically" before the set
* of commits and translate these into an atomic collection of compare-and-swap operations to apply
* the transaction), this workspace should also reject readEntity/loadEntity operations to avoid
* implying that any reads from this transaction workspace include writes performed into this
* transaction workspace that haven't yet been committed.
*
* <p>Not thread-safe; instances should only be used within a single request context and should not
* be reused between requests.
*/
public class TransactionWorkspaceMetaStoreManager implements PolarisMetaStoreManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like an awfully restrictive way to support transactional writes. I get the reasoning and the chances someone might expect reads to reflect the uncommitted writes is high. On the other hand, this seems extremely inflexible and I have a hard time seeing how this would allow future work that might need to do lookups after the initial entity resolution.

On the other hand, we don't really use the Transaction interface, which I think could be used to hold the entity updates prior to commit. It might be possible to move the entity persistence into the Transaction interface and support a multi-layer transaction so that a global transaction applies all of the underlying transaction entity updates atomically, whereas a single-layer transcation can just apply immediately on commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I was actually adding this specifically to be the more flexible option vs injecting only an updates-collector. Right now this may seem rigid by not supporting reads, but the idea would be that the transaction workspace represents the mutated state within the transaction, so if reads need to happen in the workspace in the future, we can easily intercept those reads and return the state after the prior uncommitted writes in the same transaction or else read-through to the real persistence layer or require reading a statically resolved entity state from the beginning of the transaction (if we're sticking to SNAPSHOT isolation semantics); subsequent writes would be able to still condition on the entityVersion from those reads.

It seems the current Transaction interface in Iceberg is somewhat specific to single-table transactions and is geared towards packing different table-update types into the common interface, whereas for our server-side commitTransaction we already receive nicely packed TableUpdates that we don't need to manually multiplex back out into the different update types.

By putting our transaction container here in the PolarisMetaStoreManager layer, we don't need to care about whether it's an Iceberg table transaction or something non-Iceberg entirely (e.g. a multi-PrincipalRole transaction that would atomically update multiple PrincipalRoles). Anything which knows how to write to a PolarisMetaStoreManager doesn't need to be aware of transactions happening at all -- a BEGIN TRANSACTION just means we inject one of these TransactionWorkspaceMetaStoreManager instances as the impl, the core logic happily performs its mutations into it as needed, and then the outer layer gets to commit all the queued updates atomically.

Granted, to get to that point we need some additional features in here regarding the tracking and overriding of reads of entities that have been modified in the same transaction, and some refactoring of the PolarisEntityManager/PolarisResolutionManifest, but then we can add options in here to configure the desired isolation semantics of the reads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. This makes a ton of sense - I hadn't really been thinking about non-Iceberg entities, so supporting updates for Principals, etc. makes sense. I do like the premise that we can update this tx workspace to support uncommitted reads later on (maybe worth including as a comment in the code?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment with a rough sketch of how this class would evolve to support more advanced transaction scenarios.

private final PolarisMetaStoreManager delegate;

// TODO: If we want to support the semantic of opening a transaction in which multiple
// reads and writes occur on the same entities, where the reads are expected to see the writes
// within the transaction workspace that haven't actually been committed, we can augment this
// class by allowing these pendingUpdates to represent the latest state of the entity if we
// also increment entityVersion. We'd need to store both a "latest view" of all updated entities
// to serve reads within the same transaction while also storing the ordered list of
// pendingUpdates that ultimately need to be applied in order within the real MetaStoreManager.
private final List<EntityWithPath> pendingUpdates = new ArrayList<>();

public TransactionWorkspaceMetaStoreManager(PolarisMetaStoreManager delegate) {
this.delegate = delegate;
}

public List<EntityWithPath> getPendingUpdates() {
return ImmutableList.copyOf(pendingUpdates);
}

@Override
public BaseResult bootstrapPolarisService(@NotNull PolarisCallContext callCtx) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "bootstrapPolarisService");
return null;
}

@Override
public BaseResult purge(@NotNull PolarisCallContext callCtx) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "purge");
return null;
}

@Override
public PolarisMetaStoreManager.EntityResult readEntityByName(
@NotNull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@NotNull PolarisEntityType entityType,
@NotNull PolarisEntitySubType entitySubType,
@NotNull String name) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "readEntityByName");
return null;
}

@Override
public ListEntitiesResult listEntities(
@NotNull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@NotNull PolarisEntityType entityType,
@NotNull PolarisEntitySubType entitySubType) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "listEntities");
return null;
}

@Override
public GenerateEntityIdResult generateNewEntityId(@NotNull PolarisCallContext callCtx) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "generateNewEntityId");
return null;
}

@Override
public CreatePrincipalResult createPrincipal(
@NotNull PolarisCallContext callCtx, @NotNull PolarisBaseEntity principal) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "createPrincipal");
return null;
}

@Override
public PrincipalSecretsResult loadPrincipalSecrets(
@NotNull PolarisCallContext callCtx, @NotNull String clientId) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "loadPrincipalSecrets");
return null;
}

@Override
public PrincipalSecretsResult rotatePrincipalSecrets(
@NotNull PolarisCallContext callCtx,
@NotNull String clientId,
long principalId,
@NotNull String mainSecret,
boolean reset) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "rotatePrincipalSecrets");
return null;
}

@Override
public CreateCatalogResult createCatalog(
@NotNull PolarisCallContext callCtx,
@NotNull PolarisBaseEntity catalog,
@NotNull List<PolarisEntityCore> principalRoles) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "createCatalog");
return null;
}

@Override
public EntityResult createEntityIfNotExists(
@NotNull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@NotNull PolarisBaseEntity entity) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "createEntityIfNotExists");
return null;
}

@Override
public EntitiesResult createEntitiesIfNotExist(
@NotNull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@NotNull List<? extends PolarisBaseEntity> entities) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "createEntitiesIfNotExist");
return null;
}

@Override
public EntityResult updateEntityPropertiesIfNotChanged(
@NotNull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@NotNull PolarisBaseEntity entity) {
pendingUpdates.add(new EntityWithPath(catalogPath, entity));
return new EntityResult(entity);
}

@Override
public EntitiesResult updateEntitiesPropertiesIfNotChanged(
@NotNull PolarisCallContext callCtx, @NotNull List<EntityWithPath> entities) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "updateEntitiesPropertiesIfNotChanged");
return null;
}

@Override
public EntityResult renameEntity(
@NotNull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@NotNull PolarisEntityCore entityToRename,
@Nullable List<PolarisEntityCore> newCatalogPath,
@NotNull PolarisEntity renamedEntity) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "renameEntity");
return null;
}

@Override
public DropEntityResult dropEntityIfExists(
@NotNull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@NotNull PolarisEntityCore entityToDrop,
@Nullable Map<String, String> cleanupProperties,
boolean cleanup) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "dropEntityIfExists");
return null;
}

@Override
public PrivilegeResult grantUsageOnRoleToGrantee(
@NotNull PolarisCallContext callCtx,
@Nullable PolarisEntityCore catalog,
@NotNull PolarisEntityCore role,
@NotNull PolarisEntityCore grantee) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "grantUsageOnRoleToGrantee");
return null;
}

@Override
public PrivilegeResult revokeUsageOnRoleFromGrantee(
@NotNull PolarisCallContext callCtx,
@Nullable PolarisEntityCore catalog,
@NotNull PolarisEntityCore role,
@NotNull PolarisEntityCore grantee) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "revokeUsageOnRoleFromGrantee");
return null;
}

@Override
public PrivilegeResult grantPrivilegeOnSecurableToRole(
@NotNull PolarisCallContext callCtx,
@NotNull PolarisEntityCore grantee,
@Nullable List<PolarisEntityCore> catalogPath,
@NotNull PolarisEntityCore securable,
@NotNull PolarisPrivilege privilege) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "grantPrivilegeOnSecurableToRole");
return null;
}

@Override
public PrivilegeResult revokePrivilegeOnSecurableFromRole(
@NotNull PolarisCallContext callCtx,
@NotNull PolarisEntityCore grantee,
@Nullable List<PolarisEntityCore> catalogPath,
@NotNull PolarisEntityCore securable,
@NotNull PolarisPrivilege privilege) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "revokePrivilegeOnSecurableFromRole");
return null;
}

@Override
public LoadGrantsResult loadGrantsOnSecurable(
@NotNull PolarisCallContext callCtx, long securableCatalogId, long securableId) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "loadGrantsOnSecurable");
return null;
}

@Override
public LoadGrantsResult loadGrantsToGrantee(
PolarisCallContext callCtx, long granteeCatalogId, long granteeId) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "loadGrantsToGrantee");
return null;
}

@Override
public ChangeTrackingResult loadEntitiesChangeTracking(
@NotNull PolarisCallContext callCtx, @NotNull List<PolarisEntityId> entityIds) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "loadEntitiesChangeTracking");
return null;
}

@Override
public EntityResult loadEntity(
@NotNull PolarisCallContext callCtx, long entityCatalogId, long entityId) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "loadEntity");
return null;
}

@Override
public EntitiesResult loadTasks(
@NotNull PolarisCallContext callCtx, String executorId, int limit) {
callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "loadTasks");
return null;
}

@Override
public ScopedCredentialsResult getSubscopedCredsForEntity(
@NotNull PolarisCallContext callCtx,
long catalogId,
long entityId,
boolean allowListOperation,
@NotNull Set<String> allowedReadLocations,
@NotNull Set<String> allowedWriteLocations) {
return delegate.getSubscopedCredsForEntity(
callCtx,
catalogId,
entityId,
allowListOperation,
allowedReadLocations,
allowedWriteLocations);
}

@Override
public ValidateAccessResult validateAccessToLocations(
@NotNull PolarisCallContext callCtx,
long catalogId,
long entityId,
@NotNull Set<PolarisStorageActions> actions,
@NotNull Set<String> locations) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "validateAccessToLocations");
return null;
}

@Override
public PolarisMetaStoreManager.CachedEntryResult loadCachedEntryById(
@NotNull PolarisCallContext callCtx, long entityCatalogId, long entityId) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "loadCachedEntryById");
return null;
}

@Override
public PolarisMetaStoreManager.CachedEntryResult loadCachedEntryByName(
@NotNull PolarisCallContext callCtx,
long entityCatalogId,
long parentId,
@NotNull PolarisEntityType entityType,
@NotNull String entityName) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "loadCachedEntryByName");
return null;
}

@Override
public PolarisMetaStoreManager.CachedEntryResult refreshCachedEntity(
@NotNull PolarisCallContext callCtx,
int entityVersion,
int entityGrantRecordsVersion,
@NotNull PolarisEntityType entityType,
long entityCatalogId,
long entityId) {
callCtx
.getDiagServices()
.fail("illegal_method_in_transaction_workspace", "refreshCachedEntity");
return null;
}
}
Loading