Skip to content

Commit

Permalink
Merge branch 'master' into delta-lake-guru
Browse files Browse the repository at this point in the history
  • Loading branch information
kursataktas authored Nov 27, 2024
2 parents d7e93f2 + 4d2c5cf commit 354954f
Show file tree
Hide file tree
Showing 25 changed files with 2,633 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.lang.String.format;

import io.delta.kernel.exceptions.*;
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.DataFileStatus;
Expand Down Expand Up @@ -274,6 +275,24 @@ public static KernelException invalidConfigurationValueException(
return new InvalidConfigurationValueException(key, value, helpMessage);
}

public static KernelException domainMetadataUnsupported() {
String message =
"Cannot commit DomainMetadata action(s) because the feature 'domainMetadata' "
+ "is not supported on this table.";
return new KernelException(message);
}

public static ConcurrentWriteException concurrentDomainMetadataAction(
DomainMetadata domainMetadataAttempt, DomainMetadata winningDomainMetadata) {
String message =
String.format(
"A concurrent writer added a domainMetadata action for the same domain: %s. "
+ "No domain-specific conflict resolution is available for this domain. "
+ "Attempted domainMetadata: %s. Winning domainMetadata: %s",
domainMetadataAttempt.getDomain(), domainMetadataAttempt, winningDomainMetadata);
return new ConcurrentWriteException(message);
}

/* ------------------------ HELPER METHODS ----------------------------- */
private static String formatTimestamp(long millisSinceEpochUTC) {
return new Timestamp(millisSinceEpochUTC).toInstant().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
Expand All @@ -31,6 +32,7 @@
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler;
import io.delta.kernel.types.StructType;
import java.util.Map;
import java.util.Optional;

/** Implementation of {@link Snapshot}. */
Expand Down Expand Up @@ -83,6 +85,17 @@ public Protocol getProtocol() {
return protocol;
}

/**
* Get the domain metadata map from the log replay, which lazily loads and replays a history of
* domain metadata actions, resolving them to produce the current state of the domain metadata.
*
* @return A map where the keys are domain names and the values are {@link DomainMetadata}
* objects.
*/
public Map<String, DomainMetadata> getDomainMetadataMap() {
return logReplay.getDomainMetadataMap();
}

public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) {
long minFileRetentionTimestampMillis =
System.currentTimeMillis() - TOMBSTONE_RETENTION.fromMetadata(metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class TableFeatures {
add("columnMapping");
add("typeWidening-preview");
add("typeWidening");
add(DOMAIN_METADATA_FEATURE_NAME);
}
});

Expand All @@ -57,6 +58,12 @@ public class TableFeatures {
}
});

/** The feature name for domain metadata. */
public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata";

/** The minimum writer version required to support table features. */
public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7;

////////////////////
// Helper Methods //
////////////////////
Expand Down Expand Up @@ -93,7 +100,7 @@ public static void validateReadSupportedTable(
* <li>protocol writer version 1.
* <li>protocol writer version 2 only with appendOnly feature enabled.
* <li>protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code
* columnMapping}, {@code typeWidening} feature enabled.
* columnMapping}, {@code typeWidening}, {@code domainMetadata} feature enabled.
* </ul>
*
* @param protocol Table protocol
Expand Down Expand Up @@ -125,20 +132,8 @@ public static void validateWriteSupportedTable(
throw unsupportedWriterProtocol(tablePath, minWriterVersion);
case 7:
for (String writerFeature : protocol.getWriterFeatures()) {
switch (writerFeature) {
// Only supported writer features as of today in Kernel
case "appendOnly":
break;
case "inCommitTimestamp":
break;
case "columnMapping":
break;
case "typeWidening-preview":
break;
case "typeWidening":
break;
default:
throw unsupportedWriterFeature(tablePath, writerFeature);
if (!SUPPORTED_WRITER_FEATURES.contains(writerFeature)) {
throw unsupportedWriterFeature(tablePath, writerFeature);
}
}
break;
Expand Down Expand Up @@ -187,6 +182,21 @@ public static Set<String> extractAutomaticallyEnabledWriterFeatures(
.collect(Collectors.toSet());
}

/**
* Checks if the table protocol supports the "domainMetadata" writer feature.
*
* @param protocol the protocol to check
* @return true if the "domainMetadata" feature is supported, false otherwise
*/
public static boolean isDomainMetadataSupported(Protocol protocol) {
List<String> writerFeatures = protocol.getWriterFeatures();
if (writerFeatures == null) {
return false;
}
return writerFeatures.contains(DOMAIN_METADATA_FEATURE_NAME)
&& protocol.getMinWriterVersion() >= TABLE_FEATURES_MIN_WRITER_VERSION;
}

/**
* Get the minimum reader version required for a feature.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.util.Clock;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.InCommitTimestampUtils;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
import io.delta.kernel.utils.CloseableIterator;
Expand Down Expand Up @@ -73,6 +69,7 @@ public class TransactionImpl implements Transaction {
private final Optional<SetTransaction> setTxnOpt;
private final boolean shouldUpdateProtocol;
private final Clock clock;
private final List<DomainMetadata> domainMetadatas = new ArrayList<>();
private Metadata metadata;
private boolean shouldUpdateMetadata;

Expand Down Expand Up @@ -120,6 +117,23 @@ public StructType getSchema(Engine engine) {
return readSnapshot.getSchema(engine);
}

public Optional<SetTransaction> getSetTxnOpt() {
return setTxnOpt;
}

/**
* Internal API to add domain metadata actions for this transaction. Visible for testing.
*
* @param domainMetadatas List of domain metadata to be added to the transaction.
*/
public void addDomainMetadatas(List<DomainMetadata> domainMetadatas) {
this.domainMetadatas.addAll(domainMetadatas);
}

public List<DomainMetadata> getDomainMetadatas() {
return domainMetadatas;
}

@Override
public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> dataActions)
throws ConcurrentWriteException {
Expand Down Expand Up @@ -221,6 +235,12 @@ private TransactionCommitResult doCommit(
}
setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow())));

