Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Kernel] Add an end2end prototype of Coordinated Commit read support …
Browse files Browse the repository at this point in the history
…in kernel
EstherBear committed Jul 6, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent ad8d1cb commit 323f08d
Showing 33 changed files with 2,279 additions and 27 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -453,8 +453,8 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
libraryDependencies ++= Seq(
"org.roaringbitmap" % "RoaringBitmap" % "0.9.25",
"org.slf4j" % "slf4j-api" % "1.7.36",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",

"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13.2" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
57 changes: 57 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/commit/Commit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import io.delta.kernel.utils.FileStatus;

/**
* Representation of a commit file
*/
public class Commit {

private long version;

private FileStatus fileStatus;

private long commitTimestamp;

public Commit(long version, FileStatus fileStatus, long commitTimestamp) {
this.version = version;
this.fileStatus = fileStatus;
this.commitTimestamp = commitTimestamp;
}

public long getVersion() {
return version;
}

public FileStatus getFileStatus() {
return fileStatus;
}

public long getCommitTimestamp() {
return commitTimestamp;
}

public Commit withFileStatus(FileStatus fileStatus) {
return new Commit(version, fileStatus, commitTimestamp);
}

public Commit withCommitTimestamp(long commitTimestamp) {
return new Commit(version, fileStatus, commitTimestamp);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import java.util.Iterator;
import java.util.Map;

/**
* Exception raised by
* {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit(
* String, Map, long, Iterator, UpdatedActions)}
*
* <pre>
* | retryable | conflict | meaning |
* | no | no | something bad happened (e.g. auth failure) |
* | no | yes | permanent transaction conflict (e.g. multi-table commit failed) |
* | yes | no | transient error (e.g. network hiccup) |
* | yes | yes | physical conflict (allowed to rebase and retry) |
* </pre>
*/
public class CommitFailedException extends Exception {

private boolean retryable;

private boolean conflict;

private String message;

public CommitFailedException(boolean retryable, boolean conflict, String message) {
this.retryable = retryable;
this.conflict = conflict;
this.message = message;
}

public boolean getRetryable() {
return retryable;
}

public boolean getConflict() {
return conflict;
}

public String getMessage() {
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import java.util.Iterator;
import java.util.Map;

/**
* Response container for
* {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit(
* String, Map, long, Iterator, UpdatedActions)}.
*/
public class CommitResponse {

private Commit commit;

public CommitResponse(Commit commit) {
this.commit = commit;
}

public Commit getCommit() {
return commit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import java.util.List;
import java.util.Map;

/**
* Response container for
* {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits(
* String, Map, Long, Long)}.
*/
public class GetCommitsResponse {
private List<Commit> commits;

private long latestTableVersion;

public GetCommitsResponse(List<Commit> commits, long latestTableVersion) {
this.commits = commits;
this.latestTableVersion = latestTableVersion;
}

public List<Commit> getCommits() {
return commits;
}

public long getLatestTableVersion() {
return latestTableVersion;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import io.delta.kernel.commit.actions.AbstractCommitInfo;
import io.delta.kernel.commit.actions.AbstractMetadata;
import io.delta.kernel.commit.actions.AbstractProtocol;

/**
* A container class to inform the CommitCoordinatorClientHandler about any changes in
* Protocol/Metadata
*/
public class UpdatedActions {
private AbstractCommitInfo commitInfo;

private AbstractMetadata newMetadata;

private AbstractProtocol newProtocol;

private AbstractMetadata oldMetadata;

private AbstractProtocol oldProtocol;

public UpdatedActions(
AbstractCommitInfo commitInfo,
AbstractMetadata newMetadata,
AbstractProtocol newProtocol,
AbstractMetadata oldMetadata,
AbstractProtocol oldProtocol) {
this.commitInfo = commitInfo;
this.newMetadata = newMetadata;
this.newProtocol = newProtocol;
this.oldMetadata = oldMetadata;
this.oldProtocol = oldProtocol;
}

public AbstractCommitInfo getCommitInfo() {
return commitInfo;
}

public AbstractMetadata getNewMetadata() {
return newMetadata;
}

public AbstractProtocol getNewProtocol() {
return newProtocol;
}

public AbstractMetadata getOldMetadata() {
return oldMetadata;
}

public AbstractProtocol getOldProtocol() {
return oldProtocol;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit.actions;

/**
* Interface for objects that represents the base information for a commit.
* Commits need to provide an in-commit timestamp. This timestamp is used
* to specify the exact time the commit happened and determines the target
* version for time-based time travel queries.
*/
public interface AbstractCommitInfo {

/**
* Get the timestamp of the commit as millis after the epoch.
*/
long getCommitTimestamp();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit.actions;

import java.util.List;
import java.util.Map;

/**
* Interface for metadata actions in Delta. The metadata defines the metadata
* of the table.
*/
public interface AbstractMetadata {

/**
* A unique table identifier.
*/
String getId();

/**
* User-specified table identifier.
*/
String getName();

/**
* User-specified table description.
*/
String getDescription();

/** The table provider format. */
String getProvider();

/** The format options */
Map<String, String> getFormatOptions();

/**
* The table schema in string representation.
*/
String getSchemaString();

/**
* List of partition columns.
*/
List<String> getPartitionColumns();

/**
* The table properties defined on the table.
*/
Map<String, String> getConfiguration();

/**
* Timestamp for the creation of this metadata.
*/
Long getCreatedTime();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit.actions;

import java.util.Set;

/**
* Interface for protocol actions in Delta. The protocol defines the requirements
* that readers and writers of the table need to meet.
*/
public interface AbstractProtocol {

/**
* The minimum reader version required to read the table.
*/
int getMinReaderVersion();

/**
* The minimum writer version required to read the table.
*/
int getMinWriterVersion();

/**
* The reader features that need to be supported to read the table.
*/
Set<String> getReaderFeatures();

/**
* The writer features that need to be supported to write the table.
*/
Set<String> getWriterFeatures();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.engine;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.commit.Commit;
import io.delta.kernel.commit.CommitFailedException;
import io.delta.kernel.commit.CommitResponse;
import io.delta.kernel.commit.GetCommitsResponse;
import io.delta.kernel.commit.UpdatedActions;
import io.delta.kernel.commit.actions.AbstractMetadata;
import io.delta.kernel.commit.actions.AbstractProtocol;

/**
* Provides coordinated commits related functionalities to Delta Kernel.
*
* @since 3.0.0
*/
@Evolving
public interface CommitCoordinatorClientHandler {

/**
* API to register the table represented by the given `logPath` at the provided
* currentTableVersion with the commit coordinator this commit coordinator client represents.
* <p>
* This API is called when the table is being converted from a file system table to a
* coordinated-commit table.
* <p>
* When a new coordinated-commit table is being created, the currentTableVersion will be -1 and
* the upgrade commit needs to be a file system commit which will write the backfilled file
* directly.
*
* @param logPath The path to the delta log of the table that should be converted
* @param currentVersion The currentTableVersion is the version of the table just before
* conversion. currentTableVersion + 1 represents the commit that
* will do the conversion. This must be backfilled atomically.
* currentTableVersion + 2 represents the first commit after conversion.
* This will go through the CommitCoordinatorClient and the client is
* free to choose when it wants to backfill this commit.
* @param currentMetadata The metadata of the table at currentTableVersion
* @param currentProtocol The protocol of the table at currentTableVersion
* @return A map of key-value pairs which is issued by the commit coordinator to identify the
* table. This should be stored in the table's metadata. This information needs to be
* passed to the {@link #commit}, {@link #getCommits}, and {@link #backfillToVersion}
* APIs to identify the table.
*/
Map<String, String> registerTable(
String logPath,
long currentVersion,
AbstractMetadata currentMetadata,
AbstractProtocol currentProtocol);

/**
* API to commit the given set of actions to the table represented by logPath at the
* given commitVersion.
*
* @param logPath The path to the delta log of the table that should be committed to.
* @param tableConf The table configuration that was returned by the commit coordinator
* client during registration.
* @param commitVersion The version of the commit that is being committed.
* @param actions The actions that need to be committed.
* @param updatedActions The commit info and any metadata or protocol changes that are made
* as part of this commit.
* @return CommitResponse which contains the file status of the committed commit file. If the
* commit is already backfilled, then the file status could be omitted from the response
* and the client could retrieve the information by itself.
*/
CommitResponse commit(
String logPath,
Map<String, String> tableConf,
long commitVersion,
Iterator<String> actions,
UpdatedActions updatedActions) throws IOException, CommitFailedException;

/**
* API to get the unbackfilled commits for the table represented by the given logPath.
* Commits older than startVersion or newer than endVersion (if given) are ignored. The
* returned commits are contiguous and in ascending version order.
*
* Note that the first version returned by this API may not be equal to startVersion. This
* happens when some versions starting from startVersion have already been backfilled and so
* the commit coordinator may have stopped tracking them.
*
* The returned latestTableVersion is the maximum commit version ratified by the commit
* coordinator. Note that returning latestTableVersion as -1 is acceptable only if the commit
* coordinator never ratified any version, i.e. it never accepted any unbackfilled commit.
*
* @param tablePath The path to the delta log of the table for which the unbackfilled
* commits should be retrieved.
* @param tableConf The table configuration that was returned by the commit coordinator
* during registration.
* @param startVersion The minimum version of the commit that should be returned. Can be null.
* @param endVersion The maximum version of the commit that should be returned. Can be null.
* @return GetCommitsResponse which has a list of {@link Commit}s and the latestTableVersion
* which is tracked by {@link CommitCoordinatorClientHandler}.
*/
GetCommitsResponse getCommits(
String tablePath,
Map<String, String> tableConf,
Long startVersion,
Long endVersion);

/**
* API to ask the commit coordinator client to backfill all commits up to {@code version}
* and notify the commit coordinator.
*
* If this API returns successfully, that means the backfill must have been completed, although
* the commit coordinator may not be aware of it yet.
*
* @param logPath The path to the delta log of the table that should be backfilled.
* @param tableConf The table configuration that was returned by the commit coordinator
* during registration.
* @param version The version till which the commit coordinator client should backfill.
* @param lastKnownBackfilledVersion The last known version that was backfilled before this API
* was called. If it is None or invalid, then the commit
* coordinator client should backfill from the beginning of
* the table. Can be null.
*/
void backfillToVersion(
String logPath,
Map<String, String> tableConf,
long version,
Long lastKnownBackfilledVersion) throws IOException;

/**
* Determines whether this CommitCoordinatorClient is semantically equal to another
* CommitCoordinatorClient.
*
* Semantic equality is determined by each CommitCoordinatorClient implementation based on
* whether the two instances can be used interchangeably when invoking any of the
* CommitCoordinatorClient APIs, such as {@link #commit}, {@link #getCommits}, etc. For example,
* both instances might be pointing to the same underlying endpoint.
*/
Boolean semanticEquals(CommitCoordinatorClientHandler other);
}
11 changes: 11 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@

package io.delta.kernel.engine;

import java.util.Map;

import io.delta.kernel.annotation.Evolving;

/**
@@ -55,4 +57,13 @@ public interface Engine {
* @return An implementation of {@link ParquetHandler}.
*/
ParquetHandler getParquetHandler();

/**
* Get the connector provided {@link CommitCoordinatorClientHandler}.
*
* @param name Name of the underlying commit coordinator client.
* @return An implementation of {@link CommitCoordinatorClientHandler}.
*/
CommitCoordinatorClientHandler getCommitCoordinatorClientHandler(
String name, Map<String, String> conf);
}
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@

import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.types.StructType;

@@ -30,7 +31,7 @@
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import static io.delta.kernel.internal.TableConfig.TOMBSTONE_RETENTION;
import static io.delta.kernel.internal.TableConfig.*;

/**
* Implementation of {@link Snapshot}.
@@ -159,4 +160,12 @@ public long getTimestamp(Engine engine) {
return logSegment.lastCommitTimestamp;
}
}

public Optional<CommitCoordinatorClientHandler> getCommitCoordinatorClientHandlerOpt(
Engine engine) {
return COORDINATED_COMMITS_COORDINATOR_NAME.fromMetadata(metadata).map(
commitCoordinatorStr -> engine.getCommitCoordinatorClientHandler(
commitCoordinatorStr,
COORDINATED_COMMITS_COORDINATOR_CONF.fromMetadata(metadata)));
}
}
Original file line number Diff line number Diff line change
@@ -19,6 +19,10 @@
import java.util.function.Function;
import java.util.function.Predicate;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.delta.kernel.exceptions.InvalidConfigurationValueException;
import io.delta.kernel.exceptions.UnknownConfigurationException;
import io.delta.kernel.internal.actions.Metadata;
@@ -29,6 +33,8 @@
* from the table metadata.
*/
public class TableConfig<T> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
* The shortest duration we have to keep logically deleted data files around before deleting
* them physically.
@@ -105,6 +111,45 @@ public class TableConfig<T> {
"needs to be a long."
);


/*
* This table property is used to track the commit-coordinator name for this table. If this
* property is not set, the table will be considered as file system table and commits will be
* done via atomically publishing the commit file.
*/
public static final TableConfig<Optional<String>> COORDINATED_COMMITS_COORDINATOR_NAME =
new TableConfig<>(
"delta.coordinatedCommits.commitCoordinator-preview",
null, /* default values */
Optional::ofNullable,
value -> true,
"The commit-coordinator name for this table. This is used to determine\n" +
"which implementation of commit-coordinator to use when committing\n" +
"to this table. If this property is not set, the table will be\n" +
"considered as file system table and commits will be done via\n" +
"atomically publishing the commit file.\n"
);

public static final TableConfig<Map<String, String>> COORDINATED_COMMITS_COORDINATOR_CONF =
new TableConfig<>(
"delta.coordinatedCommits.commitCoordinatorConf-preview",
null, /* default values */
v -> {
if (v == null) {
return Collections.emptyMap();
}
try {
return OBJECT_MAPPER.readValue(
v, new TypeReference<Map<String, String>>() {});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
},
value -> true,
"A string-to-string map of configuration properties for the" +
" coordinated commits-coordinator."
);

/**
* All the valid properties that can be set on the table.
*/
@@ -115,6 +160,8 @@ public class TableConfig<T> {
addConfig(this, IN_COMMIT_TIMESTAMPS_ENABLED);
addConfig(this, IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION);
addConfig(this, IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP);
addConfig(this, COORDINATED_COMMITS_COORDINATOR_NAME);
addConfig(this, COORDINATED_COMMITS_COORDINATOR_CONF);
}}
);

@@ -158,6 +205,24 @@ public String getKey() {
return key;
}

/**
* Returns the default value of the table property.
*
* @return the default value of the table property
*/
public String getDefaultValue() {
return defaultValue;
}

/**
* Returns the fromString function of the table property.
*
* @return the fromString function of the table property
*/
public Function<String, T> getFromString() {
return fromString;
}

/**
* Validates that the given properties have the delta prefix in the key name, and they are in
* the set of valid properties. The caller should get the validated configurations and store the
Original file line number Diff line number Diff line change
@@ -105,6 +105,17 @@ public static CommitInfo fromColumnVector(ColumnVector vector, int rowId) {
private final String txnId;
private Optional<Long> inCommitTimestamp;

public static CommitInfo empty() {
return new CommitInfo(
Optional.empty(),
-1,
null,
null,
Collections.emptyMap(),
true,
null);
}

public CommitInfo(
Optional<Long> inCommitTimestamp,
long timestamp,
@@ -122,6 +133,18 @@ public CommitInfo(
this.txnId = txnId;
}

public CommitInfo withTimestamp(long newTimestamp) {
return new CommitInfo(
Optional.of(newTimestamp),
newTimestamp,
this.engineInfo,
this.operation,
this.operationParameters,
this.isBlindAppend,
this.txnId
);
}

public Optional<Long> getInCommitTimestamp() {
return inCommitTimestamp;
}
Original file line number Diff line number Diff line change
@@ -19,15 +19,19 @@
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.*;
import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.types.*;

import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.util.VectorUtils;
import static io.delta.kernel.internal.util.InternalUtils.requireNonNull;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue;


public class Metadata {

@@ -86,6 +90,20 @@ public static Metadata fromColumnVector(
// Logical data schema excluding partition columns
private final Lazy<StructType> dataSchema;

public static Metadata empty() {
return new Metadata(
java.util.UUID.randomUUID().toString(),
Optional.empty(),
Optional.empty(),
new Format(),
"",
null,
stringArrayValue(Collections.emptyList()),
Optional.empty(),
VectorUtils.stringStringMapValue(Collections.emptyMap())
);
}

public Metadata(
String id,
Optional<String> name,
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@

import io.delta.kernel.internal.TableFeatures;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.internal.util.VectorUtils;
import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue;
@@ -56,6 +57,29 @@ public static Protocol fromColumnVector(ColumnVector vector, int rowId) {
private final int minWriterVersion;
private final List<String> readerFeatures;
private final List<String> writerFeatures;
private final Lazy<Set<String>> readerAndWriterFeatureNames;

public static Protocol empty() {
return new Protocol(3, 7);
}

private static boolean supportsReaderFeatures(int minReaderVersion) {
return minReaderVersion >= 3;
}

private static boolean supportsWriterFeatures(int minWriterVersion) {
return minWriterVersion >= 7;
}

public Protocol(int minReaderVersion, int minWriterVersion) {
this.minReaderVersion = minReaderVersion;
this.minWriterVersion = minWriterVersion;
this.readerFeatures =
supportsReaderFeatures(minReaderVersion) ? Collections.emptyList() : null;
this.writerFeatures =
supportsWriterFeatures(minWriterVersion) ? Collections.emptyList() : null;
this.readerAndWriterFeatureNames = getLazyReaderAndWriterFeatureNames();
}

public Protocol(
int minReaderVersion,
@@ -66,6 +90,7 @@ public Protocol(
this.minWriterVersion = minWriterVersion;
this.readerFeatures = readerFeatures;
this.writerFeatures = writerFeatures;
this.readerAndWriterFeatureNames = getLazyReaderAndWriterFeatureNames();
}

public int getMinReaderVersion() {
@@ -125,4 +150,25 @@ public Protocol withNewWriterFeatures(
this.readerFeatures == null ? null : new ArrayList<>(this.readerFeatures),
newWriterFeatures);
}

/**
* Check if a `feature` is supported by this protocol. This means the protocol supports
* table features and references the feature.
*/
public boolean isFeatureSupported(String feature) {
return readerAndWriterFeatureNames.get().contains(feature);
}

private Lazy<Set<String>> getLazyReaderAndWriterFeatureNames() {
return new Lazy<>(() -> {
Set<String> names = new HashSet<>();
if (readerFeatures != null) {
names.addAll(readerFeatures);
}
if (writerFeatures != null) {
names.addAll(writerFeatures);
}
return names;
});
}
}
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
import java.io.*;
import java.nio.file.FileAlreadyExistsException;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -29,6 +30,8 @@
import org.slf4j.LoggerFactory;

import io.delta.kernel.*;
import io.delta.kernel.commit.Commit;
import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
import io.delta.kernel.exceptions.InvalidTableException;
@@ -130,9 +133,13 @@ public Snapshot getSnapshotAt(
Engine engine,
long version) throws TableNotFoundException {

SnapshotImpl upperBoundSnapshot = getSnapshotAtInit(engine);

Optional<LogSegment> logSegmentOpt = getLogSegmentForVersion(engine,
Optional.empty(), /* startCheckpointOpt */
Optional.of(version) /* versionToLoadOpt */);
Optional.of(version) /* versionToLoadOpt */,
upperBoundSnapshot.getCommitCoordinatorClientHandlerOpt(engine)
/* commitCoordinatorClientHandlerOpt */);

return logSegmentOpt
.map(logSegment -> createSnapshot(logSegment, engine))
@@ -285,7 +292,8 @@ private Optional<CloseableIterator<FileStatus>> listFromOrNone(
protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
Engine engine,
long startVersion,
Optional<Long> versionToLoad) {
Optional<Long> versionToLoad,
Optional<CommitCoordinatorClientHandler> commitCoordinatorClientHandlerOpt) {
versionToLoad.ifPresent(v ->
checkArgument(
v >= startVersion,
@@ -296,22 +304,34 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
));
logger.debug("startVersion: {}, versionToLoad: {}", startVersion, versionToLoad);

return listFromOrNone(engine,
List<Commit> unbackfilledCommits = commitCoordinatorClientHandlerOpt
.map(commitCoordinatorClientHandler -> commitCoordinatorClientHandler
.getCommits(
logPath.toString(),
new HashMap<>(),
startVersion,
versionToLoad.orElse(null))
.getCommits())
.orElse(Collections.emptyList());


AtomicLong maxDeltaVersionSeen = new AtomicLong(startVersion - 1);
Optional<List<FileStatus>> resultFromFsListingOpt = listFromOrNone(engine,
startVersion).map(fileStatusesIter -> {
final List<FileStatus> output = new ArrayList<>();

while (fileStatusesIter.hasNext()) {
final FileStatus fileStatus = fileStatusesIter.next();
final String fileName = getName(fileStatus.getPath());

// Pick up all checkpoint and delta files
if (!isDeltaCommitOrCheckpointFile(getName(fileStatus.getPath()))) {
if (!isDeltaCommitOrCheckpointFile(fileName)) {
continue;
}

// Checkpoint files of 0 size are invalid but may be ignored silently when read,
// hence we drop them so that we never pick up such checkpoints.
if (FileNames.isCheckpointFile(getName(fileStatus.getPath())) &&
fileStatus.getSize() == 0) {
if (FileNames.isCheckpointFile(fileName) && fileStatus.getSize() == 0) {
continue;
}

@@ -333,11 +353,43 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
break;
}

// Ideally listFromOrNone should return lexiographically sorted files amd so
// maxDeltaVersionSeen should be equal to fileVersion.
// But we are being defensive here and taking max of all the fileVersions seen.
if (FileNames.isCommitFile(fileName)) {
maxDeltaVersionSeen.set(Math.max(
maxDeltaVersionSeen.get(),
FileNames.deltaVersion(fileStatus.getPath())));
}

output.add(fileStatus);
}

return output;
});

if (!commitCoordinatorClientHandlerOpt.isPresent()) {
return resultFromFsListingOpt;
}

List<FileStatus> unbackfilledCommitsFiltered = new ArrayList<>();
boolean dropConditionMet = false;
for (Commit commit : unbackfilledCommits) {
if (!dropConditionMet && commit.getVersion() <= maxDeltaVersionSeen.get()) {
continue;
} else {
dropConditionMet = true;
}
if (versionToLoad.isPresent() && commit.getVersion() > versionToLoad.get()) {
break;
}
unbackfilledCommitsFiltered.add(commit.getFileStatus());
}

return resultFromFsListingOpt.map(fsListing -> {
fsListing.addAll(unbackfilledCommitsFiltered);
return fsListing;
});
}

/**
@@ -353,15 +405,50 @@ private SnapshotImpl getSnapshotAtInit(Engine engine)
logger.warn("{}: Last checkpoint file is missing or corrupted. " +
"Will search for the checkpoint files directly.", tablePath);
}
Optional<LogSegment> logSegmentOpt =
getLogSegmentFrom(engine, lastCheckpointOpt);
Optional<LogSegment> logSegmentOpt = getLogSegmentFrom(engine, lastCheckpointOpt);

return logSegmentOpt
.map(logSegment -> createSnapshot(
logSegment, engine))
.map(logSegment -> getUpdatedSnapshot(
engine,
Optional.empty(), /* oldSnapshotOpt */
logSegment,
Optional.empty() /* initialCommitCoordinatorClientHandler */))
.orElseThrow(() -> new TableNotFoundException(tablePath.toString()));
}

private SnapshotImpl getUpdatedSnapshot(
Engine engine,
Optional<SnapshotImpl> oldSnapshotOpt,
LogSegment initialSegmentForNewSnapshot,
Optional<CommitCoordinatorClientHandler> initialCommitCoordinatorClientHandler) {
SnapshotImpl newSnapshot = createSnapshot(initialSegmentForNewSnapshot, engine);

Optional<CommitCoordinatorClientHandler> newCommitCoordinatorClientHandlerOpt =
newSnapshot.getCommitCoordinatorClientHandlerOpt(engine);
boolean usedStaleCommitCoordinator = newCommitCoordinatorClientHandlerOpt
.map(newStore -> !initialCommitCoordinatorClientHandler.isPresent() ||
!initialCommitCoordinatorClientHandler.get().semanticEquals(newStore))
.orElse(false);

if (usedStaleCommitCoordinator) {
Optional<LogSegment> segmentOpt = getLogSegmentForVersion(engine,
Optional.empty(), /* startCheckpointOpt */
newSnapshot.getLogSegment().checkpointVersionOpt /* versionToLoadOpt */,
newCommitCoordinatorClientHandlerOpt /* commitCoordinatorClientHandlerOpt */);
newSnapshot = segmentOpt
.map(segment -> {
if (oldSnapshotOpt.isPresent()
&& oldSnapshotOpt.get().getLogSegment().equals(segment)) {
return oldSnapshotOpt.get();
} else {
return createSnapshot(segment, engine);
}
})
.orElseThrow(() -> new TableNotFoundException(tablePath.toString()));
}
return newSnapshot;
}

private SnapshotImpl createSnapshot(
LogSegment initSegment,
Engine engine) {
@@ -419,7 +506,8 @@ private Optional<LogSegment> getLogSegmentFrom(
Optional<CheckpointMetaData> startingCheckpoint) {
return getLogSegmentForVersion(engine,
startingCheckpoint.map(x -> x.version),
Optional.empty());
Optional.empty(),
Optional.empty());
}

/**
@@ -441,7 +529,8 @@ private Optional<LogSegment> getLogSegmentFrom(
public Optional<LogSegment> getLogSegmentForVersion(
Engine engine,
Optional<Long> startCheckpoint,
Optional<Long> versionToLoad) {
Optional<Long> versionToLoad,
Optional<CommitCoordinatorClientHandler> commitCoordinatorClientHandlerOpt) {
// Only use startCheckpoint if it is <= versionToLoad
Optional<Long> startCheckpointToUse = startCheckpoint
.filter(v -> !versionToLoad.isPresent() || v <= versionToLoad.get());
@@ -468,7 +557,8 @@ public Optional<LogSegment> getLogSegmentForVersion(

long startTimeMillis = System.currentTimeMillis();
final Optional<List<FileStatus>> newFiles =
listDeltaAndCheckpointFiles(engine, startVersion, versionToLoad);
listDeltaAndCheckpointFiles(
engine, startVersion, versionToLoad, commitCoordinatorClientHandlerOpt);
logger.info("{}: Took {}ms to list the files after starting checkpoint",
tablePath,
System.currentTimeMillis() - startTimeMillis);
@@ -478,7 +568,8 @@ public Optional<LogSegment> getLogSegmentForVersion(
return getLogSegmentForVersion(engine,
startCheckpointToUse,
versionToLoad,
newFiles);
newFiles,
commitCoordinatorClientHandlerOpt);
} finally {
logger.info("{}: Took {}ms to construct a log segment",
tablePath,
@@ -494,7 +585,8 @@ protected Optional<LogSegment> getLogSegmentForVersion(
Engine engine,
Optional<Long> startCheckpointOpt,
Optional<Long> versionToLoadOpt,
Optional<List<FileStatus>> filesOpt) {
Optional<List<FileStatus>> filesOpt,
Optional<CommitCoordinatorClientHandler> commitCoordinatorClientHandlerOpt) {
final List<FileStatus> newFiles;
if (filesOpt.isPresent()) {
newFiles = filesOpt.get();
@@ -532,7 +624,8 @@ protected Optional<LogSegment> getLogSegmentForVersion(
// DeltaLog singleton, so try listing from the first version
return getLogSegmentForVersion(engine,
Optional.empty(),
versionToLoadOpt);
versionToLoadOpt,
commitCoordinatorClientHandlerOpt);
}

Tuple2<List<FileStatus>, List<FileStatus>> checkpointsAndDeltas = ListUtils
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.regex.Pattern;

import io.delta.kernel.internal.fs.Path;
@@ -29,6 +31,9 @@ private FileNames() {}
private static final Pattern DELTA_FILE_PATTERN =
Pattern.compile("\\d+\\.json");

public static final Pattern UUID_DELTA_FILE_REGEX =
Pattern.compile("(\\d+)\\.([^\\.]+)\\.json");

private static final Pattern CHECKPOINT_FILE_PATTERN =
Pattern.compile(
"(\\d+)\\.checkpoint((\\.\\d+\\.\\d+)?\\.parquet|\\.[^.]+\\.(json|parquet))");
@@ -43,6 +48,7 @@ private FileNames() {}
Pattern.compile("(\\d+)\\.checkpoint\\.\\d+\\.\\d+\\.parquet");

public static final String SIDECAR_DIRECTORY = "_sidecars";
public static final String COMMIT_SUBDIR = "_commits";

/**
* Returns the delta (json format) path for a given delta file.
@@ -51,6 +57,20 @@ public static String deltaFile(Path path, long version) {
return String.format("%s/%020d.json", path, version);
}

/**
* Returns the un-backfilled uuid formatted delta (json format) path for a given version.
*
* @param logPath The root path of the delta log.
* @param version The version of the delta file.
* @return The path to the un-backfilled delta file: logPath/_commits/version.uuid.json
*/
public static Path unbackfilledDeltaFile(
Path logPath, long version, Optional<String> uuidString) {
Path basePath = commitDirPath(logPath);
String uuid = uuidString.orElse(UUID.randomUUID().toString());
return new Path(basePath, String.format("%020d.%s.json", version, uuid));
}

/**
* Returns the version for the given delta path.
*/
@@ -161,7 +181,9 @@ public static boolean isV2CheckpointFile(String fileName) {


public static boolean isCommitFile(String fileName) {
return DELTA_FILE_PATTERN.matcher(new Path(fileName).getName()).matches();
String filename = new Path(fileName).getName();
return DELTA_FILE_PATTERN.matcher(filename).matches() ||
UUID_DELTA_FILE_REGEX.matcher(filename).matches();
}

/**
@@ -183,4 +205,9 @@ public static long getFileVersion(Path path) {
);
}
}

/** Returns path to the sidecar directory */
public static Path commitDirPath(Path logPath) {
return new Path(logPath, COMMIT_SUBDIR);
}
}
Original file line number Diff line number Diff line change
@@ -220,7 +220,8 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
new MockSidecarParquetHandler(expectedSidecars),
new MockSidecarJsonHandler(expectedSidecars)),
Optional.empty(),
versionToLoad
versionToLoad,
Optional.empty()
)
assert(logSegmentOpt.isPresent())

@@ -316,7 +317,8 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
snapshotManager.getLogSegmentForVersion(
createMockFSListFromEngine(files),
startCheckpoint,
versionToLoad
versionToLoad,
Optional.empty()
)
}
assert(e.getMessage.contains(expectedErrorMessageContains))
@@ -440,6 +442,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
val logSegmentOpt = snapshotManager.getLogSegmentForVersion(
createMockFSListFromEngine(Seq.empty),
Optional.empty(),
Optional.empty(),
Optional.empty()
)
assert(!logSegmentOpt.isPresent())
@@ -491,6 +494,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
val logSegmentOpt = snapshotManager.getLogSegmentForVersion(
createMockFSListFromEngine(listFrom(checkpointV)(_)),
Optional.of(checkpointV),
Optional.empty(),
Optional.empty()
)
assert(logSegmentOpt.isPresent())
@@ -831,6 +835,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
val logSegmentOpt = snapshotManager.getLogSegmentForVersion(
createMockFSListFromEngine(deltas ++ corruptedCheckpoint ++ checkpoints),
Optional.empty(),
Optional.empty(),
Optional.empty()
)
val checkpointVersion = validVersions.sorted.lastOption
@@ -852,6 +857,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
val logSegmentOpt = snapshotManager.getLogSegmentForVersion(
createMockFSListFromEngine(Seq.empty),
Optional.of(1),
Optional.empty(),
Optional.empty()
)
assert(!logSegmentOpt.isPresent())
Original file line number Diff line number Diff line change
@@ -52,7 +52,8 @@ trait MockEngineUtils {
fileSystemClient: FileSystemClient = null,
jsonHandler: JsonHandler = null,
parquetHandler: ParquetHandler = null,
expressionHandler: ExpressionHandler = null): Engine = {
expressionHandler: ExpressionHandler = null,
commitCoordinatorClientHandler: CommitCoordinatorClientHandler = null): Engine = {
new Engine() {
override def getExpressionHandler: ExpressionHandler =
Option(expressionHandler).getOrElse(
@@ -69,6 +70,11 @@ trait MockEngineUtils {
override def getParquetHandler: ParquetHandler =
Option(parquetHandler).getOrElse(
throw new UnsupportedOperationException("not supported in this test suite"))

override def getCommitCoordinatorClientHandler(name: String, conf: util.Map[String, String]):
CommitCoordinatorClientHandler =
Option(commitCoordinatorClientHandler).getOrElse(
throw new UnsupportedOperationException("not supported in this test suite"))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.engine;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import io.delta.storage.LogStore;
import io.delta.storage.commit.CommitCoordinatorClient;
import io.delta.storage.commit.CommitFailedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import io.delta.kernel.commit.Commit;
import io.delta.kernel.commit.CommitResponse;
import io.delta.kernel.commit.GetCommitsResponse;
import io.delta.kernel.commit.UpdatedActions;
import io.delta.kernel.commit.actions.AbstractMetadata;
import io.delta.kernel.commit.actions.AbstractProtocol;
import io.delta.kernel.engine.CommitCoordinatorClientHandler;
import io.delta.kernel.utils.FileStatus;
import io.delta.kernel.defaults.internal.coordinatedcommits.CommitCoordinatorProvider;
import io.delta.kernel.defaults.internal.logstore.LogStoreProvider;

/**
* Default implementation of {@link CommitCoordinatorClientHandler} based on Hadoop APIs.
* It takes a Hadoop {@link Configuration} object to interact with the commit coordinator client.
* The following optional configurations can be set to customize the behavior of the client:
* <ul>
* <li>{@code io.delta.kernel.logStore.<scheme>.impl} - The class name of the custom
* {@link LogStore} implementation to use for operations on storage systems with the
* specified {@code scheme}. For example, to use a custom {@link LogStore} for S3 storage
* objects:
* <pre>{@code
* <property>
* <name>io.delta.kernel.logStore.s3.impl</name>
* <value>com.example.S3LogStore</value>
* </property>
* }</pre>
* If not set, the default LogStore implementation for the scheme will be used.
* </li>
* <li>{@code delta.enableFastS3AListFrom} - Set to {@code true} to enable fast listing
* functionality when using a {@link LogStore} created for S3 storage objects.
* </li>
* </ul>
*/
public class DefaultCommitCoordinatorClientHandler implements CommitCoordinatorClientHandler {
private final Configuration hadoopConf;
private final CommitCoordinatorClient commitCoordinatorClient;

/**
* Create an instance of the default {@link DefaultCommitCoordinatorClientHandler}
* implementation.
*
* @param hadoopConf Configuration to use. List of options to customize the behavior of
* the client can be found in the class documentation.
*/
public DefaultCommitCoordinatorClientHandler(
Configuration hadoopConf, String name, Map<String, String> conf) {
this.hadoopConf = hadoopConf;
this.commitCoordinatorClient = CommitCoordinatorProvider
.getCommitCoordinatorClient(name, conf);
}

@Override
public Map<String, String> registerTable(
String logPath,
long currentVersion,
AbstractMetadata currentMetadata,
AbstractProtocol currentProtocol) {
return commitCoordinatorClient.registerTable(
new Path(logPath),
currentVersion,
(io.delta.storage.commit.actions.AbstractMetadata) currentMetadata,
(io.delta.storage.commit.actions.AbstractProtocol) currentProtocol);
}

@Override
public CommitResponse commit(
String logPath,
Map<String, String> tableConf,
long commitVersion,
Iterator<String> actions,
UpdatedActions updatedActions)
throws IOException, io.delta.kernel.commit.CommitFailedException {
Path path = new Path(logPath);
LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
try {
return convertCommitResponse(commitCoordinatorClient.commit(
logStore,
hadoopConf,
path,
tableConf,
commitVersion,
actions,
convertUpdatedActions(updatedActions)));
} catch (CommitFailedException e) {
throw new io.delta.kernel.commit.CommitFailedException(
e.getRetryable(), e.getConflict(), e.getMessage());
}
}

@Override
public GetCommitsResponse getCommits(
String tablePath,
Map<String, String> tableConf,
Long startVersion,
Long endVersion) {
return convertGetCommitsResponse(commitCoordinatorClient.getCommits(
new Path(tablePath),
tableConf,
startVersion,
endVersion));
}

@Override
public void backfillToVersion(
String logPath,
Map<String, String> tableConf,
long version,
Long lastKnownBackfilledVersion) throws IOException {
Path path = new Path(logPath);
LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
commitCoordinatorClient.backfillToVersion(
logStore,
hadoopConf,
path,
tableConf,
version,
lastKnownBackfilledVersion);
}

@Override
public Boolean semanticEquals(CommitCoordinatorClientHandler other) {
return commitCoordinatorClient.semanticEquals(
((DefaultCommitCoordinatorClientHandler) other).getCommitCoordinatorClient());
}

public CommitCoordinatorClient getCommitCoordinatorClient() {
return commitCoordinatorClient;
}

private io.delta.storage.commit.UpdatedActions convertUpdatedActions(
UpdatedActions updatedActions) {
return new io.delta.storage.commit.UpdatedActions(
(io.delta.storage.commit.actions.AbstractCommitInfo) updatedActions.getCommitInfo(),
(io.delta.storage.commit.actions.AbstractMetadata) updatedActions.getNewMetadata(),
(io.delta.storage.commit.actions.AbstractProtocol) updatedActions.getNewProtocol(),
(io.delta.storage.commit.actions.AbstractMetadata) updatedActions.getOldMetadata(),
(io.delta.storage.commit.actions.AbstractProtocol) updatedActions.getOldProtocol());
}

private CommitResponse convertCommitResponse(io.delta.storage.commit.CommitResponse response) {
return new CommitResponse(convertCommit(response.getCommit()));
}

private Commit convertCommit(io.delta.storage.commit.Commit commit) {
return new Commit(
commit.getVersion(),
convertFileStatus(commit.getFileStatus()),
commit.getCommitTimestamp());
}

private FileStatus convertFileStatus(org.apache.hadoop.fs.FileStatus hadoopFileStatus) {
return FileStatus.of(
hadoopFileStatus.getPath().toString(),
hadoopFileStatus.getLen(),
hadoopFileStatus.getModificationTime());
}

private GetCommitsResponse convertGetCommitsResponse(
io.delta.storage.commit.GetCommitsResponse response) {
List<Commit> commits = response.getCommits().stream()
.map(this::convertCommit)
.collect(Collectors.toList());
return new GetCommitsResponse(commits, response.getLatestTableVersion());
}
}
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@
*/
package io.delta.kernel.defaults.engine;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import io.delta.kernel.engine.*;
@@ -50,6 +52,12 @@ public ParquetHandler getParquetHandler() {
return new DefaultParquetHandler(hadoopConf);
}

@Override
public CommitCoordinatorClientHandler getCommitCoordinatorClientHandler(
String name, Map<String, String> conf) {
return new DefaultCommitCoordinatorClientHandler(hadoopConf, name, conf);
}

/**
* Create an instance of {@link DefaultEngine}.
*
@@ -59,4 +67,5 @@ public ParquetHandler getParquetHandler() {
public static DefaultEngine create(Configuration hadoopConf) {
return new DefaultEngine(hadoopConf);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.internal.coordinatedcommits;

import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;

import io.delta.storage.CloseableIterator;
import io.delta.storage.LogStore;
import io.delta.storage.commit.Commit;
import io.delta.storage.commit.CommitCoordinatorClient;
import io.delta.storage.commit.CommitFailedException;
import io.delta.storage.commit.CommitResponse;
import io.delta.storage.commit.GetCommitsResponse;
import io.delta.storage.commit.UpdatedActions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An abstract {@link CommitCoordinatorClient} which triggers backfills every n commits.
* - every commit version which satisfies `commitVersion % batchSize == 0` will trigger a backfill.
*/
public abstract class AbstractBatchBackfillingCommitCoordinatorClient
implements CommitCoordinatorClient {

protected static final Logger logger =
LoggerFactory.getLogger(AbstractBatchBackfillingCommitCoordinatorClient.class);

/**
* Size of batch that should be backfilled. So every commit version which satisfies
* `commitVersion % batchSize == 0` will trigger a backfill.
*/
protected long batchSize;

/**
* Commit a given `commitFile` to the table represented by given `logPath` at the
* given `commitVersion`
*/
protected abstract CommitResponse commitImpl(
LogStore logStore,
Configuration hadoopConf,
Path logPath,
Map<String, String> coordinatedCommitsTableConf,
long commitVersion,
FileStatus commitFile,
long commitTimestamp) throws CommitFailedException;

@Override
public CommitResponse commit(
LogStore logStore,
Configuration hadoopConf,
Path logPath,
Map<String, String> coordinatedCommitsTableConf,
long commitVersion,
Iterator<String> actions,
UpdatedActions updatedActions) throws CommitFailedException, IOException {
Path tablePath = CoordinatedCommitsUtils.getTablePath(logPath);
if (commitVersion == 0) {
throw new CommitFailedException(
false, false, "Commit version 0 must go via filesystem.");
}
logger.info("Attempting to commit version {} on table {}", commitVersion, tablePath);
FileSystem fs = logPath.getFileSystem(hadoopConf);
if (batchSize <= 1) {
// Backfill until `commitVersion - 1`
logger.info("Making sure commits are backfilled until {}" +
" version for table {}", commitVersion - 1, tablePath);
backfillToVersion(
logStore,
hadoopConf,
logPath,
coordinatedCommitsTableConf,
commitVersion - 1,
null);
}

// Write new commit file in _commits directory
FileStatus fileStatus = CoordinatedCommitsUtils.writeCommitFile(
logStore, hadoopConf, logPath.toString(), commitVersion, actions, generateUUID());

// Do the actual commit
long commitTimestamp = updatedActions.getCommitInfo().getCommitTimestamp();
CommitResponse commitResponse =
commitImpl(
logStore,
hadoopConf,
logPath,
coordinatedCommitsTableConf,
commitVersion,
fileStatus,
commitTimestamp);

boolean mcToFsConversion = isCoordinatedCommitsToFSConversion(
commitVersion, updatedActions);
// Backfill if needed
if (batchSize <= 1) {
// Always backfill when batch size is configured as 1
backfill(logStore, hadoopConf, logPath, commitVersion, fileStatus);
Path targetFile = CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, commitVersion);
FileStatus targetFileStatus = fs.getFileStatus(targetFile);
Commit newCommit = commitResponse.getCommit().withFileStatus(targetFileStatus);
return new CommitResponse(newCommit);
} else if (commitVersion % batchSize == 0 || mcToFsConversion) {
logger.info("Making sure commits are backfilled till {} version for table {}",
commitVersion,
tablePath);
backfillToVersion(
logStore,
hadoopConf,
logPath,
coordinatedCommitsTableConf,
commitVersion,
null);
}
logger.info("Commit {} done successfully on table {}", commitVersion, tablePath);
return commitResponse;
}

@Override
public void backfillToVersion(
LogStore logStore,
Configuration hadoopConf,
Path logPath,
Map<String, String> coordinatedCommitsTableConf,
long version,
Long lastKnownBackfilledVersion) throws IOException {
// Confirm the last backfilled version by checking the backfilled delta file's existence.
if (lastKnownBackfilledVersion != null) {
try {
FileSystem fs = logPath.getFileSystem(hadoopConf);
if (!fs.exists(CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version))) {
lastKnownBackfilledVersion = null;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
Long startVersion = null;
if (lastKnownBackfilledVersion != null) {
startVersion = lastKnownBackfilledVersion + 1;
}
GetCommitsResponse commitsResponse =
getCommits(logPath,coordinatedCommitsTableConf, startVersion, version);
for (Commit commit : commitsResponse.getCommits()) {
backfill(logStore, hadoopConf, logPath, commit.getVersion(), commit.getFileStatus());
}
}

protected String generateUUID() {
return UUID.randomUUID().toString();
}

/** Backfills a given `fileStatus` to `version`.json */
protected void backfill(
LogStore logStore,
Configuration hadoopConf,
Path logPath,
long version,
FileStatus fileStatus) throws IOException {
Path targetFile = CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version);
logger.info("Backfilling commit " + fileStatus.getPath() + " to " + targetFile);
CloseableIterator<String> commitContentIterator = logStore
.read(fileStatus.getPath(), hadoopConf);
try {
logStore.write(
targetFile,
commitContentIterator,
false,
hadoopConf);
registerBackfill(logPath, version);
} catch (FileAlreadyExistsException e) {
logger.info("The backfilled file " + targetFile + " already exists.");
} finally {
commitContentIterator.close();
}
}

/**
* Callback to tell the CommitCoordinator that all commits <= `backfilledVersion` are
* backfilled.
*/
protected abstract void registerBackfill(Path logPath, long backfilledVersion);

private boolean isCoordinatedCommitsToFSConversion(
long commitVersion, UpdatedActions updatedActions) {
boolean oldMetadataHasCoordinatedCommits =
CoordinatedCommitsUtils
.getCommitCoordinatorName(updatedActions.getOldMetadata()).isPresent();
boolean newMetadataHasCoordinatedCommits =
CoordinatedCommitsUtils
.getCommitCoordinatorName(updatedActions.getNewMetadata()).isPresent();
return oldMetadataHasCoordinatedCommits
&& !newMetadataHasCoordinatedCommits
&& commitVersion > 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.internal.coordinatedcommits;

import java.util.*;

import io.delta.storage.commit.CommitCoordinatorClient;

/** A builder interface for {@link CommitCoordinatorClient} */
public interface CommitCoordinatorBuilder {
/** Name of the commit-coordinator */
String getName();

/** Returns a commit-coordinator client based on the given conf */
CommitCoordinatorClient build(Map<String, String> conf);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.internal.coordinatedcommits;

import java.util.HashMap;
import java.util.Map;

import io.delta.storage.commit.CommitCoordinatorClient;

/** Factory to get the correct {@link CommitCoordinatorClient} for a table */
public class CommitCoordinatorProvider {
// mapping from different commit-coordinator names to the corresponding
// {@link CommitCoordinatorBuilder}s.
private static final Map<String, CommitCoordinatorBuilder> nameToBuilderMapping =
new HashMap<>();

/**
* Registers a new {@link CommitCoordinatorBuilder} with the {@link CommitCoordinatorProvider}.
*/
public static synchronized void registerBuilder(
CommitCoordinatorBuilder commitCoordinatorBuilder) {
String name = commitCoordinatorBuilder.getName();
if (nameToBuilderMapping.containsKey(name)) {
throw new IllegalArgumentException(
"commit-coordinator: " +
name +
" already registered with builder " +
commitCoordinatorBuilder.getClass().getName());
} else {
nameToBuilderMapping.put(name, commitCoordinatorBuilder);
}
}

/** Returns a {@link CommitCoordinatorClient} for the given `name` and `conf` */
public static synchronized CommitCoordinatorClient getCommitCoordinatorClient(
String name, Map<String, String> conf) {
CommitCoordinatorBuilder builder = nameToBuilderMapping.get(name);
if (builder == null) {
throw new IllegalArgumentException("Unknown commit-coordinator: " + name);
} else {
return builder.build(conf);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.internal.coordinatedcommits;

import java.io.IOException;
import java.util.*;
import java.util.function.Function;

import io.delta.storage.LogStore;
import io.delta.storage.commit.actions.AbstractCommitInfo;
import io.delta.storage.commit.actions.AbstractMetadata;
import io.delta.storage.commit.actions.AbstractProtocol;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import io.delta.kernel.internal.TableConfig;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.VectorUtils;
import static io.delta.kernel.internal.TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME;

public class CoordinatedCommitsUtils {

/**
* Write a UUID-based commit file for the specified version to the table at logPath.
*/
public static FileStatus writeCommitFile(
LogStore logStore,
Configuration hadoopConf,
String logPath,
long commitVersion,
Iterator<String> actions,
String uuid) throws IOException {
Path commitPath = new Path(
FileNames.unbackfilledDeltaFile(
new io.delta.kernel.internal.fs.Path(logPath),
commitVersion,
Optional.of(uuid)).toString());
FileSystem fs = commitPath.getFileSystem(hadoopConf);
if (!fs.exists(commitPath.getParent())) {
fs.mkdirs(commitPath.getParent());
}
logStore.write(commitPath, actions, false, hadoopConf);
return commitPath.getFileSystem(hadoopConf).getFileStatus(commitPath);
}

/**
* Get the table path from the provided log path.
*/
public static Path getTablePath(Path logPath) {
return logPath.getParent();
}

/**
* Helper method to recover the saved value of `tableConfig` from `abstractMetadata`.
* Return defaultValue if the key is not in the configuration.
*/
public static <T> T fromAbstractMetadataAndTableConfig(
AbstractMetadata abstractMetadata, TableConfig<T> tableConfig) {
Map<String, String> conf = abstractMetadata.getConfiguration();
String value = conf.getOrDefault(tableConfig.getKey(), tableConfig.getDefaultValue());
Function<String, T> fromString = tableConfig.getFromString();
return fromString.apply(value);
}

/**
* Get the commit coordinator name from the provided abstract metadata.
*/
public static Optional<String> getCommitCoordinatorName(AbstractMetadata abstractMetadata) {
return fromAbstractMetadataAndTableConfig(
abstractMetadata, COORDINATED_COMMITS_COORDINATOR_NAME);
}

/**
* Get the hadoop file path for the delta file for the specified version.
*
* @param logPath The root path of the delta log.
* @param version The version of the delta file.
* @return The hadoop file path for the delta file.
*/
public static Path getHadoopDeltaFile(Path logPath, long version) {
return new Path(FileNames
.deltaFile(new io.delta.kernel.internal.fs.Path(logPath.toString()), version));
}

public static AbstractMetadata convertMetadataToAbstractMetadata(Metadata metadata) {
return new AbstractMetadata() {
@Override
public String getId() {
return metadata.getId();
}

@Override
public String getName() {
return metadata.getName().orElse(null);
}

@Override
public String getDescription() {
return metadata.getDescription().orElse(null);
}

@Override
public String getProvider() {
return metadata.getFormat().getProvider();
}

@Override
public Map<String, String> getFormatOptions() {
// Assuming Format class has a method to get format options
return metadata.getFormat().getOptions();
}

@Override
public String getSchemaString() {
// Assuming Metadata class has a method to get schema string
return metadata.getSchemaString();
}

@Override
public List<String> getPartitionColumns() {
// Assuming Metadata class has a method to get partition columns
return VectorUtils.toJavaList(metadata.getPartitionColumns());
}

@Override
public Map<String, String> getConfiguration() {
return metadata.getConfiguration();
}

@Override
public Long getCreatedTime() {
return metadata.getCreatedTime().orElse(null);
}
};
}

public static AbstractProtocol convertProtocolToAbstractProtocol(Protocol protocol) {
return new AbstractProtocol() {
@Override
public int getMinReaderVersion() {
return protocol.getMinReaderVersion();
}

@Override
public int getMinWriterVersion() {
return protocol.getMinWriterVersion();
}

@Override
public Set<String> getReaderFeatures() {
return new HashSet<>(protocol.getReaderFeatures());
}

@Override
public Set<String> getWriterFeatures() {
return new HashSet<>(protocol.getWriterFeatures());
}
};
}

public static AbstractCommitInfo convertCommitInfoToAbstractCommitInfo(CommitInfo commitInfo) {
return () -> commitInfo.getInCommitTimestamp().orElse(commitInfo.getTimestamp());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.internal.coordinatedcommits;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

import io.delta.storage.LogStore;
import io.delta.storage.commit.Commit;
import io.delta.storage.commit.CommitCoordinatorClient;
import io.delta.storage.commit.CommitFailedException;
import io.delta.storage.commit.CommitResponse;
import io.delta.storage.commit.GetCommitsResponse;
import io.delta.storage.commit.actions.AbstractMetadata;
import io.delta.storage.commit.actions.AbstractProtocol;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import io.delta.kernel.internal.util.Tuple2;

public class InMemoryCommitCoordinator extends AbstractBatchBackfillingCommitCoordinatorClient {

/**
* @param maxCommitVersion represents the max commit version known for the table. This is
* initialized at the time of pre-registration and updated whenever a
* commit is successfully added to the commit-coordinator.
* @param active represents whether this commit-coordinator has ratified any commit or not.
* |----------------------------|------------------|---------------------------|
* | State | maxCommitVersion | active |
* |----------------------------|------------------|---------------------------|
* | Table is pre-registered | currentVersion+1 | false |
* |----------------------------|------------------|---------------------------|
* | Table is pre-registered | X | true |
* | and more commits are done | | |
* |----------------------------|------------------|---------------------------|
*/
private ConcurrentHashMap<String, PerTableData> perTableMap;

public InMemoryCommitCoordinator(long batchSize) {
this.batchSize = batchSize;
this.perTableMap = new ConcurrentHashMap<>();
}

private class PerTableData {
private long maxCommitVersion;
private boolean active;
private TreeMap<Long, Commit> commitsMap;
private ReentrantReadWriteLock lock;

PerTableData(long maxCommitVersion) {
this(maxCommitVersion, false);
}

PerTableData(long maxCommitVersion, boolean active) {
this.maxCommitVersion = maxCommitVersion;
this.active = active;
this.commitsMap = new TreeMap<>();
this.lock = new ReentrantReadWriteLock();
}

public void updateLastRatifiedCommit(long commitVersion) {
this.active = true;
this.maxCommitVersion = commitVersion;
}

public long lastRatifiedCommitVersion() {
return this.active ? this.maxCommitVersion : -1;
}

public long getMaxCommitVersion() {
return maxCommitVersion;
}

public TreeMap<Long, Commit> getCommitsMap() {
return commitsMap;
}
}

/**
* This method acquires a write lock, validates the commit version is next in line,
* updates commit maps, and releases the lock.
*
*/
@Override
protected CommitResponse commitImpl(
LogStore logStore,
Configuration hadoopConf,
Path logPath,
Map<String, String> coordinatedCommitsTableConf,
long commitVersion,
FileStatus commitFile,
long commitTimestamp) throws CommitFailedException {
Tuple2<CommitResponse, CommitFailedException> ret =
addToMap(logPath, commitVersion, commitFile, commitTimestamp);
if (ret._2 != null) {
throw ret._2;
} else {
return ret._1;
}
}

private Tuple2<CommitResponse, CommitFailedException> addToMap(
Path logPath,
long commitVersion,
FileStatus commitFile,
long commitTimestamp) {

return withWriteLock(logPath, () -> {
PerTableData tableData = perTableMap.get(logPath.toString());
long expectedVersion = tableData.maxCommitVersion + 1;
if (commitVersion != expectedVersion) {
return new Tuple2<>(null, new CommitFailedException(
commitVersion < expectedVersion,
commitVersion < expectedVersion,
"Commit version " +
commitVersion +
" is not valid. Expected version: " +
expectedVersion));
}

Commit commit = new Commit(commitVersion, commitFile, commitTimestamp);
tableData.commitsMap.put(commitVersion, commit);
tableData.updateLastRatifiedCommit(commitVersion);

logger.info("Added commit file " + commitFile.getPath() + " to commit-coordinator.");
return new Tuple2<>(new CommitResponse(commit), null);
});
}

@Override
public GetCommitsResponse getCommits(
Path logPath,
Map<String, String> coordinatedCommitsTableConf,
Long startVersion,
Long endVersion) {
return withReadLock(logPath, () -> {
PerTableData tableData = perTableMap.get(logPath.toString());
Optional<Long> startVersionOpt = Optional.ofNullable(startVersion);
Optional<Long> endVersionOpt = Optional.ofNullable(endVersion);
long effectiveStartVersion = startVersionOpt.orElse(0L);
// Calculate the end version for the range, or use the last key if endVersion is not
// provided
long effectiveEndVersion = endVersionOpt.orElseGet(() ->
tableData.commitsMap.isEmpty()
? effectiveStartVersion : tableData.commitsMap.lastKey());
SortedMap<Long, Commit> commitsInRange = tableData.commitsMap.subMap(
effectiveStartVersion, effectiveEndVersion + 1);
return new GetCommitsResponse(
new ArrayList<>(commitsInRange.values()),
tableData.lastRatifiedCommitVersion());
});
}

@Override
protected void registerBackfill(Path logPath, long backfilledVersion) {
withWriteLock(logPath, () -> {
PerTableData tableData = perTableMap.get(logPath.toString());
if (backfilledVersion > tableData.lastRatifiedCommitVersion()) {
throw new IllegalArgumentException(
"Unexpected backfill version: " + backfilledVersion + ". " +
"Max backfill version: " + tableData.getMaxCommitVersion());
}
// Remove keys with versions less than or equal to 'untilVersion'
Iterator<Long> iterator = tableData.getCommitsMap().keySet().iterator();
while (iterator.hasNext()) {
Long version = iterator.next();
if (version <= backfilledVersion) {
iterator.remove();
} else {
break;
}
}
return null;
});
}

@Override
public Map<String, String> registerTable(
Path logPath,
long currentVersion,
AbstractMetadata currentMetadata,
AbstractProtocol currentProtocol) {
PerTableData newPerTableData = new PerTableData(currentVersion + 1);
perTableMap.compute(logPath.toString(), (key, existingData) -> {
if (existingData != null) {
if (existingData.lastRatifiedCommitVersion() != -1) {
throw new IllegalStateException(
"Table " + logPath + " already exists in the commit-coordinator.");
}
// If lastRatifiedCommitVersion is -1 i.e. the commit-coordinator has never
// attempted any commit for this table => this table was just pre-registered. If
// there is another pre-registration request for an older version, we reject it and
// table can't go backward.
if (currentVersion < existingData.getMaxCommitVersion()) {
throw new IllegalStateException(
"Table " + logPath + " already registered with commit-coordinator");
}
}
return newPerTableData;
});
return Collections.emptyMap();
}

@Override
public Boolean semanticEquals(CommitCoordinatorClient other) {
return this.equals(other);
}

private <T> T withReadLock(Path logPath, Supplier<T> operation) {
PerTableData tableData = perTableMap.get(logPath.toString());
if (tableData == null) {
throw new IllegalArgumentException("Unknown table " + logPath + ".");
}
ReentrantReadWriteLock.ReadLock lock = tableData.lock.readLock();
lock.lock();
try {
return operation.get();
} finally {
lock.unlock();
}
}

private <T> T withWriteLock(Path logPath, Supplier<T> operation) {
PerTableData tableData = perTableMap.get(logPath.toString());
if (tableData == null) {
throw new IllegalArgumentException("Unknown table " + logPath + ".");
}
ReentrantReadWriteLock.WriteLock lock = tableData.lock.writeLock();
lock.lock();
try {
return operation.get();
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.internal.coordinatedcommits;

import java.util.Map;

import io.delta.storage.commit.CommitCoordinatorClient;

import io.delta.kernel.internal.lang.Lazy;

public class InMemoryCommitCoordinatorBuilder implements CommitCoordinatorBuilder {
private final long batchSize;
private Lazy<InMemoryCommitCoordinator> inMemoryStore;

public InMemoryCommitCoordinatorBuilder(long batchSize) {
this.batchSize = batchSize;
this.inMemoryStore = new Lazy<>(() -> new InMemoryCommitCoordinator(batchSize));
}

/** Name of the commit-coordinator */
public String getName() {
return "in-memory";
}

/** Returns a commit-coordinator based on the given conf */
public CommitCoordinatorClient build(Map<String, String> conf) {
return inMemoryStore.get();
}
}
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@ package io.delta.kernel.defaults

import java.io.File
import io.delta.kernel.Table
import io.delta.kernel.engine.{Engine, ExpressionHandler, FileSystemClient}
import io.delta.kernel.engine.{CommitCoordinatorClientHandler, Engine, ExpressionHandler, FileSystemClient}
import io.delta.kernel.data.ColumnarBatch
import io.delta.kernel.defaults.engine.{DefaultEngine, DefaultJsonHandler, DefaultParquetHandler}
import io.delta.kernel.expressions.Predicate
@@ -37,6 +37,7 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SharedSparkSession

import java.nio.file.Files
import java.util
import java.util.Optional
import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable.ArrayBuffer
@@ -365,6 +366,9 @@ class MetricsEngine(config: Configuration) extends Engine {
override def getFileSystemClient: FileSystemClient = impl.getFileSystemClient

override def getParquetHandler: MetricsParquetHandler = parquetHandler

override def getCommitCoordinatorClientHandler(name: String, conf: util.Map[String, String]):
CommitCoordinatorClientHandler = impl.getCommitCoordinatorClientHandler(name, conf)
}

/**
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.defaults.internal.coordinatedcommits

import io.delta.kernel.defaults.DeltaTableWriteSuiteBase
import io.delta.kernel.defaults.internal.logstore.LogStoreProvider
import io.delta.kernel.defaults.utils.TestRow
import io.delta.kernel.internal.TableConfig._
import io.delta.kernel.Table
import io.delta.kernel.internal.SnapshotImpl
import io.delta.kernel.internal.actions.{CommitInfo, Metadata, Protocol}
import org.apache.hadoop.fs.Path

import java.util
import java.util.{Collections, Optional}
import scala.collection.convert.ImplicitConversions.`iterator asScala`
import scala.collection.JavaConverters._

class CoordinatedCommitsSuite extends DeltaTableWriteSuiteBase
with CoordinatedCommitsTestUtils {

test("helper method that recovers config from abstract metadata works properly") {
val m1 = Metadata.empty.withNewConfiguration(
Map(COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "string_value").asJava
)
assert(CoordinatedCommitsUtils.fromAbstractMetadataAndTableConfig(
CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(m1),
COORDINATED_COMMITS_COORDINATOR_NAME) === Optional.of("string_value"))

val m2 = Metadata.empty.withNewConfiguration(
Map(COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "").asJava
)
assert(CoordinatedCommitsUtils.fromAbstractMetadataAndTableConfig(
CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(m2),
COORDINATED_COMMITS_COORDINATOR_NAME) === Optional.of(""))

val m3 = Metadata.empty.withNewConfiguration(
Map(COORDINATED_COMMITS_COORDINATOR_CONF.getKey ->
"""{"key1": "string_value", "key2Int": 2, "key3ComplexStr": "\"hello\""}""").asJava
)
assert(CoordinatedCommitsUtils.fromAbstractMetadataAndTableConfig(
CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(m3),
COORDINATED_COMMITS_COORDINATOR_CONF) ===
Map("key1" -> "string_value", "key2Int" -> "2", "key3ComplexStr" -> "\"hello\"").asJava)
}

test("cold snapshot initialization") {
val builder = new InMemoryCommitCoordinatorBuilder(10)
val commitCoordinatorClient = builder.build(Collections.emptyMap())
CommitCoordinatorProvider.registerBuilder(builder)
withTempDirAndEngine { (tablePath, engine) =>
val logPath = new Path("file:" + tablePath, "_delta_log")
val table = Table.forPath(engine, tablePath)

spark.range(0, 10).write.format("delta").mode("overwrite").save(tablePath) // version 0
checkAnswer(
spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)),
(0L to 9L).map(TestRow(_)))
spark.range(10, 20).write.format("delta").mode("overwrite").save(tablePath) // version 1
spark.range(20, 30).write.format("delta").mode("append").save(tablePath) // version 2
checkAnswer(
spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)),
(10L to 29L).map(TestRow(_)))

var tableConf: util.Map[String, String] = null
val logStore = LogStoreProvider.getLogStore(hadoopConf, logPath.toUri.getScheme)

(0 to 2).foreach{ version =>
val delta = CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version)

val rows = addCoordinatedCommitToMetadataRow(logStore.read(delta, hadoopConf).toList)

if (version == 0) {
tableConf = commitCoordinatorClient.registerTable(
logPath,
-1L,
CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(Metadata.empty()),
CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(new Protocol(1, 1)))
writeCommitZero(engine, logPath, rows.asJava)
} else {
commit(logPath, tableConf, version, version, rows.asJava, commitCoordinatorClient)
logPath.getFileSystem(hadoopConf).delete(
CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version))
}
}
val snapshot0 = table.getSnapshotAsOfVersion(engine, 0)
val result0 = readSnapshot(snapshot0, snapshot0.getSchema(engine), null, null, engine)
checkAnswer(result0, (0L to 9L).map(TestRow(_)))

val snapshot1 = table.getSnapshotAsOfVersion(engine, 1)
val result1 = readSnapshot(snapshot1, snapshot1.getSchema(engine), null, null, engine)
checkAnswer(result1, (10L to 19L).map(TestRow(_)))

val snapshot2 = table.getLatestSnapshot(engine)
val result2 = readSnapshot(snapshot2, snapshot2.getSchema(engine), null, null, engine)
checkAnswer(result2, (10L to 29L).map(TestRow(_)))
}
}

def addCoordinatedCommitToMetadataRow(rows: List[String]): List[String] = rows.map(row => {
if (row.contains("metaData")) row.replace(
"\"configuration\":{}",
"\"configuration\":{\"coordinatedCommits.commitCoordinatorConf-preview\":\"{}\"," +
"\"delta.coordinatedCommits.commitCoordinator-preview\":\"in-memory\"}") else row
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.internal.coordinatedcommits

import java.util
import io.delta.kernel.defaults.internal.logstore.LogStoreProvider
import io.delta.kernel.engine.Engine
import io.delta.kernel.internal.actions.{CommitInfo, Metadata, Protocol}
import io.delta.kernel.internal.TableConfig
import io.delta.storage.commit.{Commit, CommitCoordinatorClient, UpdatedActions}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import scala.collection.JavaConverters._

trait CoordinatedCommitsTestUtils {

val hadoopConf = new Configuration()
def commit(
logPath: Path,
tableConf: util.Map[String, String],
version: Long,
timestamp: Long,
commit: util.List[String],
commitCoordinatorClient: CommitCoordinatorClient): Commit = {
val logStore = LogStoreProvider.getLogStore(hadoopConf, logPath.toUri.getScheme)
val updatedCommitInfo = CommitInfo.empty().withTimestamp(timestamp)
val updatedActions = if (version == 0) {
getUpdatedActionsForZerothCommit(updatedCommitInfo)
} else {
getUpdatedActionsForNonZerothCommit(updatedCommitInfo)
}
commitCoordinatorClient.commit(
logStore,
hadoopConf,
logPath,
tableConf,
version,
commit.iterator(),
updatedActions).getCommit
}

def writeCommitZero(engine: Engine, logPath: Path, commit: util.List[String]): Unit = {
createLogPath(engine, logPath)
val logStore = LogStoreProvider.getLogStore(hadoopConf, logPath.toUri.getScheme)
logStore.write(
CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, 0),
commit.iterator(),
true,
hadoopConf)
}

def createLogPath(engine: Engine, logPath: Path): Unit = {
// New table, create a delta log directory
if (!engine.getFileSystemClient.mkdirs(logPath.toString)) {
throw new RuntimeException("Failed to create delta log directory: " + logPath)
}
}

def getUpdatedActionsForZerothCommit(
commitInfo: CommitInfo,
oldMetadata: Metadata = Metadata.empty()): UpdatedActions = {
val newMetadataConfiguration =
Map(TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "in-memory",
TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> "{}")
val newMetadata = oldMetadata.withNewConfiguration(newMetadataConfiguration.asJava)
new UpdatedActions(
CoordinatedCommitsUtils.convertCommitInfoToAbstractCommitInfo(commitInfo),
CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(newMetadata),
CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(Protocol.empty()),
CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(oldMetadata),
CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(Protocol.empty()))
}

def getUpdatedActionsForNonZerothCommit(commitInfo: CommitInfo): UpdatedActions = {
val updatedActions = getUpdatedActionsForZerothCommit(commitInfo)
new UpdatedActions(
updatedActions.getCommitInfo,
updatedActions.getNewMetadata,
updatedActions.getNewProtocol,
updatedActions.getNewMetadata, // oldMetadata is replaced with newMetadata
updatedActions.getOldProtocol)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.internal.coordinatedcommits

import io.delta.kernel.Table
import io.delta.kernel.defaults.engine.DefaultJsonHandler
import io.delta.kernel.defaults.internal.logstore.LogStoreProvider
import io.delta.kernel.defaults.utils.DefaultVectorTestUtils
import io.delta.kernel.defaults.DeltaTableWriteSuiteBase
import io.delta.kernel.engine.Engine
import io.delta.kernel.internal.actions.{CommitInfo, Metadata, Protocol}
import io.delta.kernel.types.{StringType, StructType}
import io.delta.storage.commit.{Commit, CommitCoordinatorClient, CommitFailedException, GetCommitsResponse}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import java.util
import java.util.{Collections, Optional}
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.{`iterator asScala`, `list asScalaBuffer`}

abstract class InMemoryCommitCoordinatorSuite(batchSize: Int) extends DeltaTableWriteSuiteBase
with CoordinatedCommitsTestUtils
with DefaultVectorTestUtils {

val jsonHandler = new DefaultJsonHandler(hadoopConf)

private def assertGetCommitResponseEqual(x: GetCommitsResponse, y: GetCommitsResponse): Unit = {
assert(x.getLatestTableVersion == y.getLatestTableVersion)
assert(x.getCommits.size() == y.getCommits.size())
for (i <- 0 until x.getCommits.size()) {
assert(x.getCommits.get(i).getVersion == y.getCommits.get(i).getVersion)
assert(x.getCommits.get(i).getFileStatus.getPath == y.getCommits.get(i).getFileStatus.getPath)
assert(x.getCommits.get(i).getFileStatus.getLen == y.getCommits.get(i).getFileStatus.getLen)
assert(x
.getCommits
.get(i)
.getFileStatus
.getModificationTime == y.getCommits.get(i).getFileStatus.getModificationTime)
assert(x.getCommits.get(i).getCommitTimestamp == y.getCommits.get(i).getCommitTimestamp)
}
}

protected def assertBackfilled(
version: Long,
logPath: Path,
timestampOpt: Option[Long] = None): Unit = {
val logStore = LogStoreProvider.getLogStore(hadoopConf, logPath.toUri.getScheme)
val delta = CoordinatedCommitsUtils.getHadoopDeltaFile(logPath, version)
if (timestampOpt.isDefined) {
assert(logStore.read(delta, hadoopConf).toSeq == Seq(s"$version", s"${timestampOpt.get}"))
} else {
assert(logStore.read(delta, hadoopConf).take(1).toSeq == Seq(s"$version"))
}
}

protected def registerBackfillOp(
commitCoordinatorClient: CommitCoordinatorClient,
logPath: Path,
version: Long): Unit = {
val inMemoryCS = commitCoordinatorClient.asInstanceOf[InMemoryCommitCoordinator]
inMemoryCS.registerBackfill(logPath, version)
}

protected def validateBackfillStrategy(
engine: Engine,
commitCoordinatorClient: CommitCoordinatorClient,
logPath: Path,
tableConf: util.Map[String, String],
version: Long): Unit = {
val lastExpectedBackfilledVersion = (version - (version % batchSize)).toInt
val unbackfilledCommitVersionsAll = commitCoordinatorClient
.getCommits(logPath, tableConf, null, null)
.getCommits.map(_.getVersion)
val expectedVersions = lastExpectedBackfilledVersion + 1 to version.toInt

assert(unbackfilledCommitVersionsAll == expectedVersions)
(0 to lastExpectedBackfilledVersion).foreach { v =>
assertBackfilled(v, logPath, Some(v))
}
}

/**
* Checks that the commit coordinator state is correct in terms of
* - The latest table version in the commit coordinator is correct
* - All supposedly backfilled commits are indeed backfilled
* - The contents of the backfilled commits are correct (verified
* if commitTimestampOpt is provided)
*
* This can be overridden by implementing classes to implement
* more specific invariants.
*/
protected def assertInvariants(
logPath: Path,
tableConf: util.Map[String, String],
commitCoordinatorClient: CommitCoordinatorClient,
commitTimestampsOpt: Option[Array[Long]] = None): Unit = {
val maxUntrackedVersion: Int = {
val commitResponse = commitCoordinatorClient.getCommits(logPath, tableConf, null, null)
if (commitResponse.getCommits.isEmpty) {
commitResponse.getLatestTableVersion.toInt
} else {
assert(
commitResponse.getCommits.last.getVersion == commitResponse.getLatestTableVersion,
s"Max commit tracked by the commit coordinator ${commitResponse.getCommits.last} must " +
s"match latestTableVersion tracked by the commit coordinator " +
s"${commitResponse.getLatestTableVersion}."
)
val minVersion = commitResponse.getCommits.head.getVersion
assert(
commitResponse.getLatestTableVersion - minVersion + 1 == commitResponse.getCommits.size,
"Commit map should have a contiguous range of unbackfilled commits."
)
minVersion.toInt - 1
}
}
(0 to maxUntrackedVersion).foreach { version =>
assertBackfilled(version, logPath, commitTimestampsOpt.map(_(version)))
}
}

test("test basic commit and backfill functionality") {
withTempDirAndEngine { (tablePath, engine) =>
val cc = new InMemoryCommitCoordinatorBuilder(batchSize).build(Collections.emptyMap())
val logPath = new Path(tablePath, "_delta_log")

val tableConf = cc.registerTable(
logPath,
-1L,
CoordinatedCommitsUtils.convertMetadataToAbstractMetadata(Metadata.empty()),
CoordinatedCommitsUtils.convertProtocolToAbstractProtocol(new Protocol(1, 1)))

val e = intercept[CommitFailedException] {
commit(
logPath,
tableConf, version = 0, timestamp = 0, util.Arrays.asList("0", "0"), cc)
}
assert(e.getMessage === "Commit version 0 must go via filesystem.")
writeCommitZero(engine, logPath, util.Arrays.asList("0", "0"))
assertGetCommitResponseEqual(
cc.getCommits(logPath, tableConf, null, null),
new GetCommitsResponse(Collections.emptyList(), -1))
assertBackfilled(version = 0, logPath, Some(0L))

// Test backfilling functionality for commits 1 - 8
(1 to 8).foreach { version =>
commit(
logPath,
tableConf,
version, version, util.Arrays.asList(s"$version", s"$version"), cc)
validateBackfillStrategy(engine, cc, logPath, tableConf, version)
assert(cc.getCommits(logPath, tableConf, null, null).getLatestTableVersion == version)
}

// Test that out-of-order backfill is rejected
intercept[IllegalArgumentException] {
registerBackfillOp(cc, logPath, 10)
}
assertInvariants(logPath, tableConf, cc)
}
}
}

class InMemoryCommitCoordinator1Suite extends InMemoryCommitCoordinatorSuite(1)
class InMemoryCommitCoordinator5Suite extends InMemoryCommitCoordinatorSuite(5)
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@

package io.delta.storage.commit;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

@@ -94,7 +95,7 @@ CommitResponse commit(
Map<String, String> tableConf,
long commitVersion,
Iterator<String> actions,
UpdatedActions updatedActions);
UpdatedActions updatedActions) throws CommitFailedException, IOException;

/**
* API to get the unbackfilled commits for the table represented by the given logPath.
@@ -148,7 +149,7 @@ void backfillToVersion(
Path logPath,
Map<String, String> tableConf,
long version,
Long lastKnownBackfilledVersion);
Long lastKnownBackfilledVersion) throws IOException;

/**
* Determines whether this CommitCoordinatorClient is semantically equal to another

0 comments on commit 323f08d

Please sign in to comment.