Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions docs/content.zh/docs/ops/traces.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,30 @@ Flink exposes a tracing system that allows gathering and exposing traces to exte
## Reporting traces

You can access the tracing system from any user function that extends [RichFunction]({{< ref "docs/dev/datastream/user_defined_functions" >}}#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
This method returns a `MetricGroup` object via which you can report a new single span trace.
This method returns a `MetricGroup` object via which you can report a new single trace with tree of spans.

### Reporting single Span


A `Span` represents some process that happened in Flink at certain point of time for a certain duration, that will be reported to a `TraceReporter`.
To report a `Span` you can use the `MetricGroup#addSpan(SpanBuilder)` method.

Currently, we don't support traces with multiple spans. Each `Span` is self-contained and represents things like a checkpoint or recovery.
Currently, we support traces with a single tree of spans, but all the children spans have to be reported all at once in one `MetricGroup#addSpan` call.
You can not report child or parent spans independently.
{{< tabs "9612d275-bdda-4322-a01f-ae6da805e917" >}}
{{< tab "Java" >}}
```java
public class MyClass {
void doSomething() {
// (...)
metricGroup.addSpan(
Span.builder(MyClass.class, "SomeAction")
.setStartTsMillis(startTs) // Optional
.setEndTsMillis(endTs) // Optional
.setAttribute("foo", "bar");
Span.builder(MyClass.class, "SomeAction")
.setStartTsMillis(startTs) // Optional
.setEndTsMillis(endTs) // Optional
.setAttribute("foo", "bar") // Optional
.addChild(Span.builder(MyClass.class, "ChildAction") // Optional
.addChildren(List.of(
Span.builder(MyClass.class, "AnotherChildAction")); // Optional
}
}
```
Expand Down
16 changes: 10 additions & 6 deletions docs/content/docs/ops/traces.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,30 @@ Flink exposes a tracing system that allows gathering and exposing traces to exte
## Reporting traces

You can access the tracing system from any user function that extends [RichFunction]({{< ref "docs/dev/datastream/user_defined_functions" >}}#rich-functions) by calling `getRuntimeContext().getMetricGroup()`.
This method returns a `MetricGroup` object via which you can report a new single span trace.
This method returns a `MetricGroup` object via which you can report a new single trace with tree of spans.

### Reporting single Span


A `Span` represents some process that happened in Flink at certain point of time for a certain duration, that will be reported to a `TraceReporter`.
To report a `Span` you can use the `MetricGroup#addSpan(SpanBuilder)` method.

Currently, we don't support traces with multiple spans. Each `Span` is self-contained and represents things like a checkpoint or recovery.
Currently, we support traces with a single tree of spans, but all the children spans have to be reported all at once in one `MetricGroup#addSpan` call.
You can not report child or parent spans independently.
{{< tabs "9612d275-bdda-4322-a01f-ae6da805e917" >}}
{{< tab "Java" >}}
```java
public class MyClass {
void doSomething() {
// (...)
metricGroup.addSpan(
Span.builder(MyClass.class, "SomeAction")
.setStartTsMillis(startTs) // Optional
.setEndTsMillis(endTs) // Optional
.setAttribute("foo", "bar");
Span.builder(MyClass.class, "SomeAction")
.setStartTsMillis(startTs) // Optional
.setEndTsMillis(endTs) // Optional
.setAttribute("foo", "bar") // Optional
.addChild(Span.builder(MyClass.class, "ChildAction") // Optional
.addChildren(List.of(
Span.builder(MyClass.class, "AnotherChildAction")); // Optional
}
}
```
Expand Down
7 changes: 7 additions & 0 deletions docs/layouts/shortcodes/generated/trace_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>traces.checkpoint.span-detail-level</h5></td>
<td style="word-wrap: break-word;">SPAN_PER_CHECKPOINT</td>
<td><p>Enum</p></td>
<td>Detail level for reporting checkpoint spans. Possible values:
<ul><li>'<code class="highlighter-rouge">SPAN_PER_CHECKPOINT</code>' (default): Single span per checkpoint. Aggregated sum/max for sub-metrics from all tasks and subtasks per checkpoint.</li><li>'<code class="highlighter-rouge">SPAN_PER_CHECKPOINT_WITH_TASKS</code>': Single span per checkpoint. Same as '<code class="highlighter-rouge">SPAN_PER_CHECKPOINT</code>', plus arrays of aggregated values per task.</li><li>'<code class="highlighter-rouge">CHILDREN_SPANS_PER_TASK</code>': Same as '<code class="highlighter-rouge">SPAN_PER_CHECKPOINT</code>' plus children spans per each task. Each task span with aggregated sum/max sub-metrics from subtasks.</li><li>'<code class="highlighter-rouge">CHILDREN_SPANS_PER_SUBTASK</code>': Same as '<code class="highlighter-rouge">CHILDREN_SPANS_PER_TASK</code>' plus children spans per each subtask. Child spans for tasks and grand-child spans for subtasks.</li></ul><br /><br />Possible values:<ul><li>"SPAN_PER_CHECKPOINT"</li><li>"SPAN_PER_CHECKPOINT_WITH_TASKS"</li><li>"CHILDREN_SPANS_PER_TASK"</li><li>"CHILDREN_SPANS_PER_SUBTASK"</li></ul></td>
</tr>
<tr>
<td><h5>traces.reporter.&lt;name&gt;.&lt;parameter&gt;</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@
@Experimental
public class TraceOptions {

/** Enum for the detail level of checkpointing spans. */
public enum CheckpointSpanDetailLevel {
/** Sum/Max for sub-metrics per checkpoint. */
SPAN_PER_CHECKPOINT,
/** Sum/Max for sub-metrics per checkpoint and arrays of task aggregates. */
SPAN_PER_CHECKPOINT_WITH_TASKS,
/** Sub/Max for sub-metrics of checkpoint and tasks (tasks as child spans). */
CHILDREN_SPANS_PER_TASK,
/**
* Sub/Max for sub-metrics of checkpoint, tasks, and subtasks (tasks as child spans,
* subtasks as grand-child spans).
*/
CHILDREN_SPANS_PER_SUBTASK;
}

private static final String NAMED_REPORTER_CONFIG_PREFIX =
ConfigConstants.TRACES_REPORTER_PREFIX + "<name>";

Expand Down Expand Up @@ -67,6 +82,59 @@ public static Configuration forReporter(Configuration configuration, String repo
+ " any of the names in the list will be started. Otherwise, all reporters that could be found in"
+ " the configuration will be started.");

/** The detail level for reporting checkpoint spans. */
public static final ConfigOption<TraceOptions.CheckpointSpanDetailLevel>
CHECKPOINT_SPAN_DETAIL_LEVEL =
key("traces.checkpoint.span-detail-level")
.enumType(CheckpointSpanDetailLevel.class)
.defaultValue(CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT)
.withDescription(
Description.builder()
.text(
"Detail level for reporting checkpoint spans. Possible values:\n")
.list(
text(
"'%s' (default): Single span per checkpoint. "
+ "Aggregated sum/max for sub-metrics from all tasks and subtasks per checkpoint.",
code(
CheckpointSpanDetailLevel
.SPAN_PER_CHECKPOINT
.name())),
text(
"'%s': Single span per checkpoint. "
+ "Same as '%s', plus arrays of aggregated values per task.",
code(
CheckpointSpanDetailLevel
.SPAN_PER_CHECKPOINT_WITH_TASKS
.name()),
code(
CheckpointSpanDetailLevel
.SPAN_PER_CHECKPOINT
.name())),
text(
"'%s': Same as '%s' plus children spans per each task. "
+ "Each task span with aggregated sum/max sub-metrics from subtasks.",
code(
CheckpointSpanDetailLevel
.CHILDREN_SPANS_PER_TASK
.name()),
code(
CheckpointSpanDetailLevel
.SPAN_PER_CHECKPOINT
.name())),
text(
"'%s': Same as '%s' plus children spans per each subtask. "
+ "Child spans for tasks and grand-child spans for subtasks.",
code(
CheckpointSpanDetailLevel
.CHILDREN_SPANS_PER_SUBTASK
.name()),
code(
CheckpointSpanDetailLevel
.CHILDREN_SPANS_PER_TASK
.name())))
.build());

/**
* Returns a view over the given configuration via which options can be set/retrieved for the
* given reporter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import org.apache.flink.annotation.Internal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Basic implementation of {@link Span}. */
Expand All @@ -31,6 +34,7 @@ public class SimpleSpan implements Span {
private final String name;

private final HashMap<String, Object> attributes = new HashMap<>();
private final List<Span> children = new ArrayList<>();
private final long startTsMillis;
private final long endTsMillis;

Expand All @@ -39,13 +43,15 @@ public SimpleSpan(
String name,
long startTsMillis,
long endTsMillis,
HashMap<String, Object> attributes) {
HashMap<String, Object> attributes,
List<Span> children) {

this.scope = scope;
this.name = name;
this.startTsMillis = startTsMillis;
this.endTsMillis = endTsMillis;
this.attributes.putAll(attributes);
this.children.addAll(children);
}

@Override
Expand Down Expand Up @@ -73,6 +79,11 @@ public Map<String, Object> getAttributes() {
return attributes;
}

@Override
public List<Span> getChildren() {
return Collections.unmodifiableList(children);
}

@Override
public String toString() {
return SimpleSpan.class.getSimpleName()
Expand All @@ -87,6 +98,8 @@ public String toString() {
+ endTsMillis
+ ", attributes="
+ attributes
+ ", children="
+ children
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@

import org.apache.flink.annotation.Experimental;

import java.util.List;
import java.util.Map;

/**
* Span represents something that happened in Flink at certain point of time, that will be reported
* to a {@link org.apache.flink.traces.reporter.TraceReporter}.
*
* <p>Currently we don't support traces with multiple spans. Each span is self-contained and
* represents things like a checkpoint or recovery.
*/
@Experimental
public interface Span {
Expand All @@ -49,4 +47,7 @@ static SpanBuilder builder(Class<?> classScope, String name) {
* added in the future.
*/
Map<String, Object> getAttributes();

/** Returns the child spans (= nested). */
List<Span> getChildren();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
import org.apache.flink.AttributeBuilder;
import org.apache.flink.annotation.Experimental;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/** Builder used to construct {@link Span}. See {@link Span#builder(Class, String)}. */
@Experimental
public class SpanBuilder implements AttributeBuilder {
private final HashMap<String, Object> attributes = new HashMap<>();
private final List<SpanBuilder> children = new ArrayList<>();
private final Class<?> classScope;
private final String name;
private long startTsMillis;
Expand Down Expand Up @@ -68,7 +72,10 @@ public Span build(Map<String, String> additionalVariables) {
name,
startTsMillisToBuild,
endTsMillisToBuild,
attributes);
attributes,
children.stream()
.map(childBuilder -> childBuilder.build(additionalVariables))
.collect(Collectors.toList()));
}

/**
Expand Down Expand Up @@ -120,4 +127,16 @@ public SpanBuilder setAttribute(String key, boolean value) {
public String getName() {
return name;
}

/** Adds child spans (= nested). */
public SpanBuilder addChildren(List<SpanBuilder> children) {
this.children.addAll(children);
return this;
}

/** Adds child span (= nested). */
public SpanBuilder addChild(SpanBuilder child) {
this.children.add(child);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,9 @@ private void notifyOfAddedSpanInternal(Span span, io.opentelemetry.api.trace.Spa
.startSpan();

// Recursively add child spans to this parent
// TODO: not yet supported
// for (Span childSpan : span.getChildren()) {
// notifyOfAddedSpanInternal(childSpan, currentOtelSpan);
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove comment? (code is identical to the loop below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been removed?

for (Span childSpan : span.getChildren()) {
notifyOfAddedSpanInternal(childSpan, currentOtelSpan);
}

currentOtelSpan.end(span.getEndTsMillis(), TimeUnit.MILLISECONDS);
}
Expand Down
Loading