From da58cad55741313852005cf2d84a7f2e0280bf2b Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Wed, 18 Dec 2024 19:35:07 -0800 Subject: [PATCH] [Kernel] Remove CC code from SnapshotManager (#3986) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description We are re-thinking the design of the Coordinated Commits table feature and much of this snapshot code will be refactored. Remove it for now as it greatly complicates our snapshot construction, and hopefully we can be more intentional in our code design/organization when re-implementing it. https://github.com/delta-io/delta/commit/fc81d1247d66cc32e454e985f0cfc81447f897b6 already removed the public interfaces and made it such that `SnapshotImpl::getTableCommitCoordinatorClientHandlerOpt` never returned a handler. ## How was this patch tested? Existing tests should suffice. ## Does this PR introduce _any_ user-facing changes? No. --- .../delta/kernel/internal/SnapshotImpl.java | 13 -- .../delta/kernel/internal/lang/ListUtils.java | 4 - .../internal/snapshot/SnapshotManager.java | 201 +++--------------- .../internal/SnapshotManagerSuite.scala | 195 ++++++----------- .../internal/DefaultEngineErrors.java | 9 - 5 files changed, 93 insertions(+), 329 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index e30a04f5e21..cb511be9e30 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -29,7 +29,6 @@ import io.delta.kernel.internal.replay.CreateCheckpointIterator; import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.snapshot.LogSegment; -import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler; import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.types.StructType; import java.util.List; @@ -169,16 +168,4 @@ public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) { public Optional getLatestTransactionVersion(Engine engine, String applicationId) { return logReplay.getLatestTransactionIdentifier(engine, applicationId); } - - /** - * Returns the commit coordinator client handler based on the table metadata in this snapshot. - * - * @param engine the engine to use for IO operations - * @return the commit coordinator client handler for this snapshot or empty if the metadata is not - * configured to use the commit coordinator. - */ - public Optional getTableCommitCoordinatorClientHandlerOpt( - Engine engine) { - return Optional.empty(); - } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/lang/ListUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/lang/ListUtils.java index 06ba6cab887..e1320554597 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/lang/ListUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/lang/ListUtils.java @@ -32,10 +32,6 @@ public static Tuple2, List> partition( return new Tuple2<>(partitionMap.get(true), partitionMap.get(false)); } - public static T last(List list) { - return list.get(list.size() - 1); - } - /** Remove once supported JDK (build) version is 21 or above */ public static T getFirst(List list) { if (list.isEmpty()) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 92784102f41..dd370d42ac2 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -35,7 +35,6 @@ import io.delta.kernel.internal.*; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.checkpoints.*; -import io.delta.kernel.internal.coordinatedcommits.Commit; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.ListUtils; import io.delta.kernel.internal.replay.CreateCheckpointIterator; @@ -48,7 +47,6 @@ import java.io.*; import java.nio.file.FileAlreadyExistsException; import java.util.*; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -137,30 +135,14 @@ public Snapshot buildLatestSnapshot(Engine engine) throws TableNotFoundException public Snapshot getSnapshotAt(Engine engine, long version) throws TableNotFoundException { Optional logSegmentOpt = - getLogSegmentAtOrBeforeVersion( + getLogSegmentForVersion( engine, Optional.empty(), /* startCheckpointOpt */ - Optional.of(version) /* versionToLoadOpt */, - Optional.empty() /* tableCommitHandlerOpt */); - - // For non-coordinated commit table, the {@code getCoordinatedCommitsAwareSnapshot} will - // create the snapshot with the {@code logSegmentOpt} built here and will not trigger other - // operations. For coordinated commit table, the {@code getCoordinatedCommitsAwareSnapshot} - // will create the snapshot with the {@code logSegmentOpt} built here and will build the - // logSegment again by also fetching the unbackfilled commits from the commit coordinator. - // With the unbackfilled commits plus the backfilled commits in Delta log, a new snapshot - // will be created. - SnapshotImpl snapshot = - logSegmentOpt - .map( - logSegment -> - getCoordinatedCommitsAwareSnapshot(engine, logSegment, Optional.of(version))) - .orElseThrow(() -> new TableNotFoundException(tablePath.toString())); - long snapshotVer = snapshot.getVersion(engine); - if (snapshotVer != version) { - throw DeltaErrors.versionAfterLatestCommit(tablePath.toString(), version, snapshotVer); - } - return snapshot; + Optional.of(version) /* versionToLoadOpt */); + + return logSegmentOpt + .map(logSegment -> createSnapshot(logSegment, engine)) + .orElseThrow(() -> new TableNotFoundException(tablePath.toString())); } /** @@ -322,32 +304,11 @@ private Optional> listFromOrNone(Engine engine, lo * @param startVersion the version to start. Inclusive. * @param versionToLoad the optional parameter to set the max version we should return. Inclusive. * Must be >= startVersion if provided. - *

This method lists all delta files and checkpoint files with the following steps: - *

    - *
  • When the table is set up with a commit coordinator, retrieve any files that haven't - * been backfilled by initiating a request to the commit coordinator service. If the - * table is not configured to use a commit coordinator, this list will be empty. - *
  • Collect commit files (aka backfilled commits) and checkpoint files by listing the - * contents of the Delta log on storage. - *
  • Filter un-backfilled files to exclude overlapping delta files collected from both - * commit-coordinator and file-system to avoid duplicates. - *
  • Merge and return the backfilled files and filtered un-backfilled files. - *
- *

*Note*: If table is a coordinated-commits table, the commit-coordinator client MUST be - * passed to correctly list the commits. - * @param startVersion the version to start. Inclusive. - * @param versionToLoad the optional parameter to set the max version we should return. Inclusive. - * Must be >= startVersion if provided. - * @param tableCommitHandlerOpt the optional commit-coordinator client handler to use for fetching - * un-backfilled commits. * @return Some array of files found (possibly empty, if no usable commit files are present), or * None if the listing returned no files at all. */ protected final Optional> listDeltaAndCheckpointFiles( - Engine engine, - long startVersion, - Optional versionToLoad, - Optional tableCommitHandlerOpt) { + Engine engine, long startVersion, Optional versionToLoad) { versionToLoad.ifPresent( v -> checkArgument( @@ -355,22 +316,10 @@ protected final Optional> listDeltaAndCheckpointFiles( "versionToLoad=%s provided is less than startVersion=%s", v, startVersion)); - logger.debug( - "startVersion: {}, versionToLoad: {}, coordinated commits enabled: {}", - startVersion, - versionToLoad, - tableCommitHandlerOpt.isPresent()); - - // Fetching the unbackfilled commits before doing the log directory listing to avoid a gap - // in delta versions if some delta files are backfilled after the log directory listing but - // before the unbackfilled commits listing - List unbackfilledCommits = - getUnbackfilledCommits(tableCommitHandlerOpt, startVersion, versionToLoad); - - final AtomicLong maxDeltaVersionSeen = new AtomicLong(startVersion - 1); - Optional> listing = listFromOrNone(engine, startVersion); - Optional> resultFromFsListingOpt = - listing.map( + logger.debug("startVersion: {}, versionToLoad: {}", startVersion, versionToLoad); + + return listFromOrNone(engine, startVersion) + .map( fileStatusesIter -> { final List output = new ArrayList<>(); @@ -406,64 +355,11 @@ protected final Optional> listDeltaAndCheckpointFiles( } break; } - - // Ideally listFromOrNone should return lexicographically sorted - // files and so maxDeltaVersionSeen should be equal to fileVersion. - // But we are being defensive here and taking max of all the - // fileVersions seen. - if (FileNames.isCommitFile(fileName)) { - maxDeltaVersionSeen.set( - Math.max( - maxDeltaVersionSeen.get(), FileNames.deltaVersion(fileStatus.getPath()))); - } output.add(fileStatus); } return output; }); - - if (!tableCommitHandlerOpt.isPresent()) { - return resultFromFsListingOpt; - } - List relevantUnbackfilledCommits = - unbackfilledCommits.stream() - .filter((commit) -> commit.getVersion() > maxDeltaVersionSeen.get()) - .filter( - (commit) -> - !versionToLoad.isPresent() || commit.getVersion() <= versionToLoad.get()) - .map(Commit::getFileStatus) - .collect(Collectors.toList()); - - return resultFromFsListingOpt.map( - fsListing -> { - fsListing.addAll(relevantUnbackfilledCommits); - return fsListing; - }); - } - - private List getUnbackfilledCommits( - Optional tableCommitHandlerOpt, - long startVersion, - Optional versionToLoad) { - try { - return tableCommitHandlerOpt - .map( - commitCoordinatorClientHandler -> { - logger.info( - "Getting un-backfilled commits from commit coordinator for " + "table: {}", - tablePath); - return commitCoordinatorClientHandler - .getCommits(startVersion, versionToLoad.orElse(null)) - .getCommits(); - }) - .orElse(Collections.emptyList()); - } catch (Exception e) { - logger.error( - "Failed to get unbackfilled commits of table {} with commit coordinator: {}", - tablePath, - e); - throw e; - } } /** @@ -482,41 +378,10 @@ private SnapshotImpl getSnapshotAtInit(Engine engine) throws TableNotFoundExcept Optional logSegmentOpt = getLogSegmentFrom(engine, lastCheckpointOpt); return logSegmentOpt - .map(logSegment -> getCoordinatedCommitsAwareSnapshot(engine, logSegment, Optional.empty())) + .map(logSegment -> createSnapshot(logSegment, engine)) .orElseThrow(() -> new TableNotFoundException(tablePath.toString())); } - /** - * This can be optimized by making snapshot hint optimization to work with coordinated commits. - * - * @see issue #3437. - */ - private SnapshotImpl getCoordinatedCommitsAwareSnapshot( - Engine engine, LogSegment initialSegmentForNewSnapshot, Optional versionToLoadOpt) { - SnapshotImpl newSnapshot = createSnapshot(initialSegmentForNewSnapshot, engine); - - if (versionToLoadOpt.isPresent() && newSnapshot.getVersion(engine) == versionToLoadOpt.get()) { - return newSnapshot; - } - - Optional newTableCommitCoordinatorClientHandlerOpt = - newSnapshot.getTableCommitCoordinatorClientHandlerOpt(engine); - - if (newTableCommitCoordinatorClientHandlerOpt.isPresent()) { - Optional segmentOpt = - getLogSegmentAtOrBeforeVersion( - engine, - newSnapshot.getLogSegment().checkpointVersionOpt, /* startCheckpointOpt */ - versionToLoadOpt /* versionToLoadOpt */, - newTableCommitCoordinatorClientHandlerOpt /* tableCommitHandlerOpt */); - newSnapshot = - segmentOpt - .map(segment -> createSnapshot(segment, engine)) - .orElseThrow(() -> new TableNotFoundException(tablePath.toString())); - } - return newSnapshot; - } - private SnapshotImpl createSnapshot(LogSegment initSegment, Engine engine) { final String startingFromStr = initSegment @@ -566,31 +431,26 @@ private SnapshotImpl createSnapshot(LogSegment initSegment, Engine engine) { */ private Optional getLogSegmentFrom( Engine engine, Optional startingCheckpoint) { - return getLogSegmentAtOrBeforeVersion( - engine, startingCheckpoint.map(x -> x.version), Optional.empty(), Optional.empty()); + return getLogSegmentForVersion( + engine, startingCheckpoint.map(x -> x.version), Optional.empty()); } /** - * Get a list of files that can be used to compute a Snapshot at or before version - * `versionToLoad`, If `versionToLoad` is not provided, will generate the list of files that are - * needed to load the latest version of the Delta table. This method also performs checks to - * ensure that the delta files are contiguous. + * Get a list of files that can be used to compute a Snapshot at version `versionToLoad`, if + * `versionToLoad` is not provided, will generate the list of files that are needed to load the + * latest version of the Delta table. This method also performs checks to ensure that the delta + * files are contiguous. * * @param startCheckpoint A potential start version to perform the listing of the DeltaLog, * typically that of a known checkpoint. If this version's not provided, we will start listing * from version 0. - * @param versionToLoad A specific version we try to load, but we may only load a version before - * this version if this version of commit is un-backfilled. Typically used with time travel - * and the Delta streaming source. If not provided, we will try to load the latest version of - * the table. + * @param versionToLoad A specific version to load. Typically used with time travel and the Delta + * streaming source. If not provided, we will try to load the latest version of the table. * @return Some LogSegment to build a Snapshot if files do exist after the given startCheckpoint. * None, if the delta log directory was missing or empty. */ - protected Optional getLogSegmentAtOrBeforeVersion( - Engine engine, - Optional startCheckpoint, - Optional versionToLoad, - Optional tableCommitHandlerOpt) { + public Optional getLogSegmentForVersion( + Engine engine, Optional startCheckpoint, Optional versionToLoad) { // Only use startCheckpoint if it is <= versionToLoad Optional startCheckpointToUse = startCheckpoint.filter(v -> !versionToLoad.isPresent() || v <= versionToLoad.get()); @@ -620,7 +480,7 @@ protected Optional getLogSegmentAtOrBeforeVersion( long startTimeMillis = System.currentTimeMillis(); final Optional> newFiles = - listDeltaAndCheckpointFiles(engine, startVersion, versionToLoad, tableCommitHandlerOpt); + listDeltaAndCheckpointFiles(engine, startVersion, versionToLoad); logger.info( "{}: Took {}ms to list the files after starting checkpoint", tablePath, @@ -628,8 +488,7 @@ protected Optional getLogSegmentAtOrBeforeVersion( startTimeMillis = System.currentTimeMillis(); try { - return getLogSegmentAtOrBeforeVersion( - engine, startCheckpointToUse, versionToLoad, newFiles, tableCommitHandlerOpt); + return getLogSegmentForVersion(engine, startCheckpointToUse, versionToLoad, newFiles); } finally { logger.info( "{}: Took {}ms to construct a log segment", @@ -642,12 +501,11 @@ protected Optional getLogSegmentAtOrBeforeVersion( * Helper function for the getLogSegmentForVersion above. Called with a provided files list, and * will then try to construct a new LogSegment using that. */ - protected Optional getLogSegmentAtOrBeforeVersion( + protected Optional getLogSegmentForVersion( Engine engine, Optional startCheckpointOpt, Optional versionToLoadOpt, - Optional> filesOpt, - Optional tableCommitHandlerOpt) { + Optional> filesOpt) { final List newFiles; if (filesOpt.isPresent()) { newFiles = filesOpt.get(); @@ -681,8 +539,7 @@ protected Optional getLogSegmentAtOrBeforeVersion( } else if (newFiles.isEmpty()) { // The directory may be deleted and recreated and we may have stale state in our // DeltaLog singleton, so try listing from the first version - return getLogSegmentAtOrBeforeVersion( - engine, Optional.empty(), versionToLoadOpt, tableCommitHandlerOpt); + return getLogSegmentForVersion(engine, Optional.empty(), versionToLoadOpt); } Tuple2, List> checkpointsAndDeltas = @@ -797,7 +654,7 @@ protected Optional getLogSegmentAtOrBeforeVersion( } versionToLoadOpt - .filter(v -> v < newVersion) + .filter(v -> v != newVersion) .ifPresent( v -> { throw DeltaErrors.versionAfterLatestCommit(tablePath.toString(), v, newVersion); @@ -822,7 +679,7 @@ protected Optional getLogSegmentAtOrBeforeVersion( verifyDeltaVersions( deltaVersionsAfterCheckpoint, Optional.of(newCheckpointVersion + 1), - Optional.of(newVersion), + versionToLoadOpt, tablePath); } diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala index 02b6e2c4e2f..0a728b7ab02 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala @@ -13,28 +13,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.delta.kernel.internal.snapshot +package io.delta.kernel.internal + +import java.util.{Arrays, Collections, Optional} -import java.util.{Arrays, Collections, List, Optional} -import java.{lang => javaLang} import scala.collection.JavaConverters._ import scala.reflect.ClassTag + import io.delta.kernel.data.{ColumnarBatch, ColumnVector} import io.delta.kernel.exceptions.InvalidTableException import io.delta.kernel.expressions.Predicate -import io.delta.kernel.internal.actions.CommitInfo import io.delta.kernel.internal.checkpoints.{CheckpointInstance, SidecarFile} import io.delta.kernel.internal.coordinatedcommits.{Commit, CommitCoordinatorClientHandler, CommitResponse, GetCommitsResponse} import io.delta.kernel.internal.fs.Path -import io.delta.kernel.internal.snapshot.{LogSegment, SnapshotManager, TableCommitCoordinatorClientHandler} +import io.delta.kernel.internal.snapshot.{LogSegment, SnapshotManager} import io.delta.kernel.internal.util.{FileNames, Utils} import io.delta.kernel.test.{BaseMockJsonHandler, BaseMockParquetHandler, MockFileSystemClientUtils, VectorTestUtils} import io.delta.kernel.types.StructType import io.delta.kernel.utils.{CloseableIterator, FileStatus} import org.scalatest.funsuite.AnyFunSuite -import scala.collection.JavaConverters._ - class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { test("verifyDeltaVersions") { @@ -122,7 +120,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } ////////////////////////////////////////////////////////////////////////////////// - // getLogSegmentAtOrBeforeVersion tests + // getLogSegmentForVersion tests ////////////////////////////////////////////////////////////////////////////////// private val snapshotManager = new SnapshotManager(logPath, dataPath) @@ -159,9 +157,9 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } /** - * Test `getLogSegmentAtOrBeforeVersion` for a given set of delta versions, singular checkpoint - * versions, and multi-part checkpoint versions with a given _last_checkpoint starting checkpoint - * and a versionToLoad. + * Test `getLogSegmentForVersion` for a given set of delta versions, singular checkpoint versions, + * and multi-part checkpoint versions with a given _last_checkpoint starting checkpoint and + * a versionToLoad. * * @param deltaVersions versions of the delta JSON files in the delta log * @param checkpointVersions version of the singular checkpoint parquet files in the delta log @@ -180,8 +178,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { numParts: Int = -1, startCheckpoint: Optional[java.lang.Long] = Optional.empty(), versionToLoad: Optional[java.lang.Long] = Optional.empty(), - v2CheckpointSpec: Seq[(Long, Boolean, Int)] = Seq.empty, - unbackfilledDeltaVersions: Seq[Long] = Seq.empty): Unit = { + v2CheckpointSpec: Seq[(Long, Boolean, Int)] = Seq.empty): Unit = { val deltas = deltaFileStatuses(deltaVersions) val singularCheckpoints = singularCheckpointFileStatuses(checkpointVersions) val multiCheckpoints = multiCheckpointFileStatuses(multiCheckpointVersions, numParts) @@ -219,21 +216,19 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } }.getOrElse((Seq.empty, Seq.empty)) - val logSegmentOpt = snapshotManager.getLogSegmentAtOrBeforeVersion( + val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(listFromProvider(deltas ++ checkpointFiles)("/"), new MockSidecarParquetHandler(expectedSidecars), new MockSidecarJsonHandler(expectedSidecars)), Optional.empty(), - versionToLoad, - Optional.of( - new MockTableCommitCoordinatorClientHandler(logPath, unbackfilledDeltaVersions)) + versionToLoad ) assert(logSegmentOpt.isPresent()) val expectedDeltas = deltaFileStatuses( deltaVersions.filter { v => v > expectedCheckpointVersion.getOrElse(-1L) && v <= versionToLoad.orElse(Long.MaxValue) - } ++ unbackfilledDeltaVersions.filter(_ > deltaVersions.max) + } ) val expectedCheckpoints = expectedCheckpointVersion.map { v => if (expectedV2Checkpoint.nonEmpty) { @@ -246,15 +241,13 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } }.getOrElse(Seq.empty) - val maxVersion = - if (unbackfilledDeltaVersions.isEmpty) deltaVersions.max else unbackfilledDeltaVersions.max checkLogSegment( logSegmentOpt.get(), - expectedVersion = versionToLoad.orElse(maxVersion), + expectedVersion = versionToLoad.orElse(deltaVersions.max), expectedDeltas = expectedDeltas, expectedCheckpoints = expectedCheckpoints, expectedCheckpointVersion = expectedCheckpointVersion, - expectedLastCommitTimestamp = versionToLoad.orElse(maxVersion) * 10 + expectedLastCommitTimestamp = versionToLoad.orElse(deltaVersions.max) * 10 ) } } @@ -272,8 +265,8 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } /** - * Test `getLogSegmentAtOrBeforeVersion` for a set of delta versions and checkpoint versions. - * Tests with (1) singular checkpoint (2) multi-part checkpoints with 5 parts + * Test `getLogSegmentForVersion` for a set of delta versions and checkpoint versions. Tests + * with (1) singular checkpoint (2) multi-part checkpoints with 5 parts * (3) multi-part checkpoints with 1 part */ def testWithSingularAndMultipartCheckpoint( @@ -319,16 +312,12 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { files: Seq[FileStatus], startCheckpoint: Optional[java.lang.Long] = Optional.empty(), versionToLoad: Optional[java.lang.Long] = Optional.empty(), - expectedErrorMessageContains: String = "", - tableCommitCoordinatorClientHandlerOpt: - Optional[TableCommitCoordinatorClientHandler] = Optional.empty() - )(implicit classTag: ClassTag[T]): Unit = { + expectedErrorMessageContains: String = "")(implicit classTag: ClassTag[T]): Unit = { val e = intercept[T] { - snapshotManager.getLogSegmentAtOrBeforeVersion( + snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(files), startCheckpoint, - versionToLoad, - tableCommitCoordinatorClientHandlerOpt + versionToLoad ) } assert(e.getMessage.contains(expectedErrorMessageContains)) @@ -336,18 +325,18 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { /* ------------------- VALID DELTA LOG FILE LISTINGS ----------------------- */ - test("getLogSegmentAtOrBeforeVersion: 000.json only") { + test("getLogSegmentForVersion: 000.json only") { testNoCheckpoint(Seq(0)) testNoCheckpoint(Seq(0), Optional.of(0)) } - test("getLogSegmentAtOrBeforeVersion: 000.json .. 009.json") { + test("getLogSegmentForVersion: 000.json .. 009.json") { testNoCheckpoint(0L until 10L) testNoCheckpoint(0L until 10L, Optional.of(9)) testNoCheckpoint(0L until 10L, Optional.of(5)) } - test("getLogSegmentAtOrBeforeVersion: 000.json..010.json + checkpoint(10)") { + test("getLogSegmentForVersion: 000.json..010.json + checkpoint(10)") { testWithSingularAndMultipartCheckpoint( deltaVersions = (0L to 10L), checkpointVersions = Seq(10) @@ -381,7 +370,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { ) } - test("getLogSegmentAtOrBeforeVersion: 000.json...20.json + checkpoint(10) + checkpoint(20)") { + test("getLogSegmentForVersion: 000.json...20.json + checkpoint(10) + checkpoint(20)") { testWithSingularAndMultipartCheckpoint( deltaVersions = (0L to 20L), checkpointVersions = Seq(10, 20) @@ -416,7 +405,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { ) } - test("getLogSegmentAtOrBeforeVersion: outdated _last_checkpoint that does not exist") { + test("getLogSegmentForVersion: outdated _last_checkpoint that does not exist") { testWithSingularAndMultipartCheckpoint( deltaVersions = (20L until 25L), checkpointVersions = Seq(20), @@ -430,7 +419,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { ) } - test("getLogSegmentAtOrBeforeVersion: 20.json...25.json + checkpoint(20)") { + test("getLogSegmentForVersion: 20.json...25.json + checkpoint(20)") { testWithSingularAndMultipartCheckpoint( deltaVersions = (20L to 25L), checkpointVersions = Seq(20) @@ -447,18 +436,17 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { ) } - test("getLogSegmentAtOrBeforeVersion: empty delta log") { + test("getLogSegmentForVersion: empty delta log") { // listDeltaAndCheckpointFiles = Optional.empty() - val logSegmentOpt = snapshotManager.getLogSegmentAtOrBeforeVersion( + val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(Seq.empty), Optional.empty(), - Optional.empty(), - Optional.empty() /* tableCommitCoordinatorClientHandlerOpt */ + Optional.empty() ) assert(!logSegmentOpt.isPresent()) } - test("getLogSegmentAtOrBeforeVersion: no delta files in the delta log") { + test("getLogSegmentForVersion: no delta files in the delta log") { // listDeltaAndCheckpointFiles = Optional.of(EmptyList) val files = Seq("foo", "notdelta.parquet", "foo.json", "001.checkpoint.00f.oo0.parquet") .map(FileStatus.of(_, 10, 10)) @@ -475,7 +463,22 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { ) } - test("getLogSegmentAtOrBeforeVersion: start listing from _last_checkpoint when it is provided") { + test("getLogSegmentForVersion: versionToLoad higher than possible") { + testExpectedError[RuntimeException]( + files = deltaFileStatuses(Seq(0L)), + versionToLoad = Optional.of(15), + expectedErrorMessageContains = + "Cannot load table version 15 as it does not exist. The latest available version is 0" + ) + testExpectedError[RuntimeException]( + files = deltaFileStatuses((10L until 13L)) ++ singularCheckpointFileStatuses(Seq(10L)), + versionToLoad = Optional.of(15), + expectedErrorMessageContains = + "Cannot load table version 15 as it does not exist. The latest available version is 12" + ) + } + + test("getLogSegmentForVersion: start listing from _last_checkpoint when it is provided") { val deltas = deltaFileStatuses(0L until 25) val checkpoints = singularCheckpointFileStatuses(Seq(10L, 20L)) val files = deltas ++ checkpoints @@ -486,11 +489,10 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { listFromProvider(files)(filePath) } for (checkpointV <- Seq(10, 20)) { - val logSegmentOpt = snapshotManager.getLogSegmentAtOrBeforeVersion( + val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(listFrom(checkpointV)(_)), Optional.of(checkpointV), - Optional.empty(), - Optional.empty() /* tableCommitCoordinatorClientHandlerOpt */ + Optional.empty() ) assert(logSegmentOpt.isPresent()) checkLogSegment( @@ -504,7 +506,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } } - test("getLogSegmentAtOrBeforeVersion: multi-part and single-part checkpoints in same log") { + test("getLogSegmentForVersion: multi-part and single-part checkpoints in same log") { testWithCheckpoints( (0L to 50L), Seq(10, 30, 50), @@ -520,7 +522,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { ) } - test("getLogSegmentAtOrBeforeVersion: versionToLoad not constructable from history") { + test("getLogSegmentForVersion: versionToLoad not constructable from history") { val files = deltaFileStatuses(20L until 25L) ++ singularCheckpointFileStatuses(Seq(20L)) testExpectedError[RuntimeException]( files, @@ -639,7 +641,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { /* ------------------- CORRUPT DELTA LOG FILE LISTINGS ------------------ */ - test("getLogSegmentAtOrBeforeVersion: corrupt listing with only checkpoint file") { + test("getLogSegmentForVersion: corrupt listing with only checkpoint file") { for (versionToLoad <- Seq(Optional.empty(), Optional.of(10L)): Seq[Optional[java.lang.Long]]) { for (startCheckpoint <- @@ -654,7 +656,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } } - test("getLogSegmentAtOrBeforeVersion: corrupt listing with missing log files") { + test("getLogSegmentForVersion: corrupt listing with missing log files") { // checkpoint(10), 010.json, 011.json, 013.json val fileList = deltaFileStatuses(Seq(10L, 11L)) ++ deltaFileStatuses(Seq(13L)) ++ singularCheckpointFileStatuses(Seq(10L)) @@ -674,7 +676,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { ) } - test("getLogSegmentAtOrBeforeVersion: corrupt listing 000.json...009.json + checkpoint(10)") { + test("getLogSegmentForVersion: corrupt listing 000.json...009.json + checkpoint(10)") { val fileList = deltaFileStatuses((0L until 10L)) ++ singularCheckpointFileStatuses(Seq(10L)) /* ---------- version to load is 15 (greater than latest checkpoint/delta file) ---------- */ @@ -703,7 +705,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } // it's weird that checkpoint(10) fails but 011.json...014.json + checkpoint(10) does not - test("getLogSegmentAtOrBeforeVersion: corrupt listing 011.json...014.json + checkpoint(10)") { + test("getLogSegmentForVersion: corrupt listing 011.json...014.json + checkpoint(10)") { val fileList = singularCheckpointFileStatuses(Seq(10L)) ++ deltaFileStatuses((11L until 15L)) /* ---------- versionToLoad is latest (14) ---------- */ // no error @@ -731,8 +733,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { ) } - test("getLogSegmentAtOrBeforeVersion: corrupted log missing json files / no way to construct " + - "history") { + test("getLogSegmentForVersion: corrupted log missing json files / no way to construct history") { testExpectedError[InvalidTableException]( deltaFileStatuses(1L until 10L), expectedErrorMessageContains = "missing log file for version 0" @@ -765,7 +766,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { ) } - test("getLogSegmentAtOrBeforeVersion: corrupt log but reading outside corrupted range") { + test("getLogSegmentForVersion: corrupt log but reading outside corrupted range") { testNoCheckpoint( deltaVersions = (0L until 5L) ++ (6L until 9L), versionToLoad = Optional.of(4) @@ -783,7 +784,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { ) } - test("getLogSegmentAtOrBeforeVersion: corrupt _last_checkpoint (is after existing versions)") { + test("getLogSegmentForVersion: corrupt _last_checkpoint (is after existing versions)") { // in the case of a corrupted _last_checkpoint we revert to listing from version 0 // (on first run newFiles.isEmpty() but since startingCheckpointOpt.isPresent() re-list from 0) testWithSingularAndMultipartCheckpoint( @@ -794,7 +795,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } // TODO recover from missing checkpoint (getLogSegmentWithMaxExclusiveCheckpointVersion) - test("getLogSegmentAtOrBeforeVersion: corrupt _last_checkpoint refers to in range version " + + test("getLogSegmentForVersion: corrupt _last_checkpoint refers to in range version " + "but no valid checkpoint") { testExpectedError[RuntimeException]( deltaFileStatuses(0L until 25L) ++ singularCheckpointFileStatuses(Seq(10L)), @@ -813,7 +814,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { ) } - test("getLogSegmentAtOrBeforeVersion: corrupted incomplete multi-part checkpoint with no" + + test("getLogSegmentForVersion: corrupted incomplete multi-part checkpoint with no" + "_last_checkpoint or a valid _last_checkpoint provided") { val cases: Seq[(Long, Seq[Long], Seq[Long], Optional[java.lang.Long])] = Seq( /* (corruptedCheckpointVersion, validCheckpointVersions, deltaVersions, startCheckpoint) */ @@ -828,11 +829,10 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { .take(4) val checkpoints = singularCheckpointFileStatuses(validVersions) val deltas = deltaFileStatuses(deltaVersions) - val logSegmentOpt = snapshotManager.getLogSegmentAtOrBeforeVersion( + val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(deltas ++ corruptedCheckpoint ++ checkpoints), Optional.empty(), - Optional.empty(), - Optional.empty() /* tableCommitCoordinatorClientHandlerOpt */ + Optional.empty() ) val checkpointVersion = validVersions.sorted.lastOption assert(logSegmentOpt.isPresent()) @@ -848,67 +848,15 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } } - test("getLogSegmentAtOrBeforeVersion: corrupt _last_checkpoint with empty delta log") { + test("getLogSegmentForVersion: corrupt _last_checkpoint with empty delta log") { // listDeltaAndCheckpointFiles = Optional.empty() - val logSegmentOpt = snapshotManager.getLogSegmentAtOrBeforeVersion( + val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(Seq.empty), Optional.of(1), - Optional.empty(), - Optional.empty() /* tableCommitCoordinatorClientHandlerOpt */ + Optional.empty() ) assert(!logSegmentOpt.isPresent()) } - - /* ------------------- COORDINATED COMMITS TESTS ------------------ */ - test("read with getCommits return empty lists") { - testWithCheckpoints( - (0L to 50L), /* deltaVersions */ - Seq(10, 30, 50), /* checkpointVersions */ - Seq(20, 40), /* multiCheckpointVersions */ - numParts = 5 /* numParts */ - ) - } - - test("read with getCommits return a list with serveral unbackfilled commits") { - val unbackfilledCommits1 = Seq(51L) - testWithCheckpoints( - (0L to 50L), /* deltaVersions */ - Seq(10, 30, 50), /* checkpointVersions */ - Seq(20, 40), /* multiCheckpointVersions */ - numParts = 5, /* numParts */ - unbackfilledDeltaVersions = unbackfilledCommits1 - ) - - val unbackfilledCommits2 = 51L to 60L - testWithCheckpoints( - (0L to 50L), /* deltaVersions */ - Seq(10, 30, 50), /* checkpointVersions */ - Seq(20, 40), /* multiCheckpointVersions */ - numParts = 5, /* numParts */ - unbackfilledDeltaVersions = unbackfilledCommits2 - ) - - val unbackfilledCommits3 = 25L to 60L - testWithCheckpoints( - (0L to 50L), /* deltaVersions */ - Seq(10, 30, 50), /* checkpointVersions */ - Seq(20, 40), /* multiCheckpointVersions */ - numParts = 5, /* numParts */ - unbackfilledDeltaVersions = unbackfilledCommits3 - ) - } - - test("read with getCommits throws an exception") { - val errMsg = "getCommits failed" - testExpectedError[RuntimeException]( - files = deltaFileStatuses(0L until 10L), - versionToLoad = Optional.of(5), - expectedErrorMessageContains = errMsg, - tableCommitCoordinatorClientHandlerOpt = - Optional.of(new MockTableCommitCoordinatorClientHandler( - logPath, e = new RuntimeException(errMsg))) - ) - } } trait SidecarIteratorProvider extends VectorTestUtils { @@ -948,18 +896,3 @@ class MockSidecarJsonHandler(sidecars: Seq[FileStatus]) predicate: Optional[Predicate]): CloseableIterator[ColumnarBatch] = singletonSidecarIterator(sidecars) } - -class MockTableCommitCoordinatorClientHandler( - logPath: Path, versions: Seq[Long] = Seq.empty, e: Throwable = null) - extends TableCommitCoordinatorClientHandler(null, null, null) { - override def getCommits( - startVersion: javaLang.Long, endVersion: javaLang.Long): GetCommitsResponse = { - if (e != null) { - throw e - } - new GetCommitsResponse( - versions - .map(v => new Commit(v, FileStatus.of(FileNames.deltaFile(logPath, v), v, v*10), v)).asJava, - if (versions.isEmpty) -1 else versions.last) - } -} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultEngineErrors.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultEngineErrors.java index 9e4f992b89f..4d60230fb0b 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultEngineErrors.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultEngineErrors.java @@ -28,15 +28,6 @@ public static IllegalArgumentException canNotInstantiateLogStore( return new IllegalArgumentException(msg, cause); } - public static IllegalArgumentException canNotInstantiateCommitCoordinatorBuilder( - String commitCoordinatorBuilderClassName, String context, Exception cause) { - String msg = - format( - "Can not instantiate `CommitCoordinatorBuilder` class (%s): %s", - context, commitCoordinatorBuilderClassName); - return new IllegalArgumentException(msg, cause); - } - /** * Exception for when the default expression evaluator cannot evaluate an expression. *