Skip to content

Commit

Permalink
Metrics serializer + LoggingMetricsReporter
Browse files Browse the repository at this point in the history
  • Loading branch information
allisonport-db committed Nov 27, 2024
1 parent ceb5a56 commit 632985b
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.delta.kernel.metrics.MetricsReport;
import io.delta.kernel.metrics.SnapshotMetricsResult;
import io.delta.kernel.metrics.SnapshotReport;
import java.io.IOException;
import java.util.Optional;

/** Defines JSON serializers for {@link MetricsReport} types */
public final class MetricsReportSerializers {

/////////////////
// Public APIs //
/////////////////

/**
* Serializes a {@link SnapshotReport} to a JSON string
*
* @throws JsonProcessingException
*/
public static String serializeSnapshotReport(SnapshotReport snapshotReport)
throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(snapshotReport);
}

/////////////////////////////////
// Private fields and methods //
////////////////////////////////

private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper()
.registerModule(
new SimpleModule()
.addSerializer(SnapshotReport.class, new SnapshotReportSerializer()));

private MetricsReportSerializers() {}

/////////////////
// Serializers //
////////////////

static class SnapshotReportSerializer extends StdSerializer<SnapshotReport> {

SnapshotReportSerializer() {
super(SnapshotReport.class);
}

@Override
public void serialize(
SnapshotReport snapshotReport, JsonGenerator gen, SerializerProvider provider)
throws IOException {
gen.writeStartObject();
gen.writeStringField("tablePath", snapshotReport.tablePath());
gen.writeStringField("operationType", snapshotReport.operationType());
gen.writeStringField("reportUUID", snapshotReport.reportUUID().toString());
writeOptionalField(
"version", snapshotReport.version(), item -> gen.writeNumberField("version", item), gen);
writeOptionalField(
"providedTimestamp",
snapshotReport.providedTimestamp(),
item -> gen.writeNumberField("providedTimestamp", item),
gen);
gen.writeFieldName("snapshotMetrics");
writeSnapshotMetrics(snapshotReport.snapshotMetrics(), gen);
writeOptionalField(
"exception",
snapshotReport.exception(),
item -> gen.writeStringField("exception", item.toString()),
gen);
gen.writeEndObject();
}

private void writeSnapshotMetrics(SnapshotMetricsResult snapshotMetrics, JsonGenerator gen)
throws IOException {
gen.writeStartObject();
writeOptionalField(
"timestampToVersionResolutionDuration",
snapshotMetrics.timestampToVersionResolutionDuration(),
item -> gen.writeNumberField("timestampToVersionResolutionDuration", item),
gen);
gen.writeNumberField(
"loadProtocolAndMetadataDuration", snapshotMetrics.loadInitialDeltaActionsDuration());
gen.writeEndObject();
}
}

//////////////////////////////////
// Helper fx for serialization //
/////////////////////////////////

/**
* For an optional item - If it is empty, writes out a null value - If it is non-empty, writes the
* items value using the provided nonNullConsumer
*
* @param fieldName name of the field to write
* @param item optional item
* @param nonNullConsumer consumes an items non-null value
* @throws IOException
*/
private static <T> void writeOptionalField(
String fieldName,
Optional<T> item,
ConsumerThrowsIOException<T> nonNullConsumer,
JsonGenerator gen)
throws IOException {
if (item.isPresent()) {
nonNullConsumer.accept(item.get());
} else {
gen.writeNullField(fieldName);
}
}

