Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.checkpoints.CheckpointInstance;
import io.delta.kernel.internal.files.LogDataUtils;
import io.delta.kernel.internal.files.ParsedCatalogCommitData;
import io.delta.kernel.internal.files.ParsedLogData;
import io.delta.kernel.internal.fs.Path;
Expand Down Expand Up @@ -163,9 +164,7 @@ public static Commit getActiveCommitAtTimestamp(
List<ParsedCatalogCommitData> parsedCatalogCommits)
throws TableNotFoundException {
// For now, we only accept *staged* ratified commits (not inline)
checkArgument(
parsedCatalogCommits.stream().allMatch(ParsedCatalogCommitData::isFile),
"Currently getActiveCommitAtTimestamp only accepts ratified staged file commits");
LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(parsedCatalogCommits);

// Create a mapper for delta version -> file status that takes into account ratified commits
Function<Long, FileStatus> versionToFileStatusFunction =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.files;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;

import io.delta.kernel.internal.lang.ListUtils;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public final class LogDataUtils {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private constructor


private LogDataUtils() {}

public static void validateLogDataContainsOnlyRatifiedStagedCommits(
List<? extends ParsedLogData> logDatas) {
for (ParsedLogData logData : logDatas) {
checkArgument(
logData instanceof ParsedCatalogCommitData && logData.isFile(),
"Only staged ratified commits are supported, but found: " + logData);
}
}

public static void validateLogDataIsSortedContiguous(List<? extends ParsedLogData> logDatas) {
if (logDatas.size() > 1) {
for (int i = 1; i < logDatas.size(); i++) {
final ParsedLogData prev = logDatas.get(i - 1);
final ParsedLogData curr = logDatas.get(i);
checkArgument(
prev.getVersion() + 1 == curr.getVersion(),
String.format(
"Log data must be sorted and contiguous, but found: %s and %s", prev, curr));
}
}
}

/**
* Combines a list of published Deltas and ratified Deltas into a single list of Deltas such that
* there is exactly one {@link ParsedDeltaData} per version. When there is both a published Delta
* and a ratified staged Delta for the same version, prioritizes the ratified Delta.
*
* <p>The method requires but does not validate the following:
*
* <ul>
* <li>{@code publishedDeltas} are sorted and contiguous
* <li>{@code ratifiedDeltas} are sorted and contiguous
* <li>the commit versions present in {@code publishedDeltas} and {@code ratifiedDeltas}, when
* combined, reflect a contiguous version range. In other words, if the two do not overlap,
* publishedDeltas.last = ratifiedDeltas.first + 1).
* </ul>
*/
public static List<ParsedDeltaData> combinePublishedAndRatifiedDeltasWithCatalogPriority(
List<ParsedDeltaData> publishedDeltas, List<ParsedDeltaData> ratifiedDeltas) {
if (ratifiedDeltas.isEmpty()) {
return publishedDeltas;
}

if (publishedDeltas.isEmpty()) {
return ratifiedDeltas;
}

final long firstRatified = ratifiedDeltas.get(0).getVersion();
final long lastRatified = ListUtils.getLast(ratifiedDeltas).getVersion();

return Stream.of(
publishedDeltas.stream().filter(x -> x.getVersion() < firstRatified),
ratifiedDeltas.stream(),
publishedDeltas.stream().filter(x -> x.getVersion() > lastRatified))
.flatMap(Function.identity())
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.utils.FileStatus;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -586,23 +584,8 @@ private List<ParsedDeltaData> getAllDeltasAfterCheckpointWithCatalogPriority(
.map(ParsedCatalogCommitData.class::cast)
.collect(Collectors.toList());

if (allRatifiedCommitsAfterCheckpoint.isEmpty()) {
return allPublishedDeltasAfterCheckpoint;
}

if (allPublishedDeltasAfterCheckpoint.isEmpty()) {
return allRatifiedCommitsAfterCheckpoint;
}

final long firstRatified = allRatifiedCommitsAfterCheckpoint.get(0).getVersion();
final long lastRatified = ListUtils.getLast(allRatifiedCommitsAfterCheckpoint).getVersion();

return Stream.of(
allPublishedDeltasAfterCheckpoint.stream().filter(x -> x.getVersion() < firstRatified),
allRatifiedCommitsAfterCheckpoint.stream(),
allPublishedDeltasAfterCheckpoint.stream().filter(x -> x.getVersion() > lastRatified))
.flatMap(Function.identity())
.collect(Collectors.toList());
return LogDataUtils.combinePublishedAndRatifiedDeltasWithCatalogPriority(
allPublishedDeltasAfterCheckpoint, allRatifiedCommitsAfterCheckpoint);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.files.ParsedCatalogCommitData;
import io.delta.kernel.internal.files.LogDataUtils;
import io.delta.kernel.internal.files.ParsedLogData;
import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.kernel.internal.util.Tuple2;
Expand Down Expand Up @@ -122,8 +122,8 @@ private void validateInputOnBuild(Engine engine) {
validateProtocolAndMetadataOnlyIfVersionProvided();
validateProtocolRead();
// TODO: delta-io/delta#4765 support other types
validateLogDataContainsOnlyStagedRatifiedCommits();
validateLogDataIsSortedContiguous();
LogDataUtils.validateLogDataContainsOnlyRatifiedStagedCommits(ctx.logDatas);
LogDataUtils.validateLogDataIsSortedContiguous(ctx.logDatas);
}

private void validateVersionNonNegative() {
Expand Down Expand Up @@ -171,25 +171,4 @@ private void validateProtocolRead() {
ctx.protocolAndMetadataOpt.ifPresent(
x -> TableFeatures.validateKernelCanReadTheTable(x._1, ctx.unresolvedPath));
}

private void validateLogDataContainsOnlyStagedRatifiedCommits() {
for (ParsedLogData logData : ctx.logDatas) {
checkArgument(
logData instanceof ParsedCatalogCommitData && logData.isFile(),
"Only staged ratified commits are supported, but found: " + logData);
}
}

private void validateLogDataIsSortedContiguous() {
if (ctx.logDatas.size() > 1) {
for (int i = 1; i < ctx.logDatas.size(); i++) {
final ParsedLogData prev = ctx.logDatas.get(i - 1);
final ParsedLogData curr = ctx.logDatas.get(i);
checkArgument(
prev.getVersion() + 1 == curr.getVersion(),
String.format(
"Log data must be sorted and contiguous, but found: %s and %s", prev, curr));
}
}
}
}
Loading
Loading