Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel][Metrics][PR#2] Add SnapshotReport for reporting snapshot construction #3903

Merged
merged 12 commits into from
Jan 8, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.types.StructType;
import java.util.List;
import java.util.Map;
Expand All @@ -45,13 +48,15 @@ public class SnapshotImpl implements Snapshot {
private final Metadata metadata;
private final LogSegment logSegment;
private Optional<Long> inCommitTimestampOpt;
private final SnapshotReport snapshotReport;

public SnapshotImpl(
Path dataPath,
LogSegment logSegment,
LogReplay logReplay,
Protocol protocol,
Metadata metadata) {
Metadata metadata,
SnapshotQueryContext snapshotContext) {
this.logPath = new Path(dataPath, "_delta_log");
this.dataPath = dataPath;
this.version = logSegment.version;
Expand All @@ -60,6 +65,7 @@ public SnapshotImpl(
this.protocol = protocol;
this.metadata = metadata;
this.inCommitTimestampOpt = Optional.empty();
this.snapshotReport = SnapshotReportImpl.forSuccess(snapshotContext);
}

/////////////////
Expand Down Expand Up @@ -107,6 +113,7 @@ public StructType getSchema(Engine engine) {

@Override
public ScanBuilder getScanBuilder(Engine engine) {
// TODO when we add ScanReport we will pass the SnapshotReport downstream here
return new ScanBuilderImpl(dataPath, protocol, metadata, getSchema(engine), logReplay, engine);
}

Expand All @@ -130,6 +137,10 @@ public List<String> getPartitionColumnNames(Engine engine) {
return VectorUtils.toJavaList(getMetadata().getPartitionColumns());
}

public SnapshotReport getSnapshotReport() {
return snapshotReport;
}

/**
* Get the domain metadata map from the log replay, which lazily loads and replays a history of
* domain metadata actions, resolving them to produce the current state of the domain metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.snapshot.SnapshotManager;
import io.delta.kernel.internal.util.Clock;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
Expand Down Expand Up @@ -90,19 +93,39 @@ public String getPath(Engine engine) {

@Override
public Snapshot getLatestSnapshot(Engine engine) throws TableNotFoundException {
return snapshotManager.buildLatestSnapshot(engine);
SnapshotQueryContext snapshotContext = SnapshotQueryContext.forLatestSnapshot(tablePath);
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
try {
return snapshotManager.buildLatestSnapshot(engine, snapshotContext);
} catch (Exception e) {
recordSnapshotErrorReport(engine, snapshotContext, e);
throw e;
}
}

@Override
public Snapshot getSnapshotAsOfVersion(Engine engine, long versionId)
throws TableNotFoundException {
return snapshotManager.getSnapshotAt(engine, versionId);
SnapshotQueryContext snapshotContext =
SnapshotQueryContext.forVersionSnapshot(tablePath, versionId);
try {
return snapshotManager.getSnapshotAt(engine, versionId, snapshotContext);
} catch (Exception e) {
recordSnapshotErrorReport(engine, snapshotContext, e);
throw e;
}
}

@Override
public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC)
throws TableNotFoundException {
return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC);
SnapshotQueryContext snapshotContext =
SnapshotQueryContext.forTimestampSnapshot(tablePath, millisSinceEpochUTC);
try {
return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC, snapshotContext);
} catch (Exception e) {
recordSnapshotErrorReport(engine, snapshotContext, e);
throw e;
}
}

@Override
Expand Down Expand Up @@ -316,4 +339,11 @@ private CloseableIterator<ColumnarBatch> getRawChanges(
logger.info("{}: Reading the commit files with readSchema {}", tablePath, readSchema);
return DeltaLogActionUtils.readCommitFiles(engine, commitFiles, readSchema);
}

/** Creates a {@link SnapshotReport} and pushes it to any {@link MetricsReporter}s. */
private void recordSnapshotErrorReport(
Engine engine, SnapshotQueryContext snapshotContext, Exception e) {
SnapshotReport snapshotReport = SnapshotReportImpl.forError(snapshotContext, e);
engine.getMetricsReporters().forEach(reporter -> reporter.report(snapshotReport));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotMetrics;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.SnapshotHint;
Expand Down Expand Up @@ -105,8 +107,11 @@ public Transaction build(Engine engine) {
// Table doesn't exist yet. Create an initial snapshot with the new schema.
Metadata metadata = getInitialMetadata();
Protocol protocol = getInitialProtocol();
LogReplay logReplay = getEmptyLogReplay(engine, metadata, protocol);
snapshot = new InitialSnapshot(table.getDataPath(), logReplay, metadata, protocol);
SnapshotQueryContext snapshotContext = SnapshotQueryContext.forVersionSnapshot(tablePath, -1);
LogReplay logReplay =
getEmptyLogReplay(engine, metadata, protocol, snapshotContext.getSnapshotMetrics());
snapshot =
new InitialSnapshot(table.getDataPath(), logReplay, metadata, protocol, snapshotContext);
}

boolean isNewTable = snapshot.getVersion(engine) < 0;
Expand Down Expand Up @@ -204,8 +209,19 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
}

private class InitialSnapshot extends SnapshotImpl {
InitialSnapshot(Path dataPath, LogReplay logReplay, Metadata metadata, Protocol protocol) {
super(dataPath, LogSegment.empty(table.getLogPath()), logReplay, protocol, metadata);
InitialSnapshot(
Path dataPath,
LogReplay logReplay,
Metadata metadata,
Protocol protocol,
SnapshotQueryContext snapshotContext) {
super(
dataPath,
LogSegment.empty(table.getLogPath()),
logReplay,
protocol,
metadata,
snapshotContext);
}

@Override
Expand All @@ -214,14 +230,16 @@ public long getTimestamp(Engine engine) {
}
}

private LogReplay getEmptyLogReplay(Engine engine, Metadata metadata, Protocol protocol) {
private LogReplay getEmptyLogReplay(
Engine engine, Metadata metadata, Protocol protocol, SnapshotMetrics snapshotMetrics) {
return new LogReplay(
table.getLogPath(),
table.getDataPath(),
-1,
engine,
LogSegment.empty(table.getLogPath()),
Optional.empty()) {
Optional.empty(),
snapshotMetrics) {

@Override
protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics;

import io.delta.kernel.metrics.SnapshotMetricsResult;
import java.util.Optional;

/**
* Stores the metrics for an ongoing snapshot construction. These metrics are updated and recorded
* throughout the snapshot query using this class.
*
* <p>At report time, we create an immutable {@link SnapshotMetricsResult} from an instance of
* {@link SnapshotMetrics} to capture the metrics collected during the query. The {@link
* SnapshotMetricsResult} interface exposes getters for any metrics collected in this class.
*/
public class SnapshotMetrics {

public final Timer timestampToVersionResolutionTimer = new Timer();

public final Timer loadInitialDeltaActionsTimer = new Timer();

public SnapshotMetricsResult captureSnapshotMetricsResult() {
return new SnapshotMetricsResult() {

final Optional<Long> timestampToVersionResolutionDurationResult =
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
timestampToVersionResolutionTimer.totalDurationIfRecorded();
final long loadInitialDeltaActionsDurationResult =
loadInitialDeltaActionsTimer.totalDurationNs();

@Override
public Optional<Long> timestampToVersionResolutionDurationNs() {
return timestampToVersionResolutionDurationResult;
}

@Override
public long loadInitialDeltaActionsDurationNs() {
return loadInitialDeltaActionsDurationResult;
}
};
}

@Override
public String toString() {
return String.format(
"SnapshotMetrics(timestampToVersionResolutionTimer=%s, "
+ "loadInitialDeltaActionsTimer=%s)",
timestampToVersionResolutionTimer, loadInitialDeltaActionsTimer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics;

import io.delta.kernel.metrics.SnapshotReport;
import java.util.Optional;

/**
* Stores the context for a given Snapshot query. This includes information about the query
* parameters (i.e. table path, time travel parameters), updated state as the snapshot query
* progresses (i.e. resolved version), and metrics.
*
* <p>This is used to generate a {@link SnapshotReport}. It exists from snapshot query initiation
* until either successful snapshot construction or failure.
*/
public class SnapshotQueryContext {

/** Creates a {@link SnapshotQueryContext} for a Snapshot created by a latest snapshot query */
public static SnapshotQueryContext forLatestSnapshot(String tablePath) {
return new SnapshotQueryContext(tablePath, Optional.empty(), Optional.empty());
}

/** Creates a {@link SnapshotQueryContext} for a Snapshot created by a AS OF VERSION query */
public static SnapshotQueryContext forVersionSnapshot(String tablePath, long version) {
allisonport-db marked this conversation as resolved.
Show resolved Hide resolved
return new SnapshotQueryContext(tablePath, Optional.of(version), Optional.empty());
}

/** Creates a {@link SnapshotQueryContext} for a Snapshot created by a AS OF TIMESTAMP query */
public static SnapshotQueryContext forTimestampSnapshot(String tablePath, long timestamp) {
return new SnapshotQueryContext(tablePath, Optional.empty(), Optional.of(timestamp));
}

private final String tablePath;
private Optional<Long> version;
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
private final Optional<Long> providedTimestamp;
private final SnapshotMetrics snapshotMetrics = new SnapshotMetrics();

/**
* @param tablePath the table path for the table being queried
* @param providedVersion the provided version for a time-travel-by-version query, empty if this
* is not a time-travel-by-version query
* @param providedTimestamp the provided timestamp for a time-travel-by-timestamp query, empty if
* this is not a time-travel-by-timestamp query
*/
private SnapshotQueryContext(
String tablePath, Optional<Long> providedVersion, Optional<Long> providedTimestamp) {
this.tablePath = tablePath;
this.version = providedVersion;
this.providedTimestamp = providedTimestamp;
}

public String getTablePath() {
return tablePath;
}

public Optional<Long> getVersion() {
return version;
}

public Optional<Long> getProvidedTimestamp() {
return providedTimestamp;
}

public SnapshotMetrics getSnapshotMetrics() {
return snapshotMetrics;
}

/**
* Updates the {@code version} stored in this snapshot context. This version should be updated
* upon version resolution for non time-travel-by-version queries. For latest snapshot queries
* this is after log segment construction. For time-travel by timestamp queries this is after
* timestamp to version resolution.
*/
public void setVersion(long updatedVersion) {
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
version = Optional.of(updatedVersion);
}

@Override
public String toString() {
return String.format(
"SnapshotQueryContext(tablePath=%s, version=%s, providedTimestamp=%s, snapshotMetric=%s)",
tablePath, version, providedTimestamp, snapshotMetrics);
}
}
Loading
Loading