Skip to content

Commit

Permalink
[Kernel] Remove CC code from SnapshotManager (#3986)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

We are re-thinking the design of the Coordinated Commits table feature
and much of this snapshot code will be refactored. Remove it for now as
it greatly complicates our snapshot construction, and hopefully we can
be more intentional in our code design/organization when re-implementing
it.


fc81d12
already removed the public interfaces and made it such that
`SnapshotImpl::getTableCommitCoordinatorClientHandlerOpt` never returned
a handler.

## How was this patch tested?

Existing tests should suffice.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
allisonport-db authored Dec 19, 2024
1 parent 34f02d8 commit da58cad
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 329 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.StructType;
import java.util.List;
Expand Down Expand Up @@ -169,16 +168,4 @@ public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) {
public Optional<Long> getLatestTransactionVersion(Engine engine, String applicationId) {
return logReplay.getLatestTransactionIdentifier(engine, applicationId);
}

/**
* Returns the commit coordinator client handler based on the table metadata in this snapshot.
*
* @param engine the engine to use for IO operations
* @return the commit coordinator client handler for this snapshot or empty if the metadata is not
* configured to use the commit coordinator.
*/
public Optional<TableCommitCoordinatorClientHandler> getTableCommitCoordinatorClientHandlerOpt(
Engine engine) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ public static <T> Tuple2<List<T>, List<T>> partition(
return new Tuple2<>(partitionMap.get(true), partitionMap.get(false));
}

public static <T> T last(List<T> list) {
return list.get(list.size() - 1);
}

/** Remove once supported JDK (build) version is 21 or above */
public static <T> T getFirst(List<T> list) {
if (list.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.checkpoints.*;
import io.delta.kernel.internal.coordinatedcommits.Commit;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
Expand All @@ -48,7 +47,6 @@
import java.io.*;
import java.nio.file.FileAlreadyExistsException;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -137,30 +135,14 @@ public Snapshot buildLatestSnapshot(Engine engine) throws TableNotFoundException
public Snapshot getSnapshotAt(Engine engine, long version) throws TableNotFoundException {

Optional<LogSegment> logSegmentOpt =
getLogSegmentAtOrBeforeVersion(
getLogSegmentForVersion(
engine,
Optional.empty(), /* startCheckpointOpt */
Optional.of(version) /* versionToLoadOpt */,
Optional.empty() /* tableCommitHandlerOpt */);

// For non-coordinated commit table, the {@code getCoordinatedCommitsAwareSnapshot} will
// create the snapshot with the {@code logSegmentOpt} built here and will not trigger other
// operations. For coordinated commit table, the {@code getCoordinatedCommitsAwareSnapshot}
// will create the snapshot with the {@code logSegmentOpt} built here and will build the
// logSegment again by also fetching the unbackfilled commits from the commit coordinator.
// With the unbackfilled commits plus the backfilled commits in Delta log, a new snapshot
// will be created.
SnapshotImpl snapshot =
logSegmentOpt
.map(
logSegment ->
getCoordinatedCommitsAwareSnapshot(engine, logSegment, Optional.of(version)))
.orElseThrow(() -> new TableNotFoundException(tablePath.toString()));
long snapshotVer = snapshot.getVersion(engine);
if (snapshotVer != version) {
throw DeltaErrors.versionAfterLatestCommit(tablePath.toString(), version, snapshotVer);
}
return snapshot;
Optional.of(version) /* versionToLoadOpt */);

return logSegmentOpt
.map(logSegment -> createSnapshot(logSegment, engine))
.orElseThrow(() -> new TableNotFoundException(tablePath.toString()));
}

/**
Expand Down Expand Up @@ -322,55 +304,22 @@ private Optional<CloseableIterator<FileStatus>> listFromOrNone(Engine engine, lo
* @param startVersion the version to start. Inclusive.
* @param versionToLoad the optional parameter to set the max version we should return. Inclusive.
* Must be >= startVersion if provided.
* <p>This method lists all delta files and checkpoint files with the following steps:
* <ul>
* <li>When the table is set up with a commit coordinator, retrieve any files that haven't
* been backfilled by initiating a request to the commit coordinator service. If the
* table is not configured to use a commit coordinator, this list will be empty.
* <li>Collect commit files (aka backfilled commits) and checkpoint files by listing the
* contents of the Delta log on storage.
* <li>Filter un-backfilled files to exclude overlapping delta files collected from both
* commit-coordinator and file-system to avoid duplicates.
* <li>Merge and return the backfilled files and filtered un-backfilled files.
* </ul>
* <p>*Note*: If table is a coordinated-commits table, the commit-coordinator client MUST be
* passed to correctly list the commits.
* @param startVersion the version to start. Inclusive.
* @param versionToLoad the optional parameter to set the max version we should return. Inclusive.
* Must be >= startVersion if provided.
* @param tableCommitHandlerOpt the optional commit-coordinator client handler to use for fetching
* un-backfilled commits.
* @return Some array of files found (possibly empty, if no usable commit files are present), or
* None if the listing returned no files at all.
*/
protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
Engine engine,
long startVersion,
Optional<Long> versionToLoad,
Optional<TableCommitCoordinatorClientHandler> tableCommitHandlerOpt) {
Engine engine, long startVersion, Optional<Long> versionToLoad) {
versionToLoad.ifPresent(
v ->
checkArgument(
v >= startVersion,
"versionToLoad=%s provided is less than startVersion=%s",
v,
startVersion));
logger.debug(
"startVersion: {}, versionToLoad: {}, coordinated commits enabled: {}",
startVersion,
versionToLoad,
tableCommitHandlerOpt.isPresent());

// Fetching the unbackfilled commits before doing the log directory listing to avoid a gap
// in delta versions if some delta files are backfilled after the log directory listing but
// before the unbackfilled commits listing
List<Commit> unbackfilledCommits =
getUnbackfilledCommits(tableCommitHandlerOpt, startVersion, versionToLoad);

final AtomicLong maxDeltaVersionSeen = new AtomicLong(startVersion - 1);
Optional<CloseableIterator<FileStatus>> listing = listFromOrNone(engine, startVersion);
Optional<List<FileStatus>> resultFromFsListingOpt =
listing.map(
logger.debug("startVersion: {}, versionToLoad: {}", startVersion, versionToLoad);

return listFromOrNone(engine, startVersion)
.map(
fileStatusesIter -> {
final List<FileStatus> output = new ArrayList<>();

Expand Down Expand Up @@ -406,64 +355,11 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
}
break;
}

// Ideally listFromOrNone should return lexicographically sorted
// files and so maxDeltaVersionSeen should be equal to fileVersion.
// But we are being defensive here and taking max of all the
// fileVersions seen.
if (FileNames.isCommitFile(fileName)) {
maxDeltaVersionSeen.set(
Math.max(
maxDeltaVersionSeen.get(), FileNames.deltaVersion(fileStatus.getPath())));
}
output.add(fileStatus);
}

return output;
});

if (!tableCommitHandlerOpt.isPresent()) {
return resultFromFsListingOpt;
}
List<FileStatus> relevantUnbackfilledCommits =
unbackfilledCommits.stream()
.filter((commit) -> commit.getVersion() > maxDeltaVersionSeen.get())
.filter(
(commit) ->
!versionToLoad.isPresent() || commit.getVersion() <= versionToLoad.get())
.map(Commit::getFileStatus)
.collect(Collectors.toList());

return resultFromFsListingOpt.map(
fsListing -> {
fsListing.addAll(relevantUnbackfilledCommits);
return fsListing;
});
}

