Skip to content

Commit

Permalink
Respond to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
allisonport-db committed Dec 5, 2024
1 parent e1d8846 commit 8e1f651
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotContext;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
Expand Down Expand Up @@ -58,7 +58,7 @@ public SnapshotImpl(
LogReplay logReplay,
Protocol protocol,
Metadata metadata,
SnapshotContext snapshotContext) {
SnapshotQueryContext snapshotContext) {
this.logPath = new Path(dataPath, "_delta_log");
this.dataPath = dataPath;
this.version = logSegment.version;
Expand All @@ -70,9 +70,9 @@ public SnapshotImpl(
this.snapshotReport =
new SnapshotReportImpl(
snapshotContext.getTablePath(),
snapshotContext.getSnapshotMetrics(),
Optional.of(this.version),
snapshotContext.getProvidedTimestamp(),
snapshotContext.getSnapshotMetrics(),
Optional.empty() /* exception */);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotContext;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.snapshot.SnapshotManager;
import io.delta.kernel.internal.util.Clock;
Expand Down Expand Up @@ -93,7 +93,7 @@ public String getPath(Engine engine) {

@Override
public Snapshot getLatestSnapshot(Engine engine) throws TableNotFoundException {
SnapshotContext snapshotContext = SnapshotContext.forLatestSnapshot(tablePath);
SnapshotQueryContext snapshotContext = SnapshotQueryContext.forLatestSnapshot(tablePath);
try {
return snapshotManager.buildLatestSnapshot(engine, snapshotContext);
} catch (Exception e) {
Expand All @@ -105,7 +105,8 @@ public Snapshot getLatestSnapshot(Engine engine) throws TableNotFoundException {
@Override
public Snapshot getSnapshotAsOfVersion(Engine engine, long versionId)
throws TableNotFoundException {
SnapshotContext snapshotContext = SnapshotContext.forVersionSnapshot(tablePath, versionId);
SnapshotQueryContext snapshotContext =
SnapshotQueryContext.forVersionSnapshot(tablePath, versionId);
try {
return snapshotManager.getSnapshotAt(engine, versionId, snapshotContext);
} catch (Exception e) {
Expand All @@ -117,8 +118,8 @@ public Snapshot getSnapshotAsOfVersion(Engine engine, long versionId)
@Override
public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC)
throws TableNotFoundException {
SnapshotContext snapshotContext =
SnapshotContext.forTimestampSnapshot(tablePath, millisSinceEpochUTC);
SnapshotQueryContext snapshotContext =
SnapshotQueryContext.forTimestampSnapshot(tablePath, millisSinceEpochUTC);
try {
return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC, snapshotContext);
} catch (Exception e) {
Expand Down Expand Up @@ -340,13 +341,14 @@ private CloseableIterator<ColumnarBatch> getRawChanges(
}

/** Creates a {@link SnapshotReport} and pushes it to any {@link MetricsReporter}s. */
private void recordSnapshotReport(Engine engine, SnapshotContext snapshotContext, Exception e) {
private void recordSnapshotReport(
Engine engine, SnapshotQueryContext snapshotContext, Exception e) {
SnapshotReport snapshotReport =
new SnapshotReportImpl(
snapshotContext.getTablePath(),
snapshotContext.getSnapshotMetrics(),
snapshotContext.getVersion(),
snapshotContext.getProvidedTimestamp(),
snapshotContext.getSnapshotMetrics(),
Optional.of(e));
engine.getMetricsReporters().forEach(reporter -> reporter.report(snapshotReport));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotContext;
import io.delta.kernel.internal.metrics.SnapshotMetrics;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.SnapshotHint;
Expand Down Expand Up @@ -107,7 +107,7 @@ public Transaction build(Engine engine) {
// Table doesn't exist yet. Create an initial snapshot with the new schema.
Metadata metadata = getInitialMetadata();
Protocol protocol = getInitialProtocol();
SnapshotContext snapshotContext = SnapshotContext.forVersionSnapshot(tablePath, 0);
SnapshotQueryContext snapshotContext = SnapshotQueryContext.forVersionSnapshot(tablePath, -1);
LogReplay logReplay =
getEmptyLogReplay(engine, metadata, protocol, snapshotContext.getSnapshotMetrics());
snapshot =
Expand Down Expand Up @@ -214,7 +214,7 @@ private class InitialSnapshot extends SnapshotImpl {
LogReplay logReplay,
Metadata metadata,
Protocol protocol,
SnapshotContext snapshotContext) {
SnapshotQueryContext snapshotContext) {
super(
dataPath,
LogSegment.empty(table.getLogPath()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,43 @@
*/
package io.delta.kernel.internal.metrics;

import io.delta.kernel.metrics.SnapshotMetricsResult;
import java.util.Optional;

/** Stores the metrics for an ongoing snapshot creation */
public class SnapshotMetrics {

public final Timer timestampToVersionResolutionDuration = new Timer();

public final Timer loadProtocolAndMetadataDuration = new Timer();

public SnapshotMetricsResult captureSnapshotMetricsResult() {
return new SnapshotMetricsResult() {

final Optional<Long> timestampToVersionResolutionDurationResult =
Optional.of(timestampToVersionResolutionDuration)
.filter(t -> t.count() > 0) // If the timer hasn't been called this should be None
.map(t -> t.totalDuration());
final long loadProtocolAndMetadataDurationResult =
loadProtocolAndMetadataDuration.totalDuration();

@Override
public Optional<Long> timestampToVersionResolutionDuration() {
return timestampToVersionResolutionDurationResult;
}

@Override
public long loadInitialDeltaActionsDuration() {
return loadProtocolAndMetadataDurationResult;
}
};
}

@Override
public String toString() {
return String.format(
"SnapshotMetrics(timestampToVersionResolutionDuration=%s, "
+ "loadProtocolAndMetadataDuration=%s)",
timestampToVersionResolutionDuration, loadProtocolAndMetadataDuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,56 @@
import java.util.Optional;

/** Stores the context for a given Snapshot query. Used to generate a {@link SnapshotReport} */
public class SnapshotContext {
public class SnapshotQueryContext {

public static SnapshotContext forLatestSnapshot(String tablePath) {
return new SnapshotContext(tablePath, Optional.empty(), Optional.empty());
public static SnapshotQueryContext forLatestSnapshot(String tablePath) {
return new SnapshotQueryContext(tablePath, Optional.empty(), Optional.empty());
}

public static SnapshotContext forVersionSnapshot(String tablePath, long version) {
return new SnapshotContext(tablePath, Optional.of(version), Optional.empty());
public static SnapshotQueryContext forVersionSnapshot(String tablePath, long version) {
return new SnapshotQueryContext(tablePath, Optional.of(version), Optional.empty());
}

public static SnapshotContext forTimestampSnapshot(String tablePath, long timestamp) {
return new SnapshotContext(tablePath, Optional.empty(), Optional.of(timestamp));
public static SnapshotQueryContext forTimestampSnapshot(String tablePath, long timestamp) {
return new SnapshotQueryContext(tablePath, Optional.empty(), Optional.of(timestamp));
}

private Optional<Long> version;
private final SnapshotMetrics snapshotMetrics = new SnapshotMetrics();
private final String tablePath;
private Optional<Long> version;
private final Optional<Long> providedTimestamp;
private final SnapshotMetrics snapshotMetrics = new SnapshotMetrics();

private SnapshotContext(
private SnapshotQueryContext(
String tablePath, Optional<Long> version, Optional<Long> providedTimestamp) {
this.tablePath = tablePath;
this.version = version;
this.providedTimestamp = providedTimestamp;
}

public Optional<Long> getVersion() {
return version;
}

public SnapshotMetrics getSnapshotMetrics() {
return snapshotMetrics;
}

public String getTablePath() {
return tablePath;
}

public Optional<Long> getVersion() {
return version;
}

public Optional<Long> getProvidedTimestamp() {
return providedTimestamp;
}

public SnapshotMetrics getSnapshotMetrics() {
return snapshotMetrics;
}

public void setVersion(long updatedVersion) {
version = Optional.of(updatedVersion);
}

@Override
public String toString() {
return String.format(
"SnapshotQueryContext(tablePath=%s, version=%s, providedTimestamp=%s, snapshotMetric=%s)",
tablePath, version, providedTimestamp, snapshotMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,24 @@
public class SnapshotReportImpl implements SnapshotReport {

private final String tablePath;
private final Optional<Long> version;
private final Optional<Long> providedTimestamp;
private final UUID reportUUID;
private final SnapshotMetricsResult snapshotMetrics;
private final Optional<Long> version;
private final Optional<Long> providedTimestamp;
private final Optional<Exception> exception;

public SnapshotReportImpl(
String tablePath,
SnapshotMetrics snapshotMetrics,
Optional<Long> version,
Optional<Long> providedTimestamp,
SnapshotMetrics snapshotMetrics,
Optional<Exception> exception) {
this.tablePath = requireNonNull(tablePath);
this.reportUUID = UUID.randomUUID();
this.snapshotMetrics = requireNonNull(snapshotMetrics).captureSnapshotMetricsResult();
this.version = requireNonNull(version);
this.providedTimestamp = requireNonNull(providedTimestamp);
this.snapshotMetrics =
SnapshotMetricsResult.fromSnapshotMetrics(requireNonNull(snapshotMetrics));
this.exception = requireNonNull(exception);
this.reportUUID = UUID.randomUUID();
}

@Override
Expand All @@ -54,17 +53,17 @@ public String tablePath() {

@Override
public String operationType() {
return OPERATION_TYPE;
return SnapshotReport.OPERATION_TYPE;
}

@Override
public Optional<Exception> exception() {
return exception;
public UUID reportUUID() {
return reportUUID;
}

@Override
public UUID reportUUID() {
return reportUUID;
public SnapshotMetricsResult snapshotMetrics() {
return snapshotMetrics;
}

@Override
Expand All @@ -78,7 +77,7 @@ public Optional<Long> providedTimestamp() {
}

@Override
public SnapshotMetricsResult snapshotMetrics() {
return snapshotMetrics;
public Optional<Exception> exception() {
return exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public void time(Runnable runnable) {
}
}

@Override
public String toString() {
return String.format("Timer(duration=%s ns, count=%s)", totalDuration(), count());
}

/**
* A timing sample that carries internal state about the Timer's start position. The timing can be
* completed by calling {@link Timed#stop()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import io.delta.kernel.internal.checkpoints.*;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
import io.delta.kernel.internal.metrics.SnapshotContext;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.util.Clock;
Expand Down Expand Up @@ -123,7 +123,7 @@ public static void verifyDeltaVersions(
* @return
* @throws TableNotFoundException
*/
public Snapshot buildLatestSnapshot(Engine engine, SnapshotContext snapshotContext)
public Snapshot buildLatestSnapshot(Engine engine, SnapshotQueryContext snapshotContext)
throws TableNotFoundException {
return getSnapshotAtInit(engine, snapshotContext);
}
Expand All @@ -136,7 +136,7 @@ public Snapshot buildLatestSnapshot(Engine engine, SnapshotContext snapshotConte
* @return a {@link Snapshot} of the table at version {@code version}
* @throws TableNotFoundException
*/
public Snapshot getSnapshotAt(Engine engine, long version, SnapshotContext snapshotContext)
public Snapshot getSnapshotAt(Engine engine, long version, SnapshotQueryContext snapshotContext)
throws TableNotFoundException {

Optional<LogSegment> logSegmentOpt =
Expand Down Expand Up @@ -177,7 +177,7 @@ public Snapshot getSnapshotAt(Engine engine, long version, SnapshotContext snaps
* @throws TableNotFoundException
*/
public Snapshot getSnapshotForTimestamp(
Engine engine, long millisSinceEpochUTC, SnapshotContext snapshotContext)
Engine engine, long millisSinceEpochUTC, SnapshotQueryContext snapshotContext)
throws TableNotFoundException {
long startTimeMillis = System.currentTimeMillis();
long versionToRead =
Expand Down Expand Up @@ -211,7 +211,9 @@ public void checkpoint(Engine engine, Clock clock, long version)
SnapshotImpl snapshot =
(SnapshotImpl)
getSnapshotAt(
engine, version, SnapshotContext.forVersionSnapshot(tablePath.toString(), version));
engine,
version,
SnapshotQueryContext.forVersionSnapshot(tablePath.toString(), version));

// Check if writing to the given table protocol version/features is supported in Kernel
validateWriteSupportedTable(
Expand Down Expand Up @@ -484,7 +486,7 @@ private List<Commit> getUnbackfilledCommits(
* Load the Snapshot for this Delta table at initialization. This method uses the `lastCheckpoint`
* file as a hint on where to start listing the transaction log directory.
*/
private SnapshotImpl getSnapshotAtInit(Engine engine, SnapshotContext snapshotContext)
private SnapshotImpl getSnapshotAtInit(Engine engine, SnapshotQueryContext snapshotContext)
throws TableNotFoundException {
Checkpointer checkpointer = new Checkpointer(logPath);
Optional<CheckpointMetaData> lastCheckpointOpt = checkpointer.readLastCheckpointFile(engine);
Expand Down Expand Up @@ -513,7 +515,7 @@ private SnapshotImpl getCoordinatedCommitsAwareSnapshot(
Engine engine,
LogSegment initialSegmentForNewSnapshot,
Optional<Long> versionToLoadOpt,
SnapshotContext snapshotContext) {
SnapshotQueryContext snapshotContext) {
SnapshotImpl newSnapshot =
createSnapshot(initialSegmentForNewSnapshot, engine, snapshotContext);

Expand All @@ -540,7 +542,7 @@ private SnapshotImpl getCoordinatedCommitsAwareSnapshot(
}

private SnapshotImpl createSnapshot(
LogSegment initSegment, Engine engine, SnapshotContext snapshotContext) {
LogSegment initSegment, Engine engine, SnapshotQueryContext snapshotContext) {
final String startingFromStr =
initSegment
.checkpointVersionOpt
Expand Down
Loading

0 comments on commit 8e1f651

Please sign in to comment.