From ceb5a564c1b6e5fd46e6a91306ce4ba1284b1b1e Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Tue, 26 Nov 2024 16:36:25 -0800 Subject: [PATCH] Snapshot report and tests --- .../delta/kernel/internal/SnapshotImpl.java | 19 +- .../io/delta/kernel/internal/TableImpl.java | 40 +- .../internal/TransactionBuilderImpl.java | 30 +- .../internal/metrics/SnapshotContext.java | 67 ++++ .../internal/metrics/SnapshotMetrics.java | 24 ++ .../internal/metrics/SnapshotReportImpl.java | 84 ++++ .../kernel/internal/replay/LogReplay.java | 12 +- .../internal/snapshot/SnapshotManager.java | 79 ++-- .../kernel/metrics/SnapshotMetricsResult.java | 61 +++ .../delta/kernel/metrics/SnapshotReport.java | 45 +++ .../kernel/defaults/MetricsReportSuite.scala | 379 ++++++++++++++++++ 11 files changed, 805 insertions(+), 35 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotContext.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotMetrics.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotReportImpl.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotMetricsResult.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotReport.java create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/MetricsReportSuite.scala diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 5dd6cd2d2d4..ff2e1055cb9 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -27,10 +27,13 @@ import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.metrics.SnapshotContext; +import io.delta.kernel.internal.metrics.SnapshotReportImpl; import io.delta.kernel.internal.replay.CreateCheckpointIterator; import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler; +import io.delta.kernel.metrics.SnapshotReport; import io.delta.kernel.types.StructType; import java.util.Map; import java.util.Optional; @@ -45,13 +48,15 @@ public class SnapshotImpl implements Snapshot { private final Metadata metadata; private final LogSegment logSegment; private Optional inCommitTimestampOpt; + private final SnapshotReport snapshotReport; public SnapshotImpl( Path dataPath, LogSegment logSegment, LogReplay logReplay, Protocol protocol, - Metadata metadata) { + Metadata metadata, + SnapshotContext snapshotContext) { this.logPath = new Path(dataPath, "_delta_log"); this.dataPath = dataPath; this.version = logSegment.version; @@ -60,6 +65,13 @@ public SnapshotImpl( this.protocol = protocol; this.metadata = metadata; this.inCommitTimestampOpt = Optional.empty(); + this.snapshotReport = + new SnapshotReportImpl( + snapshotContext.getTablePath(), + Optional.of(this.version), + snapshotContext.getProvidedTimestamp(), + snapshotContext.getSnapshotMetrics(), + Optional.empty() /* exception */); } @Override @@ -74,6 +86,7 @@ public StructType getSchema(Engine engine) { @Override public ScanBuilder getScanBuilder(Engine engine) { + // TODO when we add ScanReport we will pass the SnapshotReport downstream here return new ScanBuilderImpl(dataPath, protocol, metadata, getSchema(engine), logReplay, engine); } @@ -128,6 +141,10 @@ public Path getDataPath() { return dataPath; } + public SnapshotReport getSnapshotReport() { + return snapshotReport; + } + /** * Returns the timestamp of the latest commit of this snapshot. For an uninitialized snapshot, * this returns -1. diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java index 8e71435957d..972a1807dd7 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java @@ -26,8 +26,11 @@ import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.metrics.SnapshotContext; +import io.delta.kernel.internal.metrics.SnapshotReportImpl; import io.delta.kernel.internal.snapshot.SnapshotManager; import io.delta.kernel.internal.util.Clock; +import io.delta.kernel.metrics.SnapshotReport; import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; @@ -120,19 +123,38 @@ public Optional getTableIdentifier() { @Override public Snapshot getLatestSnapshot(Engine engine) throws TableNotFoundException { - return snapshotManager.buildLatestSnapshot(engine); + SnapshotContext snapshotContext = SnapshotContext.forLatestSnapshot(tablePath); + try { + return snapshotManager.buildLatestSnapshot(engine, snapshotContext); + } catch (Exception e) { + recordSnapshotReport(engine, snapshotContext, e); + throw e; + } } @Override public Snapshot getSnapshotAsOfVersion(Engine engine, long versionId) throws TableNotFoundException { - return snapshotManager.getSnapshotAt(engine, versionId); + SnapshotContext snapshotContext = SnapshotContext.forVersionSnapshot(tablePath, versionId); + try { + return snapshotManager.getSnapshotAt(engine, versionId, snapshotContext); + } catch (Exception e) { + recordSnapshotReport(engine, snapshotContext, e); + throw e; + } } @Override public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC) throws TableNotFoundException { - return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC); + SnapshotContext snapshotContext = + SnapshotContext.forTimestampSnapshot(tablePath, millisSinceEpochUTC); + try { + return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC, snapshotContext); + } catch (Exception e) { + recordSnapshotReport(engine, snapshotContext, e); + throw e; + } } @Override @@ -353,4 +375,16 @@ private CloseableIterator getRawChanges( logger.info("{}: Reading the commit files with readSchema {}", tablePath, readSchema); return DeltaLogActionUtils.readCommitFiles(engine, commitFiles, readSchema); } + + /** Creates a {@link SnapshotReport} and pushes it to any {@link MetricsReporter}s. */ + private void recordSnapshotReport(Engine engine, SnapshotContext snapshotContext, Exception e) { + SnapshotReport snapshotReport = + new SnapshotReportImpl( + snapshotContext.getTablePath(), + snapshotContext.getVersion(), + snapshotContext.getProvidedTimestamp(), + snapshotContext.getSnapshotMetrics(), + Optional.of(e)); + engine.getMetricsReporters().forEach(reporter -> reporter.report(snapshotReport)); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java index b11e4e8d0a8..8cf3b12edd6 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -31,6 +31,8 @@ import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.actions.*; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.metrics.SnapshotContext; +import io.delta.kernel.internal.metrics.SnapshotMetrics; import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.SnapshotHint; @@ -105,8 +107,11 @@ public Transaction build(Engine engine) { // Table doesn't exist yet. Create an initial snapshot with the new schema. Metadata metadata = getInitialMetadata(); Protocol protocol = getInitialProtocol(); - LogReplay logReplay = getEmptyLogReplay(engine, metadata, protocol); - snapshot = new InitialSnapshot(table.getDataPath(), logReplay, metadata, protocol); + SnapshotContext snapshotContext = SnapshotContext.forVersionSnapshot(tablePath, 0); + LogReplay logReplay = + getEmptyLogReplay(engine, metadata, protocol, snapshotContext.getSnapshotMetrics()); + snapshot = + new InitialSnapshot(table.getDataPath(), logReplay, metadata, protocol, snapshotContext); } boolean isNewTable = snapshot.getVersion(engine) < 0; @@ -204,8 +209,19 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable) } private class InitialSnapshot extends SnapshotImpl { - InitialSnapshot(Path dataPath, LogReplay logReplay, Metadata metadata, Protocol protocol) { - super(dataPath, LogSegment.empty(table.getLogPath()), logReplay, protocol, metadata); + InitialSnapshot( + Path dataPath, + LogReplay logReplay, + Metadata metadata, + Protocol protocol, + SnapshotContext snapshotContext) { + super( + dataPath, + LogSegment.empty(table.getLogPath()), + logReplay, + protocol, + metadata, + snapshotContext); } @Override @@ -214,14 +230,16 @@ public long getTimestamp(Engine engine) { } } - private LogReplay getEmptyLogReplay(Engine engine, Metadata metadata, Protocol protocol) { + private LogReplay getEmptyLogReplay( + Engine engine, Metadata metadata, Protocol protocol, SnapshotMetrics snapshotMetrics) { return new LogReplay( table.getLogPath(), table.getDataPath(), -1, engine, LogSegment.empty(table.getLogPath()), - Optional.empty()) { + Optional.empty(), + snapshotMetrics) { @Override protected Tuple2 loadTableProtocolAndMetadata( diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotContext.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotContext.java new file mode 100644 index 00000000000..ebcb618778c --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotContext.java @@ -0,0 +1,67 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.metrics; + +import io.delta.kernel.metrics.SnapshotReport; +import java.util.Optional; + +/** Stores the context for a given Snapshot query. Used to generate a {@link SnapshotReport} */ +public class SnapshotContext { + + public static SnapshotContext forLatestSnapshot(String tablePath) { + return new SnapshotContext(tablePath, Optional.empty(), Optional.empty()); + } + + public static SnapshotContext forVersionSnapshot(String tablePath, long version) { + return new SnapshotContext(tablePath, Optional.of(version), Optional.empty()); + } + + public static SnapshotContext forTimestampSnapshot(String tablePath, long timestamp) { + return new SnapshotContext(tablePath, Optional.empty(), Optional.of(timestamp)); + } + + private Optional version; + private final SnapshotMetrics snapshotMetrics = new SnapshotMetrics(); + private final String tablePath; + private final Optional providedTimestamp; + + private SnapshotContext( + String tablePath, Optional version, Optional providedTimestamp) { + this.tablePath = tablePath; + this.version = version; + this.providedTimestamp = providedTimestamp; + } + + public Optional getVersion() { + return version; + } + + public SnapshotMetrics getSnapshotMetrics() { + return snapshotMetrics; + } + + public String getTablePath() { + return tablePath; + } + + public Optional getProvidedTimestamp() { + return providedTimestamp; + } + + public void setVersion(long updatedVersion) { + version = Optional.of(updatedVersion); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotMetrics.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotMetrics.java new file mode 100644 index 00000000000..1944b6497dd --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotMetrics.java @@ -0,0 +1,24 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.metrics; + +/** Stores the metrics for an ongoing snapshot creation */ +public class SnapshotMetrics { + + public final Timer timestampToVersionResolutionDuration = new Timer(); + + public final Timer loadProtocolAndMetadataDuration = new Timer(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotReportImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotReportImpl.java new file mode 100644 index 00000000000..53e6d90735e --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotReportImpl.java @@ -0,0 +1,84 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.metrics; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.metrics.SnapshotMetricsResult; +import io.delta.kernel.metrics.SnapshotReport; +import java.util.Optional; +import java.util.UUID; + +/** A basic POJO implementation of {@link SnapshotReport} for creating them */ +public class SnapshotReportImpl implements SnapshotReport { + + private final String tablePath; + private final Optional version; + private final Optional providedTimestamp; + private final UUID reportUUID; + private final SnapshotMetricsResult snapshotMetrics; + private final Optional exception; + + public SnapshotReportImpl( + String tablePath, + Optional version, + Optional providedTimestamp, + SnapshotMetrics snapshotMetrics, + Optional exception) { + this.tablePath = requireNonNull(tablePath); + this.version = requireNonNull(version); + this.providedTimestamp = requireNonNull(providedTimestamp); + this.snapshotMetrics = + SnapshotMetricsResult.fromSnapshotMetrics(requireNonNull(snapshotMetrics)); + this.exception = requireNonNull(exception); + this.reportUUID = UUID.randomUUID(); + } + + @Override + public String tablePath() { + return tablePath; + } + + @Override + public String operationType() { + return OPERATION_TYPE; + } + + @Override + public Optional exception() { + return exception; + } + + @Override + public UUID reportUUID() { + return reportUUID; + } + + @Override + public Optional version() { + return version; + } + + @Override + public Optional providedTimestamp() { + return providedTimestamp; + } + + @Override + public SnapshotMetricsResult snapshotMetrics() { + return snapshotMetrics; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index f81ce16c99b..be05205597a 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -28,6 +28,7 @@ import io.delta.kernel.internal.checkpoints.SidecarFile; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.Lazy; +import io.delta.kernel.internal.metrics.SnapshotMetrics; import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.SnapshotHint; import io.delta.kernel.internal.util.DomainMetadataUtils; @@ -126,12 +127,15 @@ public LogReplay( long snapshotVersion, Engine engine, LogSegment logSegment, - Optional snapshotHint) { + Optional snapshotHint, + SnapshotMetrics snapshotMetrics) { assertLogFilesBelongToTable(logPath, logSegment.allLogFilesUnsorted()); this.dataPath = dataPath; this.logSegment = logSegment; - this.protocolAndMetadata = loadTableProtocolAndMetadata(engine, snapshotHint, snapshotVersion); + this.protocolAndMetadata = + snapshotMetrics.loadProtocolAndMetadataDuration.time( + () -> loadTableProtocolAndMetadata(engine, snapshotHint, snapshotVersion)); // Lazy loading of domain metadata only when needed this.domainMetadataMap = new Lazy<>(() -> loadDomainMetadataMap(engine)); } @@ -156,6 +160,10 @@ public Map getDomainMetadataMap() { return domainMetadataMap.get(); } + public long getVersion() { + return logSegment.version; + } + /** * Returns an iterator of {@link FilteredColumnarBatch} representing all the active AddFiles in * the table. diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 24aa391e5b2..3235dca8681 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -38,6 +38,7 @@ import io.delta.kernel.internal.checkpoints.*; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.ListUtils; +import io.delta.kernel.internal.metrics.SnapshotContext; import io.delta.kernel.internal.replay.CreateCheckpointIterator; import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.util.Clock; @@ -122,8 +123,9 @@ public static void verifyDeltaVersions( * @return * @throws TableNotFoundException */ - public Snapshot buildLatestSnapshot(Engine engine) throws TableNotFoundException { - return getSnapshotAtInit(engine); + public Snapshot buildLatestSnapshot(Engine engine, SnapshotContext snapshotContext) + throws TableNotFoundException { + return getSnapshotAtInit(engine, snapshotContext); } /** @@ -134,7 +136,8 @@ public Snapshot buildLatestSnapshot(Engine engine) throws TableNotFoundException * @return a {@link Snapshot} of the table at version {@code version} * @throws TableNotFoundException */ - public Snapshot getSnapshotAt(Engine engine, long version) throws TableNotFoundException { + public Snapshot getSnapshotAt(Engine engine, long version, SnapshotContext snapshotContext) + throws TableNotFoundException { Optional logSegmentOpt = getLogSegmentAtOrBeforeVersion( @@ -154,7 +157,8 @@ public Snapshot getSnapshotAt(Engine engine, long version) throws TableNotFoundE logSegmentOpt .map( logSegment -> - getCoordinatedCommitsAwareSnapshot(engine, logSegment, Optional.of(version))) + getCoordinatedCommitsAwareSnapshot( + engine, logSegment, Optional.of(version), snapshotContext)) .orElseThrow(() -> new TableNotFoundException(tablePath.toString())); long snapshotVer = snapshot.getVersion(engine); if (snapshotVer != version) { @@ -172,32 +176,42 @@ public Snapshot getSnapshotAt(Engine engine, long version) throws TableNotFoundE * @return a {@link Snapshot} of the table at the provided timestamp * @throws TableNotFoundException */ - public Snapshot getSnapshotForTimestamp(Engine engine, long millisSinceEpochUTC) + public Snapshot getSnapshotForTimestamp( + Engine engine, long millisSinceEpochUTC, SnapshotContext snapshotContext) throws TableNotFoundException { long startTimeMillis = System.currentTimeMillis(); long versionToRead = - DeltaHistoryManager.getActiveCommitAtTimestamp( - engine, - logPath, - millisSinceEpochUTC, - true /* mustBeRecreatable */, - false /* canReturnLastCommit */, - false /* canReturnEarliestCommit */) - .getVersion(); + snapshotContext + .getSnapshotMetrics() + .timestampToVersionResolutionDuration + .time( + () -> + DeltaHistoryManager.getActiveCommitAtTimestamp( + engine, + logPath, + millisSinceEpochUTC, + true /* mustBeRecreatable */, + false /* canReturnLastCommit */, + false /* canReturnEarliestCommit */) + .getVersion()); logger.info( "{}: Took {}ms to fetch version at timestamp {}", tablePath, System.currentTimeMillis() - startTimeMillis, millisSinceEpochUTC); + snapshotContext.setVersion(versionToRead); - return getSnapshotAt(engine, versionToRead); + return getSnapshotAt(engine, versionToRead, snapshotContext); } public void checkpoint(Engine engine, Clock clock, long version) throws TableNotFoundException, IOException { logger.info("{}: Starting checkpoint for version: {}", tablePath, version); // Get the snapshot corresponding the version - SnapshotImpl snapshot = (SnapshotImpl) getSnapshotAt(engine, version); + SnapshotImpl snapshot = + (SnapshotImpl) + getSnapshotAt( + engine, version, SnapshotContext.forVersionSnapshot(tablePath.toString(), version)); // Check if writing to the given table protocol version/features is supported in Kernel validateWriteSupportedTable( @@ -470,7 +484,8 @@ private List getUnbackfilledCommits( * Load the Snapshot for this Delta table at initialization. This method uses the `lastCheckpoint` * file as a hint on where to start listing the transaction log directory. */ - private SnapshotImpl getSnapshotAtInit(Engine engine) throws TableNotFoundException { + private SnapshotImpl getSnapshotAtInit(Engine engine, SnapshotContext snapshotContext) + throws TableNotFoundException { Checkpointer checkpointer = new Checkpointer(logPath); Optional lastCheckpointOpt = checkpointer.readLastCheckpointFile(engine); if (!lastCheckpointOpt.isPresent()) { @@ -482,7 +497,10 @@ private SnapshotImpl getSnapshotAtInit(Engine engine) throws TableNotFoundExcept Optional logSegmentOpt = getLogSegmentFrom(engine, lastCheckpointOpt); return logSegmentOpt - .map(logSegment -> getCoordinatedCommitsAwareSnapshot(engine, logSegment, Optional.empty())) + .map( + logSegment -> + getCoordinatedCommitsAwareSnapshot( + engine, logSegment, Optional.empty(), snapshotContext)) .orElseThrow(() -> new TableNotFoundException(tablePath.toString())); } @@ -492,8 +510,12 @@ private SnapshotImpl getSnapshotAtInit(Engine engine) throws TableNotFoundExcept * @see issue #3437. */ private SnapshotImpl getCoordinatedCommitsAwareSnapshot( - Engine engine, LogSegment initialSegmentForNewSnapshot, Optional versionToLoadOpt) { - SnapshotImpl newSnapshot = createSnapshot(initialSegmentForNewSnapshot, engine); + Engine engine, + LogSegment initialSegmentForNewSnapshot, + Optional versionToLoadOpt, + SnapshotContext snapshotContext) { + SnapshotImpl newSnapshot = + createSnapshot(initialSegmentForNewSnapshot, engine, snapshotContext); if (versionToLoadOpt.isPresent() && newSnapshot.getVersion(engine) == versionToLoadOpt.get()) { return newSnapshot; @@ -511,19 +533,21 @@ private SnapshotImpl getCoordinatedCommitsAwareSnapshot( newTableCommitCoordinatorClientHandlerOpt /* tableCommitHandlerOpt */); newSnapshot = segmentOpt - .map(segment -> createSnapshot(segment, engine)) + .map(segment -> createSnapshot(segment, engine, snapshotContext)) .orElseThrow(() -> new TableNotFoundException(tablePath.toString())); } return newSnapshot; } - private SnapshotImpl createSnapshot(LogSegment initSegment, Engine engine) { + private SnapshotImpl createSnapshot( + LogSegment initSegment, Engine engine, SnapshotContext snapshotContext) { final String startingFromStr = initSegment .checkpointVersionOpt .map(v -> format("starting from checkpoint version %s.", v)) .orElse("."); logger.info("{}: Loading version {} {}", tablePath, initSegment.version, startingFromStr); + snapshotContext.setVersion(initSegment.version); LogReplay logReplay = new LogReplay( @@ -532,7 +556,8 @@ private SnapshotImpl createSnapshot(LogSegment initSegment, Engine engine) { initSegment.version, engine, initSegment, - Optional.ofNullable(latestSnapshotHint.get())); + Optional.ofNullable(latestSnapshotHint.get()), + snapshotContext.getSnapshotMetrics()); long startTimeMillis = System.currentTimeMillis(); @@ -540,7 +565,15 @@ private SnapshotImpl createSnapshot(LogSegment initSegment, Engine engine) { final SnapshotImpl snapshot = new SnapshotImpl( - tablePath, initSegment, logReplay, logReplay.getProtocol(), logReplay.getMetadata()); + tablePath, + initSegment, + logReplay, + logReplay.getProtocol(), + logReplay.getMetadata(), + snapshotContext); + + // Push snapshot report to engine + engine.getMetricsReporters().forEach(reporter -> reporter.report(snapshot.getSnapshotReport())); logger.info( "{}: Took {}ms to construct the snapshot (loading protocol and metadata) for {} {}", diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotMetricsResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotMetricsResult.java new file mode 100644 index 00000000000..cccc407217a --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotMetricsResult.java @@ -0,0 +1,61 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.metrics; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + +import io.delta.kernel.internal.metrics.SnapshotMetrics; +import java.util.Optional; + +/** Stores the metrics results for a {@link SnapshotReport} */ +public interface SnapshotMetricsResult { + + /** + * @return the duration (ns) to resolve the provided timestamp to a table version for timestamp + * time-travel queries. Empty for time-travel by version or non-time-travel queries. + */ + Optional timestampToVersionResolutionDuration(); + + /** + * @return the duration (ns) to load the initial delta actions for the snapshot (such as the table + * protocol and metadata). 0 if snapshot construction fails before log replay. + */ + long loadInitialDeltaActionsDuration(); + + static SnapshotMetricsResult fromSnapshotMetrics(SnapshotMetrics snapshotMetrics) { + checkArgument(snapshotMetrics != null, "snapshotMetrics cannot be null"); + + return new SnapshotMetricsResult() { + + final Optional timestampToVersionResolutionDuration = + Optional.of(snapshotMetrics.timestampToVersionResolutionDuration) + .filter(t -> t.count() > 0) // If the timer hasn't been called this should be None + .map(t -> t.totalDuration()); + final long loadProtocolAndMetadataDuration = + snapshotMetrics.loadProtocolAndMetadataDuration.totalDuration(); + + @Override + public Optional timestampToVersionResolutionDuration() { + return timestampToVersionResolutionDuration; + } + + @Override + public long loadInitialDeltaActionsDuration() { + return loadProtocolAndMetadataDuration; + } + }; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotReport.java b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotReport.java new file mode 100644 index 00000000000..946547a7290 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/SnapshotReport.java @@ -0,0 +1,45 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.metrics; + +import java.util.Optional; + +/** Defines the metadata and metrics for a snapshot construction {@link MetricsReport} */ +public interface SnapshotReport extends DeltaOperationReport { + + String OPERATION_TYPE = "Snapshot"; + + /** + * For a time-travel by version query, this is the version provided. For a time-travel by + * timestamp query, this is the version resolved from the provided timestamp. For a latest + * snapshot, this is the version read from the delta log. + * + *

This is empty when this report is for a failed snapshot construction, and the error occurs + * before a version can be resolved. + * + * @return the version of the snapshot + */ + Optional version(); + + /** + * @return the timestamp provided for time-travel, empty if this is not a timestamp-based + * time-travel query + */ + Optional providedTimestamp(); + + /** @return the metrics for this snapshot construction */ + SnapshotMetricsResult snapshotMetrics(); +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/MetricsReportSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/MetricsReportSuite.scala new file mode 100644 index 00000000000..1610f973260 --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/MetricsReportSuite.scala @@ -0,0 +1,379 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults + +import java.io.File +import java.util +import java.util.{Objects, Optional} + +import scala.collection.mutable.ArrayBuffer + +import io.delta.golden.GoldenTableUtils.goldenTablePath +import io.delta.kernel.defaults.utils.TestUtils +import io.delta.kernel.engine.{CommitCoordinatorClientHandler, Engine, ExpressionHandler, FileSystemClient, JsonHandler, MetricsReporter, ParquetHandler} +import io.delta.kernel.metrics.{MetricsReport, SnapshotReport} +import io.delta.kernel.{Snapshot, Table} +import io.delta.kernel.internal.fs.Path +import io.delta.kernel.internal.metrics.Timer +import io.delta.kernel.internal.util.FileNames +import org.scalatest.funsuite.AnyFunSuite + +/** + * Test suite to test the Kernel-API created [[MetricsReport]]s. This suite is in the defaults + * package to be able to use real tables and avoid having to mock both file listings AND file + * contents. + */ +class MetricsReportSuite extends AnyFunSuite with TestUtils { + + /////////////////////////// + // SnapshotReport tests // + ////////////////////////// + + /** + * Given a function [[f]] that generates a snapshot from a [[Table]], runs [[f]] and looks for + * a generated [[SnapshotReport]]. Exactly 1 [[SnapshotReport]] is expected. Times and returns + * the duration it takes to run [[f]]. Uses a custom engine to collect emitted metrics reports. + * + * @param f function to generate a snapshot from a [[Table]] and engine + * @param path path of the table to query + * @param expectException whether we expect [[f]] to throw an exception, which if so, is caught + * and returned with the other results + * @returns (SnapshotReport, durationToRunF, ExceptionIfThrown) + */ + def getSnapshotReport( + f: (Table, Engine) => Snapshot, + path: String, + expectException: Boolean + ): (SnapshotReport, Long, Option[Exception]) = { + val timer = new Timer() + + val (metricsReports: Seq[MetricsReport], exception: Option[Exception]) = if (expectException) { + collectMetricsReportsAndException { engine => + val table = Table.forPath(engine, path) + timer.time(() => f(table, engine)) // Time the actual operation + } + } else { + (collectMetricsReports { engine => + val table = Table.forPath(engine, path) + timer.time(() => f(table, engine)) // Time the actual operation + }, Option.empty) + } + + val snapshotReports = metricsReports.filter(_.isInstanceOf[SnapshotReport]) + assert(snapshotReports.length == 1, "Expected exactly 1 SnapshotReport") + (snapshotReports.head.asInstanceOf[SnapshotReport], timer.totalDuration(), exception) + } + + /** + * Given a table path and a function [[f]] to generate a snapshot, runs [[f]] and collects the + * generated [[SnapshotReport]]. Checks that the report is as expected. + * + * @param f function to generate a snapshot from a [[Table]] and engine + * @param tablePath table path to query from + * @param expectException whether we expect f to throw an exception, if so we will check that the + * report contains the thrown exception + * @param expectedVersion the expected version for the SnapshotReport + * @param expectedProvidedTimestamp the expected providedTimestamp for the SnapshotReport + * @param expectNonEmptyTimestampToVersionResolutionDuration whether we expect + * timestampToVersionResolutionDuration + * to be non-empty (should be true + * for any time-travel by timestamp + * queries) + * @param expectNonZeroLoadProtocolAndMetadataDuration whether we expect + * loadProtocolAndMetadataDuration to be + * non-zero (should be true except when an + * exception is thrown before log replay) + */ + def checkSnapshotReport( + f: (Table, Engine) => Snapshot, + path: String, + expectException: Boolean, + expectedVersion: Optional[Long], + expectedProvidedTimestamp: Optional[Long], + expectNonEmptyTimestampToVersionResolutionDuration: Boolean, + expectNonZeroLoadProtocolAndMetadataDuration: Boolean + ): Unit = { + + val (snapshotReport, duration, exception) = getSnapshotReport(f, path, expectException) + + // Verify contents + assert(snapshotReport.tablePath == resolvePath(path)) + assert(snapshotReport.operationType == "Snapshot") + exception match { + case Some(e) => + assert(snapshotReport.exception().isPresent && + Objects.equals(snapshotReport.exception().get(), e)) + case None => assert(!snapshotReport.exception().isPresent) + } + assert(snapshotReport.reportUUID != null) + assert(Objects.equals(snapshotReport.version, expectedVersion), + s"Expected version $expectedVersion found ${snapshotReport.version}") + assert(Objects.equals(snapshotReport.providedTimestamp, expectedProvidedTimestamp)) + + // Since we cannot know the actual durations of these we sanity check that they are > 0 and + // less than the total operation duration whenever they are expected to be non-zero/non-empty + if (expectNonEmptyTimestampToVersionResolutionDuration) { + assert(snapshotReport.snapshotMetrics.timestampToVersionResolutionDuration.isPresent) + assert(snapshotReport.snapshotMetrics.timestampToVersionResolutionDuration.get > 0) + assert(snapshotReport.snapshotMetrics.timestampToVersionResolutionDuration.get < + duration) + } else { + assert(!snapshotReport.snapshotMetrics.timestampToVersionResolutionDuration.isPresent) + } + if (expectNonZeroLoadProtocolAndMetadataDuration) { + assert(snapshotReport.snapshotMetrics.loadInitialDeltaActionsDuration > 0) + assert(snapshotReport.snapshotMetrics.loadInitialDeltaActionsDuration < duration) + } else { + assert(snapshotReport.snapshotMetrics.loadInitialDeltaActionsDuration == 0) + } + } + + test("SnapshotReport valid queries") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + + // Set up delta table with version 0, 1 + spark.range(10).write.format("delta").mode("append").save(path) + val version0timestamp = System.currentTimeMillis + // Since filesystem modification time might be truncated to the second, we sleep to make sure + // the next commit is after this timestamp + Thread.sleep(1000) + spark.range(10).write.format("delta").mode("append").save(path) + + // Test getLatestSnapshot + checkSnapshotReport( + (table, engine) => table.getLatestSnapshot(engine), + path, + expectException = false, + expectedVersion = Optional.of(1), + expectedProvidedTimestamp = Optional.empty(), // No time travel + expectNonEmptyTimestampToVersionResolutionDuration = false, // No time travel + expectNonZeroLoadProtocolAndMetadataDuration = true + ) + + // Test getSnapshotAsOfVersion + checkSnapshotReport( + (table, engine) => table.getSnapshotAsOfVersion(engine, 0), + path, + expectException = false, + expectedVersion = Optional.of(0), + expectedProvidedTimestamp = Optional.empty(), // No time travel + expectNonEmptyTimestampToVersionResolutionDuration = false, // No time travel + expectNonZeroLoadProtocolAndMetadataDuration = true + ) + + // Test getSnapshotAsOfTimestamp + checkSnapshotReport( + (table, engine) => table.getSnapshotAsOfTimestamp(engine, version0timestamp), + path, + expectException = false, + expectedVersion = Optional.of(0), + expectedProvidedTimestamp = Optional.of(version0timestamp), + expectNonEmptyTimestampToVersionResolutionDuration = true, + expectNonZeroLoadProtocolAndMetadataDuration = true + ) + } + } + + test("Snapshot report - table does not exist") { + withTempDir { tempDir => + // This fails during either log segment building or timestamp -> version resolution + val path = tempDir.getCanonicalPath + + // Test getLatestSnapshot + checkSnapshotReport( + (table, engine) => table.getLatestSnapshot(engine), + path, + expectException = true, + expectedVersion = Optional.empty(), + expectedProvidedTimestamp = Optional.empty(), // No time travel + expectNonEmptyTimestampToVersionResolutionDuration = false, // No time travel + expectNonZeroLoadProtocolAndMetadataDuration = false + ) + + // Test getSnapshotAsOfVersion + checkSnapshotReport( + (table, engine) => table.getSnapshotAsOfVersion(engine, 0), + path, + expectException = true, + expectedVersion = Optional.of(0), + expectedProvidedTimestamp = Optional.empty(), // No time travel + expectNonEmptyTimestampToVersionResolutionDuration = false, // No time travel + expectNonZeroLoadProtocolAndMetadataDuration = false + ) + + // Test getSnapshotAsOfTimestamp + checkSnapshotReport( + (table, engine) => table.getSnapshotAsOfTimestamp(engine, 1000), + path, + expectException = true, + expectedVersion = Optional.empty(), + expectedProvidedTimestamp = Optional.of(1000), + expectNonEmptyTimestampToVersionResolutionDuration = true, + expectNonZeroLoadProtocolAndMetadataDuration = false + ) + } + } + + test("Snapshot report - log is corrupted") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + // Set up table with non-contiguous version (0, 2) which will fail during log segment building + // for all the following queries + (0 until 3).foreach( _ => + spark.range(3).write.format("delta").mode("append").save(path)) + assert( + new File(FileNames.deltaFile(new Path(tempDir.getCanonicalPath, "_delta_log"), 1)).delete()) + + // Test getLatestSnapshot + checkSnapshotReport( + (table, engine) => table.getLatestSnapshot(engine), + path, + expectException = true, + expectedVersion = Optional.empty(), + expectedProvidedTimestamp = Optional.empty(), // No time travel + expectNonEmptyTimestampToVersionResolutionDuration = false, // No time travel + expectNonZeroLoadProtocolAndMetadataDuration = false + ) + + // Test getSnapshotAsOfVersion + checkSnapshotReport( + (table, engine) => table.getSnapshotAsOfVersion(engine, 2), + path, + expectException = true, + expectedVersion = Optional.of(2), + expectedProvidedTimestamp = Optional.empty(), // No time travel + expectNonEmptyTimestampToVersionResolutionDuration = false, // No time travel + expectNonZeroLoadProtocolAndMetadataDuration = false + ) + + // Test getSnapshotAsOfTimestamp + val version2Timestamp = new File( + FileNames.deltaFile(new Path(tempDir.getCanonicalPath, "_delta_log"), 2)).lastModified() + checkSnapshotReport( + (table, engine) => table.getSnapshotAsOfTimestamp(engine, version2Timestamp), + tempDir.getCanonicalPath, + expectException = true, + expectedVersion = Optional.of(2), + expectedProvidedTimestamp = Optional.of(version2Timestamp), + expectNonEmptyTimestampToVersionResolutionDuration = true, + expectNonZeroLoadProtocolAndMetadataDuration = false + ) + } + } + + test("Snapshot report - missing metadata") { + // This fails during P&M loading for all of the following queries + val path = goldenTablePath("deltalog-state-reconstruction-without-metadata") + + // Test getLatestSnapshot + checkSnapshotReport( + (table, engine) => table.getLatestSnapshot(engine), + path, + expectException = true, + expectedVersion = Optional.of(0), + expectedProvidedTimestamp = Optional.empty(), // No time travel + expectNonEmptyTimestampToVersionResolutionDuration = false, // No time travel + expectNonZeroLoadProtocolAndMetadataDuration = true + ) + + // Test getSnapshotAsOfVersion + checkSnapshotReport( + (table, engine) => table.getSnapshotAsOfVersion(engine, 0), + path, + expectException = true, + expectedVersion = Optional.of(0), + expectedProvidedTimestamp = Optional.empty(), // No time travel + expectNonEmptyTimestampToVersionResolutionDuration = false, // No time travel + expectNonZeroLoadProtocolAndMetadataDuration = true + ) + + // Test getSnapshotAsOfTimestamp + // We use the timestamp of version 0 + val version0Timestamp = new File(FileNames.deltaFile(new Path(path, "_delta_log"), 0)) + .lastModified() + checkSnapshotReport( + (table, engine) => table.getSnapshotAsOfTimestamp(engine, version0Timestamp), + path, + expectException = true, + expectedVersion = Optional.of(0), + expectedProvidedTimestamp = Optional.of(version0Timestamp), + expectNonEmptyTimestampToVersionResolutionDuration = true, + expectNonZeroLoadProtocolAndMetadataDuration = true + ) + } + + ///////////////////////// + // Test Helper Methods // + ///////////////////////// + + // For now this just uses the default engine since we have no need to override it, if we would + // like to use a specific engine in the future for other tests we can simply add another arg here + /** Executes [[f]] using a special engine implementation to collect and return metrics reports */ + def collectMetricsReports(f: Engine => Unit): Seq[MetricsReport] = { + // Initialize a buffer for any metric reports and wrap the engine so that they are recorded + val reports = ArrayBuffer.empty[MetricsReport] + f(new EngineWithInMemoryMetricsReporter(reports, defaultEngine)) + reports + } + + /** + * Executes [[f]] using a special engine implementation to collect and return metrics reports when + * it is expected [[f]] will throw an exception. Collects said exception and returns with + * the reports. + */ + def collectMetricsReportsAndException( + f: Engine => Unit): (Seq[MetricsReport], Option[Exception]) = { + // Initialize a buffer for any metric reports and wrap the engine so that they are recorded + val reports = ArrayBuffer.empty[MetricsReport] + val e = intercept[Exception]( + f(new EngineWithInMemoryMetricsReporter(reports, defaultEngine)) + ) + (reports, Some(e)) + } + + def resolvePath(path: String): String = { + defaultEngine.getFileSystemClient.resolvePath(path) + } + + /** + * Wraps an {@link Engine} to implement the metrics reporter such that it appends any reports + * to the provided in memory buffer. + */ + class EngineWithInMemoryMetricsReporter(buf: ArrayBuffer[MetricsReport], baseEngine: Engine) + extends Engine { + + private val metricsReporter = new MetricsReporter { + override def report(report: MetricsReport): Unit = buf.append(report) + } + + override def getExpressionHandler: ExpressionHandler = baseEngine.getExpressionHandler + + override def getJsonHandler: JsonHandler = baseEngine.getJsonHandler + + override def getFileSystemClient: FileSystemClient = baseEngine.getFileSystemClient + + override def getParquetHandler: ParquetHandler = baseEngine.getParquetHandler + + override def getCommitCoordinatorClientHandler( + name: String, conf: util.Map[String, String]): CommitCoordinatorClientHandler = + baseEngine.getCommitCoordinatorClientHandler(name, conf) + + override def getMetricsReporters(): java.util.List[MetricsReporter] = { + java.util.Collections.singletonList(metricsReporter) + } + } +}