private List<Commit> getUnbackfilledCommits(
Optional<TableCommitCoordinatorClientHandler> tableCommitHandlerOpt,
long startVersion,
Optional<Long> versionToLoad) {
try {
return tableCommitHandlerOpt
.map(
commitCoordinatorClientHandler -> {
logger.info(
"Getting un-backfilled commits from commit coordinator for " + "table: {}",
tablePath);
return commitCoordinatorClientHandler
.getCommits(startVersion, versionToLoad.orElse(null))
.getCommits();
})
.orElse(Collections.emptyList());
} catch (Exception e) {
logger.error(
"Failed to get unbackfilled commits of table {} with commit coordinator: {}",
tablePath,
e);
throw e;
}
}

/**
Expand All @@ -482,41 +378,10 @@ private SnapshotImpl getSnapshotAtInit(Engine engine) throws TableNotFoundExcept
Optional<LogSegment> logSegmentOpt = getLogSegmentFrom(engine, lastCheckpointOpt);

return logSegmentOpt
.map(logSegment -> getCoordinatedCommitsAwareSnapshot(engine, logSegment, Optional.empty()))
.map(logSegment -> createSnapshot(logSegment, engine))
.orElseThrow(() -> new TableNotFoundException(tablePath.toString()));
}

/**
* This can be optimized by making snapshot hint optimization to work with coordinated commits.
*
* @see <a href="https://github.com/delta-io/delta/issues/3437">issue #3437</a>.
*/
private SnapshotImpl getCoordinatedCommitsAwareSnapshot(
Engine engine, LogSegment initialSegmentForNewSnapshot, Optional<Long> versionToLoadOpt) {
SnapshotImpl newSnapshot = createSnapshot(initialSegmentForNewSnapshot, engine);

if (versionToLoadOpt.isPresent() && newSnapshot.getVersion(engine) == versionToLoadOpt.get()) {
return newSnapshot;
}

Optional<TableCommitCoordinatorClientHandler> newTableCommitCoordinatorClientHandlerOpt =
newSnapshot.getTableCommitCoordinatorClientHandlerOpt(engine);

if (newTableCommitCoordinatorClientHandlerOpt.isPresent()) {
Optional<LogSegment> segmentOpt =
getLogSegmentAtOrBeforeVersion(
engine,
newSnapshot.getLogSegment().checkpointVersionOpt, /* startCheckpointOpt */
versionToLoadOpt /* versionToLoadOpt */,
newTableCommitCoordinatorClientHandlerOpt /* tableCommitHandlerOpt */);
newSnapshot =
segmentOpt
.map(segment -> createSnapshot(segment, engine))
.orElseThrow(() -> new TableNotFoundException(tablePath.toString()));
}
return newSnapshot;
}

