Skip to content

Commit

Permalink
Don't use a custom serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
allisonport-db committed Dec 4, 2024
1 parent 632985b commit d8c08dd
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 139 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",
"com.fasterxml.jackson.core" % "jackson-core" % "2.13.5",
"com.fasterxml.jackson.core" % "jackson-annotations" % "2.13.5",
"com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.13.5",

"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13.2" % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
*/
package io.delta.kernel.internal.metrics;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.delta.kernel.metrics.MetricsReport;
import io.delta.kernel.metrics.SnapshotMetricsResult;
import io.delta.kernel.metrics.SnapshotReport;
import java.io.IOException;
import java.util.Optional;

/** Defines JSON serializers for {@link MetricsReport} types */
public final class MetricsReportSerializers {
Expand All @@ -50,89 +46,9 @@ public static String serializeSnapshotReport(SnapshotReport snapshotReport)

private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper()
.registerModule(
new SimpleModule()
.addSerializer(SnapshotReport.class, new SnapshotReportSerializer()));
.registerModule(new Jdk8Module()) // To support Optional
.registerModule( // Serialize Exception using toString()
new SimpleModule().addSerializer(Exception.class, new ToStringSerializer()));

private MetricsReportSerializers() {}

/////////////////
// Serializers //
////////////////

static class SnapshotReportSerializer extends StdSerializer<SnapshotReport> {

SnapshotReportSerializer() {
super(SnapshotReport.class);
}

@Override
public void serialize(
SnapshotReport snapshotReport, JsonGenerator gen, SerializerProvider provider)
throws IOException {
gen.writeStartObject();
gen.writeStringField("tablePath", snapshotReport.tablePath());
gen.writeStringField("operationType", snapshotReport.operationType());
gen.writeStringField("reportUUID", snapshotReport.reportUUID().toString());
writeOptionalField(
"version", snapshotReport.version(), item -> gen.writeNumberField("version", item), gen);
writeOptionalField(
"providedTimestamp",
snapshotReport.providedTimestamp(),
item -> gen.writeNumberField("providedTimestamp", item),
gen);
gen.writeFieldName("snapshotMetrics");
writeSnapshotMetrics(snapshotReport.snapshotMetrics(), gen);
writeOptionalField(
"exception",
snapshotReport.exception(),
item -> gen.writeStringField("exception", item.toString()),
gen);
gen.writeEndObject();
}

private void writeSnapshotMetrics(SnapshotMetricsResult snapshotMetrics, JsonGenerator gen)
throws IOException {
gen.writeStartObject();
writeOptionalField(
"timestampToVersionResolutionDuration",
snapshotMetrics.timestampToVersionResolutionDuration(),
item -> gen.writeNumberField("timestampToVersionResolutionDuration", item),
gen);
gen.writeNumberField(
"loadProtocolAndMetadataDuration", snapshotMetrics.loadInitialDeltaActionsDuration());
gen.writeEndObject();
}
}

//////////////////////////////////
// Helper fx for serialization //
/////////////////////////////////

/**
* For an optional item - If it is empty, writes out a null value - If it is non-empty, writes the
* items value using the provided nonNullConsumer
*
* @param fieldName name of the field to write
* @param item optional item
* @param nonNullConsumer consumes an items non-null value
* @throws IOException
*/
private static <T> void writeOptionalField(
String fieldName,
Optional<T> item,
ConsumerThrowsIOException<T> nonNullConsumer,
JsonGenerator gen)
throws IOException {
if (item.isPresent()) {
nonNullConsumer.accept(item.get());
} else {
gen.writeNullField(fieldName);
}
}

// Need to create custom consumer so we can propagate the IOException without wrapping it
private interface ConsumerThrowsIOException<T> {
void accept(T t) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,37 @@ public SnapshotReportImpl(
}

@Override
public String tablePath() {
public String getTablePath() {
return tablePath;
}

@Override
public String operationType() {
public String getOperationType() {
return OPERATION_TYPE;
}

@Override
public Optional<Exception> exception() {
public Optional<Exception> getException() {
return exception;
}

@Override
public UUID reportUUID() {
public UUID getReportUUID() {
return reportUUID;
}

@Override
public Optional<Long> version() {
public Optional<Long> getVersion() {
return version;
}

@Override
public Optional<Long> providedTimestamp() {
public Optional<Long> getProvidedTimestamp() {
return providedTimestamp;
}

@Override
public SnapshotMetricsResult snapshotMetrics() {
public SnapshotMetricsResult getSnapshotMetrics() {
return snapshotMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
public interface DeltaOperationReport extends MetricsReport {

/** @return the path of the table */
String tablePath();
String getTablePath();

/** @return a string representation of the operation this report is for */
String operationType();

/** @return the exception thrown if this report is for a failed operation, otherwise empty */
Optional<Exception> exception();
String getOperationType();

/** @return a unique ID for this report */
UUID reportUUID();
UUID getReportUUID();

/** @return the exception thrown if this report is for a failed operation, otherwise empty */
Optional<Exception> getException();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ public interface SnapshotMetricsResult {
* @return the duration (ns) to resolve the provided timestamp to a table version for timestamp
* time-travel queries. Empty for time-travel by version or non-time-travel queries.
*/
Optional<Long> timestampToVersionResolutionDuration();
Optional<Long> getTimestampToVersionResolutionDuration();

/**
* @return the duration (ns) to load the initial delta actions for the snapshot (such as the table
* protocol and metadata). 0 if snapshot construction fails before log replay.
*/
long loadInitialDeltaActionsDuration();
long getLoadInitialDeltaActionsDuration();

static SnapshotMetricsResult fromSnapshotMetrics(SnapshotMetrics snapshotMetrics) {
checkArgument(snapshotMetrics != null, "snapshotMetrics cannot be null");
Expand All @@ -48,12 +48,12 @@ static SnapshotMetricsResult fromSnapshotMetrics(SnapshotMetrics snapshotMetrics
snapshotMetrics.loadProtocolAndMetadataDuration.totalDuration();

@Override
public Optional<Long> timestampToVersionResolutionDuration() {
public Optional<Long> getTimestampToVersionResolutionDuration() {
return timestampToVersionResolutionDuration;
}

@Override
public long loadInitialDeltaActionsDuration() {
public long getLoadInitialDeltaActionsDuration() {
return loadProtocolAndMetadataDuration;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,21 @@
*/
package io.delta.kernel.metrics;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.Optional;

/** Defines the metadata and metrics for a snapshot construction {@link MetricsReport} */
@JsonSerialize(as = SnapshotReport.class)
@JsonPropertyOrder({
"tablePath",
"operationType",
"reportUUID",
"exception",
"version",
"providedTimestamp",
"snapshotMetrics"
})
public interface SnapshotReport extends DeltaOperationReport {

String OPERATION_TYPE = "Snapshot";
Expand All @@ -32,14 +44,14 @@ public interface SnapshotReport extends DeltaOperationReport {
*
* @return the version of the snapshot
*/
Optional<Long> version();
Optional<Long> getVersion();

/**
* @return the timestamp provided for time-travel, empty if this is not a timestamp-based
* time-travel query
*/
Optional<Long> providedTimestamp();
Optional<Long> getProvidedTimestamp();

/** @return the metrics for this snapshot construction */
SnapshotMetricsResult snapshotMetrics();
SnapshotMetricsResult getSnapshotMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,22 @@ class MetricsReportSerializerSuite extends AnyFunSuite {

def testSnapshotReport(snapshotReport: SnapshotReport): Unit = {
val timestampToVersionResolutionDuration = optionToString(
snapshotReport.snapshotMetrics().timestampToVersionResolutionDuration())
snapshotReport.getSnapshotMetrics().getTimestampToVersionResolutionDuration())
val loadProtocolAndMetadataDuration =
snapshotReport.snapshotMetrics().loadInitialDeltaActionsDuration()
val exception: Optional[String] = snapshotReport.exception().map(_.toString)
snapshotReport.getSnapshotMetrics().getLoadInitialDeltaActionsDuration()
val exception: Optional[String] = snapshotReport.getException().map(_.toString)
val expectedJson =
s"""
|{"tablePath":"${snapshotReport.tablePath()}",
|{"tablePath":"${snapshotReport.getTablePath()}",
|"operationType":"Snapshot",
|"reportUUID":"${snapshotReport.reportUUID()}",
|"version":${optionToString(snapshotReport.version())},
|"providedTimestamp":${optionToString(snapshotReport.providedTimestamp())},
|"reportUUID":"${snapshotReport.getReportUUID()}",
|"exception":${optionToString(exception)},
|"version":${optionToString(snapshotReport.getVersion())},
|"providedTimestamp":${optionToString(snapshotReport.getProvidedTimestamp())},
|"snapshotMetrics":{
|"timestampToVersionResolutionDuration":${timestampToVersionResolutionDuration},
|"loadProtocolAndMetadataDuration":${loadProtocolAndMetadataDuration}
|},
|"exception":${optionToString(exception)}
|"loadInitialDeltaActionsDuration":${loadProtocolAndMetadataDuration}
|}
|}
|""".stripMargin.replaceAll("\n", "")
assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport))
Expand All @@ -76,14 +76,14 @@ class MetricsReportSerializerSuite extends AnyFunSuite {
s"""
|{"tablePath":"/table/path",
|"operationType":"Snapshot",
|"reportUUID":"${snapshotReport1.reportUUID()}",
|"reportUUID":"${snapshotReport1.getReportUUID()}",
|"exception":"$exception",
|"version":1,
|"providedTimestamp":0,
|"snapshotMetrics":{
|"timestampToVersionResolutionDuration":10,
|"loadProtocolAndMetadataDuration":1000
|},
|"exception":"$exception"
|"loadInitialDeltaActionsDuration":1000
|}
|}
|""".stripMargin.replaceAll("\n", "")
assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,34 +110,34 @@ class MetricsReportSuite extends AnyFunSuite with TestUtils {
val (snapshotReport, duration, exception) = getSnapshotReport(f, path, expectException)

// Verify contents
assert(snapshotReport.tablePath == resolvePath(path))
assert(snapshotReport.operationType == "Snapshot")
assert(snapshotReport.getTablePath == resolvePath(path))
assert(snapshotReport.getOperationType == "Snapshot")
exception match {
case Some(e) =>
assert(snapshotReport.exception().isPresent &&
Objects.equals(snapshotReport.exception().get(), e))
case None => assert(!snapshotReport.exception().isPresent)
assert(snapshotReport.getException().isPresent &&
Objects.equals(snapshotReport.getException().get(), e))
case None => assert(!snapshotReport.getException().isPresent)
}
assert(snapshotReport.reportUUID != null)
assert(Objects.equals(snapshotReport.version, expectedVersion),
s"Expected version $expectedVersion found ${snapshotReport.version}")
assert(Objects.equals(snapshotReport.providedTimestamp, expectedProvidedTimestamp))
assert(snapshotReport.getReportUUID != null)
assert(Objects.equals(snapshotReport.getVersion, expectedVersion),
s"Expected version $expectedVersion found ${snapshotReport.getVersion}")
assert(Objects.equals(snapshotReport.getProvidedTimestamp, expectedProvidedTimestamp))

// Since we cannot know the actual durations of these we sanity check that they are > 0 and
// less than the total operation duration whenever they are expected to be non-zero/non-empty
if (expectNonEmptyTimestampToVersionResolutionDuration) {
assert(snapshotReport.snapshotMetrics.timestampToVersionResolutionDuration.isPresent)
assert(snapshotReport.snapshotMetrics.timestampToVersionResolutionDuration.get > 0)
assert(snapshotReport.snapshotMetrics.timestampToVersionResolutionDuration.get <
assert(snapshotReport.getSnapshotMetrics.getTimestampToVersionResolutionDuration.isPresent)
assert(snapshotReport.getSnapshotMetrics.getTimestampToVersionResolutionDuration.get > 0)
assert(snapshotReport.getSnapshotMetrics.getTimestampToVersionResolutionDuration.get <
duration)
} else {
assert(!snapshotReport.snapshotMetrics.timestampToVersionResolutionDuration.isPresent)
assert(!snapshotReport.getSnapshotMetrics.getTimestampToVersionResolutionDuration.isPresent)
}
if (expectNonZeroLoadProtocolAndMetadataDuration) {
assert(snapshotReport.snapshotMetrics.loadInitialDeltaActionsDuration > 0)
assert(snapshotReport.snapshotMetrics.loadInitialDeltaActionsDuration < duration)
assert(snapshotReport.getSnapshotMetrics.getLoadInitialDeltaActionsDuration > 0)
assert(snapshotReport.getSnapshotMetrics.getLoadInitialDeltaActionsDuration < duration)
} else {
assert(snapshotReport.snapshotMetrics.loadInitialDeltaActionsDuration == 0)
assert(snapshotReport.getSnapshotMetrics.getLoadInitialDeltaActionsDuration == 0)
}
}

Expand Down

0 comments on commit d8c08dd

Please sign in to comment.