Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

/**
* Represents a transaction to mutate a Delta table.
Expand Down Expand Up @@ -108,6 +109,12 @@ public interface Transaction {
TransactionCommitResult commit(Engine engine, CloseableIterable<Row> dataActions)
throws ConcurrentWriteException;

/**
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love how this is on the Txn and not the TxnBuilder ...

* Adds custom properties that will be passed through to the committer. These properties allow
* connectors to inject catalog-specific metadata without Kernel inspection.
*/
void withCommitterProperties(Supplier<Map<String, String>> committerProperties);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added here instead of on the builder so Connectors can inject any properies they'd like at any point.

e.g. maybe these properties depend on the data files written


/**
* Commit the provided domain metadata as part of this transaction. If this is called more than
* once with the same {@code domain} the latest provided {@code config} will be committed in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.Tuple2;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

/**
* Contains all information (excluding the iterator of finalized actions) required to commit changes
Expand Down Expand Up @@ -57,6 +59,8 @@ public enum CommitType {
private final long version;
private final String logPath;
private final CommitInfo commitInfo;
private final Supplier<Map<String, String>> committerProperties;

private final Optional<Tuple2<Protocol, Metadata>> readPandMOpt;
private final Optional<Protocol> newProtocolOpt;
private final Optional<Metadata> newMetadataOpt;
Expand All @@ -65,13 +69,15 @@ public CommitMetadata(
long version,
String logPath,
CommitInfo commitInfo,
Supplier<Map<String, String>> committerProperties,
Optional<Tuple2<Protocol, Metadata>> readPandMOpt,
Optional<Protocol> newProtocolOpt,
Optional<Metadata> newMetadataOpt) {
checkArgument(version >= 0, "version must be non-negative: %d", version);
this.version = version;
this.logPath = requireNonNull(logPath, "logPath is null");
this.commitInfo = requireNonNull(commitInfo, "commitInfo is null");
this.committerProperties = requireNonNull(committerProperties, "committerProperties is null");
this.readPandMOpt = requireNonNull(readPandMOpt, "readPandMOpt is null");
this.newProtocolOpt = requireNonNull(newProtocolOpt, "newProtocolOpt is null");
this.newMetadataOpt = requireNonNull(newMetadataOpt, "newMetadataOpt is null");
Expand Down Expand Up @@ -102,6 +108,14 @@ public CommitInfo getCommitInfo() {
return commitInfo;
}

/**
* Returns custom properties provided by the connector to be passed through to the committer.
* These properties are not inspected by Kernel and are used for catalog-specific functionality.
*/
public Supplier<Map<String, String>> getCommitterProperties() {
return committerProperties;
}

/**
* The {@link Protocol} that was read at the beginning of the commit. Empty if a new table is
* being created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,26 @@ public static class UniversalFormats {
"need to be a string.",
false);

/** The minimum reader version of the Delta protocol. */
public static final TableConfig<Integer> MIN_READER_VERSION =
new TableConfig<>(
"delta.minReaderVersion",
"1",
Integer::valueOf,
value -> value >= 1,
"needs to be a positive integer.",
false);

/** The minimum writer version of the Delta protocol. */
public static final TableConfig<Integer> MIN_WRITER_VERSION =
new TableConfig<>(
"delta.minWriterVersion",
"1",
Integer::valueOf,
value -> value >= 1,
"needs to be a positive integer.",
false);