private SnapshotImpl createSnapshot(LogSegment initSegment, Engine engine) {
final String startingFromStr =
initSegment
Expand Down Expand Up @@ -566,31 +431,26 @@ private SnapshotImpl createSnapshot(LogSegment initSegment, Engine engine) {
*/
private Optional<LogSegment> getLogSegmentFrom(
Engine engine, Optional<CheckpointMetaData> startingCheckpoint) {
return getLogSegmentAtOrBeforeVersion(
engine, startingCheckpoint.map(x -> x.version), Optional.empty(), Optional.empty());
return getLogSegmentForVersion(
engine, startingCheckpoint.map(x -> x.version), Optional.empty());
}

/**
* Get a list of files that can be used to compute a Snapshot at or before version
* `versionToLoad`, If `versionToLoad` is not provided, will generate the list of files that are
* needed to load the latest version of the Delta table. This method also performs checks to
* ensure that the delta files are contiguous.
* Get a list of files that can be used to compute a Snapshot at version `versionToLoad`, if
* `versionToLoad` is not provided, will generate the list of files that are needed to load the
* latest version of the Delta table. This method also performs checks to ensure that the delta
* files are contiguous.
*
* @param startCheckpoint A potential start version to perform the listing of the DeltaLog,
* typically that of a known checkpoint. If this version's not provided, we will start listing
* from version 0.
* @param versionToLoad A specific version we try to load, but we may only load a version before
* this version if this version of commit is un-backfilled. Typically used with time travel
* and the Delta streaming source. If not provided, we will try to load the latest version of
* the table.
* @param versionToLoad A specific version to load. Typically used with time travel and the Delta
* streaming source. If not provided, we will try to load the latest version of the table.
* @return Some LogSegment to build a Snapshot if files do exist after the given startCheckpoint.
* None, if the delta log directory was missing or empty.
*/
protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(
Engine engine,
Optional<Long> startCheckpoint,
Optional<Long> versionToLoad,
Optional<TableCommitCoordinatorClientHandler> tableCommitHandlerOpt) {
public Optional<LogSegment> getLogSegmentForVersion(
Engine engine, Optional<Long> startCheckpoint, Optional<Long> versionToLoad) {
// Only use startCheckpoint if it is <= versionToLoad
Optional<Long> startCheckpointToUse =
startCheckpoint.filter(v -> !versionToLoad.isPresent() || v <= versionToLoad.get());
Expand Down Expand Up @@ -620,16 +480,15 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(

long startTimeMillis = System.currentTimeMillis();
final Optional<List<FileStatus>> newFiles =
listDeltaAndCheckpointFiles(engine, startVersion, versionToLoad, tableCommitHandlerOpt);
listDeltaAndCheckpointFiles(engine, startVersion, versionToLoad);
logger.info(
"{}: Took {}ms to list the files after starting checkpoint",
tablePath,
System.currentTimeMillis() - startTimeMillis);

startTimeMillis = System.currentTimeMillis();
try {
return getLogSegmentAtOrBeforeVersion(
engine, startCheckpointToUse, versionToLoad, newFiles, tableCommitHandlerOpt);
return getLogSegmentForVersion(engine, startCheckpointToUse, versionToLoad, newFiles);
} finally {
logger.info(
"{}: Took {}ms to construct a log segment",
Expand All @@ -642,12 +501,11 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(
* Helper function for the getLogSegmentForVersion above. Called with a provided files list, and
* will then try to construct a new LogSegment using that.
*/
protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(
protected Optional<LogSegment> getLogSegmentForVersion(
Engine engine,
Optional<Long> startCheckpointOpt,
Optional<Long> versionToLoadOpt,
Optional<List<FileStatus>> filesOpt,
Optional<TableCommitCoordinatorClientHandler> tableCommitHandlerOpt) {
Optional<List<FileStatus>> filesOpt) {
final List<FileStatus> newFiles;
if (filesOpt.isPresent()) {
newFiles = filesOpt.get();
Expand Down Expand Up @@ -681,8 +539,7 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(
} else if (newFiles.isEmpty()) {
// The directory may be deleted and recreated and we may have stale state in our
// DeltaLog singleton, so try listing from the first version
return getLogSegmentAtOrBeforeVersion(
engine, Optional.empty(), versionToLoadOpt, tableCommitHandlerOpt);
return getLogSegmentForVersion(engine, Optional.empty(), versionToLoadOpt);
}

Tuple2<List<FileStatus>, List<FileStatus>> checkpointsAndDeltas =
Expand Down Expand Up @@ -797,7 +654,7 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(
}

versionToLoadOpt
.filter(v -> v < newVersion)
.filter(v -> v != newVersion)
.ifPresent(
v -> {
throw DeltaErrors.versionAfterLatestCommit(tablePath.toString(), v, newVersion);
Expand All @@ -822,7 +679,7 @@ protected Optional<LogSegment> getLogSegmentAtOrBeforeVersion(
verifyDeltaVersions(
deltaVersionsAfterCheckpoint,
Optional.of(newCheckpointVersion + 1),
Optional.of(newVersion),
versionToLoadOpt,
tablePath);
}

Expand Down
Loading

0 comments on commit da58cad

Please sign in to comment.