Skip to content

Commit

Permalink
[Kernel] Add non-monotonic inCommitTimestamp with related table prope…
Browse files Browse the repository at this point in the history
…rties and table features (#3276)

## Description
Add non-monotonic `inCommitTimestamp` with related table properties and
table features to prepare for adding monotonic `inCommitTimestamp` in
later PRs.

## How was this patch tested?
Add unit tests to verify the `inCommitTimestamp` and related table
properties and table features when enabling the `inCommitTimestamp`
enablement property.

## Does this PR introduce _any_ user-facing changes?
Yes, the user can enable non-monotonic `inCommitTimestamp` by enabling its
property.
  • Loading branch information
EstherBear authored Jun 27, 2024
1 parent eb26989 commit c395a63
Show file tree
Hide file tree
Showing 12 changed files with 1,009 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
package io.delta.kernel.internal;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.*;
import java.util.function.Function;
import java.util.function.Predicate;

Expand Down Expand Up @@ -65,23 +63,67 @@ public class TableConfig<T> {
"needs to be a positive integer."
);

/**
* This table property is used to track the enablement of the {@code inCommitTimestamps}.
* <p>
* When enabled, commit metadata includes a monotonically increasing timestamp that allows for
* reliable TIMESTAMP AS OF time travel even if filesystem operations change a commit file's
* modification timestamp.
*/
public static final TableConfig<Boolean> IN_COMMIT_TIMESTAMPS_ENABLED = new TableConfig<>(
"delta.enableInCommitTimestamps-preview",
"false", /* default values */
Boolean::valueOf,
value -> true,
"needs to be a boolean."
);

/**
* This table property is used to track the version of the table at which
* {@code inCommitTimestamps} were enabled.
*/
public static final TableConfig<Optional<Long>> IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION =
new TableConfig<>(
"delta.inCommitTimestampEnablementVersion-preview",
null, /* default values */
v -> Optional.ofNullable(v).map(Long::valueOf),
value -> true,
"needs to be a long."
);

/**
* This table property is used to track the timestamp at which {@code inCommitTimestamps} were
* enabled. More specifically, it is the {@code inCommitTimestamps} of the commit with the
* version specified in {@link #IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION}.
*/
public static final TableConfig<Optional<Long>> IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP =
new TableConfig<>(
"delta.inCommitTimestampEnablementTimestamp-preview",
null, /* default values */
v -> Optional.ofNullable(v).map(Long::valueOf),
value -> true,
"needs to be a long."
);

/**
* All the valid properties that can be set on the table.
*/
private static final HashMap<String, TableConfig> validProperties = new HashMap<>();
private static final Map<String, TableConfig<?>> VALID_PROPERTIES = Collections.unmodifiableMap(
new HashMap<String, TableConfig<?>>() {{
addConfig(this, TOMBSTONE_RETENTION);
addConfig(this, CHECKPOINT_INTERVAL);
addConfig(this, IN_COMMIT_TIMESTAMPS_ENABLED);
addConfig(this, IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION);
addConfig(this, IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP);
}}
);

private final String key;
private final String defaultValue;
private final Function<String, T> fromString;
private final Predicate<T> validator;
private final String helpMessage;

static {
validProperties.put(
TOMBSTONE_RETENTION.getKey().toLowerCase(Locale.ROOT), TOMBSTONE_RETENTION);
validProperties.put(
CHECKPOINT_INTERVAL.getKey().toLowerCase(Locale.ROOT), CHECKPOINT_INTERVAL);
}

private TableConfig(
String key,
String defaultValue,
Expand Down Expand Up @@ -131,7 +173,7 @@ public static Map<String, String> validateProperties(Map<String, String> configu
String key = kv.getKey().toLowerCase(Locale.ROOT);
String value = kv.getValue();
if (key.startsWith("delta.")) {
TableConfig tableConfig = validProperties.get(key);
TableConfig<?> tableConfig = VALID_PROPERTIES.get(key);
if (tableConfig != null) {
tableConfig.validate(value);
validatedConfigurations.put(tableConfig.getKey(), value);
Expand All @@ -151,4 +193,8 @@ private void validate(String value) {
throw DeltaErrors.invalidConfigurationValueException(key, value, helpMessage);
}
}

private static void addConfig(HashMap<String, TableConfig<?>> configs, TableConfig<?> config) {
configs.put(config.getKey().toLowerCase(Locale.ROOT), config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,31 @@

package io.delta.kernel.internal;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import io.delta.kernel.types.StructType;

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.Tuple2;
import static io.delta.kernel.internal.DeltaErrors.*;

/**
* Contains utility methods related to the Delta table feature support in protocol.
*/
public class TableFeatures {

private static final Set<String> SUPPORTED_WRITER_FEATURES =
Collections.unmodifiableSet(new HashSet<String>() {{
add("appendOnly");
add("inCommitTimestamp-preview");
}});

////////////////////
// Helper Methods //
////////////////////
Expand Down Expand Up @@ -123,6 +134,93 @@ public static void validateWriteSupportedTable(
}
}

/**
* Given the automatically enabled features from Delta table metadata, returns the minimum
* required reader and writer version that satisfies all enabled table features in the metadata.
*
* @param enabledFeatures the automatically enabled features from the Delta table metadata
* @return the minimum required reader and writer version that satisfies all enabled table
*/
public static Tuple2<Integer, Integer> minProtocolVersionFromAutomaticallyEnabledFeatures(
Set<String> enabledFeatures) {

int readerVersion = 0;
int writerVersion = 0;

for (String feature : enabledFeatures) {
readerVersion = Math.max(readerVersion, getMinReaderVersion(feature));
writerVersion = Math.max(writerVersion, getMinWriterVersion(feature));
}

return new Tuple2<>(readerVersion, writerVersion);
}

/**
* Extract the writer features that should be enabled automatically based on the metadata which
* are not already enabled. For example, the {@code inCommitTimestamp-preview} feature should be
* enabled when the delta property name (delta.enableInCommitTimestamps-preview) is set to true
* in the metadata if it is not already enabled.
*
* @param metadata the metadata of the table
* @return the writer features that should be enabled automatically
*/
public static Set<String> extractAutomaticallyEnabledWriterFeatures(
Metadata metadata, Protocol protocol) {
return TableFeatures.SUPPORTED_WRITER_FEATURES.stream()
.filter(f -> metadataRequiresWriterFeatureToBeEnabled(metadata, f))
.filter(f -> protocol.getWriterFeatures() == null ||
!protocol.getWriterFeatures().contains(f))
.collect(Collectors.toSet());
}

/**
* Get the minimum reader version required for a feature.
*
* @param feature the feature
* @return the minimum reader version required for the feature
*/
private static int getMinReaderVersion(String feature) {
switch (feature) {
case "inCommitTimestamp-preview":
return 3;
default:
return 1;
}
}

/**
* Get the minimum writer version required for a feature.
*
* @param feature the feature
* @return the minimum writer version required for the feature
*/
private static int getMinWriterVersion(String feature) {
switch (feature) {
case "inCommitTimestamp-preview":
return 7;
default:
return 2;
}
}

/**
* Determine whether a writer feature must be supported and enabled to satisfy the metadata
* requirements.
*
* @param metadata the table metadata
* @param feature the writer feature to check
* @return whether the writer feature must be enabled
*/
private static boolean metadataRequiresWriterFeatureToBeEnabled(
Metadata metadata, String feature) {
switch (feature) {
case "inCommitTimestamp-preview":
return TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata);
default:
return false;
}
}

private static void validateNoInvariants(StructType tableSchema) {
boolean hasInvariants = tableSchema.fields().stream().anyMatch(
field -> field.getMetadata().contains("delta.invariants"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static io.delta.kernel.internal.DeltaErrors.tableAlreadyExists;
import static io.delta.kernel.internal.TransactionImpl.DEFAULT_READ_VERSION;
import static io.delta.kernel.internal.TransactionImpl.DEFAULT_WRITE_VERSION;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames;
import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue;
import static io.delta.kernel.internal.util.VectorUtils.stringStringMapValue;
Expand Down Expand Up @@ -111,8 +112,10 @@ public Transaction build(Engine engine) {
boolean isNewTable = snapshot.getVersion(engine) < 0;
validate(engine, snapshot, isNewTable);

Metadata metadata = snapshot.getMetadata();
boolean shouldUpdateMetadata = false;
boolean shouldUpdateProtocol = false;
Metadata metadata = snapshot.getMetadata();
Protocol protocol = snapshot.getProtocol();
if (tableProperties.isPresent()) {
Map<String, String> validatedProperties =
TableConfig.validateProperties(tableProperties.get());
Expand All @@ -122,6 +125,23 @@ public Transaction build(Engine engine) {
shouldUpdateMetadata = true;
metadata = metadata.withNewConfiguration(newProperties);
}

Set<String> newWriterFeatures =
TableFeatures.extractAutomaticallyEnabledWriterFeatures(metadata, protocol);
if (!newWriterFeatures.isEmpty()) {
logger.info("Automatically enabling writer features: {}", newWriterFeatures);
shouldUpdateProtocol = true;
List<String> oldWriterFeatures = protocol.getWriterFeatures();
protocol = protocol.withNewWriterFeatures(newWriterFeatures);
List<String> curWriterFeatures = protocol.getWriterFeatures();
checkArgument(
!Objects.equals(oldWriterFeatures, curWriterFeatures));
TableFeatures.validateWriteSupportedTable(
protocol,
metadata,
metadata.getSchema(),
table.getPath(engine));
}
}

return new TransactionImpl(
Expand All @@ -131,10 +151,11 @@ public Transaction build(Engine engine) {
snapshot,
engineInfo,
operation,
snapshot.getProtocol(),
protocol,
metadata,
setTxnOpt,
shouldUpdateMetadata);
shouldUpdateMetadata,
shouldUpdateProtocol);
}

/**
Expand Down Expand Up @@ -224,7 +245,7 @@ private Metadata getInitialMetadata() {
schema.get(), /* schema */
stringArrayValue(partitionColumnsCasePreserving), /* partitionColumns */
Optional.of(currentTimeMillis), /* createdTime */
stringStringMapValue(Collections.emptyMap())
stringStringMapValue(Collections.emptyMap()) /* configuration */
);
}

Expand Down
Loading

0 comments on commit c395a63

Please sign in to comment.