Skip to content

Commit

Permalink
[Kernel] Add an end2end prototype of Coordinated Commit read support …
Browse files Browse the repository at this point in the history
…in kernel
  • Loading branch information
EstherBear committed Jul 3, 2024
1 parent ad8d1cb commit 09b537a
Show file tree
Hide file tree
Showing 32 changed files with 2,299 additions and 25 deletions.
57 changes: 57 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/commit/Commit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import io.delta.kernel.utils.FileStatus;

/**
* Representation of a commit file
*/
public class Commit {

private long version;

private FileStatus fileStatus;

private long commitTimestamp;

public Commit(long version, FileStatus fileStatus, long commitTimestamp) {
this.version = version;
this.fileStatus = fileStatus;
this.commitTimestamp = commitTimestamp;
}

public long getVersion() {
return version;
}

public FileStatus getFileStatus() {
return fileStatus;
}

public long getCommitTimestamp() {
return commitTimestamp;
}

public Commit withFileStatus(FileStatus fileStatus) {
return new Commit(version, fileStatus, commitTimestamp);
}

public Commit withCommitTimestamp(long commitTimestamp) {
return new Commit(version, fileStatus, commitTimestamp);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import java.util.Iterator;
import java.util.Map;

/**
* Exception raised by
* {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit(
* String, Map, long, Iterator, UpdatedActions)}
*
* <pre>
* | retryable | conflict | meaning |
* | no | no | something bad happened (e.g. auth failure) |
* | no | yes | permanent transaction conflict (e.g. multi-table commit failed) |
* | yes | no | transient error (e.g. network hiccup) |
* | yes | yes | physical conflict (allowed to rebase and retry) |
* </pre>
*/
public class CommitFailedException extends Exception {

private boolean retryable;

private boolean conflict;

private String message;

public CommitFailedException(boolean retryable, boolean conflict, String message) {
this.retryable = retryable;
this.conflict = conflict;
this.message = message;
}

public boolean getRetryable() {
return retryable;
}

public boolean getConflict() {
return conflict;
}

public String getMessage() {
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import java.util.Iterator;
import java.util.Map;

/**
* Response container for
* {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#commit(
* String, Map, long, Iterator, UpdatedActions)}.
*/
public class CommitResponse {

private Commit commit;

public CommitResponse(Commit commit) {
this.commit = commit;
}

public Commit getCommit() {
return commit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import java.util.List;
import java.util.Map;

/**
* Response container for
* {@link io.delta.kernel.engine.CommitCoordinatorClientHandler#getCommits(
* String, Map, Long, Long)}.
*/
public class GetCommitsResponse {
private List<Commit> commits;

private long latestTableVersion;

public GetCommitsResponse(List<Commit> commits, long latestTableVersion) {
this.commits = commits;
this.latestTableVersion = latestTableVersion;
}

public List<Commit> getCommits() {
return commits;
}

public long getLatestTableVersion() {
return latestTableVersion;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import io.delta.kernel.commit.actions.AbstractCommitInfo;
import io.delta.kernel.commit.actions.AbstractMetadata;
import io.delta.kernel.commit.actions.AbstractProtocol;

/**
* A container class to inform the CommitCoordinatorClientHandler about any changes in
* Protocol/Metadata
*/
public class UpdatedActions {
private AbstractCommitInfo commitInfo;

private AbstractMetadata newMetadata;

private AbstractProtocol newProtocol;

private AbstractMetadata oldMetadata;

private AbstractProtocol oldProtocol;

public UpdatedActions(
AbstractCommitInfo commitInfo,
AbstractMetadata newMetadata,
AbstractProtocol newProtocol,
AbstractMetadata oldMetadata,
AbstractProtocol oldProtocol) {
this.commitInfo = commitInfo;
this.newMetadata = newMetadata;
this.newProtocol = newProtocol;
this.oldMetadata = oldMetadata;
this.oldProtocol = oldProtocol;
}

public AbstractCommitInfo getCommitInfo() {
return commitInfo;
}

public AbstractMetadata getNewMetadata() {
return newMetadata;
}

public AbstractProtocol getNewProtocol() {
return newProtocol;
}

public AbstractMetadata getOldMetadata() {
return oldMetadata;
}

public AbstractProtocol getOldProtocol() {
return oldProtocol;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit.actions;

/**
* Interface for objects that represents the base information for a commit.
* Commits need to provide an in-commit timestamp. This timestamp is used
* to specify the exact time the commit happened and determines the target
* version for time-based time travel queries.
*/
public interface AbstractCommitInfo {

/**
* Get the timestamp of the commit as millis after the epoch.
*/
long getCommitTimestamp();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit.actions;

import java.util.List;
import java.util.Map;

/**
* Interface for metadata actions in Delta. The metadata defines the metadata
* of the table.
*/
public interface AbstractMetadata {

/**
* A unique table identifier.
*/
String getId();

/**
* User-specified table identifier.
*/
String getName();

/**
* User-specified table description.
*/
String getDescription();

/** The table provider format. */
String getProvider();

/** The format options */
Map<String, String> getFormatOptions();

/**
* The table schema in string representation.
*/
String getSchemaString();

/**
* List of partition columns.
*/
List<String> getPartitionColumns();

/**
* The table properties defined on the table.
*/
Map<String, String> getConfiguration();

/**
* Timestamp for the creation of this metadata.
*/
Long getCreatedTime();
}
Loading

0 comments on commit 09b537a

Please sign in to comment.