From 67638c7d16dbaa0a80a41e719c543e440158c98c Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 3 Dec 2024 13:35:05 -0800 Subject: [PATCH] Revert "[Kernel] [CC Refactor #2] Add `TableDescriptor` and `CommitCoordinatorClient` API (#3797)" This reverts commit 6ae4b62845ed579bb5a19f4646831c4ee2931c02. --- .../java/io/delta/kernel/TableIdentifier.java | 2 +- .../CommitCoordinatorClient.java | 170 ------------------ .../coordinatedcommits/TableDescriptor.java | 99 ---------- .../java/io/delta/kernel/engine/Engine.java | 18 -- .../TableDescriptorSuite.scala | 86 --------- 5 files changed, 1 insertion(+), 374 deletions(-) delete mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/CommitCoordinatorClient.java delete mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java delete mode 100644 kernel/kernel-api/src/test/scala/io/delta/kernel/coordinatedcommits/TableDescriptorSuite.scala diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TableIdentifier.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TableIdentifier.java index 45c51f79944..ce9af3b87a6 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TableIdentifier.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TableIdentifier.java @@ -61,7 +61,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - final TableIdentifier that = (TableIdentifier) o; + TableIdentifier that = (TableIdentifier) o; return Arrays.equals(getNamespace(), that.getNamespace()) && getName().equals(that.getName()); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/CommitCoordinatorClient.java b/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/CommitCoordinatorClient.java deleted file mode 100644 index 5870f570f8b..00000000000 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/CommitCoordinatorClient.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Copyright (2024) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.delta.kernel.coordinatedcommits; - -import io.delta.kernel.TableIdentifier; -import io.delta.kernel.annotation.Evolving; -import io.delta.kernel.data.Row; -import io.delta.kernel.engine.Engine; -import io.delta.kernel.engine.coordinatedcommits.CommitFailedException; -import io.delta.kernel.engine.coordinatedcommits.CommitResponse; -import io.delta.kernel.engine.coordinatedcommits.GetCommitsResponse; -import io.delta.kernel.engine.coordinatedcommits.UpdatedActions; -import io.delta.kernel.engine.coordinatedcommits.actions.AbstractMetadata; -import io.delta.kernel.engine.coordinatedcommits.actions.AbstractProtocol; -import io.delta.kernel.utils.CloseableIterator; -import java.io.IOException; -import java.util.Map; -import java.util.Optional; - -/** - * The CommitCoordinatorClient is responsible for communicating with the commit coordinator and - * backfilling commits. It has four main APIs that need to be implemented: - * - * - * - * @since 3.3.0 - */ -@Evolving -public interface CommitCoordinatorClient { - - /** - * Register the table represented by the given {@code logPath} at the provided {@code - * currentVersion} with the commit coordinator this commit coordinator client represents. - * - *

This API is called when the table is being converted from an existing file system table to a - * coordinated-commit table. - * - *

When a new coordinated-commit table is being created, the {@code currentVersion} will be -1 - * and the upgrade commit needs to be a file system commit which will write the backfilled file - * directly. - * - * @param engine The {@link Engine} instance to use, if needed. - * @param logPath The path to the delta log of the table that should be converted. - * @param tableIdentifier The table identifier for the table, or {@link Optional#empty()} if the - * table doesn't use any identifier (i.e. it is path-based). - * @param currentVersion The version of the table just before conversion. currentVersion + 1 - * represents the commit that will do the conversion. This must be backfilled atomically. - * currentVersion + 2 represents the first commit after conversion. This will go through the - * CommitCoordinatorClient and the client is free to choose when it wants to backfill this - * commit. - * @param currentMetadata The metadata of the table at currentVersion - * @param currentProtocol The protocol of the table at currentVersion - * @return A map of key-value pairs which is issued by the commit coordinator to uniquely identify - * the table. This should be stored in the table's metadata for table property {@link - * io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF}. This information - * needs to be passed to the {@link #commit}, {@link #getCommits}, and {@link - * #backfillToVersion} APIs to identify the table. - */ - Map registerTable( - Engine engine, - String logPath, - Optional tableIdentifier, - long currentVersion, - AbstractMetadata currentMetadata, - AbstractProtocol currentProtocol); - - /** - * Commit the given set of actions to the table represented by {@code tableDescriptor}. - * - * @param engine The {@link Engine} instance to use. This gives client implementations access to - * {@link io.delta.kernel.engine.JsonHandler#writeJsonFileAtomically} in order to write the - * given set of actions to an unbackfilled Delta file. - * @param tableDescriptor The descriptor for the table. - * @param commitVersion The version of the commit that is being committed. - * @param actions The set of actions to be committed - * @param updatedActions Additional information for the commit, including: - *

- * - * @return {@link CommitResponse} containing the file status of the committed file. Note: If the - * commit is already backfilled, the file status may be omitted, and the client can retrieve - * this information independently. - * @throws CommitFailedException if the commit operation fails - */ - CommitResponse commit( - Engine engine, - TableDescriptor tableDescriptor, - long commitVersion, - CloseableIterator actions, - UpdatedActions updatedActions) - throws CommitFailedException; - - /** - * Get the unbackfilled commits for the table represented by the given tableDescriptor. Commits - * older than startVersion (if given) or newer than endVersion (if given) are ignored. The - * returned commits are contiguous and in ascending version order. - * - *

Note that the first version returned by this API may not be equal to startVersion. This - * happens when some versions starting from startVersion have already been backfilled and so the - * commit coordinator may have stopped tracking them. - * - *

The returned latestTableVersion is the maximum commit version ratified by the commit - * coordinator. Note that returning latestTableVersion as -1 is acceptable only if the commit - * coordinator never ratified any version, i.e. it never accepted any unbackfilled commit. - * - * @param engine The {@link Engine} instance to use, if needed. - * @param tableDescriptor The descriptor for the table. - * @param startVersion The minimum version of the commit that should be returned, or {@link - * Optional#empty()} if there is no minimum. - * @param endVersion The maximum version of the commit that should be returned, or {@link - * Optional#empty()} if there is no maximum. - * @return {@link GetCommitsResponse} which has a list of {@link - * io.delta.kernel.engine.coordinatedcommits.Commit}s and the latestTableVersion which is - * tracked by the {@link CommitCoordinatorClient}. - */ - GetCommitsResponse getCommits( - Engine engine, - TableDescriptor tableDescriptor, - Optional startVersion, - Optional endVersion); - - /** - * Backfill all commits up to {@code version} and notify the commit coordinator. - * - *

If this API returns successfully, that means the backfill must have been completed, although - * the commit coordinator may not be aware of it yet. - * - * @param engine The {@link Engine} instance to use, if needed. - * @param tableDescriptor The descriptor for the table. - * @param version The version until which the commit coordinator client should backfill. - * @param lastKnownBackfilledVersion The last known version that was backfilled before this API - * was called. If it is {@link Optional#empty()}, then the commit coordinator client should - * backfill from the beginning of the table. - * @throws IOException if there is an IO error while backfilling the commits. - */ - void backfillToVersion( - Engine engine, - TableDescriptor tableDescriptor, - long version, - Optional lastKnownBackfilledVersion) - throws IOException; - - /** - * Checks if this CommitCoordinatorClient is semantically equal to another - * CommitCoordinatorClient. - */ - boolean semanticEquals(CommitCoordinatorClient other); -} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java b/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java deleted file mode 100644 index 827b31db91c..00000000000 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (2024) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.delta.kernel.coordinatedcommits; - -import static java.util.Objects.requireNonNull; - -import io.delta.kernel.TableIdentifier; -import io.delta.kernel.annotation.Evolving; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; - -/** - * The complete descriptor of a Coordinated Commits (CC) Delta table, including its logPath, table - * identifier, and table CC configuration. - * - * @since 3.3.0 - */ -@Evolving -public class TableDescriptor { - - private final String logPath; - private final Optional tableIdOpt; - private final Map tableConf; - - public TableDescriptor( - String logPath, Optional tableIdOpt, Map tableConf) { - this.logPath = requireNonNull(logPath, "logPath is null"); - this.tableIdOpt = requireNonNull(tableIdOpt, "tableIdOpt is null"); - this.tableConf = requireNonNull(tableConf, "tableConf is null"); - } - - /** Returns the Delta log path of the table. */ - public String getLogPath() { - return logPath; - } - - /** Returns the optional table identifier of the table, e.g. $catalog / $schema / $tableName */ - public Optional getTableIdentifierOpt() { - return tableIdOpt; - } - - /** - * Returns the Coordinated Commits table configuration. - * - *

This is the parsed value of the Delta table property {@link - * io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF} and represents the - * configuration properties for describing the Delta table to commit-coordinator. - */ - public Map getTableConf() { - return tableConf; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final TableDescriptor that = (TableDescriptor) o; - return getLogPath().equals(that.getLogPath()) - && tableIdOpt.equals(that.tableIdOpt) - && getTableConf().equals(that.getTableConf()); - } - - @Override - public int hashCode() { - return Objects.hash(getLogPath(), tableIdOpt, getTableConf()); - } - - @Override - public String toString() { - return "TableDescriptor{" - + "logPath='" - + logPath - + '\'' - + ", tableIdOpt=" - + tableIdOpt - + ", tableConf=" - + tableConf - + '}'; - } -} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java index cd073d353ed..982a0f591cf 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/Engine.java @@ -17,7 +17,6 @@ package io.delta.kernel.engine; import io.delta.kernel.annotation.Evolving; -import io.delta.kernel.coordinatedcommits.CommitCoordinatorClient; import java.util.Map; /** @@ -57,23 +56,6 @@ public interface Engine { */ ParquetHandler getParquetHandler(); - /** - * Retrieves a {@link CommitCoordinatorClient} for the specified commit coordinator name. - * - * @param commitCoordinatorName The name (identifier) of the underlying commit coordinator client - * to instantiate - * @param commitCoordinatorConf The configuration settings for the underlying commit coordinator - * client, taken directly from the Delta table property {@link - * io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF} - * @return A {@link CommitCoordinatorClient} implementation corresponding to the specified commit - * coordinator name - * @since 3.3.0 - */ - default CommitCoordinatorClient getCommitCoordinatorClient( - String commitCoordinatorName, Map commitCoordinatorConf) { - throw new UnsupportedOperationException("Not implemented"); - } - /** * Retrieves a {@link CommitCoordinatorClientHandler} for the specified commit coordinator client. * diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/coordinatedcommits/TableDescriptorSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/coordinatedcommits/TableDescriptorSuite.scala deleted file mode 100644 index 0ab239f9f0d..00000000000 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/coordinatedcommits/TableDescriptorSuite.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (2024) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.delta.kernel.coordinatedcommits - -import org.scalatest.funsuite.AnyFunSuite -import io.delta.kernel.TableIdentifier -import java.util.Optional - -import scala.collection.JavaConverters._ - -class TableDescriptorSuite extends AnyFunSuite { - - test("TableDescriptor should throw NullPointerException for null constructor arguments") { - assertThrows[NullPointerException] { - new TableDescriptor(null, Optional.empty(), Map.empty[String, String].asJava) - } - assertThrows[NullPointerException] { - new TableDescriptor("/delta/logPath", null, Map.empty[String, String].asJava) - } - assertThrows[NullPointerException] { - new TableDescriptor("/delta/logPath", Optional.empty(), null) - } - } - - test("TableDescriptor should return the correct logPath, tableIdOpt, and tableConf") { - val logPath = "/delta/logPath" - val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table")) - val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava - - val tableDescriptor = new TableDescriptor(logPath, tableIdOpt, tableConf) - - assert(tableDescriptor.getLogPath == logPath) - assert(tableDescriptor.getTableIdentifierOpt == tableIdOpt) - assert(tableDescriptor.getTableConf == tableConf) - } - - test("TableDescriptors with the same values should be equal") { - val logPath = "/delta/logPath" - val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table")) - val tableConf = Map("key1" -> "value1", "key2" -> "value2").asJava - - val tableDescriptor1 = new TableDescriptor(logPath, tableIdOpt, tableConf) - val tableDescriptor2 = new TableDescriptor(logPath, tableIdOpt, tableConf) - - assert(tableDescriptor1 == tableDescriptor2) - assert(tableDescriptor1.hashCode == tableDescriptor2.hashCode) - } - - test("TableDescriptor with different values should not be equal") { - val logPath = "/delta/logPath" - val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table")) - val tableConf1 = Map("key1" -> "value1").asJava - val tableConf2 = Map("key1" -> "value2").asJava - - val tableDescriptor1 = new TableDescriptor(logPath, tableIdOpt, tableConf1) - val tableDescriptor2 = new TableDescriptor(logPath, tableIdOpt, tableConf2) - - assert(tableDescriptor1 != tableDescriptor2) - } - - test("TableDescriptor toString format") { - val logPath = "/delta/logPath" - val tableIdOpt = Optional.of(new TableIdentifier(Array("catalog", "schema"), "table")) - val tableConf = Map("key1" -> "value1").asJava - - val tableDescriptor = new TableDescriptor(logPath, tableIdOpt, tableConf) - val expectedString = "TableDescriptor{logPath='/delta/logPath', " + - "tableIdOpt=Optional[TableIdentifier{catalog.schema.table}], " + - "tableConf={key1=value1}}" - assert(tableDescriptor.toString == expectedString) - } -}