-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Kernel][Metrics][PR#2] Add SnapshotReport for reporting snapshot con…
…struction (#3903) <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Adds a `SnapshotReport` for reporting snapshot construction. We record a `SnapshotReport` after successfully constructing a snapshot or if an exception is thrown during construction. We use `SnapshotContext` to propagate and update information about the snapshot construction. For example, we update the "version" as soon as it's resolved for time-travel by timestamp or load latest snapshot queries. This means in the case of the exception we include as much information as is available. ## How was this patch tested? Adds a test suite `MetricsReportSuite` with unit tests. ## Does this PR introduce _any_ user-facing changes? No.
- Loading branch information
1 parent
ecc5109
commit 40ba346
Showing
13 changed files
with
926 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
62 changes: 62 additions & 0 deletions
62
kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotMetrics.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.delta.kernel.internal.metrics; | ||
|
||
import io.delta.kernel.metrics.SnapshotMetricsResult; | ||
import java.util.Optional; | ||
|
||
/** | ||
* Stores the metrics for an ongoing snapshot construction. These metrics are updated and recorded | ||
* throughout the snapshot query using this class. | ||
* | ||
* <p>At report time, we create an immutable {@link SnapshotMetricsResult} from an instance of | ||
* {@link SnapshotMetrics} to capture the metrics collected during the query. The {@link | ||
* SnapshotMetricsResult} interface exposes getters for any metrics collected in this class. | ||
*/ | ||
public class SnapshotMetrics { | ||
|
||
public final Timer timestampToVersionResolutionTimer = new Timer(); | ||
|
||
public final Timer loadInitialDeltaActionsTimer = new Timer(); | ||
|
||
public SnapshotMetricsResult captureSnapshotMetricsResult() { | ||
return new SnapshotMetricsResult() { | ||
|
||
final Optional<Long> timestampToVersionResolutionDurationResult = | ||
timestampToVersionResolutionTimer.totalDurationIfRecorded(); | ||
final long loadInitialDeltaActionsDurationResult = | ||
loadInitialDeltaActionsTimer.totalDurationNs(); | ||
|
||
@Override | ||
public Optional<Long> timestampToVersionResolutionDurationNs() { | ||
return timestampToVersionResolutionDurationResult; | ||
} | ||
|
||
@Override | ||
public long loadInitialDeltaActionsDurationNs() { | ||
return loadInitialDeltaActionsDurationResult; | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return String.format( | ||
"SnapshotMetrics(timestampToVersionResolutionTimer=%s, " | ||
+ "loadInitialDeltaActionsTimer=%s)", | ||
timestampToVersionResolutionTimer, loadInitialDeltaActionsTimer); | ||
} | ||
} |
97 changes: 97 additions & 0 deletions
97
kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/SnapshotQueryContext.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.delta.kernel.internal.metrics; | ||
|
||
import io.delta.kernel.metrics.SnapshotReport; | ||
import java.util.Optional; | ||
|
||
/** | ||
* Stores the context for a given Snapshot query. This includes information about the query | ||
* parameters (i.e. table path, time travel parameters), updated state as the snapshot query | ||
* progresses (i.e. resolved version), and metrics. | ||
* | ||
* <p>This is used to generate a {@link SnapshotReport}. It exists from snapshot query initiation | ||
* until either successful snapshot construction or failure. | ||
*/ | ||
public class SnapshotQueryContext { | ||
|
||
/** Creates a {@link SnapshotQueryContext} for a Snapshot created by a latest snapshot query */ | ||
public static SnapshotQueryContext forLatestSnapshot(String tablePath) { | ||
return new SnapshotQueryContext(tablePath, Optional.empty(), Optional.empty()); | ||
} | ||
|
||
/** Creates a {@link SnapshotQueryContext} for a Snapshot created by a AS OF VERSION query */ | ||
public static SnapshotQueryContext forVersionSnapshot(String tablePath, long version) { | ||
return new SnapshotQueryContext(tablePath, Optional.of(version), Optional.empty()); | ||
} | ||
|
||
/** Creates a {@link SnapshotQueryContext} for a Snapshot created by a AS OF TIMESTAMP query */ | ||
public static SnapshotQueryContext forTimestampSnapshot(String tablePath, long timestamp) { | ||
return new SnapshotQueryContext(tablePath, Optional.empty(), Optional.of(timestamp)); | ||
} | ||
|
||
private final String tablePath; | ||
private Optional<Long> version; | ||
private final Optional<Long> providedTimestamp; | ||
private final SnapshotMetrics snapshotMetrics = new SnapshotMetrics(); | ||
|
||
/** | ||
* @param tablePath the table path for the table being queried | ||
* @param providedVersion the provided version for a time-travel-by-version query, empty if this | ||
* is not a time-travel-by-version query | ||
* @param providedTimestamp the provided timestamp for a time-travel-by-timestamp query, empty if | ||
* this is not a time-travel-by-timestamp query | ||
*/ | ||
private SnapshotQueryContext( | ||
String tablePath, Optional<Long> providedVersion, Optional<Long> providedTimestamp) { | ||
this.tablePath = tablePath; | ||
this.version = providedVersion; | ||
this.providedTimestamp = providedTimestamp; | ||
} | ||
|
||
public String getTablePath() { | ||
return tablePath; | ||
} | ||
|
||
public Optional<Long> getVersion() { | ||
return version; | ||
} | ||
|
||
public Optional<Long> getProvidedTimestamp() { | ||
return providedTimestamp; | ||
} | ||
|
||
public SnapshotMetrics getSnapshotMetrics() { | ||
return snapshotMetrics; | ||
} | ||
|
||
/** | ||
* Updates the {@code version} stored in this snapshot context. This version should be updated | ||
* upon version resolution for non time-travel-by-version queries. For latest snapshot queries | ||
* this is after log segment construction. For time-travel by timestamp queries this is after | ||
* timestamp to version resolution. | ||
*/ | ||
public void setVersion(long updatedVersion) { | ||
version = Optional.of(updatedVersion); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return String.format( | ||
"SnapshotQueryContext(tablePath=%s, version=%s, providedTimestamp=%s, snapshotMetric=%s)", | ||
tablePath, version, providedTimestamp, snapshotMetrics); | ||
} | ||
} |
Oops, something went wrong.