Skip to content

Commit

Permalink
Snapshot report and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
allisonport-db committed Nov 27, 2024
1 parent e64ae0b commit ceb5a56
Show file tree
Hide file tree
Showing 11 changed files with 805 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.types.StructType;
import java.util.Map;
import java.util.Optional;
Expand All @@ -45,13 +48,15 @@ public class SnapshotImpl implements Snapshot {
private final Metadata metadata;
private final LogSegment logSegment;
private Optional<Long> inCommitTimestampOpt;
private final SnapshotReport snapshotReport;

public SnapshotImpl(
Path dataPath,
LogSegment logSegment,
LogReplay logReplay,
Protocol protocol,
Metadata metadata) {
Metadata metadata,
SnapshotContext snapshotContext) {
this.logPath = new Path(dataPath, "_delta_log");
this.dataPath = dataPath;
this.version = logSegment.version;
Expand All @@ -60,6 +65,13 @@ public SnapshotImpl(
this.protocol = protocol;
this.metadata = metadata;
this.inCommitTimestampOpt = Optional.empty();
this.snapshotReport =
new SnapshotReportImpl(
snapshotContext.getTablePath(),
Optional.of(this.version),
snapshotContext.getProvidedTimestamp(),
snapshotContext.getSnapshotMetrics(),
Optional.empty() /* exception */);
}

@Override
Expand All @@ -74,6 +86,7 @@ public StructType getSchema(Engine engine) {

@Override
public ScanBuilder getScanBuilder(Engine engine) {
// TODO when we add ScanReport we will pass the SnapshotReport downstream here
return new ScanBuilderImpl(dataPath, protocol, metadata, getSchema(engine), logReplay, engine);
}

Expand Down Expand Up @@ -128,6 +141,10 @@ public Path getDataPath() {
return dataPath;
}

public SnapshotReport getSnapshotReport() {
return snapshotReport;
}

/**
* Returns the timestamp of the latest commit of this snapshot. For an uninitialized snapshot,
* this returns -1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.snapshot.SnapshotManager;
import io.delta.kernel.internal.util.Clock;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
Expand Down Expand Up @@ -120,19 +123,38 @@ public Optional<TableIdentifier> getTableIdentifier() {

@Override
public Snapshot getLatestSnapshot(Engine engine) throws TableNotFoundException {
return snapshotManager.buildLatestSnapshot(engine);
SnapshotContext snapshotContext = SnapshotContext.forLatestSnapshot(tablePath);
try {
return snapshotManager.buildLatestSnapshot(engine, snapshotContext);
} catch (Exception e) {
recordSnapshotReport(engine, snapshotContext, e);
throw e;
}
}

@Override
public Snapshot getSnapshotAsOfVersion(Engine engine, long versionId)
throws TableNotFoundException {
return snapshotManager.getSnapshotAt(engine, versionId);
SnapshotContext snapshotContext = SnapshotContext.forVersionSnapshot(tablePath, versionId);
try {
return snapshotManager.getSnapshotAt(engine, versionId, snapshotContext);
} catch (Exception e) {
recordSnapshotReport(engine, snapshotContext, e);
throw e;
}
}

@Override
public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC)
throws TableNotFoundException {
return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC);
SnapshotContext snapshotContext =
SnapshotContext.forTimestampSnapshot(tablePath, millisSinceEpochUTC);
try {
return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC, snapshotContext);
} catch (Exception e) {
recordSnapshotReport(engine, snapshotContext, e);
throw e;
}
}

@Override
Expand Down Expand Up @@ -353,4 +375,16 @@ private CloseableIterator<ColumnarBatch> getRawChanges(
logger.info("{}: Reading the commit files with readSchema {}", tablePath, readSchema);
return DeltaLogActionUtils.readCommitFiles(engine, commitFiles, readSchema);
}

/** Creates a {@link SnapshotReport} and pushes it to any {@link MetricsReporter}s. */
private void recordSnapshotReport(Engine engine, SnapshotContext snapshotContext, Exception e) {
SnapshotReport snapshotReport =
new SnapshotReportImpl(
snapshotContext.getTablePath(),
snapshotContext.getVersion(),
snapshotContext.getProvidedTimestamp(),
snapshotContext.getSnapshotMetrics(),
Optional.of(e));
engine.getMetricsReporters().forEach(reporter -> reporter.report(snapshotReport));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotContext;
import io.delta.kernel.internal.metrics.SnapshotMetrics;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.SnapshotHint;
Expand Down Expand Up @@ -105,8 +107,11 @@ public Transaction build(Engine engine) {
// Table doesn't exist yet. Create an initial snapshot with the new schema.
Metadata metadata = getInitialMetadata();
Protocol protocol = getInitialProtocol();
LogReplay logReplay = getEmptyLogReplay(engine, metadata, protocol);
snapshot = new InitialSnapshot(table.getDataPath(), logReplay, metadata, protocol);
SnapshotContext snapshotContext = SnapshotContext.forVersionSnapshot(tablePath, 0);
LogReplay logReplay =
getEmptyLogReplay(engine, metadata, protocol, snapshotContext.getSnapshotMetrics());
snapshot =
new InitialSnapshot(table.getDataPath(), logReplay, metadata, protocol, snapshotContext);
}

boolean isNewTable = snapshot.getVersion(engine) < 0;
Expand Down Expand Up @@ -204,8 +209,19 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
}

private class InitialSnapshot extends SnapshotImpl {
InitialSnapshot(Path dataPath, LogReplay logReplay, Metadata metadata, Protocol protocol) {
super(dataPath, LogSegment.empty(table.getLogPath()), logReplay, protocol, metadata);
InitialSnapshot(
Path dataPath,
LogReplay logReplay,
Metadata metadata,
Protocol protocol,
SnapshotContext snapshotContext) {
super(
dataPath,
LogSegment.empty(table.getLogPath()),
logReplay,
protocol,
metadata,
snapshotContext);
}

@Override
Expand All @@ -214,14 +230,16 @@ public long getTimestamp(Engine engine) {
}
}

private LogReplay getEmptyLogReplay(Engine engine, Metadata metadata, Protocol protocol) {
private LogReplay getEmptyLogReplay(
Engine engine, Metadata metadata, Protocol protocol, SnapshotMetrics snapshotMetrics) {
return new LogReplay(
table.getLogPath(),
table.getDataPath(),
-1,
engine,
LogSegment.empty(table.getLogPath()),
Optional.empty()) {
Optional.empty(),
snapshotMetrics) {

@Override
protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics;

import io.delta.kernel.metrics.SnapshotReport;
import java.util.Optional;

/** Stores the context for a given Snapshot query. Used to generate a {@link SnapshotReport} */
public class SnapshotContext {

public static SnapshotContext forLatestSnapshot(String tablePath) {
return new SnapshotContext(tablePath, Optional.empty(), Optional.empty());
}

public static SnapshotContext forVersionSnapshot(String tablePath, long version) {
return new SnapshotContext(tablePath, Optional.of(version), Optional.empty());
}

public static SnapshotContext forTimestampSnapshot(String tablePath, long timestamp) {
return new SnapshotContext(tablePath, Optional.empty(), Optional.of(timestamp));
}

private Optional<Long> version;
private final SnapshotMetrics snapshotMetrics = new SnapshotMetrics();
private final String tablePath;
private final Optional<Long> providedTimestamp;

private SnapshotContext(
String tablePath, Optional<Long> version, Optional<Long> providedTimestamp) {
this.tablePath = tablePath;
this.version = version;
this.providedTimestamp = providedTimestamp;
}

public Optional<Long> getVersion() {
return version;
}

public SnapshotMetrics getSnapshotMetrics() {
return snapshotMetrics;
}

public String getTablePath() {
return tablePath;
}

public Optional<Long> getProvidedTimestamp() {
return providedTimestamp;
}

public void setVersion(long updatedVersion) {
version = Optional.of(updatedVersion);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics;

/** Stores the metrics for an ongoing snapshot creation */
public class SnapshotMetrics {

public final Timer timestampToVersionResolutionDuration = new Timer();

public final Timer loadProtocolAndMetadataDuration = new Timer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.metrics.SnapshotMetricsResult;
import io.delta.kernel.metrics.SnapshotReport;
import java.util.Optional;
import java.util.UUID;

/** A basic POJO implementation of {@link SnapshotReport} for creating them */
public class SnapshotReportImpl implements SnapshotReport {

private final String tablePath;
private final Optional<Long> version;
private final Optional<Long> providedTimestamp;
private final UUID reportUUID;
private final SnapshotMetricsResult snapshotMetrics;
private final Optional<Exception> exception;

public SnapshotReportImpl(
String tablePath,
Optional<Long> version,
Optional<Long> providedTimestamp,
SnapshotMetrics snapshotMetrics,
Optional<Exception> exception) {
this.tablePath = requireNonNull(tablePath);
this.version = requireNonNull(version);
this.providedTimestamp = requireNonNull(providedTimestamp);
this.snapshotMetrics =
SnapshotMetricsResult.fromSnapshotMetrics(requireNonNull(snapshotMetrics));
this.exception = requireNonNull(exception);
this.reportUUID = UUID.randomUUID();
}

@Override
public String tablePath() {
return tablePath;
}

@Override
public String operationType() {
return OPERATION_TYPE;
}

@Override
public Optional<Exception> exception() {
return exception;
}

@Override
public UUID reportUUID() {
return reportUUID;
}

@Override
public Optional<Long> version() {
return version;
}

@Override
public Optional<Long> providedTimestamp() {
return providedTimestamp;
}

@Override
public SnapshotMetricsResult snapshotMetrics() {
return snapshotMetrics;
}
}
Loading

0 comments on commit ceb5a56

Please sign in to comment.