/** All the valid properties that can be set on the table. */
private static final Map<String, TableConfig<?>> VALID_PROPERTIES =
Collections.unmodifiableMap(
Expand Down Expand Up @@ -393,6 +413,8 @@ public static class UniversalFormats {
addConfig(this, UNIVERSAL_FORMAT_ENABLED_FORMATS);
addConfig(this, MATERIALIZED_ROW_ID_COLUMN_NAME);
addConfig(this, MATERIALIZED_ROW_COMMIT_VERSION_COLUMN_NAME);
addConfig(this, MIN_READER_VERSION);
addConfig(this, MIN_WRITER_VERSION);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.delta.kernel.internal.util.Preconditions.checkState;
import static io.delta.kernel.internal.util.Utils.toCloseableIterator;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.*;
import io.delta.kernel.commit.CommitFailedException;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.io.UncheckedIOException;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -85,6 +87,7 @@ public class TransactionImpl implements Transaction {
///////////////////////////
///// Instance fields /////
///////////////////////////

private final UUID txnId = UUID.randomUUID();

/** If the transaction is defining a new table from scratch (i.e. create table, replace table) */
Expand Down Expand Up @@ -121,6 +124,7 @@ public class TransactionImpl implements Transaction {
private Optional<CRCInfo> currentCrcInfo;
private Optional<Long> providedRowIdHighWatermark = Optional.empty();
private boolean closed; // To avoid trying to commit the same transaction again.
private Supplier<Map<String, String>> committerProperties = Collections::emptyMap;

public TransactionImpl(
boolean isCreateOrReplace,
Expand Down Expand Up @@ -195,6 +199,11 @@ public void addDomainMetadataInternal(String domain, String config) {
domainMetadataState.addDomain(domain, config);
}

@Override
public void withCommitterProperties(Supplier<Map<String, String>> committerProperties) {
this.committerProperties = requireNonNull(committerProperties, "committerProperties is null");
}

@Override
public void addDomainMetadata(String domain, String config) {
checkState(
Expand Down Expand Up @@ -532,6 +541,7 @@ private long doCommit(
commitAsVersion,
logPath.toString(),
attemptCommitInfo,
committerProperties,
readSnapshotOpt.map(x -> new Tuple2<>(x.getProtocol(), x.getMetadata())),
shouldUpdateProtocol ? Optional.of(protocol) : Optional.empty(),
shouldUpdateMetadata ? Optional.of(metadata) : Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,21 @@ public Set<String> getWriterFeatures() {
return writerFeatures;
}

public Set<String> getReaderAndWriterFeatures() {
final Set<String> allFeatureNames = new HashSet<>();
allFeatureNames.addAll(readerFeatures);
allFeatureNames.addAll(writerFeatures);
return allFeatureNames;
}

public boolean supportsReaderFeatures() {
return supportsReaderFeatures;
}

public boolean supportsWriterFeatures() {
return supportsWriterFeatures;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Protocol{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public CommitMetadata getCommitMetadata() {
getCommitAsVersion(),
txnState.logPath,
commitInfo,
Collections::emptyMap, /* committerProperties */
txnState.readTableOpt.map(x -> new Tuple2<>(x.getProtocol(), x.getMetadata())),
txnState.updatedProtocolOpt,
txnState.isMetadataUpdate() ? Optional.of(metadata) : Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package io.delta.kernel.internal.commit

import java.util.Optional

import io.delta.kernel.commit.CommitMetadata
import scala.collection.JavaConverters._

import io.delta.kernel.commit.CommitMetadata.CommitType
import io.delta.kernel.internal.actions.{Metadata, Protocol}
import io.delta.kernel.internal.util.{Tuple2 => KernelTuple2}
import io.delta.kernel.test.{ActionUtils, TestFixtures, VectorTestUtils}
import io.delta.kernel.test.{TestFixtures, VectorTestUtils}
import io.delta.kernel.types.{IntegerType, StructType}

import org.scalatest.funsuite.AnyFunSuite
Expand Down Expand Up @@ -57,6 +58,13 @@ class CommitMetadataSuite extends AnyFunSuite
commitInfo = null,
readPandMOpt = Optional.of(new KernelTuple2(protocol12, basicPartitionedMetadata)))
}

intercept[NullPointerException] {
createCommitMetadata(
version = updateVersionNonZero,
readPandMOpt = Optional.of(new KernelTuple2(protocol12, basicPartitionedMetadata)),
committerProperties = null)
}
}

test("constructor validates readProtocol and readMetadata consistency") {
Expand Down Expand Up @@ -100,6 +108,17 @@ class CommitMetadataSuite extends AnyFunSuite
assert(exMsg.contains("InCommitTimestamp must be present for commits to catalogManaged tables"))
}

test("getCommitterProperties returns provided supplier") {
val props = Map("key1" -> "value1", "key2" -> "value2").asJava

val commitMetadata = createCommitMetadata(
version = updateVersionNonZero,
readPandMOpt = Optional.of(new KernelTuple2(protocol12, basicPartitionedMetadata)),
committerProperties = () => props)

assert(commitMetadata.getCommitterProperties.get() == props)
}

test("getEffectiveProtocol returns new protocol when present") {
val newProtocol = new Protocol(2, 3)
val commitMetadata = createCommitMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package io.delta.kernel.test

import java.util.Optional
import java.util.{Collections, Map => JMap, Optional}
import java.util.function.Supplier

import io.delta.kernel.commit.CommitMetadata
import io.delta.kernel.internal.actions.{CommitInfo, Metadata, Protocol}
Expand Down Expand Up @@ -57,13 +58,15 @@ trait TestFixtures extends ActionUtils {
version: Long,
logPath: String = "/fake/_delta_log",
commitInfo: CommitInfo = testCommitInfo(),
committerProperties: Supplier[JMap[String, String]] = () => Collections.emptyMap(),
readPandMOpt: Optional[Tuple2[Protocol, Metadata]] = Optional.empty(),
newProtocolOpt: Optional[Protocol] = Optional.empty(),
newMetadataOpt: Optional[Metadata] = Optional.empty()): CommitMetadata = {
new CommitMetadata(
version,
logPath,
commitInfo,
committerProperties,
readPandMOpt,
newProtocolOpt,
newMetadataOpt)
Expand Down
25 changes: 20 additions & 5 deletions unity/src/main/java/io/delta/unity/UCCatalogManagedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.SnapshotBuilder;
import io.delta.kernel.TableManager;
import io.delta.kernel.annotation.Experimental;
import io.delta.kernel.commit.Committer;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.annotation.VisibleForTesting;
import io.delta.kernel.internal.files.ParsedLogData;
Expand Down Expand Up @@ -56,14 +57,14 @@ public class UCCatalogManagedClient {
* add this table feature to Kernel's protocol. This property won't be written to the delta
* metadata.
*/
private static final String CATALOG_MANAGED_ENABLEMENT_KEY =
protected static final String CATALOG_MANAGED_ENABLEMENT_KEY =
TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX
+ TableFeatures.CATALOG_MANAGED_R_W_FEATURE_PREVIEW.featureName();

/** Key for identifying Unity Catalog table ID. */
private static final String UC_TABLE_ID_KEY = "ucTableId";
protected static final String UC_TABLE_ID_KEY = "ucTableId";

private final UCClient ucClient;
protected final UCClient ucClient;

public UCCatalogManagedClient(UCClient ucClient) {
this.ucClient = Objects.requireNonNull(ucClient, "ucClient is null");
Expand Down Expand Up @@ -112,7 +113,7 @@ public Snapshot loadSnapshot(
}

return snapshotBuilder
.withCommitter(new UCCatalogManagedCommitter(ucClient, ucTableId, tablePath))
.withCommitter(createUCCommitter(ucClient, ucTableId, tablePath))
.withLogData(logData)
.build(engine);
});
Expand Down Expand Up @@ -142,10 +143,24 @@ public CreateTableTransactionBuilder buildCreateTableTransaction(
Objects.requireNonNull(engineInfo, "engineInfo is null");

return TableManager.buildCreateTableTransaction(tablePath, schema, engineInfo)
.withCommitter(new UCCatalogManagedCommitter(ucClient, ucTableId, tablePath))
.withCommitter(createUCCommitter(ucClient, ucTableId, tablePath))
.withTableProperties(getRequiredTablePropertiesForCreate(ucTableId));
}

/////////////////////////////////////////
// Protected Methods for Extensibility //
/////////////////////////////////////////

/**
* Creates a UC committer instance for the specified table.
*
* <p>This method allows subclasses to provide custom committer implementations for specialized
* use cases.
*/
protected Committer createUCCommitter(UCClient ucClient, String ucTableId, String tablePath) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code smell here. I wonder if we should abandon the UCCatalogManagedClient::loadSnapshot API -- and instead just provide static utilities (like getting the commits, parsing the commits, etc.)

return new UCCatalogManagedCommitter(ucClient, ucTableId, tablePath);
}

////////////////////
// Helper Methods //
////////////////////
Expand Down
23 changes: 19 additions & 4 deletions unity/src/main/java/io/delta/unity/UCCatalogManagedCommitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.delta.kernel.commit.Committer;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.annotation.VisibleForTesting;
import io.delta.kernel.internal.files.ParsedLogData;
import io.delta.kernel.utils.CloseableIterator;
Expand All @@ -49,9 +50,9 @@
public class UCCatalogManagedCommitter implements Committer {
private static final Logger logger = LoggerFactory.getLogger(UCCatalogManagedCommitter.class);

private final UCClient ucClient;
private final String ucTableId;
private final Path tablePath;
protected final UCClient ucClient;
protected final String ucTableId;
protected final Path tablePath;

/**
* Creates a new UCCatalogManagedCommitter for the specified Unity Catalog-managed Delta table.
Expand Down Expand Up @@ -126,6 +127,20 @@ private CommitResponse writeImpl(
return new CommitResponse(ParsedLogData.forFileStatus(kernelStagedCommitFileStatus));
}

/////////////////////////////////////////
// Protected Methods for Extensibility //
/////////////////////////////////////////

/**
* Generates the metadata payload for UC commit operations.
*
* <p>This method allows subclasses to customize or enhance metadata before sending to Unity
* Catalog.
*/
protected Optional<Metadata> generateMetadataPayloadOpt(CommitMetadata commitMetadata) {
return commitMetadata.getNewMetadataOpt();
}

////////////////////
// Helper methods //
////////////////////
Expand Down Expand Up @@ -203,7 +218,7 @@ private void commitToUC(CommitMetadata commitMetadata, FileStatus kernelStagedCo
Optional.of(getUcCommitPayload(commitMetadata, kernelStagedCommitFileStatus)),
Optional.empty() /* lastKnownBackfilledVersion */, // TODO: take this in as a hint
false /* isDisown */,
commitMetadata.getNewMetadataOpt().map(MetadataAdapter::new),
generateMetadataPayloadOpt(commitMetadata).map(MetadataAdapter::new),
commitMetadata.getNewProtocolOpt().map(ProtocolAdapter::new));
return null;
} catch (io.delta.storage.commit.CommitFailedException cfe) {
Expand Down