From c395a63caef724bfe2e8e0633754038f8f5987d4 Mon Sep 17 00:00:00 2001 From: Qianru Lao <55441375+EstherBear@users.noreply.github.com> Date: Thu, 27 Jun 2024 11:35:41 -0400 Subject: [PATCH] [Kernel] Add non-monotonic inCommitTimestamp with related table properties and table features (#3276) ## Description Add non-monotonic `inCommitTimestamp` with related table properties and table features to prepare for adding monotonic `inCommitTimestamp` in later PRs. ## How was this patch tested? Add unit tests to verify the `inCommitTimestamp` and related table properties and table features when enabling the `inCommitTimestamp` enablement property. ## Does this PR introduce _any_ user-facing changes? Yes, the user can enable non-monotonic `inCommitTimestamp` by enabling its property. --- .../io/delta/kernel/internal/TableConfig.java | 70 +++- .../delta/kernel/internal/TableFeatures.java | 98 ++++++ .../internal/TransactionBuilderImpl.java | 29 +- .../kernel/internal/TransactionImpl.java | 63 +++- .../kernel/internal/actions/CommitInfo.java | 41 +++ .../delta/kernel/internal/actions/Format.java | 8 + .../kernel/internal/actions/Metadata.java | 24 ++ .../kernel/internal/actions/Protocol.java | 18 + .../internal/util/InCommitTimestampUtils.java | 81 +++++ .../defaults/DeltaTableWriteSuiteBase.scala | 314 +++++++++++++++++- .../defaults/DeltaTableWritesSuite.scala | 216 +----------- .../defaults/InCommitTimestampSuite.scala | 288 ++++++++++++++++ 12 files changed, 1009 insertions(+), 241 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/InCommitTimestampUtils.java create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java index 1c0b6c36bf5..b7c1586fc7a 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java @@ -15,9 +15,7 @@ */ package io.delta.kernel.internal; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; +import java.util.*; import java.util.function.Function; import java.util.function.Predicate; @@ -65,23 +63,67 @@ public class TableConfig { "needs to be a positive integer." ); + /** + * This table property is used to track the enablement of the {@code inCommitTimestamps}. + *

+ * When enabled, commit metadata includes a monotonically increasing timestamp that allows for + * reliable TIMESTAMP AS OF time travel even if filesystem operations change a commit file's + * modification timestamp. + */ + public static final TableConfig IN_COMMIT_TIMESTAMPS_ENABLED = new TableConfig<>( + "delta.enableInCommitTimestamps-preview", + "false", /* default values */ + Boolean::valueOf, + value -> true, + "needs to be a boolean." + ); + + /** + * This table property is used to track the version of the table at which + * {@code inCommitTimestamps} were enabled. + */ + public static final TableConfig> IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION = + new TableConfig<>( + "delta.inCommitTimestampEnablementVersion-preview", + null, /* default values */ + v -> Optional.ofNullable(v).map(Long::valueOf), + value -> true, + "needs to be a long." + ); + + /** + * This table property is used to track the timestamp at which {@code inCommitTimestamps} were + * enabled. More specifically, it is the {@code inCommitTimestamps} of the commit with the + * version specified in {@link #IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION}. + */ + public static final TableConfig> IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP = + new TableConfig<>( + "delta.inCommitTimestampEnablementTimestamp-preview", + null, /* default values */ + v -> Optional.ofNullable(v).map(Long::valueOf), + value -> true, + "needs to be a long." + ); + /** * All the valid properties that can be set on the table. */ - private static final HashMap validProperties = new HashMap<>(); + private static final Map> VALID_PROPERTIES = Collections.unmodifiableMap( + new HashMap>() {{ + addConfig(this, TOMBSTONE_RETENTION); + addConfig(this, CHECKPOINT_INTERVAL); + addConfig(this, IN_COMMIT_TIMESTAMPS_ENABLED); + addConfig(this, IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION); + addConfig(this, IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP); + }} + ); + private final String key; private final String defaultValue; private final Function fromString; private final Predicate validator; private final String helpMessage; - static { - validProperties.put( - TOMBSTONE_RETENTION.getKey().toLowerCase(Locale.ROOT), TOMBSTONE_RETENTION); - validProperties.put( - CHECKPOINT_INTERVAL.getKey().toLowerCase(Locale.ROOT), CHECKPOINT_INTERVAL); - } - private TableConfig( String key, String defaultValue, @@ -131,7 +173,7 @@ public static Map validateProperties(Map configu String key = kv.getKey().toLowerCase(Locale.ROOT); String value = kv.getValue(); if (key.startsWith("delta.")) { - TableConfig tableConfig = validProperties.get(key); + TableConfig tableConfig = VALID_PROPERTIES.get(key); if (tableConfig != null) { tableConfig.validate(value); validatedConfigurations.put(tableConfig.getKey(), value); @@ -151,4 +193,8 @@ private void validate(String value) { throw DeltaErrors.invalidConfigurationValueException(key, value, helpMessage); } } + + private static void addConfig(HashMap> configs, TableConfig config) { + configs.put(config.getKey().toLowerCase(Locale.ROOT), config); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java index 5a90c07e452..0026afedcd1 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableFeatures.java @@ -16,13 +16,18 @@ package io.delta.kernel.internal; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import io.delta.kernel.types.StructType; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.util.ColumnMapping; +import io.delta.kernel.internal.util.Tuple2; import static io.delta.kernel.internal.DeltaErrors.*; /** @@ -30,6 +35,12 @@ */ public class TableFeatures { + private static final Set SUPPORTED_WRITER_FEATURES = + Collections.unmodifiableSet(new HashSet() {{ + add("appendOnly"); + add("inCommitTimestamp-preview"); + }}); + //////////////////// // Helper Methods // //////////////////// @@ -123,6 +134,93 @@ public static void validateWriteSupportedTable( } } + /** + * Given the automatically enabled features from Delta table metadata, returns the minimum + * required reader and writer version that satisfies all enabled table features in the metadata. + * + * @param enabledFeatures the automatically enabled features from the Delta table metadata + * @return the minimum required reader and writer version that satisfies all enabled table + */ + public static Tuple2 minProtocolVersionFromAutomaticallyEnabledFeatures( + Set enabledFeatures) { + + int readerVersion = 0; + int writerVersion = 0; + + for (String feature : enabledFeatures) { + readerVersion = Math.max(readerVersion, getMinReaderVersion(feature)); + writerVersion = Math.max(writerVersion, getMinWriterVersion(feature)); + } + + return new Tuple2<>(readerVersion, writerVersion); + } + + /** + * Extract the writer features that should be enabled automatically based on the metadata which + * are not already enabled. For example, the {@code inCommitTimestamp-preview} feature should be + * enabled when the delta property name (delta.enableInCommitTimestamps-preview) is set to true + * in the metadata if it is not already enabled. + * + * @param metadata the metadata of the table + * @return the writer features that should be enabled automatically + */ + public static Set extractAutomaticallyEnabledWriterFeatures( + Metadata metadata, Protocol protocol) { + return TableFeatures.SUPPORTED_WRITER_FEATURES.stream() + .filter(f -> metadataRequiresWriterFeatureToBeEnabled(metadata, f)) + .filter(f -> protocol.getWriterFeatures() == null || + !protocol.getWriterFeatures().contains(f)) + .collect(Collectors.toSet()); + } + + /** + * Get the minimum reader version required for a feature. + * + * @param feature the feature + * @return the minimum reader version required for the feature + */ + private static int getMinReaderVersion(String feature) { + switch (feature) { + case "inCommitTimestamp-preview": + return 3; + default: + return 1; + } + } + + /** + * Get the minimum writer version required for a feature. + * + * @param feature the feature + * @return the minimum writer version required for the feature + */ + private static int getMinWriterVersion(String feature) { + switch (feature) { + case "inCommitTimestamp-preview": + return 7; + default: + return 2; + } + } + + /** + * Determine whether a writer feature must be supported and enabled to satisfy the metadata + * requirements. + * + * @param metadata the table metadata + * @param feature the writer feature to check + * @return whether the writer feature must be enabled + */ + private static boolean metadataRequiresWriterFeatureToBeEnabled( + Metadata metadata, String feature) { + switch (feature) { + case "inCommitTimestamp-preview": + return TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata); + default: + return false; + } + } + private static void validateNoInvariants(StructType tableSchema) { boolean hasInvariants = tableSchema.fields().stream().anyMatch( field -> field.getMetadata().contains("delta.invariants")); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java index 69b4ecf7f71..4a11a19b67b 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -37,6 +37,7 @@ import static io.delta.kernel.internal.DeltaErrors.tableAlreadyExists; import static io.delta.kernel.internal.TransactionImpl.DEFAULT_READ_VERSION; import static io.delta.kernel.internal.TransactionImpl.DEFAULT_WRITE_VERSION; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames; import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue; import static io.delta.kernel.internal.util.VectorUtils.stringStringMapValue; @@ -111,8 +112,10 @@ public Transaction build(Engine engine) { boolean isNewTable = snapshot.getVersion(engine) < 0; validate(engine, snapshot, isNewTable); - Metadata metadata = snapshot.getMetadata(); boolean shouldUpdateMetadata = false; + boolean shouldUpdateProtocol = false; + Metadata metadata = snapshot.getMetadata(); + Protocol protocol = snapshot.getProtocol(); if (tableProperties.isPresent()) { Map validatedProperties = TableConfig.validateProperties(tableProperties.get()); @@ -122,6 +125,23 @@ public Transaction build(Engine engine) { shouldUpdateMetadata = true; metadata = metadata.withNewConfiguration(newProperties); } + + Set newWriterFeatures = + TableFeatures.extractAutomaticallyEnabledWriterFeatures(metadata, protocol); + if (!newWriterFeatures.isEmpty()) { + logger.info("Automatically enabling writer features: {}", newWriterFeatures); + shouldUpdateProtocol = true; + List oldWriterFeatures = protocol.getWriterFeatures(); + protocol = protocol.withNewWriterFeatures(newWriterFeatures); + List curWriterFeatures = protocol.getWriterFeatures(); + checkArgument( + !Objects.equals(oldWriterFeatures, curWriterFeatures)); + TableFeatures.validateWriteSupportedTable( + protocol, + metadata, + metadata.getSchema(), + table.getPath(engine)); + } } return new TransactionImpl( @@ -131,10 +151,11 @@ public Transaction build(Engine engine) { snapshot, engineInfo, operation, - snapshot.getProtocol(), + protocol, metadata, setTxnOpt, - shouldUpdateMetadata); + shouldUpdateMetadata, + shouldUpdateProtocol); } /** @@ -224,7 +245,7 @@ private Metadata getInitialMetadata() { schema.get(), /* schema */ stringArrayValue(partitionColumnsCasePreserving), /* partitionColumns */ Optional.of(currentTimeMillis), /* createdTime */ - stringStringMapValue(Collections.emptyMap()) + stringStringMapValue(Collections.emptyMap()) /* configuration */ ); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 5de41f71de5..7375b14b3fb 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -38,8 +38,10 @@ import io.delta.kernel.internal.replay.ConflictChecker; import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState; import io.delta.kernel.internal.util.FileNames; +import io.delta.kernel.internal.util.InCommitTimestampUtils; import io.delta.kernel.internal.util.VectorUtils; import static io.delta.kernel.internal.TableConfig.CHECKPOINT_INTERVAL; +import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED; import static io.delta.kernel.internal.actions.SingleAction.*; import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static io.delta.kernel.internal.util.Preconditions.checkState; @@ -67,10 +69,11 @@ public class TransactionImpl private final Path dataPath; private final Path logPath; private final Protocol protocol; - private final Metadata metadata; - private final boolean shouldUpdateMetadata; private final SnapshotImpl readSnapshot; private final Optional setTxnOpt; + private final boolean shouldUpdateProtocol; + private Metadata metadata; + private boolean shouldUpdateMetadata; private boolean closed; // To avoid trying to commit the same transaction again. @@ -84,7 +87,8 @@ public TransactionImpl( Protocol protocol, Metadata metadata, Optional setTxnOpt, - boolean shouldUpdateMetadata) { + boolean shouldUpdateMetadata, + boolean shouldUpdateProtocol) { this.isNewTable = isNewTable; this.dataPath = dataPath; this.logPath = logPath; @@ -95,6 +99,7 @@ public TransactionImpl( this.metadata = metadata; this.setTxnOpt = setTxnOpt; this.shouldUpdateMetadata = shouldUpdateMetadata; + this.shouldUpdateProtocol = shouldUpdateProtocol; } @Override @@ -120,11 +125,16 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable data "Transaction is already attempted to commit. Create a new transaction."); long commitAsVersion = readSnapshot.getVersion(engine) + 1; + // Generate the commit action with the inCommitTimestamp if ICT is enabled. + CommitInfo attemptCommitInfo = generateCommitAction(engine); + updateMetadataWithICTIfRequired(engine, attemptCommitInfo); int numRetries = 0; do { logger.info("Committing transaction as version = {}.", commitAsVersion); try { - return doCommit(engine, commitAsVersion, dataActions); + // TODO Update the attemptCommitInfo and metadata based on the conflict + // resolution. + return doCommit(engine, commitAsVersion, attemptCommitInfo, dataActions); } catch (FileAlreadyExistsException fnfe) { logger.info("Concurrent write detected when committing as version = {}. " + "Trying to resolve conflicts and retry commit.", commitAsVersion); @@ -147,17 +157,43 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable data throw new ConcurrentWriteException(); } + private void updateMetadata(Metadata metadata) { + logger.info( + "Updated metadata from {} to {}", + shouldUpdateMetadata ? this.metadata : "-", metadata); + this.metadata = metadata; + this.shouldUpdateMetadata = true; + } + + private void updateMetadataWithICTIfRequired(Engine engine, CommitInfo attemptCommitInfo) { + // If ICT is enabled for the current transaction, update the metadata with the ICT + // enablement info. + attemptCommitInfo.getInCommitTimestamp().ifPresent( + inCommitTimestamp -> { + Optional metadataWithICTInfo = + InCommitTimestampUtils.getUpdatedMetadataWithICTEnablementInfo( + engine, + inCommitTimestamp, + readSnapshot, + metadata, + readSnapshot.getVersion(engine) + 1L); + metadataWithICTInfo.ifPresent(this::updateMetadata); + } + ); + } + private TransactionCommitResult doCommit( Engine engine, long commitAsVersion, + CommitInfo attemptCommitInfo, CloseableIterable dataActions) throws FileAlreadyExistsException { List metadataActions = new ArrayList<>(); - metadataActions.add(createCommitInfoSingleAction(generateCommitAction())); + metadataActions.add(createCommitInfoSingleAction(attemptCommitInfo.toRow())); if (shouldUpdateMetadata || isNewTable) { metadataActions.add(createMetadataSingleAction(metadata.toRow())); } - if (isNewTable) { + if (shouldUpdateProtocol || isNewTable) { // In the future, we need to add metadata and action when there are any changes to them. metadataActions.add(createProtocolSingleAction(protocol.toRow())); } @@ -203,15 +239,24 @@ public Optional getSetTxnOpt() { return setTxnOpt; } - private Row generateCommitAction() { + private Optional generateInCommitTimestampForFirstCommitAttempt( + Engine engine, long currentTimestamp) { + boolean ictEnabled = IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata); + return ictEnabled ? Optional.of(currentTimestamp) : Optional.empty(); + } + + private CommitInfo generateCommitAction(Engine engine) { + long commitAttemptStartTime = System.currentTimeMillis(); return new CommitInfo( - System.currentTimeMillis(), /* timestamp */ + generateInCommitTimestampForFirstCommitAttempt( + engine, commitAttemptStartTime), + commitAttemptStartTime, /* timestamp */ "Kernel-" + Meta.KERNEL_VERSION + "/" + engineInfo, /* engineInfo */ operation.getDescription(), /* description */ getOperationParameters(), /* operationParameters */ isBlindAppend(), /* isBlindAppend */ txnId.toString() /* txnId */ - ).toRow(); + ); } private boolean isReadyForCheckpoint(long newVersion) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java index aea3461bb41..f45737bac69 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java @@ -19,16 +19,20 @@ import java.util.stream.IntStream; import static java.util.stream.Collectors.toMap; +import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.Row; import io.delta.kernel.types.*; import io.delta.kernel.internal.data.GenericRow; +import io.delta.kernel.internal.util.VectorUtils; import static io.delta.kernel.internal.util.VectorUtils.stringStringMapValue; /** * Delta log action representing a commit information action. According to the Delta protocol there * isn't any specific schema for this action, but we use the following schema: *