// Need to create custom consumer so we can propagate the IOException without wrapping it
private interface ConsumerThrowsIOException<T> {
void accept(T t) throws IOException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.metrics

import java.util.Optional

import io.delta.kernel.metrics.SnapshotReport
import org.scalatest.funsuite.AnyFunSuite

class MetricsReportSerializerSuite extends AnyFunSuite {

private def optionToString[T](option: Optional[T]): String = {
if (option.isPresent) {
if (option.get().isInstanceOf[String]) {
s""""${option.get()}"""" // For string objects wrap with quotes
} else {
option.get().toString
}
} else {
"null"
}
}

def testSnapshotReport(snapshotReport: SnapshotReport): Unit = {
val timestampToVersionResolutionDuration = optionToString(
snapshotReport.snapshotMetrics().timestampToVersionResolutionDuration())
val loadProtocolAndMetadataDuration =
snapshotReport.snapshotMetrics().loadInitialDeltaActionsDuration()
val exception: Optional[String] = snapshotReport.exception().map(_.toString)
val expectedJson =
s"""
|{"tablePath":"${snapshotReport.tablePath()}",
|"operationType":"Snapshot",
|"reportUUID":"${snapshotReport.reportUUID()}",
|"version":${optionToString(snapshotReport.version())},
|"providedTimestamp":${optionToString(snapshotReport.providedTimestamp())},
|"snapshotMetrics":{
|"timestampToVersionResolutionDuration":${timestampToVersionResolutionDuration},
|"loadProtocolAndMetadataDuration":${loadProtocolAndMetadataDuration}
|},
|"exception":${optionToString(exception)}
|}
|""".stripMargin.replaceAll("\n", "")
assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport))
}

test("SnapshotReport serializer") {
val snapshotMetrics1 = new SnapshotMetrics()
snapshotMetrics1.timestampToVersionResolutionDuration.record(10)
snapshotMetrics1.loadProtocolAndMetadataDuration.record(1000)
val exception = new RuntimeException("something something failed")

val snapshotReport1 = new SnapshotReportImpl(
"/table/path",
Optional.of(1),
Optional.of(0),
snapshotMetrics1,
Optional.of(exception)
)

// Manually check expected JSON
val expectedJson =
s"""
|{"tablePath":"/table/path",
|"operationType":"Snapshot",
|"reportUUID":"${snapshotReport1.reportUUID()}",
|"version":1,
|"providedTimestamp":0,
|"snapshotMetrics":{
|"timestampToVersionResolutionDuration":10,
|"loadProtocolAndMetadataDuration":1000
|},
|"exception":"$exception"
|}
|""".stripMargin.replaceAll("\n", "")
assert(expectedJson == MetricsReportSerializers.serializeSnapshotReport(snapshotReport1))

// Check with test function
testSnapshotReport(snapshotReport1)

// Empty options for all possible fields
val snapshotMetrics2 = new SnapshotMetrics()
val snapshotReport2 = new SnapshotReportImpl(
"/table/path",
Optional.empty(),
Optional.empty(),
snapshotMetrics2,
Optional.empty()
)
testSnapshotReport(snapshotReport2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.delta.kernel.defaults.engine;

import io.delta.kernel.engine.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;

Expand Down Expand Up @@ -53,6 +55,11 @@ public CommitCoordinatorClientHandler getCommitCoordinatorClientHandler(
return new DefaultCommitCoordinatorClientHandler(hadoopConf, name, conf);
}

@Override
public List<MetricsReporter> getMetricsReporters() {
return Collections.singletonList(new LoggingMetricsReporter());
};

/**
* Create an instance of {@link DefaultEngine}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.defaults.engine;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.delta.kernel.engine.MetricsReporter;
import io.delta.kernel.internal.metrics.MetricsReportSerializers;
import io.delta.kernel.internal.snapshot.SnapshotManager;
import io.delta.kernel.metrics.MetricsReport;
import io.delta.kernel.metrics.SnapshotReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An implementation of {@link MetricsReporter} that logs the reports (as JSON) to Log4J at the info
* level.
*/
public class LoggingMetricsReporter implements MetricsReporter {

private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);

@Override
public void report(MetricsReport report) {
try {
if (report instanceof SnapshotReport) {
logger.info(
"SnapshotReport = %s",
MetricsReportSerializers.serializeSnapshotReport((SnapshotReport) report));
} else {
logger.info(
"%s = [%s does not support serializing this type of MetricReport]",
report.getClass(), this.getClass());
}
} catch (JsonProcessingException e) {
logger.info("Encountered exception while serializing report %s: %s", report, e);
}
}
}

0 comments on commit 632985b

Please sign in to comment.