getChildren() {
+ return Collections.unmodifiableList(children);
+ }
+
@Override
public String toString() {
return SimpleSpan.class.getSimpleName()
@@ -87,6 +98,8 @@ public String toString() {
+ endTsMillis
+ ", attributes="
+ attributes
+ + ", children="
+ + children
+ "}";
}
}
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java
index 699e48fd5c468..d530b5c37843b 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/Span.java
@@ -20,14 +20,12 @@
import org.apache.flink.annotation.Experimental;
+import java.util.List;
import java.util.Map;
/**
* Span represents something that happened in Flink at certain point of time, that will be reported
* to a {@link org.apache.flink.traces.reporter.TraceReporter}.
- *
- * Currently we don't support traces with multiple spans. Each span is self-contained and
- * represents things like a checkpoint or recovery.
*/
@Experimental
public interface Span {
@@ -49,4 +47,7 @@ static SpanBuilder builder(Class> classScope, String name) {
* added in the future.
*/
Map getAttributes();
+
+ /** Returns the child spans (= nested). */
+ List getChildren();
}
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java
index e2c14738485d9..5f1c1c35b27e2 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/traces/SpanBuilder.java
@@ -21,14 +21,18 @@
import org.apache.flink.AttributeBuilder;
import org.apache.flink.annotation.Experimental;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/** Builder used to construct {@link Span}. See {@link Span#builder(Class, String)}. */
@Experimental
public class SpanBuilder implements AttributeBuilder {
private final HashMap attributes = new HashMap<>();
+ private final List children = new ArrayList<>();
private final Class> classScope;
private final String name;
private long startTsMillis;
@@ -68,7 +72,10 @@ public Span build(Map additionalVariables) {
name,
startTsMillisToBuild,
endTsMillisToBuild,
- attributes);
+ attributes,
+ children.stream()
+ .map(childBuilder -> childBuilder.build(additionalVariables))
+ .collect(Collectors.toList()));
}
/**
@@ -120,4 +127,16 @@ public SpanBuilder setAttribute(String key, boolean value) {
public String getName() {
return name;
}
+
+ /** Adds child spans (= nested). */
+ public SpanBuilder addChildren(List children) {
+ this.children.addAll(children);
+ return this;
+ }
+
+ /** Adds child span (= nested). */
+ public SpanBuilder addChild(SpanBuilder child) {
+ this.children.add(child);
+ return this;
+ }
}
diff --git a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
index 248e54b1aa3d2..d8798fbec4b90 100644
--- a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
+++ b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java
@@ -124,10 +124,9 @@ private void notifyOfAddedSpanInternal(Span span, io.opentelemetry.api.trace.Spa
.startSpan();
// Recursively add child spans to this parent
- // TODO: not yet supported
- // for (Span childSpan : span.getChildren()) {
- // notifyOfAddedSpanInternal(childSpan, currentOtelSpan);
- // }
+ for (Span childSpan : span.getChildren()) {
+ notifyOfAddedSpanInternal(childSpan, currentOtelSpan);
+ }
currentOtelSpan.end(span.getEndTsMillis(), TimeUnit.MILLISECONDS);
}
diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java
index 88b9871787e68..7d452a90d5a39 100644
--- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java
+++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java
@@ -33,7 +33,11 @@
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -125,4 +129,170 @@ public void testReportSpan() throws Exception {
});
});
}
+
+ @Test
+ public void testReportNestedSpan() throws Exception {
+ String scope = this.getClass().getCanonicalName();
+
+ String attribute1KeyRoot = "foo";
+ String attribute1ValueRoot = "bar";
+ String attribute2KeyRoot = "";
+ String attribute2ValueRoot = "value";
+ String spanRoot = "root";
+
+ String spanL1N1 = "1_1";
+ String attribute1KeyL1N1 = "foo_" + spanL1N1;
+ String attribute1ValueL1N1 = "bar_" + spanL1N1;
+
+ String spanL1N2 = "1_2";
+ String attribute1KeyL1N2 = "foo_" + spanL1N2;
+ String attribute1ValueL1N2 = "bar_" + spanL1N2;
+
+ String spanL2N1 = "2_1";
+ String attribute1KeyL2N1 = "foo_" + spanL2N1;
+ String attribute1ValueL2N1 = "bar_" + spanL2N1;
+
+ reporter.open(createMetricConfig());
+ try {
+ SpanBuilder childLeveL2N1 =
+ Span.builder(this.getClass(), spanL2N1)
+ .setAttribute(attribute1KeyL2N1, attribute1ValueL2N1)
+ .setStartTsMillis(44)
+ .setEndTsMillis(46);
+
+ SpanBuilder childL1N1 =
+ Span.builder(this.getClass(), spanL1N1)
+ .setAttribute(attribute1KeyL1N1, attribute1ValueL1N1)
+ .setStartTsMillis(43)
+ .setEndTsMillis(48)
+ .addChild(childLeveL2N1);
+
+ SpanBuilder childL1N2 =
+ Span.builder(this.getClass(), spanL1N2)
+ .setAttribute(attribute1KeyL1N2, attribute1ValueL1N2)
+ .setStartTsMillis(44)
+ .setEndTsMillis(46);
+
+ SpanBuilder rootSpan =
+ Span.builder(this.getClass(), spanRoot)
+ .setAttribute(attribute1KeyRoot, attribute1ValueRoot)
+ .setAttribute(attribute2KeyRoot, attribute2ValueRoot)
+ .setStartTsMillis(42)
+ .setEndTsMillis(64)
+ .addChildren(Arrays.asList(childL1N1, childL1N2));
+
+ reporter.notifyOfAddedSpan(rootSpan.build());
+ } finally {
+ reporter.close();
+ }
+
+ eventuallyConsumeJson(
+ (json) -> {
+ JsonNode scopeSpans = json.findPath("resourceSpans").findPath("scopeSpans");
+ assertThat(scopeSpans.findPath("scope").findPath("name").asText())
+ .isEqualTo(scope);
+ JsonNode spans = scopeSpans.findPath("spans");
+
+ Map actualSpanSummaries = convertToSummaries(spans);
+
+ assertThat(actualSpanSummaries.keySet())
+ .containsExactlyInAnyOrder(spanRoot, spanL1N1, spanL1N2, spanL2N1);
+
+ ActualSpan root = actualSpanSummaries.get(spanRoot);
+ ActualSpan l1n1 = actualSpanSummaries.get(spanL1N1);
+ ActualSpan l1n2 = actualSpanSummaries.get(spanL1N2);
+ ActualSpan l2n1 = actualSpanSummaries.get(spanL2N1);
+
+ assertThat(root.parentSpanId).isEmpty();
+ assertThat(root.attributes)
+ .containsEntry(attribute1KeyRoot, attribute1ValueRoot);
+ assertThat(root.attributes)
+ .containsEntry(
+ VariableNameUtil.getVariableName(attribute2KeyRoot),
+ attribute2ValueRoot);
+ assertThat(l1n1.attributes)
+ .containsEntry(attribute1KeyL1N1, attribute1ValueL1N1);
+ assertThat(l1n2.attributes)
+ .containsEntry(attribute1KeyL1N2, attribute1ValueL1N2);
+ assertThat(l2n1.attributes)
+ .containsEntry(attribute1KeyL2N1, attribute1ValueL2N1);
+
+ assertThat(root.traceId).isEqualTo(l1n1.traceId);
+ assertThat(root.traceId).isEqualTo(l1n2.traceId);
+ assertThat(root.traceId).isEqualTo(l2n1.traceId);
+ assertThat(root.spanId).isNotEmpty();
+ assertThat(root.spanId).isEqualTo(l1n1.parentSpanId);
+ assertThat(root.spanId).isEqualTo(l1n2.parentSpanId);
+
+ assertThat(root.children).containsExactlyInAnyOrder(l1n1, l1n2);
+ assertThat(l1n1.children).containsExactlyInAnyOrder(l2n1);
+ assertThat(l1n2.children).isEmpty();
+ assertThat(l2n1.children).isEmpty();
+ });
+ }
+
+ private Map convertToSummaries(JsonNode spans) {
+ Map spanIdToSpan = new HashMap<>();
+ for (int i = 0; spans.get(i) != null; i++) {
+ ActualSpan actualSpan = convertToActualSpan(spans.get(i));
+ spanIdToSpan.put(actualSpan.spanId, actualSpan);
+ }
+
+ Map nameToSpan = new HashMap<>();
+
+ spanIdToSpan.forEach(
+ (spanId, actualSpan) -> {
+ if (!actualSpan.parentSpanId.isEmpty()) {
+ ActualSpan parentSpan = spanIdToSpan.get(actualSpan.parentSpanId);
+ parentSpan.addChild(actualSpan);
+ }
+ nameToSpan.put(actualSpan.name, actualSpan);
+ });
+
+ return nameToSpan;
+ }
+
+ private ActualSpan convertToActualSpan(JsonNode span) {
+ String name = span.findPath("name").asText();
+ String traceId = span.findPath("traceId").asText();
+ String spanId = span.findPath("spanId").asText();
+ String parentSpanId = span.findPath("parentSpanId").asText();
+
+ Map attributeMap = new HashMap<>();
+ JsonNode attributes = span.findPath("attributes");
+
+ for (int j = 0; attributes.get(j) != null; j++) {
+ JsonNode attribute = attributes.get(j);
+ String key = attribute.get("key").asText();
+ String value = attribute.at("/value/stringValue").asText();
+ attributeMap.put(key, value);
+ }
+ return new ActualSpan(traceId, spanId, name, parentSpanId, attributeMap);
+ }
+
+ private static class ActualSpan {
+ final String traceId;
+ final String spanId;
+ final String name;
+ final String parentSpanId;
+ final Map attributes;
+ final List children = new ArrayList<>();
+
+ public ActualSpan(
+ String traceId,
+ String spanId,
+ String name,
+ String parentSpanId,
+ Map attributes) {
+ this.traceId = traceId;
+ this.spanId = spanId;
+ this.name = name;
+ this.parentSpanId = parentSpanId;
+ this.attributes = attributes;
+ }
+
+ public void addChild(ActualSpan child) {
+ this.children.add(child);
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java
index 70b05c32ce869..9f891c0181cb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java
@@ -20,6 +20,7 @@
import org.apache.flink.AttributeBuilder;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.events.EventBuilder;
import org.apache.flink.events.Events;
import org.apache.flink.metrics.Gauge;
@@ -29,6 +30,7 @@
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.util.LongArrayList;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
@@ -40,6 +42,10 @@
import javax.annotation.Nullable;
import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -53,6 +59,84 @@ public class DefaultCheckpointStatsTracker implements CheckpointStatsTracker {
private static final Logger LOG = LoggerFactory.getLogger(DefaultCheckpointStatsTracker.class);
private static final ObjectMapper MAPPER = RestMapperUtils.getStrictObjectMapper();
+ /**
+ * Function that extracts a {@link StatsSummary} from a {@link
+ * org.apache.flink.runtime.checkpoint.TaskStateStats.TaskStateStatsSummary}.
+ */
+ @FunctionalInterface
+ interface TaskStatsSummaryExtractor {
+ StatsSummary extract(TaskStateStats.TaskStateStatsSummary taskStateStatsSummary);
+ }
+
+ /** Function that extracts a (long) metric value from {@link SubtaskStateStats}. */
+ @FunctionalInterface
+ interface SubtaskMetricExtractor {
+ long extract(SubtaskStateStats subtaskStateStats);
+ }
+
+ /**
+ * Helper class that defines a checkpoint span metric and how to extract the required values.
+ */
+ static final class CheckpointSpanMetric {
+ final String metricName;
+ final TaskStatsSummaryExtractor taskStatsSummaryExtractor;
+ final SubtaskMetricExtractor subtaskMetricExtractor;
+
+ private CheckpointSpanMetric(
+ String metricName,
+ TaskStatsSummaryExtractor taskStatsSummaryExtractor,
+ SubtaskMetricExtractor subtaskMetricExtractor) {
+ this.metricName = metricName;
+ this.taskStatsSummaryExtractor = taskStatsSummaryExtractor;
+ this.subtaskMetricExtractor = subtaskMetricExtractor;
+ }
+
+ static CheckpointSpanMetric of(
+ String metricName,
+ TaskStatsSummaryExtractor taskStatsSummaryExtractor,
+ SubtaskMetricExtractor subtaskMetricExtractor) {
+ return new CheckpointSpanMetric(
+ metricName, taskStatsSummaryExtractor, subtaskMetricExtractor);
+ }
+ }
+
+ private static final List CHECKPOINT_SPAN_METRICS =
+ Arrays.asList(
+ CheckpointSpanMetric.of(
+ "StateSizeBytes",
+ TaskStateStats.TaskStateStatsSummary::getStateSizeStats,
+ SubtaskStateStats::getStateSize),
+ CheckpointSpanMetric.of(
+ "CheckpointedSizeBytes",
+ TaskStateStats.TaskStateStatsSummary::getCheckpointedSize,
+ SubtaskStateStats::getCheckpointedSize),
+ CheckpointSpanMetric.of(
+ "CheckpointStartDelayMs",
+ TaskStateStats.TaskStateStatsSummary::getCheckpointStartDelayStats,
+ SubtaskStateStats::getCheckpointStartDelay),
+ CheckpointSpanMetric.of(
+ "AlignmentDurationMs",
+ TaskStateStats.TaskStateStatsSummary::getAlignmentDurationStats,
+ SubtaskStateStats::getAlignmentDuration),
+ CheckpointSpanMetric.of(
+ "SyncCheckpointDurationMs",
+ TaskStateStats.TaskStateStatsSummary::getSyncCheckpointDurationStats,
+ SubtaskStateStats::getSyncCheckpointDuration),
+ CheckpointSpanMetric.of(
+ "AsyncCheckpointDurationMs",
+ TaskStateStats.TaskStateStatsSummary::getAsyncCheckpointDurationStats,
+ SubtaskStateStats::getAsyncCheckpointDuration),
+ CheckpointSpanMetric.of(
+ "ProcessedDataBytes",
+ TaskStateStats.TaskStateStatsSummary::getProcessedDataStats,
+ SubtaskStateStats::getProcessedData),
+ CheckpointSpanMetric.of(
+ "PersistedDataBytes",
+ TaskStateStats.TaskStateStatsSummary::getPersistedDataStats,
+ SubtaskStateStats::getPersistedData));
+
+ private final TraceOptions.CheckpointSpanDetailLevel checkpointSpanDetailLevel;
+
/**
* Lock used to update stats and creating snapshots. Updates always happen from a single Thread
* at a time and there can be multiple concurrent read accesses to the latest stats snapshot.
@@ -99,7 +183,11 @@ public class DefaultCheckpointStatsTracker implements CheckpointStatsTracker {
*/
public DefaultCheckpointStatsTracker(
int numRememberedCheckpoints, JobManagerJobMetricGroup metricGroup) {
- this(numRememberedCheckpoints, metricGroup, null);
+ this(
+ numRememberedCheckpoints,
+ metricGroup,
+ TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT,
+ null);
}
/**
@@ -113,10 +201,12 @@ public DefaultCheckpointStatsTracker(
public DefaultCheckpointStatsTracker(
int numRememberedCheckpoints,
JobManagerJobMetricGroup metricGroup,
+ TraceOptions.CheckpointSpanDetailLevel checkpointSpanDetailLevel,
@Nullable CheckpointStatsListener checkpointStatsListener) {
checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints");
this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
this.metricGroup = metricGroup;
+ this.checkpointSpanDetailLevel = checkpointSpanDetailLevel;
this.checkpointStatsListener = checkpointStatsListener;
// Latest snapshot is empty
@@ -264,6 +354,8 @@ private void logCheckpointStatistics(AbstractCheckpointStats checkpointStats) {
.setStartTsMillis(checkpointStats.getTriggerTimestamp())
.setEndTsMillis(checkpointStats.getLatestAckTimestamp());
addCommonCheckpointStatsAttributes(spanBuilder, checkpointStats);
+ // Add max/sum aggregations for breakdown metrics
+ addCheckpointAggregationStats(checkpointStats, spanBuilder);
metricGroup.addSpan(spanBuilder);
if (LOG.isDebugEnabled()) {
@@ -300,6 +392,131 @@ private AttributeBuilder addCommonCheckpointStatsAttributes(
return attributeBuilder;
}
+ private void addCheckpointAggregationStats(
+ AbstractCheckpointStats checkpointStats, SpanBuilder checkpointSpanBuilder) {
+
+ final List sortedTaskStateStats =
+ new ArrayList<>(checkpointStats.getAllTaskStateStats());
+ sortedTaskStateStats.sort(
+ (x, y) ->
+ Long.signum(
+ x.getSummaryStats().getCheckpointStartDelayStats().getMinimum()
+ - y.getSummaryStats()
+ .getCheckpointStartDelayStats()
+ .getMinimum()));
+
+ CHECKPOINT_SPAN_METRICS.stream()
+ .map(metric -> TaskStatsAggregator.aggregate(sortedTaskStateStats, metric))
+ .forEach(
+ aggregator -> {
+ final String metricName = aggregator.getMetricName();
+ checkpointSpanBuilder.setAttribute(
+ "max" + metricName, aggregator.getTotalMax());
+
+ if (!shouldSkipSumMetricNameInCheckpointSpanForCompatibility(
+ metricName)) {
+ checkpointSpanBuilder.setAttribute(
+ "sum" + metricName, aggregator.getTotalSum());
+ }
+
+ if (checkpointSpanDetailLevel
+ == TraceOptions.CheckpointSpanDetailLevel
+ .SPAN_PER_CHECKPOINT_WITH_TASKS) {
+ checkpointSpanBuilder.setAttribute(
+ "perTaskMax" + metricName,
+ Arrays.toString(
+ aggregator.getValuesMax().getInternalArray()));
+ checkpointSpanBuilder.setAttribute(
+ "perTaskSum" + metricName,
+ Arrays.toString(
+ aggregator.getValuesSum().getInternalArray()));
+ }
+ });
+
+ if (checkpointSpanDetailLevel
+ == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_TASK
+ || checkpointSpanDetailLevel
+ == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK) {
+ for (TaskStateStats taskStats : sortedTaskStateStats) {
+ checkpointSpanBuilder.addChild(
+ createTaskSpan(
+ checkpointStats,
+ taskStats,
+ checkpointSpanDetailLevel
+ == TraceOptions.CheckpointSpanDetailLevel
+ .CHILDREN_SPANS_PER_SUBTASK));
+ }
+ }
+ }
+
+ private SpanBuilder createTaskSpan(
+ AbstractCheckpointStats checkpointStats,
+ TaskStateStats taskStats,
+ boolean addSubtaskSpans) {
+
+ // start = trigger ts + minimum delay.
+ long taskStartTs =
+ checkpointStats.getTriggerTimestamp()
+ + taskStats.getSummaryStats().getCheckpointStartDelayStats().getMinimum();
+ SpanBuilder taskSpanBuilder =
+ Span.builder(CheckpointStatsTracker.class, "Checkpoint_Task")
+ .setStartTsMillis(taskStartTs)
+ .setEndTsMillis(taskStats.getLatestAckTimestamp())
+ .setAttribute("checkpointId", checkpointStats.getCheckpointId())
+ .setAttribute("jobVertexId", taskStats.getJobVertexId().toString());
+
+ for (CheckpointSpanMetric spanMetric : CHECKPOINT_SPAN_METRICS) {
+ String metricName = spanMetric.metricName;
+ StatsSummary statsSummary =
+ spanMetric.taskStatsSummaryExtractor.extract(taskStats.getSummaryStats());
+ taskSpanBuilder.setAttribute("max" + metricName, statsSummary.getMaximum());
+ taskSpanBuilder.setAttribute("sum" + metricName, statsSummary.getSum());
+ }
+
+ if (addSubtaskSpans) {
+ addSubtaskSpans(checkpointStats, taskStats, taskSpanBuilder);
+ }
+
+ return taskSpanBuilder;
+ }
+
+ private void addSubtaskSpans(
+ AbstractCheckpointStats checkpointStats,
+ TaskStateStats taskStats,
+ SpanBuilder taskSpanBuilder) {
+ for (SubtaskStateStats subtaskStat : taskStats.getSubtaskStats()) {
+ if (subtaskStat == null) {
+ continue;
+ }
+
+ // start = trigger ts + minimum delay.
+ long subTaskStartTs =
+ checkpointStats.getTriggerTimestamp() + subtaskStat.getCheckpointStartDelay();
+
+ SpanBuilder subTaskSpanBuilder =
+ Span.builder(CheckpointStatsTracker.class, "Checkpoint_Subtask")
+ .setStartTsMillis(subTaskStartTs)
+ .setEndTsMillis(subtaskStat.getAckTimestamp())
+ .setAttribute("checkpointId", checkpointStats.getCheckpointId())
+ .setAttribute("jobVertexId", taskStats.getJobVertexId().toString())
+ .setAttribute("subtaskId", subtaskStat.getSubtaskIndex());
+
+ for (CheckpointSpanMetric spanMetric : CHECKPOINT_SPAN_METRICS) {
+ String metricName = spanMetric.metricName;
+ long metricValue = spanMetric.subtaskMetricExtractor.extract(subtaskStat);
+ subTaskSpanBuilder.setAttribute(metricName, metricValue);
+ }
+
+ taskSpanBuilder.addChild(subTaskSpanBuilder);
+ }
+ }
+
+ private boolean shouldSkipSumMetricNameInCheckpointSpanForCompatibility(String metricName) {
+ // Those two metrics already exists under different names that we want to preserve
+ // (fullSize, checkpointedSize).
+ return metricName.equals("StateSizeBytes") || metricName.equals("CheckpointedSizeBytes");
+ }
+
@Override
public void reportFailedCheckpointsWithoutInProgress() {
statsReadWriteLock.lock();
@@ -649,4 +866,55 @@ public Long getValue() {
}
}
}
+
+ static class TaskStatsAggregator {
+ final String metricName;
+ final LongArrayList valuesMax;
+ final LongArrayList valuesSum;
+
+ TaskStatsAggregator(String metric, LongArrayList valuesMax, LongArrayList valuesSum) {
+ this.metricName = metric;
+ this.valuesMax = valuesMax;
+ this.valuesSum = valuesSum;
+ }
+
+ public static TaskStatsAggregator aggregate(
+ Collection allTaskStateStats,
+ CheckpointSpanMetric metricDescriptor) {
+
+ final LongArrayList valuesMax = new LongArrayList(allTaskStateStats.size());
+ final LongArrayList valuesSum = new LongArrayList(allTaskStateStats.size());
+ for (TaskStateStats taskStats : allTaskStateStats) {
+ StatsSummary statsSummary =
+ metricDescriptor.taskStatsSummaryExtractor.extract(
+ taskStats.getSummaryStats());
+ valuesMax.add(statsSummary.getMaximum());
+ valuesSum.add(statsSummary.getSum());
+ }
+ return new TaskStatsAggregator(metricDescriptor.metricName, valuesMax, valuesSum);
+ }
+
+ public LongArrayList getValuesMax() {
+ return valuesMax;
+ }
+
+ public LongArrayList getValuesSum() {
+ return valuesSum;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public long getTotalMax() {
+ return Arrays.stream(valuesMax.getInternalArray())
+ .filter(val -> val > 0L)
+ .max()
+ .orElse(0L);
+ }
+
+ public long getTotalSum() {
+ return Arrays.stream(valuesSum.getInternalArray()).filter(val -> val >= 0L).sum();
+ }
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 1ec53f6efd9e0..b7e03f8058ce4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -135,6 +135,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import static org.apache.flink.configuration.TraceOptions.CHECKPOINT_SPAN_DETAIL_LEVEL;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -247,7 +248,9 @@ public SchedulerBase(
new DefaultCheckpointStatsTracker(
jobMasterConfiguration.get(
WebOptions.CHECKPOINTS_HISTORY_SIZE),
- jobManagerJobMetricGroup));
+ jobManagerJobMetricGroup,
+ jobMasterConfiguration.get(CHECKPOINT_SPAN_DETAIL_LEVEL),
+ null));
this.executionGraph =
createAndRestoreExecutionGraph(
completedCheckpointStore,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 158b462d69215..d8219d595428d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -158,6 +158,7 @@
import java.util.function.Supplier;
import static org.apache.flink.configuration.JobManagerOptions.SCHEDULER_RESCALE_TRIGGER_MAX_DELAY;
+import static org.apache.flink.configuration.TraceOptions.CHECKPOINT_SPAN_DETAIL_LEVEL;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphUtils.isAnyOutputBlocking;
/**
@@ -455,6 +456,7 @@ public AdaptiveScheduler(
new DefaultCheckpointStatsTracker(
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
metricGroup,
+ configuration.get(CHECKPOINT_SPAN_DETAIL_LEVEL),
checkpointStatsListener),
jobGraph,
jobResourceRequirements,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
index 2cde638096daa..9784c45a6a39f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
@@ -67,6 +67,10 @@ public long[] toArray() {
return Arrays.copyOf(array, size);
}
+ public long[] getInternalArray() {
+ return array;
+ }
+
private void grow(int length) {
if (length > array.length) {
final int newLength =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
index 4a08a733f3e61..b3bbe4b3d9b02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.events.Event;
import org.apache.flink.events.EventBuilder;
@@ -32,6 +33,7 @@
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
+import org.apache.flink.util.CollectionUtil;
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
@@ -287,6 +289,7 @@ public void onFailedCheckpoint() {
new DefaultCheckpointStatsTracker(
10,
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+ TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT,
listener);
// "factory" code to enable the instantiation of test data based on a PendingCheckpointStats
@@ -434,6 +437,254 @@ private void assertCheckpointAttributes(
assertThat(attributes.get("isUnaligned")).isEqualTo(Boolean.toString(isUnaligned));
}
+ @Test
+ public void testSpanCreationBreakDownPerCheckpoint() {
+ testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT);
+ }
+
+ @Test
+ public void testSpanCreationBreakDownPerCheckpointWithTasks() {
+ testSpanCreationTemplate(
+ TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT_WITH_TASKS);
+ }
+
+ @Test
+ public void testSpanCreationBreakDownPerTask() {
+ testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_TASK);
+ }
+
+ @Test
+ public void testSpanCreationBreakDownPerSubTask() {
+ testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK);
+ }
+
+ public void testSpanCreationTemplate(TraceOptions.CheckpointSpanDetailLevel detailLevel) {
+ JobVertexID jobVertexID0 = new JobVertexID();
+ JobVertexID jobVertexID1 = new JobVertexID();
+
+ final List reportedSpans = new ArrayList<>();
+ final List reportedEvents = new ArrayList<>();
+
+ produceTestSpans(jobVertexID0, jobVertexID1, detailLevel, reportedSpans, reportedEvents);
+ assertThat(reportedSpans).hasSize(1);
+ assertThat(reportedEvents).hasSize(1);
+
+ Map expected = new HashMap<>();
+ expected.put("checkpointId", 42L);
+ expected.put("checkpointedSize", 37L);
+ expected.put("fullSize", 40L);
+ expected.put("checkpointStatus", "COMPLETED");
+ expected.put("checkpointType", "Checkpoint");
+ expected.put("isUnaligned", "true");
+ expected.put("metadataSize", 1984L);
+
+ assertThat(reportedEvents.get(0).getAttributes())
+ .containsExactlyInAnyOrderEntriesOf(expected);
+
+ expected.put("maxCheckpointStartDelayMs", 29L);
+ expected.put("maxPersistedDataBytes", 27L);
+ expected.put("maxAsyncCheckpointDurationMs", 25L);
+ expected.put("maxSyncCheckpointDurationMs", 24L);
+ expected.put("maxAlignmentDurationMs", 28L);
+ expected.put("maxProcessedDataBytes", 26L);
+ expected.put("sumCheckpointStartDelayMs", 58L);
+ expected.put("sumAlignmentDurationMs", 55L);
+ expected.put("sumProcessedDataBytes", 49L);
+ expected.put("sumAsyncCheckpointDurationMs", 46L);
+ expected.put("sumPersistedDataBytes", 52L);
+ expected.put("sumSyncCheckpointDurationMs", 43L);
+ expected.put("maxStateSizeBytes", 23L);
+ expected.put("maxCheckpointedSizeBytes", 22L);
+
+ if (detailLevel == TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT_WITH_TASKS) {
+ expected.put("perTaskMaxAlignmentDurationMs", "[28, 8]");
+ expected.put("perTaskMaxAsyncCheckpointDurationMs", "[16, 25]");
+ expected.put("perTaskMaxCheckpointStartDelayMs", "[20, 29]");
+ expected.put("perTaskMaxPersistedDataBytes", "[18, 27]");
+ expected.put("perTaskMaxProcessedDataBytes", "[17, 26]");
+ expected.put("perTaskMaxSyncCheckpointDurationMs", "[24, 4]");
+ expected.put("perTaskMaxStateSizeBytes", "[14, 23]");
+ expected.put("perTaskMaxCheckpointedSizeBytes", "[13, 22]");
+ expected.put("perTaskSumAlignmentDurationMs", "[47, 8]");
+ expected.put("perTaskSumAsyncCheckpointDurationMs", "[21, 25]");
+ expected.put("perTaskSumCheckpointStartDelayMs", "[29, 29]");
+ expected.put("perTaskSumPersistedDataBytes", "[25, 27]");
+ expected.put("perTaskSumProcessedDataBytes", "[23, 26]");
+ expected.put("perTaskSumSyncCheckpointDurationMs", "[39, 4]");
+ expected.put("perTaskSumStateSizeBytes", "[17, 23]");
+ expected.put("perTaskSumCheckpointedSizeBytes", "[15, 22]");
+ }
+
+ Span checkpointLevelSpan = reportedSpans.get(0);
+ assertThat(checkpointLevelSpan.getAttributes())
+ .containsExactlyInAnyOrderEntriesOf(expected);
+
+ List taskLevelSpans = checkpointLevelSpan.getChildren();
+ if (detailLevel == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_TASK
+ || detailLevel
+ == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK) {
+ assertThat(taskLevelSpans.size()).isEqualTo(2);
+ } else {
+ assertThat(taskLevelSpans).isEmpty();
+ return;
+ }
+
+ Span taskSpan0 = taskLevelSpans.get(0);
+ expected.clear();
+
+ expected.put("checkpointId", 42L);
+ expected.put("jobVertexId", jobVertexID0.toString());
+ expected.put("maxCheckpointStartDelayMs", 20L);
+ expected.put("maxPersistedDataBytes", 18L);
+ expected.put("maxAsyncCheckpointDurationMs", 16L);
+ expected.put("maxSyncCheckpointDurationMs", 24L);
+ expected.put("maxAlignmentDurationMs", 28L);
+ expected.put("maxProcessedDataBytes", 17L);
+ expected.put("sumCheckpointStartDelayMs", 29L);
+ expected.put("sumAlignmentDurationMs", 47L);
+ expected.put("sumProcessedDataBytes", 23L);
+ expected.put("sumAsyncCheckpointDurationMs", 21L);
+ expected.put("sumPersistedDataBytes", 25L);
+ expected.put("sumSyncCheckpointDurationMs", 39L);
+ expected.put("maxStateSizeBytes", 14L);
+ expected.put("sumStateSizeBytes", 17L);
+ expected.put("maxCheckpointedSizeBytes", 13L);
+ expected.put("sumCheckpointedSizeBytes", 15L);
+
+ assertThat(taskSpan0.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected);
+
+ Span taskSpan1 = taskLevelSpans.get(1);
+ expected.clear();
+
+ expected.put("checkpointId", 42L);
+ expected.put("jobVertexId", jobVertexID1.toString());
+ expected.put("maxCheckpointStartDelayMs", 29L);
+ expected.put("maxPersistedDataBytes", 27L);
+ expected.put("maxAsyncCheckpointDurationMs", 25L);
+ expected.put("maxSyncCheckpointDurationMs", 4L);
+ expected.put("maxAlignmentDurationMs", 8L);
+ expected.put("maxProcessedDataBytes", 26L);
+ expected.put("sumCheckpointStartDelayMs", 29L);
+ expected.put("sumAlignmentDurationMs", 8L);
+ expected.put("sumProcessedDataBytes", 26L);
+ expected.put("sumAsyncCheckpointDurationMs", 25L);
+ expected.put("sumPersistedDataBytes", 27L);
+ expected.put("sumSyncCheckpointDurationMs", 4L);
+ expected.put("maxStateSizeBytes", 23L);
+ expected.put("sumStateSizeBytes", 23L);
+ expected.put("maxCheckpointedSizeBytes", 22L);
+ expected.put("sumCheckpointedSizeBytes", 22L);
+ assertThat(taskSpan1.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected);
+
+ List subtasksSpans0 = taskSpan0.getChildren();
+ List subtasksSpans1 = taskSpan1.getChildren();
+
+ if (detailLevel == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK) {
+ assertThat(subtasksSpans0.size()).isEqualTo(2);
+ assertThat(subtasksSpans1.size()).isEqualTo(1);
+ } else {
+ assertThat(subtasksSpans0).isEmpty();
+ assertThat(subtasksSpans1).isEmpty();
+ return;
+ }
+
+ Span subtaskSpan0N0 = subtasksSpans0.get(0);
+ expected.clear();
+
+ expected.put("checkpointId", 42L);
+ expected.put("jobVertexId", jobVertexID0.toString());
+ expected.put("subtaskId", 0L);
+ expected.put("CheckpointStartDelayMs", 9L);
+ expected.put("AlignmentDurationMs", 28L);
+ expected.put("ProcessedDataBytes", 6L);
+ expected.put("AsyncCheckpointDurationMs", 5L);
+ expected.put("PersistedDataBytes", 7L);
+ expected.put("SyncCheckpointDurationMs", 24L);
+ expected.put("StateSizeBytes", 3L);
+ expected.put("CheckpointedSizeBytes", 2L);
+ assertThat(subtaskSpan0N0.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected);
+
+ Span subtaskSpan0N1 = subtasksSpans0.get(1);
+ expected.clear();
+
+ expected.put("checkpointId", 42L);
+ expected.put("jobVertexId", jobVertexID0.toString());
+ expected.put("subtaskId", 1L);
+ expected.put("CheckpointStartDelayMs", 20L);
+ expected.put("AlignmentDurationMs", 19L);
+ expected.put("ProcessedDataBytes", 17L);
+ expected.put("AsyncCheckpointDurationMs", 16L);
+ expected.put("PersistedDataBytes", 18L);
+ expected.put("SyncCheckpointDurationMs", 15L);
+ expected.put("StateSizeBytes", 14L);
+ expected.put("CheckpointedSizeBytes", 13L);
+ assertThat(subtaskSpan0N1.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected);
+
+ Span subtaskSpan1N10 = subtasksSpans1.get(0);
+ expected.clear();
+
+ expected.put("checkpointId", 42L);
+ expected.put("jobVertexId", jobVertexID1.toString());
+ expected.put("subtaskId", 0L);
+ expected.put("CheckpointStartDelayMs", 29L);
+ expected.put("AlignmentDurationMs", 8L);
+ expected.put("ProcessedDataBytes", 26L);
+ expected.put("AsyncCheckpointDurationMs", 25L);
+ expected.put("PersistedDataBytes", 27L);
+ expected.put("SyncCheckpointDurationMs", 4L);
+ expected.put("StateSizeBytes", 23L);
+ expected.put("CheckpointedSizeBytes", 22L);
+ assertThat(subtaskSpan1N10.getAttributes()).containsExactlyInAnyOrderEntriesOf(expected);
+ }
+
+ private List produceTestSpans(
+ JobVertexID jobVertexID0,
+ JobVertexID jobVertexID1,
+ TraceOptions.CheckpointSpanDetailLevel detailLevel,
+ List reportedSpansOut,
+ List reportedEventsOut) {
+
+ JobManagerJobMetricGroup metricGroup =
+ new UnregisteredMetricGroups.UnregisteredJobManagerJobMetricGroup() {
+
+ @Override
+ public void addSpan(SpanBuilder spanBuilder) {
+ reportedSpansOut.add(spanBuilder.build());
+ }
+
+ @Override
+ public void addEvent(EventBuilder eventBuilder) {
+ reportedEventsOut.add(eventBuilder.build());
+ }
+ };
+
+ CheckpointStatsTracker tracker =
+ new DefaultCheckpointStatsTracker(10, metricGroup, detailLevel, null);
+
+ Map subtasksByVertex = CollectionUtil.newHashMapWithExpectedSize(2);
+ subtasksByVertex.put(jobVertexID0, 2);
+ subtasksByVertex.put(jobVertexID1, 1);
+ PendingCheckpointStats pending =
+ tracker.reportPendingCheckpoint(
+ 42,
+ 1,
+ CheckpointProperties.forCheckpoint(
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ subtasksByVertex);
+
+ pending.reportSubtaskStats(
+ jobVertexID0, new SubtaskStateStats(0, 1, 2, 3, 24, 5, 6, 7, 28, 9, false, true));
+ pending.reportSubtaskStats(
+ jobVertexID0,
+ new SubtaskStateStats(1, 12, 13, 14, 15, 16, 17, 18, 19, 20, false, true));
+ pending.reportSubtaskStats(
+ jobVertexID1,
+ new SubtaskStateStats(0, 21, 22, 23, 4, 25, 26, 27, 8, 29, true, true));
+ // Complete checkpoint => new snapshot
+ tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null, 1984));
+ return reportedSpansOut;
+ }
+
@Test
public void testInitializationSpanCreation() throws Exception {
final List reportedSpans = new ArrayList<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index b6626e97ace48..405ef99baac3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -21,6 +21,7 @@
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
@@ -106,7 +107,10 @@ public class AdaptiveSchedulerBuilder {
checkpointStatsTrackerFactory =
(metricGroup, checkpointStatsListener) ->
new DefaultCheckpointStatsTracker(
- 10, metricGroup, checkpointStatsListener);
+ 10,
+ metricGroup,
+ TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT,
+ checkpointStatsListener);
public AdaptiveSchedulerBuilder(
final JobGraph jobGraph,