From 8f4906fed41655552277662a6b8ef93a67c3206e Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Thu, 2 Oct 2025 16:55:39 -0700 Subject: [PATCH 1/4] Impl + tests --- .../kernel/internal/DeltaHistoryManager.java | 6 +- .../kernel/internal/files/LogDataUtils.java | 90 +++++++ .../internal/snapshot/SnapshotManager.java | 27 +- .../internal/table/SnapshotBuilderImpl.java | 28 +- .../internal/files/LogDataUtilsSuite.scala | 239 ++++++++++++++++++ 5 files changed, 338 insertions(+), 52 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java create mode 100644 kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/LogDataUtilsSuite.scala diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java index 951b8db6c9e..769677df5d6 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java @@ -26,6 +26,7 @@ import io.delta.kernel.internal.actions.CommitInfo; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.checkpoints.CheckpointInstance; +import io.delta.kernel.internal.files.LogDataUtils; import io.delta.kernel.internal.files.ParsedDeltaData; import io.delta.kernel.internal.files.ParsedLogData; import io.delta.kernel.internal.fs.Path; @@ -164,10 +165,7 @@ public static Commit getActiveCommitAtTimestamp( throws TableNotFoundException { // For now, we only accept ratified staged commits - checkArgument( - parsedDeltaDatas.stream() - .allMatch(deltaData -> deltaData.isFile() && deltaData.isRatifiedCommit()), - "Currently getActiveCommitAtTimestamp only accepts ratified staged file commits"); + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(parsedDeltaDatas); // Create a mapper for delta version -> file status that takes into account ratified commits Function versionToFileStatusFunction = diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java new file mode 100644 index 00000000000..92c0468b33f --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java @@ -0,0 +1,90 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.files; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + +import io.delta.kernel.internal.lang.ListUtils; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +// TODO: update after https://github.com/delta-io/delta/pull/5279 is merged +public final class LogDataUtils { + public static void validateLogDataContainsOnlyRatifiedStagedCommits( + List logDatas) { + Function isRatifiedStagedCommit = + logData -> + logData instanceof ParsedDeltaData + && ((ParsedDeltaData) logData).isRatifiedCommit() + && logData.isFile(); + for (ParsedLogData logData : logDatas) { + checkArgument( + isRatifiedStagedCommit.apply(logData), + "Only staged ratified commits are supported, but found: " + logData); + } + } + + public static void validateLogDataIsSortedContiguous(List logDatas) { + if (logDatas.size() > 1) { + for (int i = 1; i < logDatas.size(); i++) { + final ParsedLogData prev = logDatas.get(i - 1); + final ParsedLogData curr = logDatas.get(i); + checkArgument( + prev.getVersion() + 1 == curr.getVersion(), + String.format( + "Log data must be sorted and contiguous, but found: %s and %s", prev, curr)); + } + } + } + + /** + * Combines a list of published Deltas and ratified Deltas into a single list of Deltas such that + * there is exactly one {@link ParsedDeltaData} per version. When there is both a published Delta + * and a ratified staged Delta for the same version, prioritizes the ratified Delta. + * + *

The method requires but does not validate the following: + * + *

    + *
  • {@code publishedDeltas} are sorted and contiguous + *
  • {@code ratifiedDeltas} are sorted and contiguous + *
  • the commit versions present in {@code publishedDeltas} and {@code ratifiedDeltas}, when + * combined, reflect a contiguous version range. In other words, if the two do not overlap, + * publishedDeltas.last = ratifiedDeltas.first + 1). + *
+ */ + public static List combinePublishedAndRatifiedDeltasWithCatalogPriority( + List publishedDeltas, List ratifiedDeltas) { + if (ratifiedDeltas.isEmpty()) { + return publishedDeltas; + } + + if (publishedDeltas.isEmpty()) { + return ratifiedDeltas; + } + + final long firstRatified = ratifiedDeltas.get(0).getVersion(); + final long lastRatified = ListUtils.getLast(ratifiedDeltas).getVersion(); + + return Stream.of( + publishedDeltas.stream().filter(x -> x.getVersion() < firstRatified), + ratifiedDeltas.stream(), + publishedDeltas.stream().filter(x -> x.getVersion() > lastRatified)) + .flatMap(Function.identity()) + .collect(Collectors.toList()); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 4b530c2647d..62ce046274a 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -27,11 +27,7 @@ import io.delta.kernel.internal.annotation.VisibleForTesting; import io.delta.kernel.internal.checkpoints.*; import io.delta.kernel.internal.commit.DefaultFileSystemManagedTableOnlyCommitter; -import io.delta.kernel.internal.files.ParsedCheckpointData; -import io.delta.kernel.internal.files.ParsedChecksumData; -import io.delta.kernel.internal.files.ParsedDeltaData; -import io.delta.kernel.internal.files.ParsedLogCompactionData; -import io.delta.kernel.internal.files.ParsedLogData; +import io.delta.kernel.internal.files.*; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.lang.ListUtils; @@ -43,9 +39,7 @@ import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.utils.FileStatus; import java.util.*; -import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -589,23 +583,8 @@ private List getAllDeltasAfterCheckpointWithCatalogPriority( .map(ParsedDeltaData.class::cast) .collect(Collectors.toList()); - if (allRatifiedCommitsAfterCheckpoint.isEmpty()) { - return allPublishedDeltasAfterCheckpoint; - } - - if (allPublishedDeltasAfterCheckpoint.isEmpty()) { - return allRatifiedCommitsAfterCheckpoint; - } - - final long firstRatified = allRatifiedCommitsAfterCheckpoint.get(0).getVersion(); - final long lastRatified = ListUtils.getLast(allRatifiedCommitsAfterCheckpoint).getVersion(); - - return Stream.of( - allPublishedDeltasAfterCheckpoint.stream().filter(x -> x.getVersion() < firstRatified), - allRatifiedCommitsAfterCheckpoint.stream(), - allPublishedDeltasAfterCheckpoint.stream().filter(x -> x.getVersion() > lastRatified)) - .flatMap(Function.identity()) - .collect(Collectors.toList()); + return LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + allPublishedDeltasAfterCheckpoint, allRatifiedCommitsAfterCheckpoint); } /** diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotBuilderImpl.java index 5155815c8f6..7aa094a464d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotBuilderImpl.java @@ -27,7 +27,7 @@ import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; -import io.delta.kernel.internal.files.ParsedDeltaData; +import io.delta.kernel.internal.files.LogDataUtils; import io.delta.kernel.internal.files.ParsedLogData; import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.Tuple2; @@ -121,8 +121,9 @@ private void validateInputOnBuild(Engine engine) { validateVersionAndTimestampMutuallyExclusive(); validateProtocolAndMetadataOnlyIfVersionProvided(); validateProtocolRead(); - validateLogDataContainsOnlyRatifiedCommits(); // TODO: delta-io/delta#4765 support other types - validateLogDataIsSortedContiguous(); + // TODO: delta-io/delta#4765 support other types + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(ctx.logDatas); + LogDataUtils.validateLogDataIsSortedContiguous(ctx.logDatas); } private void validateVersionNonNegative() { @@ -170,25 +171,4 @@ private void validateProtocolRead() { ctx.protocolAndMetadataOpt.ifPresent( x -> TableFeatures.validateKernelCanReadTheTable(x._1, ctx.unresolvedPath)); } - - private void validateLogDataContainsOnlyRatifiedCommits() { - for (ParsedLogData logData : ctx.logDatas) { - checkArgument( - logData instanceof ParsedDeltaData && logData.isFile(), - "Only staged ratified commits are supported, but found: " + logData); - } - } - - private void validateLogDataIsSortedContiguous() { - if (ctx.logDatas.size() > 1) { - for (int i = 1; i < ctx.logDatas.size(); i++) { - final ParsedLogData prev = ctx.logDatas.get(i - 1); - final ParsedLogData curr = ctx.logDatas.get(i); - checkArgument( - prev.getVersion() + 1 == curr.getVersion(), - String.format( - "Log data must be sorted and contiguous, but found: %s and %s", prev, curr)); - } - } - } } diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/LogDataUtilsSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/LogDataUtilsSuite.scala new file mode 100644 index 00000000000..f706d3a8a9a --- /dev/null +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/LogDataUtilsSuite.scala @@ -0,0 +1,239 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.internal.files + +import scala.collection.JavaConverters._ + +import io.delta.kernel.test.{MockFileSystemClientUtils, VectorTestUtils} + +import org.scalatest.funsuite.AnyFunSuite + +class LogDataUtilsSuite extends AnyFunSuite with MockFileSystemClientUtils with VectorTestUtils { + + private val emptyInlineData = emptyColumnarBatch + + ////////////////////////////////////////////////////// + // validateLogDataContainsOnlyRatifiedStagedCommits // + ////////////////////////////////////////////////////// + + test("validateLogDataContainsOnlyRatifiedStagedCommits: empty list passes") { + // Should not throw any exception + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(Seq.empty.asJava) + } + + test("validateLogDataContainsOnlyRatifiedStagedCommits: valid list passes") { + val logDatas = Seq( + ParsedDeltaData.forFileStatus(stagedCommitFile(1)), + ParsedDeltaData.forFileStatus(stagedCommitFile(2))) + // Should not throw any exception + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(logDatas.asJava) + } + + test("validateLogDataContainsOnlyRatifiedStagedCommits: inline delta fails") { + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits( + Seq(ParsedDeltaData.forInlineData(3, emptyInlineData)).asJava) + } + } + + test("validateLogDataContainsOnlyRatifiedStagedCommits: published delta fails") { + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits( + Seq(ParsedDeltaData.forFileStatus(deltaFileStatus(1))).asJava) + } + } + + test("validateLogDataContainsOnlyRatifiedStagedCommits: checkpoint data fails") { + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits( + Seq(ParsedClassicCheckpointData.forFileStatus(classicCheckpointFileStatus(0))).asJava) + } + } + + /////////////////////////////////////// + // validateLogDataIsSortedContiguous // + /////////////////////////////////////// + + test("validateLogDataIsSortedContiguous: empty list should pass") { + // Should not throw any exception + LogDataUtils.validateLogDataIsSortedContiguous(Seq.empty.asJava) + } + + test("validateLogDataIsSortedContiguous: single element should pass") { + val singleElement = Seq(ParsedDeltaData.forFileStatus(deltaFileStatus(1))) + + // Should not throw any exception + LogDataUtils.validateLogDataIsSortedContiguous(singleElement.asJava) + } + + test("validateLogDataIsSortedContiguous: contiguous versions should pass") { + val contiguousData = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2)), + ParsedDeltaData.forFileStatus(deltaFileStatus(3))) + + // Should not throw any exception + LogDataUtils.validateLogDataIsSortedContiguous(contiguousData.asJava) + } + + test("validateLogDataIsSortedContiguous: non-contiguous versions should fail") { + val nonContiguousData = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(3)) // Missing version 2 + ) + + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataIsSortedContiguous(nonContiguousData.asJava) + } + } + + test("validateLogDataIsSortedContiguous: unsorted versions should fail") { + val unsortedData = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(2)), + ParsedDeltaData.forFileStatus(deltaFileStatus(1))) + + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataIsSortedContiguous(unsortedData.asJava) + } + } + + test("validateLogDataIsSortedContiguous: duplicate versions should fail") { + val duplicateData = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(1))) + + intercept[IllegalArgumentException] { + LogDataUtils.validateLogDataIsSortedContiguous(duplicateData.asJava) + } + } + + test("validateLogDataIsSortedContiguous: mixed log data types should work") { + val mixedData = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedLogData.forFileStatus(checksumFileStatus(2)), + ParsedLogData.forFileStatus(classicCheckpointFileStatus(3))) + + // Should not throw any exception + LogDataUtils.validateLogDataIsSortedContiguous(mixedData.asJava) + } + + ////////////////////////////////////////////////////////// + // combinePublishedAndRatifiedDeltasWithCatalogPriority // + ////////////////////////////////////////////////////////// + + test("combinePublishedAndRatifiedDeltasWithCatalogPriority: empty published, empty ratified") { + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + Seq.empty.asJava, + Seq.empty.asJava) + assert(result.isEmpty) + } + + test( + "combinePublishedAndRatifiedDeltasWithCatalogPriority: empty published, non-empty ratified") { + val ratifiedDeltas = Seq( + ParsedDeltaData.forFileStatus(stagedCommitFile(1)), + ParsedDeltaData.forFileStatus(stagedCommitFile(2))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + Seq.empty.asJava, + ratifiedDeltas.asJava) + + assert(result.asScala === ratifiedDeltas) + } + + test( + "combinePublishedAndRatifiedDeltasWithCatalogPriority: non-empty published, empty ratified") { + val publishedDeltas = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + publishedDeltas.asJava, + Seq.empty.asJava) + + assert(result.asScala === publishedDeltas) + } + + test("combinePublishedAndRatifiedDeltasWithCatalogPriority: non-overlapping ranges") { + val publishedDeltas = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2))) + val ratifiedDeltas = Seq( + ParsedDeltaData.forFileStatus(stagedCommitFile(3)), + ParsedDeltaData.forFileStatus(stagedCommitFile(4))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + publishedDeltas.asJava, + ratifiedDeltas.asJava) + + assert(result.asScala === publishedDeltas ++ ratifiedDeltas) + } + + test( + "combinePublishedAndRatifiedDeltasWithCatalogPriority: " + + "overlapping ranges - ratified priority") { + val publishedDeltas = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2))) + val ratifiedDeltas = Seq( + ParsedDeltaData.forFileStatus(stagedCommitFile(2)), + ParsedDeltaData.forFileStatus(stagedCommitFile(3))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + publishedDeltas.asJava, + ratifiedDeltas.asJava) + + val expected = Seq(publishedDeltas.head) ++ ratifiedDeltas + assert(result.asScala === expected) + } + + test("combinePublishedAndRatifiedDeltasWithCatalogPriority: ratified in middle of published") { + val publishedDeltas = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2)), + ParsedDeltaData.forFileStatus(deltaFileStatus(3)), + ParsedDeltaData.forFileStatus(deltaFileStatus(4))) + val ratifiedDeltas = Seq( + ParsedDeltaData.forFileStatus(stagedCommitFile(2)), + ParsedDeltaData.forFileStatus(stagedCommitFile(3))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + publishedDeltas.asJava, + ratifiedDeltas.asJava) + + val expected = Seq(publishedDeltas.head) ++ ratifiedDeltas ++ Seq(publishedDeltas(3)) + + assert(result.asScala === expected) + } + + test("combinePublishedAndRatifiedDeltasWithCatalogPriority: single ratified version") { + val publishedDeltas = Seq( + ParsedDeltaData.forFileStatus(deltaFileStatus(1)), + ParsedDeltaData.forFileStatus(deltaFileStatus(2)), + ParsedDeltaData.forFileStatus(deltaFileStatus(3))) + val ratifiedDeltas = Seq( + ParsedDeltaData.forFileStatus(stagedCommitFile(3))) + + val result = LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority( + publishedDeltas.asJava, + ratifiedDeltas.asJava) + + val expected = Seq(publishedDeltas(0), publishedDeltas(1)) ++ ratifiedDeltas + + assert(result.asScala === expected) + } +} From 8d2ffac05286cd486ee1668bceb9565d16ba9e6b Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Fri, 3 Oct 2025 11:04:53 -0700 Subject: [PATCH 2/4] Update for master --- .../java/io/delta/kernel/internal/files/LogDataUtils.java | 1 + .../delta/kernel/internal/files/LogDataUtilsSuite.scala | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java index 742f6bcc602..07242dd9ca3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java @@ -19,6 +19,7 @@ import io.delta.kernel.internal.lang.ListUtils; import java.util.List; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/LogDataUtilsSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/LogDataUtilsSuite.scala index f706d3a8a9a..0fa5d911cb3 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/LogDataUtilsSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/files/LogDataUtilsSuite.scala @@ -37,8 +37,8 @@ class LogDataUtilsSuite extends AnyFunSuite with MockFileSystemClientUtils with test("validateLogDataContainsOnlyRatifiedStagedCommits: valid list passes") { val logDatas = Seq( - ParsedDeltaData.forFileStatus(stagedCommitFile(1)), - ParsedDeltaData.forFileStatus(stagedCommitFile(2))) + ParsedCatalogCommitData.forFileStatus(stagedCommitFile(1)), + ParsedCatalogCommitData.forFileStatus(stagedCommitFile(2))) // Should not throw any exception LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(logDatas.asJava) } @@ -46,14 +46,14 @@ class LogDataUtilsSuite extends AnyFunSuite with MockFileSystemClientUtils with test("validateLogDataContainsOnlyRatifiedStagedCommits: inline delta fails") { intercept[IllegalArgumentException] { LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits( - Seq(ParsedDeltaData.forInlineData(3, emptyInlineData)).asJava) + Seq(ParsedCatalogCommitData.forInlineData(3, emptyInlineData)).asJava) } } test("validateLogDataContainsOnlyRatifiedStagedCommits: published delta fails") { intercept[IllegalArgumentException] { LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits( - Seq(ParsedDeltaData.forFileStatus(deltaFileStatus(1))).asJava) + Seq(ParsedPublishedDeltaData.forFileStatus(deltaFileStatus(1))).asJava) } } From f1c48c7d16b446b6817780d9c389bc6ed8f6c0ba Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Fri, 3 Oct 2025 14:38:50 -0700 Subject: [PATCH 3/4] Update comments --- .../java/io/delta/kernel/internal/DeltaHistoryManager.java | 3 +-- .../main/java/io/delta/kernel/internal/files/LogDataUtils.java | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java index 14ecddf300b..2c295201e95 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaHistoryManager.java @@ -163,8 +163,7 @@ public static Commit getActiveCommitAtTimestamp( boolean canReturnEarliestCommit, List parsedCatalogCommits) throws TableNotFoundException { - - // For now, we only accept ratified staged commits + // For now, we only accept *staged* ratified commits (not inline) LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(parsedCatalogCommits); // Create a mapper for delta version -> file status that takes into account ratified commits diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java index 07242dd9ca3..1390a2ce205 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java @@ -23,7 +23,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -// TODO: update after https://github.com/delta-io/delta/pull/5279 is merged public final class LogDataUtils { public static void validateLogDataContainsOnlyRatifiedStagedCommits( List logDatas) { From e5ee731c211189b0271c85ac5cda3120d8d21f1b Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Fri, 3 Oct 2025 18:02:51 -0700 Subject: [PATCH 4/4] Private constructor --- .../main/java/io/delta/kernel/internal/files/LogDataUtils.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java index 1390a2ce205..1bb1ea63556 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/files/LogDataUtils.java @@ -24,6 +24,9 @@ import java.util.stream.Stream; public final class LogDataUtils { + + private LogDataUtils() {} + public static void validateLogDataContainsOnlyRatifiedStagedCommits( List logDatas) { for (ParsedLogData logData : logDatas) {