diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index 6a3dc23fd4c..2b4e0599df3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -18,6 +18,7 @@ import static java.lang.String.format; import io.delta.kernel.exceptions.*; +import io.delta.kernel.internal.actions.DomainMetadata; import io.delta.kernel.types.DataType; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.DataFileStatus; @@ -274,6 +275,24 @@ public static KernelException invalidConfigurationValueException( return new InvalidConfigurationValueException(key, value, helpMessage); } + public static KernelException domainMetadataUnsupported() { + String message = + "Cannot commit DomainMetadata action(s) because the feature 'domainMetadata' " + + "is not supported on this table."; + return new KernelException(message); + } + + public static ConcurrentWriteException concurrentDomainMetadataAction( + DomainMetadata domainMetadataAttempt, DomainMetadata winningDomainMetadata) { + String message = + String.format( + "A concurrent writer added a domainMetadata action for the same domain: %s. " + + "No domain-specific conflict resolution is available for this domain. " + + "Attempted domainMetadata: %s. Winning domainMetadata: %s", + domainMetadataAttempt.getDomain(), domainMetadataAttempt, winningDomainMetadata); + return new ConcurrentWriteException(message); + } + /* ------------------------ HELPER METHODS ----------------------------- */ private static String formatTimestamp(long millisSinceEpochUTC) { return new Timestamp(millisSinceEpochUTC).toInstant().toString(); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 67dca98e89a..5dd6cd2d2d4 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -23,6 +23,7 @@ import io.delta.kernel.engine.CommitCoordinatorClientHandler; import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.actions.CommitInfo; +import io.delta.kernel.internal.actions.DomainMetadata; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.fs.Path; @@ -31,6 +32,7 @@ import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler; import io.delta.kernel.types.StructType; +import java.util.Map; import java.util.Optional; /** Implementation of {@link Snapshot}. */ @@ -83,6 +85,17 @@ public Protocol getProtocol() { return protocol; } + /** + * Get the domain metadata map from the log replay, which lazily loads and replays a history of + * domain metadata actions, resolving them to produce the current state of the domain metadata. + * + * @return A map where the keys are domain names and the values are {@link DomainMetadata} + * objects. + */ + public Map getDomainMetadataMap() { + return logReplay.getDomainMetadataMap(); + } + public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) { long minFileRetentionTimestampMillis = System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java index eeeffc7cc7d..88fbabbd0d2 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java @@ -39,6 +39,7 @@ public class TableFeatures { add("columnMapping"); add("typeWidening-preview"); add("typeWidening"); + add(DOMAIN_METADATA_FEATURE_NAME); } }); @@ -57,6 +58,12 @@ public class TableFeatures { } }); + /** The feature name for domain metadata. */ + public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata"; + + /** The minimum writer version required to support table features. */ + public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7; + //////////////////// // Helper Methods // //////////////////// @@ -93,7 +100,7 @@ public static void validateReadSupportedTable( *
  • protocol writer version 1. *
  • protocol writer version 2 only with appendOnly feature enabled. *
  • protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code - * columnMapping}, {@code typeWidening} feature enabled. + * columnMapping}, {@code typeWidening}, {@code domainMetadata} feature enabled. * * * @param protocol Table protocol @@ -125,20 +132,8 @@ public static void validateWriteSupportedTable( throw unsupportedWriterProtocol(tablePath, minWriterVersion); case 7: for (String writerFeature : protocol.getWriterFeatures()) { - switch (writerFeature) { - // Only supported writer features as of today in Kernel - case "appendOnly": - break; - case "inCommitTimestamp": - break; - case "columnMapping": - break; - case "typeWidening-preview": - break; - case "typeWidening": - break; - default: - throw unsupportedWriterFeature(tablePath, writerFeature); + if (!SUPPORTED_WRITER_FEATURES.contains(writerFeature)) { + throw unsupportedWriterFeature(tablePath, writerFeature); } } break; @@ -187,6 +182,21 @@ public static Set extractAutomaticallyEnabledWriterFeatures( .collect(Collectors.toSet()); } + /** + * Checks if the table protocol supports the "domainMetadata" writer feature. + * + * @param protocol the protocol to check + * @return true if the "domainMetadata" feature is supported, false otherwise + */ + public static boolean isDomainMetadataSupported(Protocol protocol) { + List writerFeatures = protocol.getWriterFeatures(); + if (writerFeatures == null) { + return false; + } + return writerFeatures.contains(DOMAIN_METADATA_FEATURE_NAME) + && protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION; + } + /** * Get the minimum reader version required for a feature. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 7b90d2be1d8..267517651c7 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -32,11 +32,7 @@ import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.replay.ConflictChecker; import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState; -import io.delta.kernel.internal.util.Clock; -import io.delta.kernel.internal.util.ColumnMapping; -import io.delta.kernel.internal.util.FileNames; -import io.delta.kernel.internal.util.InCommitTimestampUtils; -import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.internal.util.*; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterable; import io.delta.kernel.utils.CloseableIterator; @@ -73,6 +69,7 @@ public class TransactionImpl implements Transaction { private final Optional setTxnOpt; private final boolean shouldUpdateProtocol; private final Clock clock; + private final List domainMetadatas = new ArrayList<>(); private Metadata metadata; private boolean shouldUpdateMetadata; @@ -120,6 +117,23 @@ public StructType getSchema(Engine engine) { return readSnapshot.getSchema(engine); } + public Optional getSetTxnOpt() { + return setTxnOpt; + } + + /** + * Internal API to add domain metadata actions for this transaction. Visible for testing. + * + * @param domainMetadatas List of domain metadata to be added to the transaction. + */ + public void addDomainMetadatas(List domainMetadatas) { + this.domainMetadatas.addAll(domainMetadatas); + } + + public List getDomainMetadatas() { + return domainMetadatas; + } + @Override public TransactionCommitResult commit(Engine engine, CloseableIterable dataActions) throws ConcurrentWriteException { @@ -221,6 +235,12 @@ private TransactionCommitResult doCommit( } setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow()))); + // Check for duplicate domain metadata and if the protocol supports + DomainMetadataUtils.validateDomainMetadatas(domainMetadatas, protocol); + + domainMetadatas.forEach( + dm -> metadataActions.add(createDomainMetadataSingleAction(dm.toRow()))); + try (CloseableIterator stageDataIter = dataActions.iterator()) { // Create a new CloseableIterator that will return the metadata actions followed by the // data actions. @@ -265,10 +285,6 @@ public boolean isBlindAppend() { return true; } - public Optional getSetTxnOpt() { - return setTxnOpt; - } - /** * Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This can * result in an additional file read and that this will only happen if ICT is enabled. diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java new file mode 100644 index 00000000000..073fd2956df --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java @@ -0,0 +1,141 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.actions; + +import static io.delta.kernel.internal.util.InternalUtils.requireNonNull; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.Row; +import io.delta.kernel.internal.data.GenericRow; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import java.util.HashMap; +import java.util.Map; + +/** Delta log action representing an `DomainMetadata` action */ +public class DomainMetadata { + /** Full schema of the {@link DomainMetadata} action in the Delta Log. */ + public static final StructType FULL_SCHEMA = + new StructType() + .add("domain", StringType.STRING, false /* nullable */) + .add("configuration", StringType.STRING, false /* nullable */) + .add("removed", BooleanType.BOOLEAN, false /* nullable */); + + public static DomainMetadata fromColumnVector(ColumnVector vector, int rowId) { + if (vector.isNullAt(rowId)) { + return null; + } + return new DomainMetadata( + requireNonNull(vector.getChild(0), rowId, "domain").getString(rowId), + requireNonNull(vector.getChild(1), rowId, "configuration").getString(rowId), + requireNonNull(vector.getChild(2), rowId, "removed").getBoolean(rowId)); + } + + /** + * Creates a {@link DomainMetadata} instance from a Row with the schema being {@link + * DomainMetadata#FULL_SCHEMA}. + * + * @param row the Row object containing the DomainMetadata action + * @return a DomainMetadata instance or null if the row is null + * @throws IllegalArgumentException if the schema of the row does not match {@link + * DomainMetadata#FULL_SCHEMA} + */ + public static DomainMetadata fromRow(Row row) { + if (row == null) { + return null; + } + checkArgument( + row.getSchema().equals(FULL_SCHEMA), + "Expected schema: %s, found: %s", + FULL_SCHEMA, + row.getSchema()); + return new DomainMetadata( + requireNonNull(row, 0, "domain").getString(0), + requireNonNull(row, 1, "configuration").getString(1), + requireNonNull(row, 2, "removed").getBoolean(2)); + } + + private final String domain; + private final String configuration; + private final boolean removed; + + /** + * The domain metadata action contains a configuration string for a named metadata domain. Two + * overlapping transactions conflict if they both contain a domain metadata action for the same + * metadata domain. Per-domain conflict resolution logic can be implemented. + * + * @param domain A string used to identify a specific domain. + * @param configuration A string containing configuration for the metadata domain. + * @param removed If it is true it serves as a tombstone to logically delete a {@link + * DomainMetadata} action. + */ + public DomainMetadata(String domain, String configuration, boolean removed) { + this.domain = requireNonNull(domain, "domain is null"); + this.configuration = requireNonNull(configuration, "configuration is null"); + this.removed = removed; + } + + public String getDomain() { + return domain; + } + + public String getConfiguration() { + return configuration; + } + + public boolean isRemoved() { + return removed; + } + + /** + * Encode as a {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA}. + * + * @return {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA} + */ + public Row toRow() { + Map domainMetadataMap = new HashMap<>(); + domainMetadataMap.put(0, domain); + domainMetadataMap.put(1, configuration); + domainMetadataMap.put(2, removed); + + return new GenericRow(DomainMetadata.FULL_SCHEMA, domainMetadataMap); + } + + @Override + public String toString() { + return String.format( + "DomainMetadata{domain='%s', configuration='%s', removed='%s'}", + domain, configuration, removed); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + DomainMetadata that = (DomainMetadata) obj; + return removed == that.removed + && domain.equals(that.domain) + && configuration.equals(that.configuration); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(domain, configuration, removed); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java index 94626ea5f7d..1f32226a0ec 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java @@ -18,6 +18,7 @@ import io.delta.kernel.data.Row; import io.delta.kernel.internal.data.GenericRow; import io.delta.kernel.types.StructType; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -32,7 +33,9 @@ public class SingleAction { .add("add", AddFile.FULL_SCHEMA) .add("remove", RemoveFile.FULL_SCHEMA) .add("metaData", Metadata.FULL_SCHEMA) - .add("protocol", Protocol.FULL_SCHEMA); + .add("protocol", Protocol.FULL_SCHEMA) + .add("domainMetadata", DomainMetadata.FULL_SCHEMA); + // Once we start supporting updating CDC or domain metadata enabled tables, we should add the // schema for those fields here. @@ -48,7 +51,9 @@ public class SingleAction { // .add("remove", RemoveFile.FULL_SCHEMA) // not needed for blind appends .add("metaData", Metadata.FULL_SCHEMA) .add("protocol", Protocol.FULL_SCHEMA) - .add("commitInfo", CommitInfo.FULL_SCHEMA); + .add("commitInfo", CommitInfo.FULL_SCHEMA) + .add("domainMetadata", DomainMetadata.FULL_SCHEMA); + // Once we start supporting domain metadata/row tracking enabled tables, we should add the // schema for domain metadata fields here. @@ -61,7 +66,8 @@ public class SingleAction { .add("metaData", Metadata.FULL_SCHEMA) .add("protocol", Protocol.FULL_SCHEMA) .add("cdc", new StructType()) - .add("commitInfo", CommitInfo.FULL_SCHEMA); + .add("commitInfo", CommitInfo.FULL_SCHEMA) + .add("domainMetadata", DomainMetadata.FULL_SCHEMA); // Once we start supporting updating CDC or domain metadata enabled tables, we should add the // schema for those fields here. @@ -71,6 +77,7 @@ public class SingleAction { private static final int METADATA_ORDINAL = FULL_SCHEMA.indexOf("metaData"); private static final int PROTOCOL_ORDINAL = FULL_SCHEMA.indexOf("protocol"); private static final int COMMIT_INFO_ORDINAL = FULL_SCHEMA.indexOf("commitInfo"); + private static final int DOMAIN_METADATA_ORDINAL = FULL_SCHEMA.indexOf("domainMetadata"); public static Row createAddFileSingleAction(Row addFile) { Map singleActionValueMap = new HashMap<>(); @@ -102,6 +109,11 @@ public static Row createCommitInfoSingleAction(Row commitInfo) { return new GenericRow(FULL_SCHEMA, singleActionValueMap); } + public static Row createDomainMetadataSingleAction(Row domainMetadata) { + return new GenericRow( + FULL_SCHEMA, Collections.singletonMap(DOMAIN_METADATA_ORDINAL, domainMetadata)); + } + public static Row createTxnSingleAction(Row txn) { Map singleActionValueMap = new HashMap<>(); singleActionValueMap.put(TXN_ORDINAL, txn); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java index 1846fd22590..33808db8be4 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java @@ -15,9 +15,10 @@ */ package io.delta.kernel.internal.replay; +import static io.delta.kernel.internal.DeltaErrors.concurrentDomainMetadataAction; import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO; import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED; -import static io.delta.kernel.internal.actions.SingleAction.CONFLICT_RESOLUTION_SCHEMA; +import static io.delta.kernel.internal.actions.SingleAction.*; import static io.delta.kernel.internal.util.FileNames.deltaFile; import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static io.delta.kernel.internal.util.Preconditions.checkState; @@ -29,7 +30,9 @@ import io.delta.kernel.exceptions.ConcurrentWriteException; import io.delta.kernel.internal.*; import io.delta.kernel.internal.actions.CommitInfo; +import io.delta.kernel.internal.actions.DomainMetadata; import io.delta.kernel.internal.actions.SetTransaction; +import io.delta.kernel.internal.util.DomainMetadataUtils; import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; @@ -48,6 +51,8 @@ public class ConflictChecker { private static final int METADATA_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("metaData"); private static final int TXN_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("txn"); private static final int COMMITINFO_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("commitInfo"); + private static final int DOMAIN_METADATA_ORDINAL = + CONFLICT_RESOLUTION_SCHEMA.indexOf("domainMetadata"); // Snapshot of the table read by the transaction that encountered the conflict // (a.k.a the losing transaction) @@ -109,6 +114,7 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW handleProtocol(batch.getColumnVector(PROTOCOL_ORDINAL)); handleMetadata(batch.getColumnVector(METADATA_ORDINAL)); handleTxn(batch.getColumnVector(TXN_ORDINAL)); + handleDomainMetadata(batch.getColumnVector(DOMAIN_METADATA_ORDINAL)); }); } catch (IOException ioe) { throw new UncheckedIOException("Error reading actions from winning commits.", ioe); @@ -191,6 +197,39 @@ private void handleMetadata(ColumnVector metadataVector) { } } + /** + * Checks whether each of the current transaction's {@link DomainMetadata} conflicts with the + * winning transaction at any domain. + * + *
      + *
    1. Accept the current transaction if its set of metadata domains does not overlap with the + * winning transaction's set of metadata domains. + *
    2. Otherwise, fail the current transaction unless each conflicting domain is associated with + * a domain-specific way of resolving the conflict. + *
    + * + * @param domainMetadataVector domainMetadata rows from the winning transactions + */ + private void handleDomainMetadata(ColumnVector domainMetadataVector) { + // Build a domain metadata map from the winning transaction. + Map winningTxnDomainMetadataMap = new HashMap<>(); + DomainMetadataUtils.populateDomainMetadataMap( + domainMetadataVector, winningTxnDomainMetadataMap); + + for (DomainMetadata currentTxnDM : this.transaction.getDomainMetadatas()) { + // For each domain metadata action in the current transaction, check if it has a conflict with + // the winning transaction. + String domainName = currentTxnDM.getDomain(); + DomainMetadata winningTxnDM = winningTxnDomainMetadataMap.get(domainName); + if (winningTxnDM != null) { + // Conflict - check if the conflict can be resolved. + // Currently, we don't have any domain-specific way of resolving the conflict. + // Domain-specific ways of resolving the conflict can be added here (e.g. for Row Tracking). + throw concurrentDomainMetadataAction(currentTxnDM, winningTxnDM); + } + } + } + /** * Get the commit info from the winning transactions. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java index d1b3d55ee72..11ff3879aa1 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java @@ -78,6 +78,8 @@ public class CreateCheckpointIterator implements CloseableIterator txnAppIdToVersion = new HashMap<>(); + // Current state of all domains we have seen in {@link DomainMetadata} during the log replay. We + // traverse the log in reverse, so remembering the domains we have seen is enough for creating a + // checkpoint. + private final Set domainSeen = new HashSet<>(); + // Metadata about the checkpoint to store in `_last_checkpoint` file private long numberOfAddActions = 0; // final number of add actions survived in the checkpoint @@ -234,6 +241,11 @@ private boolean prepareNext() { final ColumnVector txnVector = getVector(actionsBatch, TXN_ORDINAL); processTxn(txnVector, selectionVectorBuffer); + // Step 5: Process the domain metadata + final ColumnVector domainMetadataDomainNameVector = + getVector(actionsBatch, DOMAIN_METADATA_DOMAIN_NAME_ORDINAL); + processDomainMetadata(domainMetadataDomainNameVector, selectionVectorBuffer); + Optional selectionVector = Optional.of(createSelectionVector(selectionVectorBuffer, actionsBatch.getSize())); toReturnNext = Optional.of(new FilteredColumnarBatch(actionsBatch, selectionVector)); @@ -352,6 +364,37 @@ private void processTxn(ColumnVector txnVector, boolean[] selectionVectorBuffer) } } + /** + * Processes domain metadata actions during checkpoint creation. During the reverse log replay, + * for each domain, we only keep the first (latest) domain metadata action encountered by + * selecting them in the selection vector, and ignore any older ones for the same domain by + * unselecting them. + * + * @param domainMetadataVector Column vector containing domain names of domain metadata actions. + * @param selectionVectorBuffer The selection vector to attach to the batch to indicate which + * records to write to the checkpoint and which ones not to. + */ + private void processDomainMetadata( + ColumnVector domainMetadataVector, boolean[] selectionVectorBuffer) { + final int vectorSize = domainMetadataVector.getSize(); + for (int rowId = 0; rowId < vectorSize; rowId++) { + if (domainMetadataVector.isNullAt(rowId)) { + continue; // selectionVector will be `false` at rowId by default + } + + final String domain = domainMetadataVector.getString(rowId); + if (domainSeen.contains(domain)) { + // We do a reverse log replay. The latest domainMetadata seen for a given domain wins and + // should be written to the checkpoint. Anything after the first one shouldn't be in + // checkpoint. + unselect(selectionVectorBuffer, rowId); + } else { + select(selectionVectorBuffer, rowId); + domainSeen.add(domain); + } + } + } + private void unselect(boolean[] selectionVectorBuffer, int rowId) { // Just use the java assert (which are enabled in tests) for sanity checks. This should // never happen. Given this is going to be on the hot path, we want to avoid cost in diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index 170dad9ac1f..f81ce16c99b 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -27,13 +27,18 @@ import io.delta.kernel.internal.actions.*; import io.delta.kernel.internal.checkpoints.SidecarFile; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.SnapshotHint; +import io.delta.kernel.internal.util.DomainMetadataUtils; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.types.StringType; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; /** @@ -74,6 +79,10 @@ private static StructType getAddSchema(boolean shouldReadStats) { return shouldReadStats ? AddFile.SCHEMA_WITH_STATS : AddFile.SCHEMA_WITHOUT_STATS; } + /** Read schema when searching for just the domain metadata */ + public static final StructType DOMAIN_METADATA_READ_SCHEMA = + new StructType().add("domainMetadata", DomainMetadata.FULL_SCHEMA); + public static String SIDECAR_FIELD_NAME = "sidecar"; public static String ADDFILE_FIELD_NAME = "add"; public static String REMOVEFILE_FIELD_NAME = "remove"; @@ -109,6 +118,7 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) { private final Path dataPath; private final LogSegment logSegment; private final Tuple2 protocolAndMetadata; + private final Lazy> domainMetadataMap; public LogReplay( Path logPath, @@ -122,6 +132,8 @@ public LogReplay( this.dataPath = dataPath; this.logSegment = logSegment; this.protocolAndMetadata = loadTableProtocolAndMetadata(engine, snapshotHint, snapshotVersion); + // Lazy loading of domain metadata only when needed + this.domainMetadataMap = new Lazy<>(() -> loadDomainMetadataMap(engine)); } ///////////////// @@ -140,6 +152,10 @@ public Optional getLatestTransactionIdentifier(Engine engine, String appli return loadLatestTransactionVersion(engine, applicationId); } + public Map getDomainMetadataMap() { + return domainMetadataMap.get(); + } + /** * Returns an iterator of {@link FilteredColumnarBatch} representing all the active AddFiles in * the table. @@ -296,4 +312,40 @@ private Optional loadLatestTransactionVersion(Engine engine, String applic return Optional.empty(); } + + /** + * Retrieves a map of domainName to {@link DomainMetadata} from the log files. + * + *

    Loading domain metadata requires an additional round of log replay so this is done lazily + * only when domain metadata is requested. We might want to merge this into {@link + * #loadTableProtocolAndMetadata}. + * + * @param engine The engine used to process the log files. + * @return A map where the keys are domain names and the values are the corresponding {@link + * DomainMetadata} objects. + * @throws UncheckedIOException if an I/O error occurs while closing the iterator. + */ + private Map loadDomainMetadataMap(Engine engine) { + try (CloseableIterator reverseIter = + new ActionsIterator( + engine, + logSegment.allLogFilesReversed(), + DOMAIN_METADATA_READ_SCHEMA, + Optional.empty() /* checkpointPredicate */)) { + Map domainMetadataMap = new HashMap<>(); + while (reverseIter.hasNext()) { + final ColumnarBatch columnarBatch = reverseIter.next().getColumnarBatch(); + assert (columnarBatch.getSchema().equals(DOMAIN_METADATA_READ_SCHEMA)); + + final ColumnVector dmVector = columnarBatch.getColumnVector(0); + + // We are performing a reverse log replay. This function ensures that only the first + // encountered domain metadata for each domain is added to the map. + DomainMetadataUtils.populateDomainMetadataMap(dmVector, domainMetadataMap); + } + return domainMetadataMap; + } catch (IOException ex) { + throw new UncheckedIOException("Could not close iterator", ex); + } + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java new file mode 100644 index 00000000000..454d1a2b33a --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/DomainMetadataUtils.java @@ -0,0 +1,91 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.internal.util; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.internal.DeltaErrors; +import io.delta.kernel.internal.TableFeatures; +import io.delta.kernel.internal.actions.DomainMetadata; +import io.delta.kernel.internal.actions.Protocol; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DomainMetadataUtils { + + private DomainMetadataUtils() { + // Empty private constructor to prevent instantiation + } + + /** + * Populate the map of domain metadata from actions. When encountering duplicate domain metadata + * actions for the same domain, this method preserves the first seen entry and skips subsequent + * entries. This behavior is especially useful for log replay as we want to ensure that earlier + * domain metadata entries take precedence over later ones. + * + * @param domainMetadataActionVector A {@link ColumnVector} containing the domain metadata rows + * @param domainMetadataMap The existing map to be populated with domain metadata entries, where + * the key is the domain name and the value is the domain metadata + */ + public static void populateDomainMetadataMap( + ColumnVector domainMetadataActionVector, Map domainMetadataMap) { + final int vectorSize = domainMetadataActionVector.getSize(); + for (int rowId = 0; rowId < vectorSize; rowId++) { + DomainMetadata dm = DomainMetadata.fromColumnVector(domainMetadataActionVector, rowId); + if (dm != null && !domainMetadataMap.containsKey(dm.getDomain())) { + // We only add the domain metadata if its domain name not already present in the map + domainMetadataMap.put(dm.getDomain(), dm); + } + } + } + + /** + * Validates the list of domain metadata actions before committing them. It ensures that + * + *

      + *
    1. domain metadata actions are only present when supported by the table protocol + *
    2. there are no duplicate domain metadata actions for the same domain in the provided + * actions. + *
    + * + * @param domainMetadataActions The list of domain metadata actions to validate + * @param protocol The protocol to check for domain metadata support + */ + public static void validateDomainMetadatas( + List domainMetadataActions, Protocol protocol) { + if (domainMetadataActions.isEmpty()) return; + + // The list of domain metadata is non-empty, so the protocol must support domain metadata + if (!TableFeatures.isDomainMetadataSupported(protocol)) { + throw DeltaErrors.domainMetadataUnsupported(); + } + + Map domainMetadataMap = new HashMap<>(); + for (DomainMetadata domainMetadata : domainMetadataActions) { + String domain = domainMetadata.getDomain(); + if (domainMetadataMap.containsKey(domain)) { + String message = + String.format( + "Multiple actions detected for domain '%s' in single transaction: '%s' and '%s'. " + + "Only one action per domain is allowed.", + domain, domainMetadataMap.get(domain).toString(), domainMetadata.toString()); + throw new IllegalArgumentException(message); + } + domainMetadataMap.put(domain, domainMetadata); + } + } +} diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala index 2dfb034ea99..a98d39840d0 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableFeaturesSuite.scala @@ -68,7 +68,8 @@ class TableFeaturesSuite extends AnyFunSuite { checkSupported(createTestProtocol(minWriterVersion = 7)) } - Seq("appendOnly", "inCommitTimestamp", "columnMapping", "typeWidening-preview", "typeWidening") + Seq("appendOnly", "inCommitTimestamp", "columnMapping", "typeWidening-preview", "typeWidening", + "domainMetadata") .foreach { supportedWriterFeature => test(s"validateWriteSupported: protocol 7 with $supportedWriterFeature") { checkSupported(createTestProtocol(minWriterVersion = 7, supportedWriterFeature)) @@ -77,7 +78,7 @@ class TableFeaturesSuite extends AnyFunSuite { Seq("invariants", "checkConstraints", "generatedColumns", "allowColumnDefaults", "changeDataFeed", "identityColumns", "deletionVectors", "rowTracking", "timestampNtz", - "domainMetadata", "v2Checkpoint", "icebergCompatV1", "icebergCompatV2", "clustering", + "v2Checkpoint", "icebergCompatV1", "icebergCompatV2", "clustering", "vacuumProtocolCheck").foreach { unsupportedWriterFeature => test(s"validateWriteSupported: protocol 7 with $unsupportedWriterFeature") { checkUnsupported(createTestProtocol(minWriterVersion = 7, unsupportedWriterFeature)) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala new file mode 100644 index 00000000000..ff097c7c5da --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala @@ -0,0 +1,543 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults + +import io.delta.kernel._ +import io.delta.kernel.defaults.internal.parquet.ParquetSuiteBase +import io.delta.kernel.engine.Engine +import io.delta.kernel.exceptions._ +import io.delta.kernel.expressions.Literal +import io.delta.kernel.internal.{SnapshotImpl, TableImpl, TransactionBuilderImpl, TransactionImpl} +import io.delta.kernel.internal.actions.{DomainMetadata, Protocol, SingleAction} +import io.delta.kernel.internal.util.Utils.toCloseableIterator +import io.delta.kernel.internal.TableConfig.CHECKPOINT_INTERVAL +import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.actions.{DomainMetadata => SparkDomainMetadata} +import org.apache.spark.sql.delta.test.DeltaTestImplicits.OptimisticTxnTestHelper + +import java.util.Collections +import scala.collection.JavaConverters._ +import scala.collection.immutable.Seq + +class DomainMetadataSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBase { + + private def assertDomainMetadata( + snapshot: SnapshotImpl, + expectedValue: Map[String, DomainMetadata]): Unit = { + assert(expectedValue === snapshot.getDomainMetadataMap.asScala) + } + + private def assertDomainMetadata( + table: Table, + engine: Engine, + expectedValue: Map[String, DomainMetadata]): Unit = { + // Get the latest snapshot of the table + val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + assertDomainMetadata(snapshot, expectedValue) + } + + private def createTxnWithDomainMetadatas( + engine: Engine, + tablePath: String, + domainMetadatas: Seq[DomainMetadata]): Transaction = { + + val txnBuilder = createWriteTxnBuilder(TableImpl.forPath(engine, tablePath)) + .asInstanceOf[TransactionBuilderImpl] + + val txn = txnBuilder.build(engine).asInstanceOf[TransactionImpl] + txn.addDomainMetadatas(domainMetadatas.asJava) + txn + } + + private def commitDomainMetadataAndVerify( + engine: Engine, + tablePath: String, + domainMetadatas: Seq[DomainMetadata], + expectedValue: Map[String, DomainMetadata]): Unit = { + // Create the transaction with domain metadata and commit + val txn = createTxnWithDomainMetadatas(engine, tablePath, domainMetadatas) + txn.commit(engine, emptyIterable()) + + // Verify the final state includes the expected domain metadata + val table = Table.forPath(engine, tablePath) + assertDomainMetadata(table, engine, expectedValue) + } + + private def setDomainMetadataSupport(engine: Engine, tablePath: String): Unit = { + val protocol = new Protocol( + 3, // minReaderVersion + 7, // minWriterVersion + Collections.emptyList(), // readerFeatures + Seq("domainMetadata").asJava // writerFeatures + ) + + val protocolAction = SingleAction.createProtocolSingleAction(protocol.toRow) + val txn = createTxn(engine, tablePath, isNewTable = false, testSchema, Seq.empty) + txn.commit(engine, inMemoryIterable(toCloseableIterator(Seq(protocolAction).asJava.iterator()))) + } + + private def createTableWithDomainMetadataSupported(engine: Engine, tablePath: String): Unit = { + // Create an empty table + createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty) + .commit(engine, emptyIterable()) + + // Set writer version and writer feature to support domain metadata + setDomainMetadataSupport(engine, tablePath) + } + + private def validateDomainMetadataConflictResolution( + engine: Engine, + tablePath: String, + currentTxn1DomainMetadatas: Seq[DomainMetadata], + winningTxn2DomainMetadatas: Seq[DomainMetadata], + winningTxn3DomainMetadatas: Seq[DomainMetadata], + expectedConflict: Boolean): Unit = { + // Create table with domain metadata support + createTableWithDomainMetadataSupported(engine, tablePath) + val table = Table.forPath(engine, tablePath) + + /** + * Txn1: i.e. the current transaction that comes later than winning transactions. + * Txn2: i.e. the winning transaction that was committed first. + * Txn3: i.e. the winning transaction that was committed secondly. + * + * Note tx is the timestamp. + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (SUCCESS or FAIL). + */ + val txn1 = createTxnWithDomainMetadatas(engine, tablePath, currentTxn1DomainMetadatas) + + val txn2 = createTxnWithDomainMetadatas(engine, tablePath, winningTxn2DomainMetadatas) + txn2.commit(engine, emptyIterable()) + + val txn3 = createTxnWithDomainMetadatas(engine, tablePath, winningTxn3DomainMetadatas) + txn3.commit(engine, emptyIterable()) + + if (expectedConflict) { + // We expect the commit of txn1 to fail because of the conflicting DM actions + val ex = intercept[KernelException] { + txn1.commit(engine, emptyIterable()) + } + assert( + ex.getMessage.contains( + "A concurrent writer added a domainMetadata action for the same domain" + ) + ) + } else { + // We expect the commit of txn1 to succeed + txn1.commit(engine, emptyIterable()) + // Verify the final state includes merged domain metadata + val expectedMetadata = + (winningTxn2DomainMetadatas ++ winningTxn3DomainMetadatas ++ currentTxn1DomainMetadatas) + .groupBy(_.getDomain) + .mapValues(_.last) + assertDomainMetadata(table, engine, expectedMetadata) + } + } + + test("create table w/o domain metadata") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + + // Create an empty table + createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty) + .commit(engine, emptyIterable()) + + // Verify that the table doesn't have any domain metadata + assertDomainMetadata(table, engine, Map.empty) + } + } + + test("table w/o domain metadata support fails domain metadata commits") { + withTempDirAndEngine { (tablePath, engine) => + // Create an empty table + // Its minWriterVersion is 2 and doesn't have 'domainMetadata' in its writerFeatures + createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty) + .commit(engine, emptyIterable()) + + val dm1 = new DomainMetadata("domain1", "", false) + val txn1 = createTxnWithDomainMetadatas(engine, tablePath, List(dm1)) + + // We expect the commit to fail because the table doesn't support domain metadata + val e = intercept[KernelException] { + txn1.commit(engine, emptyIterable()) + } + assert( + e.getMessage + .contains( + "Cannot commit DomainMetadata action(s) because the feature 'domainMetadata' " + + "is not supported on this table." + ) + ) + + // Set writer version and writer feature to support domain metadata + setDomainMetadataSupport(engine, tablePath) + + // Commit domain metadata again and expect success + commitDomainMetadataAndVerify( + engine, + tablePath, + domainMetadatas = Seq(dm1), + expectedValue = Map("domain1" -> dm1) + ) + } + } + + test("multiple DomainMetadatas for the same domain should fail in single transaction") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1_1 = new DomainMetadata("domain1", """{"key1":"1"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm1_2 = new DomainMetadata("domain1", """{"key1":"10"}"""", false) + + val txn = createTxnWithDomainMetadatas(engine, tablePath, List(dm1_1, dm2, dm1_2)) + + val e = intercept[IllegalArgumentException] { + txn.commit(engine, emptyIterable()) + } + assert( + e.getMessage.contains( + "Multiple actions detected for domain 'domain1' in single transaction" + ) + ) + } + } + + test("latest domain metadata overwriting existing ones") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1 = new DomainMetadata("domain1", """{"key1":"1"}, {"key2":"2"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain3", """{"key3":"3"}""", false) + + val dm1_2 = new DomainMetadata("domain1", """{"key1":"10"}""", false) + val dm3_2 = new DomainMetadata("domain3", """{"key3":"30"}""", false) + + Seq( + (Seq(dm1), Map("domain1" -> dm1)), + (Seq(dm2, dm3, dm1_2), Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3)), + (Seq(dm3_2), Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3_2)) + ).foreach { + case (domainMetadatas, expectedValue) => + commitDomainMetadataAndVerify(engine, tablePath, domainMetadatas, expectedValue) + } + } + } + + test("domain metadata persistence across log replay") { + withTempDirAndEngine { (tablePath, engine) => + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1 = new DomainMetadata("domain1", """{"key1":"1"}, {"key2":"2"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + + commitDomainMetadataAndVerify( + engine, + tablePath, + domainMetadatas = Seq(dm1, dm2), + expectedValue = Map("domain1" -> dm1, "domain2" -> dm2) + ) + + // Restart the table and verify the domain metadata + val table2 = Table.forPath(engine, tablePath) + assertDomainMetadata(table2, engine, Map("domain1" -> dm1, "domain2" -> dm2)) + } + } + + test("only the latest domain metadata per domain is stored in checkpoints") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + createTableWithDomainMetadataSupported(engine, tablePath) + + val dm1 = new DomainMetadata("domain1", """{"key1":"1"}, {"key2":"2"}""", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain3", """{"key3":"3"}""", false) + val dm1_2 = new DomainMetadata("domain1", """{"key1":"10"}""", false) + val dm3_2 = new DomainMetadata("domain3", """{"key3":"30"}""", true) + + Seq( + (Seq(dm1), Map("domain1" -> dm1)), + (Seq(dm2), Map("domain1" -> dm1, "domain2" -> dm2)), + (Seq(dm3), Map("domain1" -> dm1, "domain2" -> dm2, "domain3" -> dm3)), + (Seq(dm1_2, dm3_2), Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3_2)) + ).foreach { + case (domainMetadatas, expectedValue) => + commitDomainMetadataAndVerify(engine, tablePath, domainMetadatas, expectedValue) + } + + // Checkpoint the table + val latestVersion = table.getLatestSnapshot(engine).getVersion(engine) + table.checkpoint(engine, latestVersion) + + // Verify that only the latest domain metadata is persisted in the checkpoint + val table2 = Table.forPath(engine, tablePath) + assertDomainMetadata( + table2, + engine, + Map("domain1" -> dm1_2, "domain2" -> dm2, "domain3" -> dm3_2) + ) + } + } + + test("Conflict resolution - one of three concurrent txns has DomainMetadata") { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action. + * Txn2: does NOT include DomainMetadata action. + * Txn3: does NOT include DomainMetadata action. + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (SUCCESS). + */ + val dm1 = new DomainMetadata("domain1", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq.empty, + winningTxn3DomainMetadatas = Seq.empty, + expectedConflict = false + ) + } + } + + test( + "Conflict resolution - three concurrent txns have DomainMetadata w/o conflicting domains" + ) { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action for "domain1". + * Txn2: include DomainMetadata action for "domain2". + * Txn3: include DomainMetadata action for "domain3". + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (SUCCESS). + */ + val dm1 = new DomainMetadata("domain1", "", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain3", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq(dm2), + winningTxn3DomainMetadatas = Seq(dm3), + expectedConflict = false + ) + } + } + + test( + "Conflict resolution - three concurrent txns have DomainMetadata w/ conflicting domains" + ) { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action for "domain1". + * Txn2: include DomainMetadata action for "domain2". + * Txn3: include DomainMetadata action for "domain1". + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (FAIL). + */ + val dm1 = new DomainMetadata("domain1", "", false) + val dm2 = new DomainMetadata("domain2", "", false) + val dm3 = new DomainMetadata("domain1", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq(dm2), + winningTxn3DomainMetadatas = Seq(dm3), + expectedConflict = true + ) + } + } + + test( + "Conflict resolution - three concurrent txns have DomainMetadata w/ conflict domains - 2" + ) { + withTempDirAndEngine { (tablePath, engine) => + /** + * Txn1: include DomainMetadata action for "domain1". + * Txn2: include DomainMetadata action for "domain1". + * Txn3: include DomainMetadata action for "domain2". + * + * t1 ------------------------ Txn1 starts. + * t2 ------- Txn2 starts. + * t3 ------- Txn2 commits. + * t4 ------- Txn3 starts. + * t5 ------- Txn3 commits. + * t6 ------------------------ Txn1 commits (FAIL). + */ + val dm1 = new DomainMetadata("domain1", "", false) + val dm2 = new DomainMetadata("domain1", "", false) + val dm3 = new DomainMetadata("domain2", "", false) + + validateDomainMetadataConflictResolution( + engine, + tablePath, + currentTxn1DomainMetadatas = Seq(dm1), + winningTxn2DomainMetadatas = Seq(dm2), + winningTxn3DomainMetadatas = Seq(dm3), + expectedConflict = true + ) + } + } + + test("Integration test - create a table with Spark and read its domain metadata using Kernel") { + withTempDir(dir => { + val tbl = "tbl" + withTable(tbl) { + val tablePath = dir.getCanonicalPath + // Create table with domain metadata enabled + spark.sql(s"CREATE TABLE $tbl (id LONG) USING delta LOCATION '$tablePath'") + spark.sql( + s"ALTER TABLE $tbl SET TBLPROPERTIES(" + + s"'delta.feature.domainMetadata' = 'enabled'," + + s"'delta.checkpointInterval' = '3')" + ) + + // Manually commit domain metadata actions. This will create 02.json + val deltaLog = DeltaLog.forTable(spark, new Path(tablePath)) + deltaLog + .startTransaction() + .commitManually( + List( + SparkDomainMetadata("testDomain1", "{\"key1\":\"1\"}", removed = false), + SparkDomainMetadata("testDomain2", "", removed = false), + SparkDomainMetadata("testDomain3", "", removed = false) + ): _* + ) + + // This will create 03.json and 03.checkpoint + spark.range(0, 2).write.format("delta").mode("append").save(tablePath) + + // Manually commit domain metadata actions. This will create 04.json + deltaLog + .startTransaction() + .commitManually( + List( + SparkDomainMetadata("testDomain1", "{\"key1\":\"10\"}", removed = false), + SparkDomainMetadata("testDomain2", "", removed = true) + ): _* + ) + + // Use Delta Kernel to read the table's domain metadata and verify the result. + // We will need to read 1 checkpoint file and 1 log file to replay the table. + // The state of the domain metadata should be: + // testDomain1: "{\"key1\":\"10\"}", removed = false (from 03.checkpoint) + // testDomain2: "", removed = true (from 03.checkpoint) + // testDomain3: "", removed = false (from 04.json) + + val dm1 = new DomainMetadata("testDomain1", """{"key1":"10"}""", false) + val dm2 = new DomainMetadata("testDomain2", "", true) + val dm3 = new DomainMetadata("testDomain3", "", false) + + val snapshot = latestSnapshot(tablePath).asInstanceOf[SnapshotImpl] + assertDomainMetadata( + snapshot, + Map("testDomain1" -> dm1, "testDomain2" -> dm2, "testDomain3" -> dm3) + ) + } + }) + } + + test("Integration test - create a table using Kernel and read its domain metadata using Spark") { + withTempDirAndEngine { (tablePath, engine) => + val tbl = "tbl" + withTable(tbl) { + // Create table with domain metadata enabled + createTableWithDomainMetadataSupported(engine, tablePath) + + // Manually commit three domain metadata actions + val dm1 = new DomainMetadata("testDomain1", """{"key1":"1"}""", false) + val dm2 = new DomainMetadata("testDomain2", "", false) + val dm3 = new DomainMetadata("testDomain3", "", false) + commitDomainMetadataAndVerify( + engine, + tablePath, + domainMetadatas = Seq(dm1, dm2, dm3), + expectedValue = Map("testDomain1" -> dm1, "testDomain2" -> dm2, "testDomain3" -> dm3) + ) + + appendData( + engine, + tablePath, + data = Seq(Map.empty[String, Literal] -> dataBatches1) + ) + + // Checkpoint the table so domain metadata is distributed to both checkpoint and log files + val table = Table.forPath(engine, tablePath) + val latestVersion = table.getLatestSnapshot(engine).getVersion(engine) + table.checkpoint(engine, latestVersion) + + // Manually commit two domain metadata actions + val dm1_2 = new DomainMetadata("testDomain1", """{"key1":"10"}""", false) + val dm2_2 = new DomainMetadata("testDomain2", "", true) + commitDomainMetadataAndVerify( + engine, + tablePath, + domainMetadatas = Seq(dm1_2, dm2_2), + expectedValue = Map("testDomain1" -> dm1_2, "testDomain2" -> dm2_2, "testDomain3" -> dm3) + ) + + // Use Spark to read the table's domain metadata and verify the result + val deltaLog = DeltaLog.forTable(spark, new Path(tablePath)) + val domainMetadata = deltaLog.snapshot.domainMetadata.groupBy(_.domain).map { + case (name, domains) => + assert(domains.size == 1) + name -> domains.head + } + // Note that in Delta-Spark, the deltaLog.snapshot.domainMetadata does not include + // domain metadata that are removed. + assert( + domainMetadata === Map( + "testDomain1" -> SparkDomainMetadata( + "testDomain1", + """{"key1":"10"}""", + removed = false + ), + "testDomain3" -> SparkDomainMetadata("testDomain3", "", removed = false) + ) + ) + } + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index 3ce34d0f458..fccdfdb8e3b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -619,6 +619,17 @@ object Checkpoints if (spark.conf.get(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED)) { snapshot.validateChecksum(Map("context" -> "writeCheckpoint")) } + // Verify allFiles in checksum during checkpoint if we are not doing so already on every + // commit. + val allFilesInCRCEnabled = Snapshot.allFilesInCrcWritePathEnabled(spark, snapshot) + val shouldVerifyAllFilesInCRCEveryCommit = + Snapshot.allFilesInCrcVerificationEnabled(spark, snapshot) + if (allFilesInCRCEnabled && !shouldVerifyAllFilesInCRCEveryCommit) { + snapshot.checksumOpt.foreach { checksum => + snapshot.validateFileListAgainstCRC( + checksum, contextOpt = Some("triggeredFromCheckpoint")) + } + } val hadoopConf = deltaLog.newDeltaHadoopConf() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala index 2748153eaae..150b57c0376 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta import java.io.FileNotFoundException import java.nio.charset.StandardCharsets.UTF_8 +import java.util.TimeZone // scalastyle:off import.ordering.noEmptyLine import scala.collection.immutable.ListMap @@ -78,9 +79,7 @@ trait RecordChecksum extends DeltaLogging { private lazy val writer = CheckpointFileManager.create(deltaLog.logPath, deltaLog.newDeltaHadoopConf()) - private def getChecksum(snapshot: Snapshot): VersionChecksum = { - snapshot.checksumOpt.getOrElse(snapshot.computeChecksum) - } + private def getChecksum(snapshot: Snapshot): VersionChecksum = snapshot.computeChecksum protected def writeChecksumFile(txnId: String, snapshot: Snapshot): Unit = { if (!spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED)) { @@ -137,6 +136,7 @@ trait RecordChecksum extends DeltaLogging { * `versionToCompute - 1` or a snapshot. Note that the snapshot may * belong to any version and this method will only use the snapshot if * it corresponds to `versionToCompute - 1`. + * @param includeAddFilesInCrc True if the new checksum should include a [[AddFile]]s. * @return Either the new checksum or an error code string if the checksum could not be computed. */ // scalastyle:off argcount @@ -149,7 +149,8 @@ trait RecordChecksum extends DeltaLogging { protocol: Protocol, operationName: String, txnIdOpt: Option[String], - previousVersionState: Either[Snapshot, VersionChecksum] + previousVersionState: Either[Snapshot, VersionChecksum], + includeAddFilesInCrc: Boolean ): Either[String, VersionChecksum] = { // scalastyle:on argcount if (!deltaLog.incrementalCommitEnabled) { @@ -209,7 +210,8 @@ trait RecordChecksum extends DeltaLogging { oldVersionChecksum, oldSnapshot, actions, - ignoreAddFilesInOperation + ignoreAddFilesInOperation, + includeAddFilesInCrc ) } @@ -236,7 +238,8 @@ trait RecordChecksum extends DeltaLogging { oldVersionChecksum: VersionChecksum, oldSnapshot: Option[Snapshot], actions: Seq[Action], - ignoreAddFiles: Boolean + ignoreAddFiles: Boolean, + includeAllFilesInCRC: Boolean ) : Either[String, VersionChecksum] = { // scalastyle:on argcount oldSnapshot.foreach(s => require(s.version == (attemptVersion - 1))) @@ -277,6 +280,81 @@ trait RecordChecksum extends DeltaLogging { case _ => } + val setTransactions = incrementallyComputeSetTransactions( + oldSnapshot, oldVersionChecksum, attemptVersion, actions) + + val domainMetadata = incrementallyComputeDomainMetadatas( + oldSnapshot, oldVersionChecksum, attemptVersion, actions) + + val computeAddFiles = if (includeAllFilesInCRC) { + incrementallyComputeAddFiles( + oldSnapshot = oldSnapshot, + oldVersionChecksum = oldVersionChecksum, + attemptVersion = attemptVersion, + numFilesAfterCommit = numFiles, + actionsToCommit = actions) + } else if (numFiles == 0) { + // If the table becomes empty after the commit, addFiles should be empty. + Option(Nil) + } else { + None + } + + val allFiles = computeAddFiles.filter { files => + val computedNumFiles = files.size + val computedTableSizeBytes = files.map(_.size).sum + // Validate checksum of Incrementally computed files against the computed checksum from + // incremental commits. + if (computedNumFiles != numFiles || computedTableSizeBytes != tableSizeBytes) { + val filePathsFromPreviousVersion = oldVersionChecksum.allFiles + .orElse { + recordFrameProfile("Delta", "VersionChecksum.computeNewChecksum.allFiles") { + oldSnapshot.map(_.allFiles.collect().toSeq) + } + } + .getOrElse(Seq.empty) + .map(_.path) + val addFilePathsInThisCommit = actions.collect { case af: AddFile => af.path } + val removeFilePathsInThisCommit = actions.collect { case rf: RemoveFile => rf.path } + logWarning(log"Incrementally computed files does not match the incremental checksum " + + log"for commit attempt: ${MDC(DeltaLogKeys.VERSION, attemptVersion)}. " + + log"addFilePathsInThisCommit: [${MDC(DeltaLogKeys.PATHS, + addFilePathsInThisCommit.mkString(","))}], " + + log"removeFilePathsInThisCommit: [${MDC(DeltaLogKeys.PATHS2, + removeFilePathsInThisCommit.mkString(","))}], " + + log"filePathsFromPreviousVersion: [${MDC(DeltaLogKeys.PATHS3, + filePathsFromPreviousVersion.mkString(","))}], " + + log"computedFiles: [${MDC(DeltaLogKeys.PATHS4, + files.map(_.path).mkString(","))}]") + val eventData = Map( + "attemptVersion" -> attemptVersion, + "expectedNumFiles" -> numFiles, + "expectedTableSizeBytes" -> tableSizeBytes, + "computedNumFiles" -> computedNumFiles, + "computedTableSizeBytes" -> computedTableSizeBytes, + "numAddFilePathsInThisCommit" -> addFilePathsInThisCommit.size, + "numRemoveFilePathsInThisCommit" -> removeFilePathsInThisCommit.size, + "numFilesInPreviousVersion" -> filePathsFromPreviousVersion.size, + "operationName" -> operationName, + "addFilePathsInThisCommit" -> JsonUtils.toJson(addFilePathsInThisCommit.take(10)), + "removeFilePathsInThisCommit" -> JsonUtils.toJson(removeFilePathsInThisCommit.take(10)), + "filePathsFromPreviousVersion" -> JsonUtils.toJson(filePathsFromPreviousVersion.take(10)), + "computedFiles" -> JsonUtils.toJson(files.take(10)) + ) + recordDeltaEvent( + deltaLog, + opType = "delta.allFilesInCrc.checksumMismatch.aggregated", + data = eventData) + if (Utils.isTesting) { + throw new IllegalStateException("Incrementally Computed State failed checksum check" + + s" for commit $attemptVersion [$eventData]") + } + false + } else { + true + } + } + Right(VersionChecksum( txnId = txnIdOpt, tableSizeBytes = tableSizeBytes, @@ -286,13 +364,162 @@ trait RecordChecksum extends DeltaLogging { inCommitTimestampOpt = inCommitTimestamp, metadata = metadata, protocol = protocol, - setTransactions = None, - domainMetadata = None, - histogramOpt = None, - allFiles = None + setTransactions = setTransactions, + domainMetadata = domainMetadata, + allFiles = allFiles, + histogramOpt = None )) } + /** + * Incrementally compute [[Snapshot.setTransactions]] for the commit `attemptVersion`. + * + * @param oldSnapshot - snapshot corresponding to `attemptVersion` - 1 + * @param oldVersionChecksum - [[VersionChecksum]] corresponding to `attemptVersion` - 1 + * @param attemptVersion - version which we want to commit + * @param actionsToCommit - actions for commit `attemptVersion` + * @return Optional sequence of incrementally computed [[SetTransaction]]s for commit + * `attemptVersion`. + */ + private def incrementallyComputeSetTransactions( + oldSnapshot: Option[Snapshot], + oldVersionChecksum: VersionChecksum, + attemptVersion: Long, + actionsToCommit: Seq[Action]): Option[Seq[SetTransaction]] = { + // Check-1: check conf + if (!spark.conf.get(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC)) { + return None + } + + // Check-2: check `minSetTransactionRetentionTimestamp` is not set + val newMetadataToCommit = actionsToCommit.collectFirst { case m: Metadata => m } + // TODO: Add support for incrementally computing [[SetTransaction]]s even when + // `minSetTransactionRetentionTimestamp` is set. + // We don't incrementally compute [[SetTransaction]]s when user has configured + // `minSetTransactionRetentionTimestamp` as it makes verification non-deterministic. + // Check all places to figure out whether `minSetTransactionRetentionTimestamp` is set: + // 1. oldSnapshot corresponding to `attemptVersion - 1` + // 2. old VersionChecksum's MetaData (corresponding to `attemptVersion-1`) + // 3. new VersionChecksum's MetaData (corresponding to `attemptVersion`) + val setTransactionRetentionTimestampConfigured = + (oldSnapshot.map(_.metadata) ++ Option(oldVersionChecksum.metadata) ++ newMetadataToCommit) + .exists(DeltaLog.minSetTransactionRetentionInterval(_).nonEmpty) + if (setTransactionRetentionTimestampConfigured) return None + + // Check-3: Check old setTransactions are available so that we can incrementally compute new. + val oldSetTransactions = oldVersionChecksum.setTransactions + .getOrElse { return None } + + // Check-4: old/new setTransactions are within the threshold. + val setTransactionsToCommit = actionsToCommit.filter(_.isInstanceOf[SetTransaction]) + val threshold = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_MAX_SET_TRANSACTIONS_IN_CRC) + if (Math.max(setTransactionsToCommit.size, oldSetTransactions.size) > threshold) return None + + // We currently don't attempt incremental [[SetTransaction]] when + // `minSetTransactionRetentionTimestamp` is set. So passing this as None here explicitly. + // We can also ignore file retention because that only affects [[RemoveFile]] actions. + val logReplay = new InMemoryLogReplay( + minFileRetentionTimestamp = 0, + minSetTransactionRetentionTimestamp = None) + + logReplay.append(attemptVersion - 1, oldSetTransactions.toIterator) + logReplay.append(attemptVersion, setTransactionsToCommit.toIterator) + Some(logReplay.getTransactions.toSeq).filter(_.size <= threshold) + } + + /** + * Incrementally compute [[Snapshot.domainMetadata]] for the commit `attemptVersion`. + * + * @param oldVersionChecksum - [[VersionChecksum]] corresponding to `attemptVersion` - 1 + * @param attemptVersion - version which we want to commit + * @param actionsToCommit - actions for commit `attemptVersion` + * @return Sequence of incrementally computed [[DomainMetadata]]s for commit + * `attemptVersion`. + */ + private def incrementallyComputeDomainMetadatas( + oldSnapshot: Option[Snapshot], + oldVersionChecksum: VersionChecksum, + attemptVersion: Long, + actionsToCommit: Seq[Action]): Option[Seq[DomainMetadata]] = { + // Check old DomainMetadatas are available so that we can incrementally compute new. + val oldDomainMetadatas = oldVersionChecksum.domainMetadata + .getOrElse { return None } + val newDomainMetadatas = actionsToCommit.filter(_.isInstanceOf[DomainMetadata]) + + // We only work with DomainMetadata, so RemoveFile and SetTransaction retention don't matter. + val logReplay = new InMemoryLogReplay( + minFileRetentionTimestamp = 0, + minSetTransactionRetentionTimestamp = None) + + val threshold = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_MAX_DOMAIN_METADATAS_IN_CRC) + + logReplay.append(attemptVersion - 1, oldDomainMetadatas.iterator) + logReplay.append(attemptVersion, newDomainMetadatas.iterator) + // We don't truncate the set of DomainMetadata actions. Instead, we either store all of them or + // none of them. The advantage of this is that you can then determine presence based on the + // checksum, i.e. if the checksum contains domain metadatas but it doesn't contain the one you + // are looking for, then it's not there. + // + // It's also worth noting that we can distinguish "no domain metadatas" versus + // "domain metadatas not stored" as [[Some]] vs. [[None]]. + Some(logReplay.getDomainMetadatas.toSeq).filter(_.size <= threshold) + } + + /** + * Incrementally compute [[Snapshot.allFiles]] for the commit `attemptVersion`. + * + * @param oldSnapshot - snapshot corresponding to `attemptVersion` - 1 + * @param oldVersionChecksum - [[VersionChecksum]] corresponding to `attemptVersion` - 1 + * @param attemptVersion - version which we want to commit + * @param numFilesAfterCommit - number of files in the table after the attemptVersion commit. + * @param actionsToCommit - actions for commit `attemptVersion` + * @return Optional sequence of AddFiles which represents the incrementally computed state for + * commit `attemptVersion` + */ + private def incrementallyComputeAddFiles( + oldSnapshot: Option[Snapshot], + oldVersionChecksum: VersionChecksum, + attemptVersion: Long, + numFilesAfterCommit: Long, + actionsToCommit: Seq[Action]): Option[Seq[AddFile]] = { + + // We must enumerate both the pre- and post-commit file lists; give up if they are too big. + val incrementalAllFilesThreshold = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES) + val numFilesBeforeCommit = oldVersionChecksum.numFiles + if (Math.max(numFilesAfterCommit, numFilesBeforeCommit) > incrementalAllFilesThreshold) { + return None + } + + // We try to get files for `attemptVersion - 1` from the old CRC first. If the old CRC doesn't + // have those files, then we will try to get that info from the oldSnapshot (corresponding to + // attemptVersion - 1). Note that oldSnapshot might not be present if another concurrent commits + // have happened in between. In this case we return and not store incrementally computed state + // to crc. + val oldAllFiles = oldVersionChecksum.allFiles + .orElse { + recordFrameProfile("Delta", "VersionChecksum.incrementallyComputeAddFiles") { + oldSnapshot.map(_.allFiles.collect().toSeq) + } + } + .getOrElse { return None } + + val canonicalPath = new DeltaLog.CanonicalPathFunction(() => deltaLog.newDeltaHadoopConf()) + def normalizePath(action: Action): Action = action match { + case af: AddFile => af.copy(path = canonicalPath(af.path)) + case rf: RemoveFile => rf.copy(path = canonicalPath(rf.path)) + case others => others + } + + // We only work with AddFile, so RemoveFile and SetTransaction retention don't matter. + val logReplay = new InMemoryLogReplay( + minFileRetentionTimestamp = 0, + minSetTransactionRetentionTimestamp = None) + + logReplay.append(attemptVersion - 1, oldAllFiles.map(normalizePath).toIterator) + logReplay.append(attemptVersion, actionsToCommit.map(normalizePath).toIterator) + Some(logReplay.allFiles) + } } object RecordChecksum { @@ -426,6 +653,76 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot => false } + /** + * Validate [[Snapshot.allFiles]] against given checksum.allFiles. + * Returns true if validation succeeds, else return false. + * In Unit Tests, this method throws [[IllegalStateException]] so that issues can be caught during + * development. + */ + def validateFileListAgainstCRC(checksum: VersionChecksum, contextOpt: Option[String]): Boolean = { + val fileSortKey = (f: AddFile) => (f.path, f.modificationTime, f.size) + val filesFromCrc = checksum.allFiles.map(_.sortBy(fileSortKey)).getOrElse { return true } + val filesFromStateReconstruction = recordFrameProfile("Delta", "snapshot.allFiles") { + allFilesViaStateReconstruction.collect().toSeq.sortBy(fileSortKey) + } + if (filesFromCrc == filesFromStateReconstruction) return true + + val filesFromCrcWithoutStats = filesFromCrc.map(_.copy(stats = "")) + val filesFromStateReconstructionWithoutStats = + filesFromStateReconstruction.map(_.copy(stats = "")) + val mismatchWithStatsOnly = + filesFromCrcWithoutStats == filesFromStateReconstructionWithoutStats + + if (mismatchWithStatsOnly) { + // Normalize stats in CRC as per the table schema + val filesFromStateReconstructionMap = + filesFromStateReconstruction.map(af => (af.path, af)).toMap + val parser = DeltaFileProviderUtils.createJsonStatsParser(statsSchema) + var normalizedStatsDiffer = false + filesFromCrc.foreach { addFile => + val statsFromSR = filesFromStateReconstructionMap(addFile.path).stats + val statsFromSRParsed = parser(statsFromSR) + val statsFromCrcParsed = parser(addFile.stats) + if (statsFromSRParsed != statsFromCrcParsed) { + normalizedStatsDiffer = true + } + } + if (!normalizedStatsDiffer) return true + } + // If incremental all-files-in-crc validation fails, then there is a possibility that the + // issue is not just with incremental all-files-in-crc computation but with overall incremental + // commits. So run the incremental commit crc validation and find out whether that is also + // failing. + val contextForIncrementalCommitCheck = contextOpt.map(c => s"$c.").getOrElse("") + + "delta.allFilesInCrc.checksumMismatch.validateFileListAgainstCRC" + var errorForIncrementalCommitCrcValidation = "" + val incrementalCommitCrcValidationPassed = try { + validateChecksum(Map("context" -> contextForIncrementalCommitCheck)) + } catch { + case NonFatal(e) => + errorForIncrementalCommitCrcValidation += e.getMessage + false + } + val eventData = Map( + "version" -> version, + "mismatchWithStatsOnly" -> mismatchWithStatsOnly, + "filesCountFromCrc" -> filesFromCrc.size, + "filesCountFromStateReconstruction" -> filesFromStateReconstruction.size, + "filesFromCrc" -> JsonUtils.toJson(filesFromCrc), + "incrementalCommitCrcValidationPassed" -> incrementalCommitCrcValidationPassed, + "errorForIncrementalCommitCrcValidation" -> errorForIncrementalCommitCrcValidation, + "context" -> contextOpt.getOrElse("") + ) + val message = s"Incremental state reconstruction validation failed for version " + + s"$version [${eventData.mkString(",")}]" + logInfo(message) + recordDeltaEvent( + this.deltaLog, + opType = "delta.allFilesInCrc.checksumMismatch.differentAllFiles", + data = eventData) + if (Utils.isTesting) throw new IllegalStateException(message) + false + } /** * Validates the given `checksum` against [[Snapshot.computedState]]. * Returns an tuple of Maps: diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 84c3efb4d62..dfb23a8e0fb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -2535,6 +2535,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite protected def incrementallyDeriveChecksum( attemptVersion: Long, currentTransactionInfo: CurrentTransactionInfo): Option[VersionChecksum] = { + // Don't include [[AddFile]]s in CRC if this commit is modifying the schema of table in some + // way. This is to make sure we don't carry any DROPPED column from previous CRC to this CRC + // forever and can start fresh from next commit. + // If the oldSnapshot itself is missing, we don't incrementally compute the checksum. + val allFilesInCrcWritePathEnabled = + Snapshot.allFilesInCrcWritePathEnabled(spark, snapshot) && + (snapshot.version == -1 || snapshot.metadata.schema == metadata.schema) incrementallyDeriveChecksum( spark, @@ -2545,7 +2552,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite protocol = currentTransactionInfo.protocol, operationName = currentTransactionInfo.op.name, txnIdOpt = Some(currentTransactionInfo.txnId), - previousVersionState = scala.Left(snapshot) + previousVersionState = scala.Left(snapshot), + includeAddFilesInCrc = allFilesInCrcWritePathEnabled ).toOption } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 2b85b2c73de..8340b4aaab7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine +import java.util.{Locale, TimeZone} + import scala.collection.mutable import org.apache.spark.sql.delta.actions._ @@ -513,17 +515,29 @@ class Snapshot( */ def computeChecksum: VersionChecksum = VersionChecksum( txnId = None, - tableSizeBytes = sizeInBytes, - numFiles = numOfFiles, - numMetadata = numOfMetadata, - numProtocol = numOfProtocol, inCommitTimestampOpt = getInCommitTimestampOpt, - setTransactions = checksumOpt.flatMap(_.setTransactions), - domainMetadata = checksumOpt.flatMap(_.domainMetadata), metadata = metadata, protocol = protocol, - histogramOpt = checksumOpt.flatMap(_.histogramOpt), - allFiles = checksumOpt.flatMap(_.allFiles)) + allFiles = checksumOpt.flatMap(_.allFiles), + tableSizeBytes = checksumOpt.map(_.tableSizeBytes).getOrElse(sizeInBytes), + numFiles = checksumOpt.map(_.numFiles).getOrElse(numOfFiles), + numMetadata = checksumOpt.map(_.numMetadata).getOrElse(numOfMetadata), + numProtocol = checksumOpt.map(_.numProtocol).getOrElse(numOfProtocol), + // Only return setTransactions and domainMetadata if they are either already present + // in the checksum or if they have already been computed in the current snapshot. + setTransactions = checksumOpt.flatMap(_.setTransactions) + .orElse { + Option.when(_computedStateTriggered && + // Only extract it from the current snapshot if set transaction + // writes are enabled. + spark.conf.get(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC)) { + setTransactions + } + }, + domainMetadata = checksumOpt.flatMap(_.domainMetadata) + .orElse(Option.when(_computedStateTriggered)(domainMetadata)), + histogramOpt = checksumOpt.flatMap(_.histogramOpt) + ) /** Returns the data schema of the table, used for reading stats */ def tableSchema: StructType = metadata.dataSchema @@ -651,6 +665,76 @@ object Snapshot extends DeltaLogging { } } } + + /** Whether to write allFiles in [[VersionChecksum.allFiles]] */ + private[delta] def allFilesInCrcWritePathEnabled( + spark: SparkSession, + snapshot: Snapshot): Boolean = { + // disable if config is off. + if (!spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED)) return false + + // Also disable if all stats (structs/json) are disabled in checkpoints. + // When checkpoint stats are disabled (both in terms of structs/json), then the + // snapshot.allFiles from state reconstruction may/may not have stats (files coming from + // checkpoint won't have stats and files coming from deltas will have stats). + // But CRC.allFiles will have stats as VersionChecksum.allFiles is created + // incrementally using each commit. To prevent this inconsistency, we disable the feature when + // both json/struct stats are disabled for checkpoint. + if (!Checkpoints.shouldWriteStatsAsJson(snapshot) && + !Checkpoints.shouldWriteStatsAsStruct(spark.sessionState.conf, snapshot)) { + return false + } + + // Disable if table is configured to collect stats on more than the default number of columns + // to avoid bloating the .crc file. + val numIndexedColsThreshold = spark.sessionState.conf + .getConf(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_INDEXED_COLS) + .getOrElse(DataSkippingReader.DATA_SKIPPING_NUM_INDEXED_COLS_DEFAULT_VALUE) + val configuredNumIndexCols = + DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.fromMetaData(snapshot.metadata) + if (configuredNumIndexCols > numIndexedColsThreshold) return false + + true + } + + /** + * If true, force a verification of [[VersionChecksum.allFiles]] irrespective of the value of + * DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED flag (if they're written). + */ + private[delta] def allFilesInCrcVerificationForceEnabled( + spark: SparkSession): Boolean = { + val forceVerificationForNonUTCEnabled = spark.sessionState.conf.getConf( + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_FORCE_VERIFICATION_MODE_FOR_NON_UTC_ENABLED) + if (!forceVerificationForNonUTCEnabled) return false + + // This is necessary because timestamps for older dates (pre-1883) are not correctly serialized + // in non-UTC timezones due to unusual historical offsets (e.g. -07:52:58 for LA). + // These serialization discrepancies can lead to spurious CRC verification failures. + // By forcing verification of all files in non-UTC environments, we can continue to detect and + // work towards fixing this issues. + // Note: Display Name for UTC is Etc/UTC, so we check for UTC substring in the timezone. + val sparkSessionTimeZone = spark.sessionState.conf.sessionLocalTimeZone + val defaultJVMTimeZone = TimeZone.getDefault.getID + val systemTimeZone = System.getProperty("user.timezone", "Etc/UTC") + + val isNonUtcTimeZone = List(sparkSessionTimeZone, defaultJVMTimeZone, systemTimeZone) + .exists(!_.toLowerCase(Locale.ROOT).contains("utc")) + + isNonUtcTimeZone + } + + /** + * If true, do verification of [[VersionChecksum.allFiles]] computed by incremental commit CRC + * by doing state-reconstruction. + */ + private[delta] def allFilesInCrcVerificationEnabled( + spark: SparkSession, + snapshot: Snapshot): Boolean = { + val verificationConfEnabled = spark.sessionState.conf.getConf( + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED) + val shouldVerify = verificationConfEnabled || allFilesInCrcVerificationForceEnabled(spark) + allFilesInCrcWritePathEnabled(spark, snapshot) && shouldVerify + } } /** @@ -704,6 +788,7 @@ class DummySnapshot( override protected lazy val computedState: SnapshotState = initialState(metadata, protocol) override protected lazy val getInCommitTimestampOpt: Option[Long] = None + _computedStateTriggered = true // The [[InitialSnapshot]] is not backed by any external commit-coordinator. override val tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient] = None diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 3340422f05d..569ea48ae77 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -1249,19 +1249,48 @@ trait SnapshotManagement { self: DeltaLog => committedVersion: Long): Snapshot = { logInfo( log"Creating a new snapshot v${MDC(DeltaLogKeys.VERSION, initSegment.version)} " + - log"for commit version ${MDC(DeltaLogKeys.VERSION2, committedVersion)}") + log"for commit version ${MDC(DeltaLogKeys.VERSION2, committedVersion)}") + // Guard against race condition when a txn commits after this txn but before + // reaching createLogSegment(...) above. + var checksumContext = "incrementalCommit" + val passedChecksumIsUsable = newChecksumOpt.isDefined && committedVersion == initSegment.version + val snapChecksumOpt = newChecksumOpt + .filter(_ => passedChecksumIsUsable) + .orElse { + checksumContext = "fallbackToReadChecksumFile" + readChecksum(initSegment.version) + } + + def createSnapshotWithCrc(checksumOpt: Option[VersionChecksum]): Snapshot = { + createSnapshot( + initSegment, + tableCommitCoordinatorClientOpt, + catalogTableOpt, + checksumOpt) + } + + var newSnapshot = createSnapshotWithCrc(snapChecksumOpt) + + // Skip validation in 0th commit when number of files in underlying snapshot is 0 in order to + // avoid state reconstruction - since there is nothing to verify from allFilesInCrc perspective. + val skipValidationForZerothCommit = committedVersion == 0 && newChecksumOpt.forall { crc => + crc.numFiles == 0 && crc.allFiles.forall(_.isEmpty) + } + if (passedChecksumIsUsable && !skipValidationForZerothCommit && + Snapshot.allFilesInCrcVerificationEnabled(spark, unsafeVolatileSnapshot)) { + snapChecksumOpt.collect { case crc if + !newSnapshot.validateFileListAgainstCRC(crc, contextOpt = Some("triggeredFromCommit")) => + // If the verification for [[VersionChecksum.allFiles]] failed, then strip off `allFiles` + // and create the snapshot again with new CRC (without addFiles in it). + newSnapshot = createSnapshotWithCrc(snapChecksumOpt.map(_.copy(allFiles = None))) + } + } - val newSnapshot = createSnapshot( - initSegment, - tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, - catalogTableOpt = catalogTableOpt, - checksumOpt = newChecksumOpt - ) // Verify when enabled or when tests run to help future proof IC if (shouldVerifyIncrementalCommit) { val crcIsValid = try { // NOTE: Validation is a no-op with incremental commit disabled. - newSnapshot.validateChecksum(Map("context" -> "incrementalCommit")) + newSnapshot.validateChecksum(Map("context" -> checksumContext)) } catch { case _: IllegalStateException if !Utils.isTesting => false } @@ -1271,12 +1300,8 @@ trait SnapshotManagement { self: DeltaLog => // a checksum based on state reconstruction. Disable incremental commit to avoid // further error triggers in this session. spark.sessionState.conf.setConf(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED, false) - return createSnapshotAfterCommit( - initSegment, - newChecksumOpt = None, - tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, - catalogTableOpt, - committedVersion) + spark.sessionState.conf.setConf(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC, false) + return createSnapshotWithCrc(checksumOpt = None) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala index 3a80bdb278f..33fb47993d4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala @@ -65,6 +65,9 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot => // For implicits which re-use Encoder: import implicits._ + /** Whether computedState is already computed or not */ + @volatile protected var _computedStateTriggered: Boolean = false + /** A map to look up transaction version by appId. */ lazy val transactions: Map[String, Long] = setTransactions.map(t => t.appId -> t.version).toMap @@ -114,6 +117,7 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot => throw DeltaErrors.actionNotFoundException("metadata", version) } + _computedStateTriggered = true _computedState } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala index ce8b41fdbc9..1b63dfdd39a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala @@ -102,6 +102,9 @@ trait DeltaLogKeysBase { case object PATH extends LogKeyShims case object PATH2 extends LogKeyShims case object PATHS extends LogKeyShims + case object PATHS2 extends LogKeyShims + case object PATHS3 extends LogKeyShims + case object PATHS4 extends LogKeyShims case object PROTOCOL extends LogKeyShims case object QUERY_ID extends LogKeyShims case object SCHEMA extends LogKeyShims diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index fffd59e9682..fc99ea8e2b4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1073,6 +1073,77 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_WRITE_SET_TRANSACTIONS_IN_CRC = + buildConf("setTransactionsInCrc.writeOnCommit") + .internal() + .doc("When enabled, each commit will incrementally compute and cache all SetTransaction" + + " actions in the .crc file. Note that this only happens when incremental commits" + + s" are enabled (${INCREMENTAL_COMMIT_ENABLED.key})") + .booleanConf + .createWithDefault(true) + + val DELTA_MAX_SET_TRANSACTIONS_IN_CRC = + buildConf("setTransactionsInCrc.maxAllowed") + .internal() + .doc("Threshold of the number of SetTransaction actions below which this optimization" + + " should be enabled") + .longConf + .createWithDefault(100) + + val DELTA_MAX_DOMAIN_METADATAS_IN_CRC = + buildConf("domainMetadatasInCrc.maxAllowed") + .internal() + .doc("Threshold of the number of DomainMetadata actions below which this optimization" + + " should be enabled") + .longConf + .createWithDefault(10) + + val DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES = + buildConf("allFilesInCrc.thresholdNumFiles") + .internal() + .doc("Threshold of the number of AddFiles below which AddFiles will be added to CRC.") + .intConf + .createWithDefault(50) + + val DELTA_ALL_FILES_IN_CRC_ENABLED = + buildConf("allFilesInCrc.enabled") + .internal() + .doc("When enabled, [[Snapshot.allFiles]] will be stored in the .crc file when the " + + "length is less than the threshold specified by " + + s"${DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES.key}. " + + "Note that this config only takes effect when incremental commits are enabled " + + s"(${INCREMENTAL_COMMIT_ENABLED.key})." + ) + .booleanConf + .createWithDefault(true) + + val DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED = + buildConf("allFilesInCrc.verificationMode.enabled") + .internal() + .doc(s"This will be effective only if ${DELTA_ALL_FILES_IN_CRC_ENABLED.key} is set. When" + + " enabled, We will have additional verification of the incrementally computed state by" + + " doing an actual state reconstruction on every commit.") + .booleanConf + .createWithDefault(false) + + val DELTA_ALL_FILES_IN_CRC_FORCE_VERIFICATION_MODE_FOR_NON_UTC_ENABLED = + buildConf("allFilesInCrc.verificationMode.forceOnNonUTC.enabled") + .internal() + .doc(s"This will be effective only if " + + s"${DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key} is not set. When enabled, we " + + s"will force verification of the incrementally computed state by doing an actual state " + + s"reconstruction on every commit for tables that are not using UTC timezone.") + .booleanConf + .createWithDefault(true) + + val DELTA_ALL_FILES_IN_CRC_THRESHOLD_INDEXED_COLS = + buildConf("allFilesInCrc.thresholdIndexedCols") + .internal() + .doc("If the delta table is configured to collect stats on more columns than this" + + " threshold, then disable storage of `[[Snapshot.allFiles]]` in the .crc file.") + .intConf + .createOptional + val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED = buildConf("checkpoint.exceptionThrowing.enabled") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala index 0dc94dda59d..5e4e9f2f62e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.delta import java.io.File +import java.util.TimeZone import com.databricks.spark.util.Log4jUsageLogger import org.apache.spark.sql.delta.DeltaTestUtils._ @@ -69,13 +70,22 @@ class ChecksumSuite testChecksumFile(writeChecksumEnabled = false) } + private def setTimeZone(timeZone: String): Unit = { + spark.sql(s"SET spark.sql.session.timeZone = $timeZone") + TimeZone.setDefault(TimeZone.getTimeZone(timeZone)) + } + test("Incremental checksums: post commit snapshot should have a checksum " + "without triggering state reconstruction") { for (incrementalCommitEnabled <- BOOLEAN_DOMAIN) { withSQLConf( DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false", - DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> incrementalCommitEnabled.toString) { + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> incrementalCommitEnabled.toString + ) { withTempDir { tempDir => + // Set the timezone to UTC to avoid triggering force verification of all files in CRC + // for non utc environments. + setTimeZone("UTC") val df = spark.range(1) df.write.format("delta").mode("append").save(tempDir.getCanonicalPath) val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) @@ -199,7 +209,8 @@ class ChecksumSuite withSQLConf( DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> "true", DeltaSQLConf.DELTA_CHECKSUM_MISMATCH_IS_FATAL.key -> "false", - DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true" + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true", + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false" ) { withTempDir { tempDir => import testImplicits._ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala new file mode 100644 index 00000000000..0f7ad328747 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala @@ -0,0 +1,507 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +// scalastyle:off import.ordering.noEmptyLine +import java.util.TimeZone +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.Duration + +import com.databricks.spark.util.{Log4jUsageLogger, UsageRecord} +import org.apache.spark.sql.delta.DeltaTestUtils.{collectUsageLogs, BOOLEAN_DOMAIN} +import org.apache.spark.sql.delta.concurrency._ +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.ThreadUtils + +class DeltaAllFilesInCrcSuite + extends QueryTest + with SharedSparkSession + with TransactionExecutionTestMixin + with DeltaSQLCommandTest + with PhaseLockingTestMixin { + protected override def sparkConf = super.sparkConf + .set(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key, "true") + .set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key, "true") + // Set the threshold to a very high number so that this test suite continues to use all files + // from CRC. + .set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES.key, "10000") + // needed for DELTA_ALL_FILES_IN_CRC_ENABLED + .set(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key, "true") + // Turn on verification by default in the tests + .set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key, "true") + // Turn off force verification for non-UTC timezones by default in the tests + .set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_FORCE_VERIFICATION_MODE_FOR_NON_UTC_ENABLED.key, + "false") + + private def setTimeZone(timeZone: String): Unit = { + spark.sql(s"SET spark.sql.session.timeZone = $timeZone") + TimeZone.setDefault(TimeZone.getTimeZone(timeZone)) + } + + /** Filter usage records for specific `opType` */ + protected def filterUsageRecords( + usageRecords: Seq[UsageRecord], + opType: String): Seq[UsageRecord] = { + usageRecords.filter { r => + r.tags.get("opType").contains(opType) || r.opType.map(_.typeName).contains(opType) + } + } + + /** Deletes all delta/crc/checkpoint files later that given `version` for the delta table */ + private def deleteDeltaFilesLaterThanVersion(deltaLog: DeltaLog, version: Long): Unit = { + val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf()) + deltaLog.listFrom(version + 1).filter { f => + FileNames.isDeltaFile(f) || FileNames.isChecksumFile(f) || FileNames.isCheckpointFile(f) + }.foreach(f => fs.delete(f.getPath, true)) + DeltaLog.clearCache() + assert(DeltaLog.forTable(spark, deltaLog.dataPath).update().version === version) + } + + test("allFiles are written to CRC and different threshold configs are respected") { + withTempDir { dir => + val path = dir.getCanonicalPath + + // Helper method to perform a commit with 10 AddFile actions to the table. + def writeToTable( + version: Long, newFilesToWrite: Int, expectedFilesInCRCOption: Option[Long]): Unit = { + spark.range(start = 1, end = 100, step = 1, numPartitions = newFilesToWrite) + .toDF("c1") + .withColumn("c2", col("c1")).withColumn("c3", col("c1")) + .write.format("delta").mode("append").save(path) + assert(deltaLog.update().version === version) + assert(deltaLog.snapshot.checksumOpt.get.allFiles.map(_.size) === expectedFilesInCRCOption) + assert(deltaLog.readChecksum(version).get.allFiles.map(_.size) === expectedFilesInCRCOption) + } + + def deltaLog: DeltaLog = DeltaLog.forTable(spark, path) + + withSQLConf( + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES.key -> "55", + DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED.key -> "false") { + // Commit-0: Add 10 new files to table. Total files (10) is less than threshold. + writeToTable(version = 0, newFilesToWrite = 10, expectedFilesInCRCOption = Some(10)) + + withSQLConf(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false") { + // Commit-1: Add 10 more files to table. Total files (20) is less than threshold. + // Still these won't be written to CRC as the conf is explicitly disabled. + writeToTable(version = 1, newFilesToWrite = 10, expectedFilesInCRCOption = None) + } + // Commit-2: Add 20 more files to table. Total files (40) is less than threshold. + writeToTable(version = 2, newFilesToWrite = 20, expectedFilesInCRCOption = Some(40)) + // Commit-3: Add 13 more files to table. Total files (53) is less than threshold. + writeToTable(version = 3, newFilesToWrite = 13, expectedFilesInCRCOption = Some(53)) + // Commit-4: Add 7 more files to table. Total files (60) is greater than the threshold (55). + // So files won't be persisted to CRC. + writeToTable(version = 4, newFilesToWrite = 7, expectedFilesInCRCOption = None) + + // Commit-5: Delete all rows except with value=1. After this step, very few files will + // remain in table, still they won't be persisted to CRC as previous version had more than + // 55 files. We write files to CRC if both previous commit and this commit has files <= 55. + sql(s"DELETE FROM delta.`$path` WHERE c1 != 1").collect() + assert(deltaLog.update().version === 5) + assert(deltaLog.snapshot.checksumOpt.get.allFiles === None) + val fileCountAfterDeleteCommand = deltaLog.snapshot.checksumOpt.get.numFiles + assert(fileCountAfterDeleteCommand < 55) + + // Commit-6: Commit 1 new file again. Now previous-version also had < 55 files. This version + // also has < 55 files. + writeToTable(version = 6, newFilesToWrite = 1, + expectedFilesInCRCOption = Some(fileCountAfterDeleteCommand + 1)) + + withSQLConf(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_INDEXED_COLS.key -> "2") { + // Table collects stats on 3 cols (col1/col2/col3) which is more than threshold. + // So optimization should be disabled by default. + writeToTable(version = 7, newFilesToWrite = 1, expectedFilesInCRCOption = None) + } + + writeToTable(version = 8, newFilesToWrite = 1, + expectedFilesInCRCOption = Some(fileCountAfterDeleteCommand + 3)) + + // Commit-7: Delete all rows from table + sql(s"DELETE FROM delta.`$path` WHERE c1 >= 0").collect() + assert(deltaLog.update().version === 9) + assert(deltaLog.snapshot.checksumOpt.get.allFiles === Some(Seq())) + } + } + } + + + test("test all-files-in-crc verification failure also triggers and logs" + + " incremental-commit verification result") { + withTempDir { tempDir => + withSQLConf( + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES.key -> "100", + // Disable incremental commit force verifications in UTs - to mimic prod behavior + DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS.key -> "false", + // Enable all-files-in-crc verification mode + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key -> "true", + DeltaSQLConf.DELTA_COLLECT_STATS.key -> "false") { + + val df = spark.range(2).coalesce(1).toDF() + df.write.format("delta").save(tempDir.toString()) + val deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + assert(deltaLog.update().allFiles.collect().map(_.numPhysicalRecords).forall(_.isEmpty)) + + val records = Log4jUsageLogger.track { + val executor = ThreadUtils.newDaemonSingleThreadExecutor(threadName = "executor-txn-A") + try { + val query = s"DELETE from delta.`${tempDir.getAbsolutePath}` WHERE id >= 0" + val (observer, future) = runQueryWithObserver(name = "A", executor, query) + observer.phases.initialPhase.entryBarrier.unblock() + observer.phases.preparePhase.entryBarrier.unblock() + // Make sure that delete query has run the actual computation and has reached + // the 'prepare commit' phase. i.e. it just wants to commit. + busyWaitFor(observer.phases.preparePhase.hasLeft, timeout) + // Now delete and recreate the complete table. + deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf()) + .delete(deltaLog.dataPath, true) + spark.range(3, 4).coalesce(1).toDF().write.format("delta").save(tempDir.toString()) + + // Allow the delete query to commit. + unblockCommit(observer) + waitForCommit(observer) + // Query will fail due to incremental-state-reconstruction validation failure. + // Note that this failure happens only in test. In prod, this would have just logged + // the incremental-state-reconstruction failure and query would have passed. + val ex = intercept[SparkException] { ThreadUtils.awaitResult(future, Duration.Inf) } + val message = ex.getMessage + "\n" + ex.getCause.getMessage + assert(message.contains("Incremental state reconstruction validation failed")) + } finally { + executor.shutdownNow() + executor.awaitTermination(timeout.toMillis, TimeUnit.MILLISECONDS) + } + } + + // We will see all files in CRC verification failure. + // This will trigger the incremental commit verification which will fail. + val allFilesInCrcValidationFailureRecords = + filterUsageRecords(records, "delta.allFilesInCrc.checksumMismatch.differentAllFiles") + assert(allFilesInCrcValidationFailureRecords.size === 1) + val eventData = + JsonUtils.fromJson[Map[String, String]](allFilesInCrcValidationFailureRecords.head.blob) + assert(eventData("version").toLong === 1L) + assert(eventData("mismatchWithStatsOnly").toBoolean === false) + val expectedFilesCountFromCrc = 1L + assert(eventData("filesCountFromCrc").toLong === expectedFilesCountFromCrc) + assert(eventData("filesCountFromStateReconstruction").toLong === + expectedFilesCountFromCrc + 1) + assert(eventData("incrementalCommitCrcValidationPassed").toBoolean === false) + val expectedValidationFailureMessage = "Number of files - Expected: 1 Computed: 2" + assert(eventData("errorForIncrementalCommitCrcValidation").contains( + expectedValidationFailureMessage)) + } + } + } + + test("schema changing metadata operations should disable putting AddFile" + + " actions in crc but other metadata operations should not") { + withTempDir { dir => + val path = dir.getCanonicalPath + spark.range(1, 5).toDF("c1").withColumn("c2", col("c1")) + .write.format("delta").mode("append").save(path) + + def deltaLog: DeltaLog = DeltaLog.forTable(spark, path) + assert(deltaLog.update().checksumOpt.get.allFiles.nonEmpty) + sql(s"ALTER TABLE delta.`$dir` CHANGE COLUMN c2 FIRST") + assert(deltaLog.update().checksumOpt.get.allFiles.isEmpty) + sql(s"ALTER TABLE delta.`$dir` SET TBLPROPERTIES ('a' = 'b')") + assert(deltaLog.update().checksumOpt.get.allFiles.nonEmpty) + } + } + + test("schema changing metadata operations on empty tables should not disable putting " + + "AddFile actions in crc") { + withTempDir { dir => + val path = dir.getCanonicalPath + def deltaLog: DeltaLog = DeltaLog.forTable(spark, path) + + def assertNoStateReconstructionTriggeredWhenPerfPackEnabled(f: => Unit): Unit = { + val oldSnapshot = deltaLog.update() + f + val newSnapshot = deltaLog.update() + } + + withSQLConf( + // Disable test flags to make the behaviors verified in this test close to prod + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key -> "false", + DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS.key -> "false" + ) { + assertNoStateReconstructionTriggeredWhenPerfPackEnabled { + // Create a table with an empty schema so that the next write will change the schema + sql(s"CREATE TABLE delta.`$path` USING delta LOCATION '$path'") + } + assert(deltaLog.update().checksumOpt.get.allFiles == Option(Nil)) + + assertNoStateReconstructionTriggeredWhenPerfPackEnabled { + // Write zero files but update the table schema + spark.range(1, 5).filter("false").write.format("delta") + .option("mergeSchema", "true").mode("append").save(path) + } + // Make sure writing zero files still make a Delta commit so that this test is valid + assert(deltaLog.update().version == 1) + assert(deltaLog.update().checksumOpt.get.allFiles == Option(Nil)) + + assertNoStateReconstructionTriggeredWhenPerfPackEnabled { + // Write some files to the table + spark.range(1, 5).write.format("delta").mode("append").save(path) + } + assert(deltaLog.update().checksumOpt.get.allFiles.nonEmpty) + assert(deltaLog.update().checksumOpt.get.allFiles.get.size > 0) + } + } + } + private def withCrcVerificationEnabled(testCode: => Unit): Unit = { + withSQLConf(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key -> "true") { + testCode + } + } + + private def withCrcVerificationDisabled(testCode: => Unit): Unit = { + withSQLConf(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key -> "false") { + testCode + } + } + + private def write(deltaLog: DeltaLog, numFiles: Int, expectedFilesInCrc: Option[Int]): Unit = { + spark + .range(1, 100, 1, numPartitions = numFiles) + .write + .format("delta") + .mode("append") + .save(deltaLog.dataPath.toString) + assert(deltaLog.snapshot.checksumOpt.get.allFiles.map(_.size) === expectedFilesInCrc) + } + + private def corruptCRCNumFiles(deltaLog: DeltaLog, version: Int): Unit = { + val crc = deltaLog.readChecksum(version).get + assert(crc.allFiles.nonEmpty) + val filesInCrc = crc.allFiles.get + + // Corrupt the CRC + val corruptedCrc = crc.copy(allFiles = + Some(filesInCrc.dropRight(1)), numFiles = crc.numFiles - 1) + val checksumFilePath = FileNames.checksumFile(deltaLog.logPath, version) + deltaLog.store.write( + checksumFilePath, + actions = Seq(JsonUtils.toJson(corruptedCrc)).toIterator, + overwrite = true, + hadoopConf = deltaLog.newDeltaHadoopConf()) + } + + private def corruptCRCAddFilesModificationTime(deltaLog: DeltaLog, version: Int): Unit = { + val crc = deltaLog.readChecksum(version).get + assert(crc.allFiles.nonEmpty) + val filesInCrc = crc.allFiles.get + + // Corrupt the CRC + val corruptedCrc = crc.copy(allFiles = Some(filesInCrc.map(_.copy(modificationTime = 23)))) + val checksumFilePath = FileNames.checksumFile(deltaLog.logPath, version) + deltaLog.store.write( + checksumFilePath, + actions = Seq(JsonUtils.toJson(corruptedCrc)).toIterator, + overwrite = true, + hadoopConf = deltaLog.newDeltaHadoopConf()) + } + + private def checkIfCrcModificationTimeCorrupted( + deltaLog: DeltaLog, + expectCorrupted: Boolean): Unit = { + val crc = deltaLog.readChecksum(deltaLog.update().version).get + assert(crc.allFiles.nonEmpty) + assert(crc.allFiles.get.count(_.modificationTime == 23L) > 0 === expectCorrupted) + } + + test("allFilesInCRC verification with flag manipulation for UTC timezone") { + withSQLConf( + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_FORCE_VERIFICATION_MODE_FOR_NON_UTC_ENABLED.key -> + "true") { + setTimeZone("UTC") + withTempDir { dir => + var deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + + // Commit 0: Initial write with verification enabled + withCrcVerificationEnabled { + write(deltaLog, numFiles = 10, expectedFilesInCrc = Some(10)) + } + + // Corrupt the CRC at Version 0 + corruptCRCAddFilesModificationTime(deltaLog, version = 0) + DeltaLog.clearCache() + deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + + // Commit 1: Write with verification flag off and Verify Incremental CRC at version 1 is + // also corrupted + withCrcVerificationDisabled { + write(deltaLog, numFiles = 10, expectedFilesInCrc = Some(20)) + checkIfCrcModificationTimeCorrupted(deltaLog, expectCorrupted = true) + } + + // Commit 2: Write with verification flag on and it should fail because the AddFiles from + // base CRC at Version 1 are incorrect. + withCrcVerificationEnabled { + val usageRecords = + collectUsageLogs("delta.allFilesInCrc.checksumMismatch.differentAllFiles") { + intercept[IllegalStateException] { + write(deltaLog, numFiles = 10, expectedFilesInCrc = None) + } + } + assert(usageRecords.size === 1) + } + + // Commit 3: Write with verification flag on and it should pass since the base CRC is not + // corrupted anymore. + withCrcVerificationEnabled { + write(deltaLog, numFiles = 10, expectedFilesInCrc = Some(40)) + checkIfCrcModificationTimeCorrupted(deltaLog, expectCorrupted = false) + } + } + } + } + + test("allFilesInCRC verification with flag manipulation for non-UTC timezone") { + withSQLConf( + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_FORCE_VERIFICATION_MODE_FOR_NON_UTC_ENABLED.key -> + "true") { + setTimeZone("America/Los_Angeles") + withTempDir { dir => + var deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + + // Commit 0: Initial write with verification enabled + withCrcVerificationEnabled { + write(deltaLog, numFiles = 10, expectedFilesInCrc = Some(10)) + } + + // Corrupt the CRC at Version 0 + corruptCRCAddFilesModificationTime(deltaLog, version = 0) + DeltaLog.clearCache() + deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + + // Commit 1: Write with verification flag off and Verify Incremental CRC is still validated + // because timezone is non-UTC. + withCrcVerificationDisabled { + val usageRecords = + collectUsageLogs("delta.allFilesInCrc.checksumMismatch.differentAllFiles") { + intercept[IllegalStateException] { + write(deltaLog, numFiles = 10, expectedFilesInCrc = None) + } + } + assert(usageRecords.size === 1) + } + + // Commit 2: Write with verification flag on and it should pass since the base CRC is not + // corrupted anymore. + withCrcVerificationEnabled { + write(deltaLog, numFiles = 10, expectedFilesInCrc = Some(30)) + checkIfCrcModificationTimeCorrupted(deltaLog, expectCorrupted = false) + } + } + } + } + + test("Verify aggregate stats are matched even when allFilesInCrc " + + "verification is disabled") { + setTimeZone("UTC") + withSQLConf(DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS.key -> "false") { + withCrcVerificationDisabled { + withTempDir { dir => + var deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + + // Commit 0 + write(deltaLog, numFiles = 10, expectedFilesInCrc = Some(10)) + + // Corrupt the CRC at Version 0 + corruptCRCNumFiles(deltaLog, version = 0) + DeltaLog.clearCache() + deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + + // Commit 1: Verify aggregate stats are matched even when verification is off + val usageRecords = collectUsageLogs("delta.allFilesInCrc.checksumMismatch.aggregated") { + intercept[IllegalStateException] { + write(deltaLog, numFiles = 10, expectedFilesInCrc = None) + } + } + assert(usageRecords.size === 1) + } + } + } + } + + test("allFilesInCRC validation during checkpoint must be opposite of per-commit " + + "validation") { + withTempDir { dir => + var deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + withCrcVerificationDisabled { + write(deltaLog, numFiles = 10, expectedFilesInCrc = Some(10)) + write(deltaLog, numFiles = 10, expectedFilesInCrc = Some(20)) + corruptCRCAddFilesModificationTime(deltaLog, version = 1) + + DeltaLog.clearCache() + deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + deltaLog.update() + + // Checkpoint should validate Checksum even when per-commit verification is disabled. + val usageRecords = + collectUsageLogs("delta.allFilesInCrc.checksumMismatch.differentAllFiles") { + intercept[IllegalStateException] { + deltaLog.checkpoint() + } + } + assert(usageRecords.size === 1) + assert(usageRecords.head.blob.contains("\"context\":" + "\"triggeredFromCheckpoint\""), + usageRecords.head) + + write(deltaLog, numFiles = 10, expectedFilesInCrc = Some(30)) + } + + // Checkpoint should not validate Checksum when per-commit verification is enabled. + withCrcVerificationEnabled { + val usageRecords = + collectUsageLogs("delta.allFilesInCrc.checksumMismatch.differentAllFiles") { + deltaLog.checkpoint() + } + assert(usageRecords.isEmpty) + } + } + } + + test("allFilesInCrcVerificationForceEnabled works as expected") { + // Test with the non-UTC force verification conf enabled. + withSQLConf( + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_FORCE_VERIFICATION_MODE_FOR_NON_UTC_ENABLED.key -> + "true") { + setTimeZone("UTC") + assert(!Snapshot.allFilesInCrcVerificationForceEnabled(spark)) + setTimeZone("America/Los_Angeles") + assert(Snapshot.allFilesInCrcVerificationForceEnabled(spark)) + } + // Test with the non-UTC force verification conf disabled. + withSQLConf( + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_FORCE_VERIFICATION_MODE_FOR_NON_UTC_ENABLED.key -> + "false") { + assert(!Snapshot.allFilesInCrcVerificationForceEnabled(spark)) + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaIncrementalSetTransactionsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaIncrementalSetTransactionsSuite.scala new file mode 100644 index 00000000000..c780f2bb5f5 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaIncrementalSetTransactionsSuite.scala @@ -0,0 +1,450 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.io.File +import java.util.UUID + +// scalastyle:off import.ordering.noEmptyLine +import com.databricks.spark.util.UsageRecord +import org.apache.spark.sql.delta.DeltaTestUtils.{collectUsageLogs, createTestAddFile, BOOLEAN_DOMAIN} +import org.apache.spark.sql.delta.actions.{AddFile, SetTransaction, SingleAction} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.test.DeltaTestImplicits._ +import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} + +import org.apache.spark.sql.{QueryTest, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.test.SharedSparkSession + +class DeltaIncrementalSetTransactionsSuite + extends QueryTest + with DeltaSQLCommandTest + with SharedSparkSession { + + protected override def sparkConf = super.sparkConf + .set(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key, "true") + .set(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key, "true") + // needed for DELTA_WRITE_SET_TRANSACTIONS_IN_CRC + .set(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key, "true") + // This test suite is sensitive to stateReconstruction we do at different places. So we disable + // [[INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS]] to simulate prod behaviour. + .set(DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS.key, "false") + + /** + * Validates the result of [[Snapshot.setTransactions]] API for the latest snapshot of this + * [[DeltaLog]]. + */ + private def assertSetTransactions( + deltaLog: DeltaLog, + expectedTxns: Map[String, Long], + viaCRC: Boolean = false + ): Unit = { + val snapshot = deltaLog.update() + if (viaCRC) { + assert(snapshot.checksumOpt.flatMap(_.setTransactions).isDefined) + snapshot.checksumOpt.flatMap(_.setTransactions).foreach { setTxns => + assert(setTxns.map(txn => (txn.appId, txn.version)).toMap === expectedTxns) + } + } + assert(snapshot.setTransactions.map(txn => (txn.appId, txn.version)).toMap === expectedTxns) + assert(snapshot.numOfSetTransactions === expectedTxns.size) + assert(expectedTxns === snapshot.transactions) + } + + + /** Commit given [[SetTransaction]] to `deltaLog`` */ + private def commitSetTxn( + deltaLog: DeltaLog, appId: String, version: Long, lastUpdated: Long): Unit = { + commitSetTxn(deltaLog, Seq(SetTransaction(appId, version, Some(lastUpdated)))) + } + + /** Commit given [[SetTransaction]]s to `deltaLog`` */ + private def commitSetTxn( + deltaLog: DeltaLog, + setTransactions: Seq[SetTransaction]): Unit = { + deltaLog.startTransaction().commit( + setTransactions :+ + AddFile( + s"file-${UUID.randomUUID().toString}", + partitionValues = Map.empty, + size = 1L, + modificationTime = 1L, + dataChange = true), + DeltaOperations.Write(SaveMode.Append) + ) + } + + test( + "set-transaction tracking starts from 0th commit in CRC" + ) { + withSQLConf( + DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true" + ) { + val tbl = "test_table" + withTable(tbl) { + sql(s"CREATE TABLE $tbl USING delta as SELECT 1 as value") // 0th commit + val log = DeltaLog.forTable(spark, TableIdentifier(tbl)) + log.update() + // CRC for 0th commit has SetTransactions defined and are empty Seq. + assert(log.unsafeVolatileSnapshot.checksumOpt.flatMap(_.setTransactions).isDefined) + assert(log.unsafeVolatileSnapshot.checksumOpt.flatMap(_.setTransactions).get.isEmpty) + assertSetTransactions(log, expectedTxns = Map()) + + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) // 1st commit + assertSetTransactions(log, expectedTxns = Map("app-1" -> 1)) + commitSetTxn(log, "app-1", version = 3, lastUpdated = 2) // 2nd commit + assertSetTransactions(log, expectedTxns = Map("app-1" -> 3)) + commitSetTxn(log, "app-2", version = 100, lastUpdated = 3) // 3rd commit + assertSetTransactions(log, expectedTxns = Map("app-1" -> 3, "app-2" -> 100)) + commitSetTxn(log, "app-1", version = 4, lastUpdated = 4) // 4th commit + assertSetTransactions(log, expectedTxns = Map("app-1" -> 4, "app-2" -> 100)) + + // 5th commit - Commit multiple [[SetTransaction]] in single commit + commitSetTxn( + log, + setTransactions = Seq( + SetTransaction("app-1", version = 100, lastUpdated = Some(4)), + SetTransaction("app-3", version = 300, lastUpdated = Some(4)) + )) + assertSetTransactions( + log, + expectedTxns = Map("app-1" -> 100, "app-2" -> 100, "app-3" -> 300)) + } + } + } + + test("set-transaction tracking starts for old tables after new commits") { + withSQLConf(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "false") { + val tbl = "test_table" + withTable(tbl) { + // Create a table with feature disabled. So 0th/1st commit won't do SetTransaction + // tracking in CRC. + sql(s"CREATE TABLE $tbl USING delta as SELECT 1 as value") // 0th commit + + def deltaLog: DeltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl)) + + assert(deltaLog.update().checksumOpt.get.setTransactions.isEmpty) + commitSetTxn(deltaLog, "app-1", version = 1, lastUpdated = 1) // 1st commit + assert(deltaLog.update().checksumOpt.get.setTransactions.isEmpty) + + // Enable the SetTransaction tracking config and do more commits in the table. + withSQLConf(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true") { + DeltaLog.clearCache() + commitSetTxn(deltaLog, "app-1", version = 2, lastUpdated = 2) // 2nd commit + // By default, commit doesn't trigger stateReconstruction and so the + // incremental CRC won't have setTransactions present until `setTransactions` API is + // explicitly invoked before the commit. + assert(deltaLog.update().checksumOpt.get.setTransactions.isEmpty) // crc has no set-txn + assertSetTransactions(deltaLog, expectedTxns = Map("app-1" -> 2), viaCRC = false) + DeltaLog.clearCache() + + // Do commit after forcing computeState. Now SetTransaction tracking will start. + deltaLog.snapshot.setTransactions // This triggers computeState. + commitSetTxn(deltaLog, "app-2", version = 100, lastUpdated = 3) // 3rd commit + assert(deltaLog.update().checksumOpt.get.setTransactions.nonEmpty) // crc has set-txn + } + } + } + } + + test("validate that crc doesn't contain SetTransaction when tracking is disabled") { + withSQLConf(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "false") { + val tbl = "test_table" + withTable(tbl) { + sql(s"CREATE TABLE $tbl (value Int) USING delta") + val log = DeltaLog.forTable(spark, TableIdentifier(tbl)) + // CRC for 0th commit should not have SetTransactions defined if conf is disabled. + assert(log.unsafeVolatileSnapshot.checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions(log, expectedTxns = Map(), viaCRC = false) + + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 1), viaCRC = false) + + commitSetTxn(log, "app-1", version = 3, lastUpdated = 2) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 3), viaCRC = false) + + commitSetTxn(log, "app-2", version = 100, lastUpdated = 3) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 3, "app-2" -> 100), viaCRC = false) + + commitSetTxn(log, "app-1", version = 4, lastUpdated = 4) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 4, "app-2" -> 100), viaCRC = false) + } + } + } + + for(computeStatePreloaded <- BOOLEAN_DOMAIN) { + test("set-transaction tracking should start if computeState is pre-loaded before" + + s" commit [computeState preloaded: $computeStatePreloaded]") { + + // Enable INCREMENTAL COMMITS and disable verification - to make sure that we + // don't trigger state reconstruction after a commit. + withSQLConf( + DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "false", + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true", + DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> "false" + ) { + val tbl = "test_table" + + def log: DeltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl)) + + withTable(tbl) { + sql(s"CREATE TABLE $tbl (value Int) USING delta") + // After 0th commit - CRC shouldn't have SetTransactions as feature is disabled. + assertSetTransactions(log, expectedTxns = Map(), viaCRC = false) + + DeltaLog.clearCache() + withSQLConf(DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true") { + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + // During 1st commit, the feature is enabled. But still the new commit crc shouldn't + // contain the [[SetTransaction]] actions as we don't have an estimate of how many + // [[SetTransaction]] actions might be already part of this table till now. + // So incremental computation of [[SetTransaction]] won't trigger. + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + + if (computeStatePreloaded) { + // Calling `validateChecksum` will pre-load the computeState + log.update().validateChecksum() + } + // During 2nd commit, we have following 2 cases: + // 1. If `computeStatePreloaded` is set, then the Snapshot has already calculated + // computeState and so we have estimate of number of SetTransactions till this point. + // So next commit will trigger incremental computation of [[SetTransaction]]. + // 2. If `computeStatePreloaded` is not set, then Snapshot doesn't have computeState + // pre-computed. So next commit will not trigger incremental computation of + // [[SetTransaction]]. + commitSetTxn(log, "app-1", version = 100, lastUpdated = 1) + assert(log.update().checksumOpt.flatMap(_.setTransactions).nonEmpty === + computeStatePreloaded) + } + } + } + } + } + + test("set-transaction tracking in CRC should stop once threshold is crossed") { + withSQLConf( + DeltaSQLConf.DELTA_MAX_SET_TRANSACTIONS_IN_CRC.key -> "2", + DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true") { + val tbl = "test_table" + withTable(tbl) { + sql(s"CREATE TABLE $tbl (value Int) USING delta") + def log: DeltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl)) + assertSetTransactions(log, expectedTxns = Map()) + + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 1)) + + commitSetTxn(log, "app-1", version = 3, lastUpdated = 2) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + assertSetTransactions(log, expectedTxns = Map("app-1" -> 3)) + + commitSetTxn(log, "app-2", version = 100, lastUpdated = 3) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + assertSetTransactions( + log, expectedTxns = Map("app-1" -> 3, "app-2" -> 100)) + + commitSetTxn(log, "app-1", version = 4, lastUpdated = 4) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + assertSetTransactions( + log, expectedTxns = Map("app-1" -> 4, "app-2" -> 100)) + + commitSetTxn(log, "app-3", version = 1000, lastUpdated = 5) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + assertSetTransactions( + log, expectedTxns = Map("app-1" -> 4, "app-2" -> 100, "app-3" -> 1000), viaCRC = false) + } + } + } + + test("set-transaction tracking in CRC should stop once setTxn retention conf is set") { + withSQLConf( + DeltaSQLConf.DELTA_MAX_SET_TRANSACTIONS_IN_CRC.key -> "2", + DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true") { + val tbl = "test_table" + withTable(tbl) { + sql(s"CREATE TABLE $tbl (value Int) USING delta") + def log: DeltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl)) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + + // Do 1 commit to table - set-transaction tracking continue to happen. + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + + // Set any random table property - set-transaction tracking continue to happen. + sql(s"ALTER TABLE $tbl SET TBLPROPERTIES ('randomProp1' = 'value1')") + assert(log.update().checksumOpt.flatMap(_.setTransactions).isDefined) + + // Set the `setTransactionRetentionDuration` table property - set-transaction tracking will + // stop. + sql(s"ALTER TABLE $tbl SET TBLPROPERTIES " + + s"('delta.setTransactionRetentionDuration' = 'interval 1 days')") + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + log.update().setTransactions + commitSetTxn(log, "app-1", version = 1, lastUpdated = 1) + assert(log.update().checksumOpt.flatMap(_.setTransactions).isEmpty) + + } + } + } + + for(checksumVerificationFailureIsFatal <- BOOLEAN_DOMAIN) { + // In this test we check that verification failed usage-logs are triggered when + // there is an issue in incremental computation and verification is explicitly enabled. + test("incremental set-transaction verification failures" + + s" [checksumVerificationFailureIsFatal: $checksumVerificationFailureIsFatal]") { + withSQLConf( + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true", + DeltaSQLConf.DELTA_WRITE_SET_TRANSACTIONS_IN_CRC.key -> "true", + // Enable verification explicitly as it is disabled by default. + DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> true.toString, + DeltaSQLConf.DELTA_CHECKSUM_MISMATCH_IS_FATAL.key -> s"$checksumVerificationFailureIsFatal" + ) { + withTempDir { tempDir => + // Procedure: + // 1. Populate the table with 2 [[SetTransaction]]s and create a checkpoint, validate that + // CRC has setTransactions present. + // 2. Intentionally corrupt the checkpoint - Remove one SetTransaction from it. + // 3. Clear the delta log cache so we pick up the checkpoint + // 4. Start a new transaction and attempt to commit the transaction + // a. Incremental SetTransaction verification should fail + // b. Post-commit snapshot should have checksumOpt with no [[SetTransaction]]s + + // Step-1 + val txn0 = SetTransaction("app-0", version = 1, lastUpdated = Some(1)) + val txn1 = SetTransaction("app-1", version = 888, lastUpdated = Some(2)) + + def log: DeltaLog = DeltaLog.forTable(spark, tempDir) + + // commit-0 + val actions0 = + (1 to 10).map(i => createTestAddFile(encodedPath = i.toString)) :+ txn0 + log.startTransaction().commitWriteAppend(actions0: _*) + // commit-1 + val actions1 = + (11 to 20).map(i => createTestAddFile(encodedPath = i.toString)) :+ txn1 + log.startTransaction().commitWriteAppend(actions1: _*) + assert(log.readChecksum(version = 1).get.setTransactions.nonEmpty) + log.checkpoint() + + // Step-2 + dropOneSetTransactionFromCheckpoint(log) + + // Step-3 + DeltaLog.clearCache() + assert(!log.update().logSegment.checkpointProvider.isEmpty) + + // Step-4 + // Create the txn with [[DELTA_CHECKSUM_MISMATCH_IS_FATAL]] as false so that pre-commit + // CRC validation doesn't fail. Our goal is to capture that post-commit verification + // catches any issues. + var txn: OptimisticTransactionImpl = null + withSQLConf(DeltaSQLConf.DELTA_CHECKSUM_MISMATCH_IS_FATAL.key -> "false") { + txn = log.startTransaction() + } + val Seq(corruptionReport) = collectSetTransactionCorruptionReport { + if (checksumVerificationFailureIsFatal) { + val e = intercept[DeltaIllegalStateException] { + withSQLConf(DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> "true") { + txn.commit(Seq(), DeltaOperations.Write(SaveMode.Append)) + } + } + assert(e.getMessage.contains("SetTransaction mismatch")) + } else { + txn.commit(Seq(), DeltaOperations.Write(SaveMode.Append)) + } + } + val eventData = JsonUtils.fromJson[Map[String, Any]](corruptionReport.blob) + + val expectedErrorEventData = Map( + "unmatchedSetTransactionsCRC" -> Seq(txn1), + "unmatchedSetTransactionsComputedState" -> Seq.empty, + "version" -> 2, + "minSetTransactionRetentionTimestamp" -> None, + "repeatedEntriesForSameAppId" -> Seq.empty, + "exactMatchFailed" -> true) + + val observedMismatchingFields = eventData("mismatchingFields").asInstanceOf[Seq[String]] + val observedErrorMessage = eventData("error").asInstanceOf[String] + val observedDetailedErrorMap = + eventData("detailedErrorMap").asInstanceOf[Map[String, String]] + assert(observedMismatchingFields === Seq("setTransactions")) + assert(observedErrorMessage.contains("SetTransaction mismatch")) + assert(observedDetailedErrorMap("setTransactions") === + JsonUtils.toJson(expectedErrorEventData)) + + if (checksumVerificationFailureIsFatal) { + // Due to failure, post-commit snapshot couldn't be updated + assert(log.snapshot.version === 1) + assert(log.readChecksum(version = 2).isEmpty) + } else { + assert(log.snapshot.version === 2) + assert(log.readChecksum(version = 2).get.setTransactions.isEmpty) + } + } + } + } + } + + /** Drops one [[SetTransaction]] operation from checkpoint - the one with max appId */ + private def dropOneSetTransactionFromCheckpoint(log: DeltaLog): Unit = { + import testImplicits._ + val checkpointPath = FileNames.checkpointFileSingular(log.logPath, log.snapshot.version) + withTempDir { tmpCheckpoint => + // count total rows in checkpoint + val checkpointDf = spark.read + .schema(SingleAction.encoder.schema) + .parquet(checkpointPath.toString) + val initialActionCount = checkpointDf.count().toInt + val corruptedCheckpointData = checkpointDf + .orderBy(col("txn.appId").asc_nulls_first) // force non setTransaction actions to front + .as[SingleAction].take(initialActionCount - 1) // Drop 1 action + + corruptedCheckpointData.toSeq.toDS().coalesce(1).write + .mode("overwrite").parquet(tmpCheckpoint.toString) + assert(spark.read.parquet(tmpCheckpoint.toString).count() === initialActionCount - 1) + val writtenCheckpoint = + tmpCheckpoint.listFiles().toSeq.filter(_.getName.startsWith("part")).head + val checkpointFile = new File(checkpointPath.toUri) + new File(log.logPath.toUri).listFiles().toSeq.foreach { file => + if (file.getName.startsWith(".0")) { + // we need to delete checksum files, otherwise trying to replace our incomplete + // checkpoint file fails due to the LocalFileSystem's checksum checks. + assert(file.delete(), "Failed to delete checksum file") + } + } + assert(checkpointFile.delete(), "Failed to delete old checkpoint") + assert(writtenCheckpoint.renameTo(checkpointFile), + "Failed to rename corrupt checkpoint") + val newCheckpoint = spark.read.parquet(checkpointFile.toString) + assert(newCheckpoint.count() === initialActionCount - 1, + "Checkpoint file incorrect:\n" + newCheckpoint.collect().mkString("\n")) + } + } + + private def collectSetTransactionCorruptionReport(f: => Unit): Seq[UsageRecord] = { + collectUsageLogs("delta.checksum.invalid")(f).toSeq + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala index e431e080812..027c9c3eb3a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DomainMetadataSuite.scala @@ -89,6 +89,16 @@ class DomainMetadataSuite sortByDomain(deltaTable.snapshot.domainMetadata)) } + DeltaLog.clearCache() + deltaTable = DeltaTableV2(spark, TableIdentifier(table)) + val checksumOpt = deltaTable.snapshot.checksumOpt + if (doChecksum) { + assertEquals( + sortByDomain(checksumOpt.get.domainMetadata.get), sortByDomain(domainMetadata)) + } else { + assert(checksumOpt.isEmpty) + } + assert(deltaLog.update().validateChecksum()) } } } @@ -150,6 +160,108 @@ class DomainMetadataSuite } } + test("DomainMetadata actions tracking in CRC should stop once threshold is crossed") { + def assertDomainMetadatas( + deltaLog: DeltaLog, + expectedDomainMetadatas: Seq[DomainMetadata], + expectedInCrc: Boolean): Unit = { + val snapshot = deltaLog.update() + assert(snapshot.validateChecksum()) + assertEquals(sortByDomain(expectedDomainMetadatas), sortByDomain(snapshot.domainMetadata)) + assert(snapshot.checksumOpt.nonEmpty) + if (expectedInCrc) { + assert(snapshot.checksumOpt.get.domainMetadata.nonEmpty) + assertEquals( + sortByDomain(expectedDomainMetadatas), + sortByDomain(snapshot.checksumOpt.get.domainMetadata.get)) + } else { + assert(snapshot.checksumOpt.get.domainMetadata.isEmpty) + } + } + + val table = "testTable" + withSQLConf( + DeltaSQLConf.DELTA_MAX_DOMAIN_METADATAS_IN_CRC.key -> "2") { + withTable(table) { + sql( + s""" + | CREATE TABLE $table(id int) USING delta + | tblproperties + | ('${TableFeatureProtocolUtils.propertyKey(DomainMetadataTableFeature)}' = 'enabled') + |""".stripMargin) + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) + assertDomainMetadatas(deltaLog, Seq.empty, true) + + deltaLog + .startTransaction() + .commit(DomainMetadata("testDomain1", "", false) :: Nil, Truncate()) + assertDomainMetadatas( + deltaLog, DomainMetadata("testDomain1", "", false) :: Nil, true) + + deltaLog + .startTransaction() + .commit(DomainMetadata("testDomain2", "", false) :: Nil, Truncate()) + assertDomainMetadatas( + deltaLog, + DomainMetadata("testDomain1", "", false) :: + DomainMetadata("testDomain2", "", false) :: Nil, + true) + + deltaLog + .startTransaction() + .commit(DomainMetadata("testDomain3", "", false) :: Nil, Truncate()) + assertDomainMetadatas( + deltaLog, + DomainMetadata("testDomain1", "", false) :: + DomainMetadata("testDomain2", "", false) :: + DomainMetadata("testDomain3", "", false) :: Nil, + false) + } + } + } + + test("Validate crc can be read when domainMetadata is missing") { + val table = "testTable" + withTable(table) { + sql( + s""" + | CREATE TABLE $table(id int) USING delta + | tblproperties + | ('${TableFeatureProtocolUtils.propertyKey(DomainMetadataTableFeature)}' = 'enabled') + |""".stripMargin) + val deltaTable = DeltaTableV2(spark, TableIdentifier(table)) + val deltaLog = deltaTable.deltaLog + val version = + deltaTable + .startTransactionWithInitialSnapshot() + .commit(DomainMetadata("testDomain1", "", false) :: Nil, Truncate()) + val snapshot = deltaLog.update() + assert(snapshot.checksumOpt.nonEmpty) + assert(snapshot.checksumOpt.get.domainMetadata.nonEmpty) + val originalChecksum = snapshot.checksumOpt.get + + // Write out a checksum without domainMetadata. + val checksumWithoutDomainMetadata = originalChecksum.copy(domainMetadata = None) + val writer = CheckpointFileManager.create(deltaLog.logPath, deltaLog.newDeltaHadoopConf()) + val toWrite = JsonUtils.toJson(checksumWithoutDomainMetadata) + "\n" + val stream = writer.createAtomic( + FileNames.checksumFile(deltaLog.logPath, version + 1), + overwriteIfPossible = false) + stream.write(toWrite.getBytes(UTF_8)) + stream.close() + + // Make sure the read is not broken. + val content = + deltaLog + .store + .read( + FileNames.checksumFile(deltaLog.logPath, version + 1), + deltaLog.newDeltaHadoopConf()) + val checksumFromFile = JsonUtils.mapper.readValue[VersionChecksum](content.head) + assert(checksumWithoutDomainMetadata == checksumFromFile) + } + } + test("DomainMetadata action survives state reconstruction [w/o checkpoint, w/o checksum]") { validateStateReconstructionHelper(doCheckpoint = false, doChecksum = false) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index f1e9222456f..ce7f529b422 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -936,6 +936,10 @@ class OptimisticTransactionSuite withSQLConf( DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false", DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true", + DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS.key -> "false", + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key -> "false", + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_FORCE_VERIFICATION_MODE_FOR_NON_UTC_ENABLED.key -> + "false", DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS.key -> "false" ) { withTempDir { tableDir =>