diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java index b3a4dc41e94..2c295201e95 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java @@ -26,6 +26,7 @@ import io.delta.kernel.internal.actions.CommitInfo; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.checkpoints.CheckpointInstance; +import io.delta.kernel.internal.files.LogDataUtils; import io.delta.kernel.internal.files.ParsedCatalogCommitData; import io.delta.kernel.internal.files.ParsedLogData; import io.delta.kernel.internal.fs.Path; @@ -163,9 +164,7 @@ public static Commit getActiveCommitAtTimestamp( List parsedCatalogCommits) throws TableNotFoundException { // For now, we only accept *staged* ratified commits (not inline) - checkArgument( - parsedCatalogCommits.stream().allMatch(ParsedCatalogCommitData::isFile), - "Currently getActiveCommitAtTimestamp only accepts ratified staged file commits"); + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(parsedCatalogCommits); // Create a mapper for delta version -> file status that takes into account ratified commits Function versionToFileStatusFunction = diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java new file mode 100644 index 00000000000..1bb1ea63556 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java @@ -0,0 +1,87 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.files; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + +import io.delta.kernel.internal.lang.ListUtils; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public final class LogDataUtils { + + private LogDataUtils() {} + + public static void validateLogDataContainsOnlyRatifiedStagedCommits( + List logDatas) { + for (ParsedLogData logData : logDatas) { + checkArgument( + logData instanceof ParsedCatalogCommitData && logData.isFile(), + "Only staged ratified commits are supported, but found: " + logData); + } + } + + public static void validateLogDataIsSortedContiguous(List logDatas) { + if (logDatas.size() > 1) { + for (int i = 1; i < logDatas.size(); i++) { + final ParsedLogData prev = logDatas.get(i - 1); + final ParsedLogData curr = logDatas.get(i); + checkArgument( + prev.getVersion() + 1 == curr.getVersion(), + String.format( + "Log data must be sorted and contiguous, but found: %s and %s", prev, curr)); + } + } + } + + /** + * Combines a list of published Deltas and ratified Deltas into a single list of Deltas such that + * there is exactly one {@link ParsedDeltaData} per version. When there is both a published Delta + * and a ratified staged Delta for the same version, prioritizes the ratified Delta. + * + *

The method requires but does not validate the following: + * + *

+ */ + public static List combinePublishedAndRatifiedDeltasWithCatalogPriority( + List publishedDeltas, List ratifiedDeltas) { + if (ratifiedDeltas.isEmpty()) { + return publishedDeltas; + } + + if (publishedDeltas.isEmpty()) { + return ratifiedDeltas; + } + + final long firstRatified = ratifiedDeltas.get(0).getVersion(); + final long lastRatified = ListUtils.getLast(ratifiedDeltas).getVersion(); + + return Stream.of( + publishedDeltas.stream().filter(x -> x.getVersion() < firstRatified), + ratifiedDeltas.stream(), + publishedDeltas.stream().filter(x -> x.getVersion() > lastRatified)) + .flatMap(Function.identity()) + .collect(Collectors.toList()); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 5e5797cd372..64dbd6f51a8 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -39,9 +39,7 @@ import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.utils.FileStatus; import java.util.*; -import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -586,23 +584,8 @@ private List getAllDeltasAfterCheckpointWithCatalogPriority( .map(ParsedCatalogCommitData.class::cast) .collect(Collectors.toList()); - if (allRatifiedCommitsAfterCheckpoint.isEmpty()) { - return allPublishedDeltasAfterCheckpoint; - } - - if (allPublishedDeltasAfterCheckpoint.isEmpty()) { - return allRatifiedCommitsAfterCheckpoint; - } - - final long firstRatified = allRatifiedCommitsAfterCheckpoint.get(0).getVersion(); - final long lastRatified = ListUtils.getLast(allRatifiedCommitsAfterCheckpoint).getVersion(); - - return Stream.of( - allPublishedDeltasAfterCheckpoint.stream().filter(x -> x.getVersion() < firstRatified), - allRatifiedCommitsAfterCheckpoint.stream(), - allPublishedDeltasAfterCheckpoint.stream().filter(x -> x.getVersion() > lastRatified)) - .flatMap(Function.identity()) - .collect(Collectors.toList()); + return LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + allPublishedDeltasAfterCheckpoint, allRatifiedCommitsAfterCheckpoint); } /** diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotBuilderImpl.java index 1213b104334..7aa094a464d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotBuilderImpl.java @@ -27,7 +27,7 @@ import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; -import io.delta.kernel.internal.files.ParsedCatalogCommitData; +import io.delta.kernel.internal.files.LogDataUtils; import io.delta.kernel.internal.files.ParsedLogData; import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.Tuple2; @@ -122,8 +122,8 @@ private void validateInputOnBuild(Engine engine) { validateProtocolAndMetadataOnlyIfVersionProvided(); validateProtocolRead(); // TODO: delta-io/delta#4765 support other types - validateLogDataContainsOnlyStagedRatifiedCommits(); - validateLogDataIsSortedContiguous(); + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(ctx.logDatas); + LogDataUtils.validateLogDataIsSortedContiguous(ctx.logDatas); } private void validateVersionNonNegative() { @@ -171,25 +171,4 @@ private void validateProtocolRead() { ctx.protocolAndMetadataOpt.ifPresent( x -> TableFeatures.validateKernelCanReadTheTable(x._1, ctx.unresolvedPath)); } - - private void validateLogDataContainsOnlyStagedRatifiedCommits() { - for (ParsedLogData logData : ctx.logDatas) { - checkArgument( - logData instanceof ParsedCatalogCommitData && logData.isFile(), - "Only staged ratified commits are supported, but found: " + logData); - } - } - - private void validateLogDataIsSortedContiguous() { - if (ctx.logDatas.size() > 1) { - for (int i = 1; i < ctx.logDatas.size(); i++) { - final ParsedLogData prev = ctx.logDatas.get(i - 1); - final ParsedLogData curr = ctx.logDatas.get(i); - checkArgument( - prev.getVersion() + 1 == curr.getVersion(), - String.format( - "Log data must be sorted and contiguous, but found: %s and %s", prev, curr)); - } - } - } } diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/LogDataUtilsSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/LogDataUtilsSuite.scala new file mode 100644 index 00000000000..0fa5d911cb3 --- /dev/null +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/LogDataUtilsSuite.scala @@ -0,0 +1,239 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.internal.files + +import scala.collection.JavaConverters._ + +import io.delta.kernel.test.{MockFileSystemClientUtils, VectorTestUtils} + +import org.scalatest.funsuite.AnyFunSuite + +class LogDataUtilsSuite extends AnyFunSuite with MockFileSystemClientUtils with VectorTestUtils { + + private val emptyInlineData = emptyColumnarBatch + + ////////////////////////////////////////////////////// + // validateLogDataContainsOnlyRatifiedStagedCommits // + ////////////////////////////////////////////////////// + + test("validateLogDataContainsOnlyRatifiedStagedCommits: empty list passes") { + // Should not throw any exception + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(Seq.empty.asJava) + } + + test("validateLogDataContainsOnlyRatifiedStagedCommits: valid list passes") { + val logDatas = Seq( + ParsedCatalogCommitData.forFileStatus(stagedCommitFile(1)), + ParsedCatalogCommitData.forFileStatus(stagedCommitFile(2))) + // Should not throw any exception + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(logDatas.asJava) + } + + test("validateLogDataContainsOnlyRatifiedStagedCommits: inline delta fails") { + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits( + Seq(ParsedCatalogCommitData.forInlineData(3, emptyInlineData)).asJava) + } + } + + test("validateLogDataContainsOnlyRatifiedStagedCommits: published delta fails") { + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits( + Seq(ParsedPublishedDeltaData.forFileStatus(deltaFileStatus(1))).asJava) + } + } + + test("validateLogDataContainsOnlyRatifiedStagedCommits: checkpoint data fails") { + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits( + Seq(ParsedClassicCheckpointData.forFileStatus(classicCheckpointFileStatus(0))).asJava) + } + } + + /////////////////////////////////////// + // validateLogDataIsSortedContiguous // + /////////////////////////////////////// + + test("validateLogDataIsSortedContiguous: empty list should pass") { + // Should not throw any exception + LogDataUtils.validateLogDataIsSortedContiguous(Seq.empty.asJava) + } + + test("validateLogDataIsSortedContiguous: single element should pass") { + val singleElement = Seq(ParsedDeltaData.forFileStatus(deltaFileStatus(1))) + + // Should not throw any exception + LogDataUtils.validateLogDataIsSortedContiguous(singleElement.asJava) + } + + test("validateLogDataIsSortedContiguous: contiguous versions should pass") { + val contiguousData = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2)), + ParsedDeltaData.forFileStatus(deltaFileStatus(3))) + + // Should not throw any exception + LogDataUtils.validateLogDataIsSortedContiguous(contiguousData.asJava) + } + + test("validateLogDataIsSortedContiguous: non-contiguous versions should fail") { + val nonContiguousData = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(3)) // Missing version 2 + ) + + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataIsSortedContiguous(nonContiguousData.asJava) + } + } + + test("validateLogDataIsSortedContiguous: unsorted versions should fail") { + val unsortedData = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(2)), + ParsedDeltaData.forFileStatus(deltaFileStatus(1))) + + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataIsSortedContiguous(unsortedData.asJava) + } + } + + test("validateLogDataIsSortedContiguous: duplicate versions should fail") { + val duplicateData = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(1))) + + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataIsSortedContiguous(duplicateData.asJava) + } + } + + test("validateLogDataIsSortedContiguous: mixed log data types should work") { + val mixedData = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedLogData.forFileStatus(checksumFileStatus(2)), + ParsedLogData.forFileStatus(classicCheckpointFileStatus(3))) + + // Should not throw any exception + LogDataUtils.validateLogDataIsSortedContiguous(mixedData.asJava) + } + + ////////////////////////////////////////////////////////// + // combinePublishedAndRatifiedDeltasWithCatalogPriority // + ////////////////////////////////////////////////////////// + + test("combinePublishedAndRatifiedDeltasWithCatalogPriority: empty published, empty ratified") { + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + Seq.empty.asJava, + Seq.empty.asJava) + assert(result.isEmpty) + } + + test( + "combinePublishedAndRatifiedDeltasWithCatalogPriority: empty published, non-empty ratified") { + val ratifiedDeltas = Seq( + ParsedDeltaData.forFileStatus(stagedCommitFile(1)), + ParsedDeltaData.forFileStatus(stagedCommitFile(2))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + Seq.empty.asJava, + ratifiedDeltas.asJava) + + assert(result.asScala === ratifiedDeltas) + } + + test( + "combinePublishedAndRatifiedDeltasWithCatalogPriority: non-empty published, empty ratified") { + val publishedDeltas = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + publishedDeltas.asJava, + Seq.empty.asJava) + + assert(result.asScala === publishedDeltas) + } + + test("combinePublishedAndRatifiedDeltasWithCatalogPriority: non-overlapping ranges") { + val publishedDeltas = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2))) + val ratifiedDeltas = Seq( + ParsedDeltaData.forFileStatus(stagedCommitFile(3)), + ParsedDeltaData.forFileStatus(stagedCommitFile(4))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + publishedDeltas.asJava, + ratifiedDeltas.asJava) + + assert(result.asScala === publishedDeltas ++ ratifiedDeltas) + } + + test( + "combinePublishedAndRatifiedDeltasWithCatalogPriority: " + + "overlapping ranges - ratified priority") { + val publishedDeltas = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2))) + val ratifiedDeltas = Seq( + ParsedDeltaData.forFileStatus(stagedCommitFile(2)), + ParsedDeltaData.forFileStatus(stagedCommitFile(3))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + publishedDeltas.asJava, + ratifiedDeltas.asJava) + + val expected = Seq(publishedDeltas.head) ++ ratifiedDeltas + assert(result.asScala === expected) + } + + test("combinePublishedAndRatifiedDeltasWithCatalogPriority: ratified in middle of published") { + val publishedDeltas = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2)), + ParsedDeltaData.forFileStatus(deltaFileStatus(3)), + ParsedDeltaData.forFileStatus(deltaFileStatus(4))) + val ratifiedDeltas = Seq( + ParsedDeltaData.forFileStatus(stagedCommitFile(2)), + ParsedDeltaData.forFileStatus(stagedCommitFile(3))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + publishedDeltas.asJava, + ratifiedDeltas.asJava) + + val expected = Seq(publishedDeltas.head) ++ ratifiedDeltas ++ Seq(publishedDeltas(3)) + + assert(result.asScala === expected) + } + + test("combinePublishedAndRatifiedDeltasWithCatalogPriority: single ratified version") { + val publishedDeltas = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2)), + ParsedDeltaData.forFileStatus(deltaFileStatus(3))) + val ratifiedDeltas = Seq( + ParsedDeltaData.forFileStatus(stagedCommitFile(3))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + publishedDeltas.asJava, + ratifiedDeltas.asJava) + + val expected = Seq(publishedDeltas(0), publishedDeltas(1)) ++ ratifiedDeltas + + assert(result.asScala === expected) + } +}