// Check for duplicate domain metadata and if the protocol supports
DomainMetadataUtils.validateDomainMetadatas(domainMetadatas, protocol);

domainMetadatas.forEach(
dm -> metadataActions.add(createDomainMetadataSingleAction(dm.toRow())));

try (CloseableIterator<Row> stageDataIter = dataActions.iterator()) {
// Create a new CloseableIterator that will return the metadata actions followed by the
// data actions.
Expand Down Expand Up @@ -265,10 +285,6 @@ public boolean isBlindAppend() {
return true;
}

public Optional<SetTransaction> getSetTxnOpt() {
return setTxnOpt;
}

/**
* Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This can
* result in an additional file read and that this will only happen if ICT is enabled.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.actions;

import static io.delta.kernel.internal.util.InternalUtils.requireNonNull;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.Row;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.types.BooleanType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import java.util.HashMap;
import java.util.Map;

/** Delta log action representing an `DomainMetadata` action */
public class DomainMetadata {
/** Full schema of the {@link DomainMetadata} action in the Delta Log. */
public static final StructType FULL_SCHEMA =
new StructType()
.add("domain", StringType.STRING, false /* nullable */)
.add("configuration", StringType.STRING, false /* nullable */)
.add("removed", BooleanType.BOOLEAN, false /* nullable */);

public static DomainMetadata fromColumnVector(ColumnVector vector, int rowId) {
if (vector.isNullAt(rowId)) {
return null;
}
return new DomainMetadata(
requireNonNull(vector.getChild(0), rowId, "domain").getString(rowId),
requireNonNull(vector.getChild(1), rowId, "configuration").getString(rowId),
requireNonNull(vector.getChild(2), rowId, "removed").getBoolean(rowId));
}

/**
* Creates a {@link DomainMetadata} instance from a Row with the schema being {@link
* DomainMetadata#FULL_SCHEMA}.
*
* @param row the Row object containing the DomainMetadata action
* @return a DomainMetadata instance or null if the row is null
* @throws IllegalArgumentException if the schema of the row does not match {@link
* DomainMetadata#FULL_SCHEMA}
*/
public static DomainMetadata fromRow(Row row) {
if (row == null) {
return null;
}
checkArgument(
row.getSchema().equals(FULL_SCHEMA),
"Expected schema: %s, found: %s",
FULL_SCHEMA,
row.getSchema());
return new DomainMetadata(
requireNonNull(row, 0, "domain").getString(0),
requireNonNull(row, 1, "configuration").getString(1),
requireNonNull(row, 2, "removed").getBoolean(2));
}

private final String domain;
private final String configuration;
private final boolean removed;

/**
* The domain metadata action contains a configuration string for a named metadata domain. Two
* overlapping transactions conflict if they both contain a domain metadata action for the same
* metadata domain. Per-domain conflict resolution logic can be implemented.
*
* @param domain A string used to identify a specific domain.
* @param configuration A string containing configuration for the metadata domain.
* @param removed If it is true it serves as a tombstone to logically delete a {@link
* DomainMetadata} action.
*/
public DomainMetadata(String domain, String configuration, boolean removed) {
this.domain = requireNonNull(domain, "domain is null");
this.configuration = requireNonNull(configuration, "configuration is null");
this.removed = removed;
}

public String getDomain() {
return domain;
}

public String getConfiguration() {
return configuration;
}

public boolean isRemoved() {
return removed;
}

/**
* Encode as a {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA}.
*
* @return {@link Row} object with the schema {@link DomainMetadata#FULL_SCHEMA}
*/
public Row toRow() {
Map<Integer, Object> domainMetadataMap = new HashMap<>();
domainMetadataMap.put(0, domain);
domainMetadataMap.put(1, configuration);
domainMetadataMap.put(2, removed);

return new GenericRow(DomainMetadata.FULL_SCHEMA, domainMetadataMap);
}

@Override
public String toString() {
return String.format(
"DomainMetadata{domain='%s', configuration='%s', removed='%s'}",
domain, configuration, removed);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
DomainMetadata that = (DomainMetadata) obj;
return removed == that.removed
&& domain.equals(that.domain)
&& configuration.equals(that.configuration);
}

@Override
public int hashCode() {
return java.util.Objects.hash(domain, configuration, removed);
}
}
Loading

0 comments on commit 354954f

Please sign in to comment.