Skip to content

Commit

Permalink
Done cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db committed Jan 9, 2025
1 parent 6ca9dd4 commit ed7064d
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.internal.annotation;

import java.lang.annotation.*;

/**
* Indicates that the visibility of a program element (such as a field, method, or class) is
* intentionally wider than necessary for testing purposes. This annotation serves as documentation
* for developers and tooling, clarifying that the element is not intended for production use but
* must be visible for test code.
*/
@Documented
@Retention(RetentionPolicy.CLASS)
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.CONSTRUCTOR})
public @interface VisibleForTesting {}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.annotation.VisibleForTesting;
import io.delta.kernel.internal.checkpoints.*;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
Expand All @@ -51,7 +52,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -78,42 +78,6 @@ public SnapshotManager(Path logPath, Path tablePath) {
// Public APIs //
/////////////////

/**
* - Verify the versions are contiguous. - Verify the versions start with `expectedStartVersion`
* if it's specified. - Verify the versions end with `expectedEndVersion` if it's specified.
*/
public static void verifyDeltaVersions(
List<Long> versions,
Optional<Long> expectedStartVersion,
Optional<Long> expectedEndVersion,
Path tablePath) {
if (!versions.isEmpty()) {
List<Long> contVersions =
LongStream.rangeClosed(versions.get(0), versions.get(versions.size() - 1))
.boxed()
.collect(Collectors.toList());
if (!contVersions.equals(versions)) {
throw new InvalidTableException(
tablePath.toString(),
String.format("Missing delta files: versions are not contiguous: (%s)", versions));
}
}
expectedStartVersion.ifPresent(
v -> {
checkArgument(
!versions.isEmpty() && Objects.equals(versions.get(0), v),
"Did not get the first delta file version %s to compute Snapshot",
v);
});
expectedEndVersion.ifPresent(
v -> {
checkArgument(
!versions.isEmpty() && Objects.equals(versions.get(versions.size() - 1), v),
"Did not get the last delta file version %s to compute Snapshot",
v);
});
}

/**
* Construct the latest snapshot for given table.
*
Expand Down Expand Up @@ -251,6 +215,42 @@ public void checkpoint(Engine engine, Clock clock, long version)
// Helper Methods //
////////////////////

/**
* Given a list of delta versions, verifies that they are (1) contiguous, (2) start with
* expectedStartVersion (if provided), and (3) end with expectedEndVersionOpt (if provided).
* Throws an exception if any of these are not true.
*
* @param versions List of versions in sorted increasing order according
*/
@VisibleForTesting
public static void verifyDeltaVersions(
List<Long> versions,
Optional<Long> expectedStartVersion,
Optional<Long> expectedEndVersion,
Path tablePath) {
for (int i = 1; i < versions.size(); i++) {
if (versions.get(i) != versions.get(i - 1) + 1) {
throw new InvalidTableException(
tablePath.toString(),
String.format("Missing delta files: versions are not contiguous: (%s)", versions));
}
}
expectedStartVersion.ifPresent(
v -> {
checkArgument(
!versions.isEmpty() && Objects.equals(versions.get(0), v),
"Did not get the first delta file version %s to compute Snapshot",
v);
});
expectedEndVersion.ifPresent(
v -> {
checkArgument(
!versions.isEmpty() && Objects.equals(ListUtils.getLast(versions), v),
"Did not get the last delta file version %s to compute Snapshot",
v);
});
}

/**
* Updates the current `latestSnapshotHint` with the `newHint` if and only if the newHint is newer
* (i.e. has a later table version).
Expand Down Expand Up @@ -661,18 +661,18 @@ protected Optional<LogSegment> getLogSegmentForVersion(
.map(x -> new Path(x.getPath()).getName())
.toArray())));

final LinkedList<Long> deltaVersionsAfterCheckpoint =
final List<Long> deltaVersionsAfterCheckpoint =
deltasAfterCheckpoint.stream()
.map(fileStatus -> FileNames.deltaVersion(new Path(fileStatus.getPath())))
.collect(Collectors.toCollection(LinkedList::new));
.collect(Collectors.toList());

logDebug(
() -> format("deltaVersions: %s", Arrays.toString(deltaVersionsAfterCheckpoint.toArray())));

final long newVersion =
deltaVersionsAfterCheckpoint.isEmpty()
? newCheckpointOpt.get().version
: deltaVersionsAfterCheckpoint.getLast();
: ListUtils.getLast(deltaVersionsAfterCheckpoint);

// There should be a delta file present for the newVersion that we are loading
// (Even if `deltasAfterCheckpoint` is empty, `deltas` should not be)
Expand All @@ -694,22 +694,24 @@ protected Optional<LogSegment> getLogSegmentForVersion(
// If we have deltas after the checkpoint, the first file should be 1 greater than our
// last checkpoint version. If no checkpoint is present, this means the first delta file
// should be version 0.
if (deltaVersionsAfterCheckpoint.getFirst() != newCheckpointVersion + 1) {
if (deltaVersionsAfterCheckpoint.get(0) != newCheckpointVersion + 1) {
throw new InvalidTableException(
tablePath.toString(),
String.format(
"Unable to reconstruct table state: missing log file for version %s",
newCheckpointVersion + 1));
}
logger.info(
"Verified delta files are contiguous from version {} to {}",
newCheckpointVersion + 1,
newVersion);

verifyDeltaVersions(
deltaVersionsAfterCheckpoint,
Optional.of(newCheckpointVersion + 1),
versionToLoadOpt,
tablePath);

logger.info(
"Verified delta files are contiguous from version {} to {}",
newCheckpointVersion + 1,
newVersion);
}

final long lastCommitTimestamp = deltas.get(deltas.size() - 1).getModificationTime();
Expand Down

0 comments on commit ed7064d

Please sign in to comment.