diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/annotation/VisibleForTesting.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/annotation/VisibleForTesting.java new file mode 100644 index 00000000000..8724a91c353 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/annotation/VisibleForTesting.java @@ -0,0 +1,30 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.internal.annotation; + +import java.lang.annotation.*; + +/** + * Indicates that the visibility of a program element (such as a field, method, or class) is + * intentionally wider than necessary for testing purposes. This annotation serves as documentation + * for developers and tooling, clarifying that the element is not intended for production use but + * must be visible for test code. + */ +@Documented +@Retention(RetentionPolicy.CLASS) +@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.CONSTRUCTOR}) +public @interface VisibleForTesting {} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index e7ed07dbf35..427f2a1e989 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -34,6 +34,7 @@ import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.*; import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.annotation.VisibleForTesting; import io.delta.kernel.internal.checkpoints.*; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.ListUtils; @@ -51,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.LongStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,42 +78,6 @@ public SnapshotManager(Path logPath, Path tablePath) { // Public APIs // ///////////////// - /** - * - Verify the versions are contiguous. - Verify the versions start with `expectedStartVersion` - * if it's specified. - Verify the versions end with `expectedEndVersion` if it's specified. - */ - public static void verifyDeltaVersions( - List versions, - Optional expectedStartVersion, - Optional expectedEndVersion, - Path tablePath) { - if (!versions.isEmpty()) { - List contVersions = - LongStream.rangeClosed(versions.get(0), versions.get(versions.size() - 1)) - .boxed() - .collect(Collectors.toList()); - if (!contVersions.equals(versions)) { - throw new InvalidTableException( - tablePath.toString(), - String.format("Missing delta files: versions are not contiguous: (%s)", versions)); - } - } - expectedStartVersion.ifPresent( - v -> { - checkArgument( - !versions.isEmpty() && Objects.equals(versions.get(0), v), - "Did not get the first delta file version %s to compute Snapshot", - v); - }); - expectedEndVersion.ifPresent( - v -> { - checkArgument( - !versions.isEmpty() && Objects.equals(versions.get(versions.size() - 1), v), - "Did not get the last delta file version %s to compute Snapshot", - v); - }); - } - /** * Construct the latest snapshot for given table. * @@ -251,6 +215,42 @@ public void checkpoint(Engine engine, Clock clock, long version) // Helper Methods // //////////////////// + /** + * Given a list of delta versions, verifies that they are (1) contiguous, (2) start with + * expectedStartVersion (if provided), and (3) end with expectedEndVersionOpt (if provided). + * Throws an exception if any of these are not true. + * + * @param versions List of versions in sorted increasing order according + */ + @VisibleForTesting + public static void verifyDeltaVersions( + List versions, + Optional expectedStartVersion, + Optional expectedEndVersion, + Path tablePath) { + for (int i = 1; i < versions.size(); i++) { + if (versions.get(i) != versions.get(i - 1) + 1) { + throw new InvalidTableException( + tablePath.toString(), + String.format("Missing delta files: versions are not contiguous: (%s)", versions)); + } + } + expectedStartVersion.ifPresent( + v -> { + checkArgument( + !versions.isEmpty() && Objects.equals(versions.get(0), v), + "Did not get the first delta file version %s to compute Snapshot", + v); + }); + expectedEndVersion.ifPresent( + v -> { + checkArgument( + !versions.isEmpty() && Objects.equals(ListUtils.getLast(versions), v), + "Did not get the last delta file version %s to compute Snapshot", + v); + }); + } + /** * Updates the current `latestSnapshotHint` with the `newHint` if and only if the newHint is newer * (i.e. has a later table version). @@ -661,10 +661,10 @@ protected Optional getLogSegmentForVersion( .map(x -> new Path(x.getPath()).getName()) .toArray()))); - final LinkedList deltaVersionsAfterCheckpoint = + final List deltaVersionsAfterCheckpoint = deltasAfterCheckpoint.stream() .map(fileStatus -> FileNames.deltaVersion(new Path(fileStatus.getPath()))) - .collect(Collectors.toCollection(LinkedList::new)); + .collect(Collectors.toList()); logDebug( () -> format("deltaVersions: %s", Arrays.toString(deltaVersionsAfterCheckpoint.toArray()))); @@ -672,7 +672,7 @@ protected Optional getLogSegmentForVersion( final long newVersion = deltaVersionsAfterCheckpoint.isEmpty() ? newCheckpointOpt.get().version - : deltaVersionsAfterCheckpoint.getLast(); + : ListUtils.getLast(deltaVersionsAfterCheckpoint); // There should be a delta file present for the newVersion that we are loading // (Even if `deltasAfterCheckpoint` is empty, `deltas` should not be) @@ -694,22 +694,24 @@ protected Optional getLogSegmentForVersion( // If we have deltas after the checkpoint, the first file should be 1 greater than our // last checkpoint version. If no checkpoint is present, this means the first delta file // should be version 0. - if (deltaVersionsAfterCheckpoint.getFirst() != newCheckpointVersion + 1) { + if (deltaVersionsAfterCheckpoint.get(0) != newCheckpointVersion + 1) { throw new InvalidTableException( tablePath.toString(), String.format( "Unable to reconstruct table state: missing log file for version %s", newCheckpointVersion + 1)); } - logger.info( - "Verified delta files are contiguous from version {} to {}", - newCheckpointVersion + 1, - newVersion); + verifyDeltaVersions( deltaVersionsAfterCheckpoint, Optional.of(newCheckpointVersion + 1), versionToLoadOpt, tablePath); + + logger.info( + "Verified delta files are contiguous from version {} to {}", + newCheckpointVersion + 1, + newVersion); } final long lastCommitTimestamp = deltas.get(deltas.size() - 1).getModificationTime();