Skip to content

Commit b189842

Browse files
authored
Various tracing fixes (#137908) (#138109)
- Only continue tracing in TaskManager if a parent APM trace context exists. Trace headers might be set by external transactions as well. If present, it caused APMTracer to report a transaction for every span when in fact being sampled out. - Correctly report the duration of transactions if not recording by not discarding the root span immediately. These transactions might still get reported, but without spans. - Discard transient trace start time in `newTraceContext` when a parent APM trace context already exists. If propagated, all spans would start at the same time. Relates to ES-13389
1 parent b88547c commit b189842

File tree

7 files changed

+113
-77
lines changed

7 files changed

+113
-77
lines changed

modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracer.java

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,9 @@ public void startTrace(TraceContext traceContext, Traceable traceable, String sp
177177

178178
// A span can have a parent span, which here is modelled though a parent span context.
179179
// Setting this is important for seeing a complete trace in the APM UI.
180-
final Context parentContext = getParentContext(traceContext);
180+
// Attempt to fetch a local parent context first, otherwise look for a remote parent
181+
final Context localParentContext = traceContext.getTransient(Task.PARENT_APM_TRACE_CONTEXT);
182+
final Context parentContext = localParentContext != null ? localParentContext : getRemoteParentContext(traceContext);
181183
if (parentContext != null) {
182184
spanBuilder.setParent(parentContext);
183185
}
@@ -188,21 +190,21 @@ public void startTrace(TraceContext traceContext, Traceable traceable, String sp
188190
if (startTime != null) {
189191
spanBuilder.setStartTimestamp(startTime);
190192
}
193+
191194
final Span span = spanBuilder.startSpan();
192-
// If the agent decided not to record this span (e.g., due to transaction_max_spans), isRecording() will be false.
193-
if (span.isRecording() == false) {
194-
logger.trace("Span [{}] [{}] will not be recorded, e.g. due to transaction_max_spans reached", spanId, spanName);
195-
// It's good practice to end the no-op span immediately to release any resources.
196-
span.end();
197-
// Returning null from computeIfAbsent means no value will be inserted into the map.
198-
return null;
195+
// If not a root span (meaning a local parent exists) and the agent decided not to record the span, discard it immediately.
196+
// Root spans (transactions), however, have to be kept to correctly report their duration.
197+
if (localParentContext != null && span.isRecording() == false) {
198+
logger.trace("Span [{}] [{}] will not be recorded due to transaction_max_spans reached", spanId, spanName);
199+
span.end(); // end span immediately to release any resources.
200+
return null; // return null to discard and not record in map of spans
199201
}
200202

201-
// If we are here, the span is real and being recorded.
202-
logger.trace("Successfully started tracing [{}] [{}]", spanId, spanName);
203203
final Context contextForNewSpan = Context.current().with(span);
204-
205-
updateThreadContext(traceContext, services, contextForNewSpan);
204+
if (span.isRecording()) {
205+
logger.trace("Recording trace [{}] [{}]", spanId, spanName);
206+
updateThreadContext(traceContext, services, contextForNewSpan);
207+
}
206208

207209
return contextForNewSpan;
208210
});
@@ -240,30 +242,26 @@ private static void updateThreadContext(TraceContext traceContext, APMServices s
240242
});
241243
}
242244

243-
private Context getParentContext(TraceContext traceContext) {
245+
private Context getRemoteParentContext(TraceContext traceContext) {
244246
// https://github.com/open-telemetry/opentelemetry-java/discussions/2884#discussioncomment-381870
245247
// If you just want to propagate across threads within the same process, you don't need context propagators (extract/inject).
246248
// You can just pass the Context object directly to another thread (it is immutable and thus thread-safe).
247249

248-
// Attempt to fetch a local parent context first, otherwise look for a remote parent
249-
Context parentContext = traceContext.getTransient(Task.PARENT_APM_TRACE_CONTEXT);
250-
if (parentContext == null) {
251-
final String traceParentHeader = traceContext.getTransient(Task.PARENT_TRACE_PARENT_HEADER);
252-
final String traceStateHeader = traceContext.getTransient(Task.PARENT_TRACE_STATE);
253-
254-
if (traceParentHeader != null) {
255-
final Map<String, String> traceContextMap = Maps.newMapWithExpectedSize(2);
256-
// traceparent and tracestate should match the keys used by W3CTraceContextPropagator
257-
traceContextMap.put(Task.TRACE_PARENT_HTTP_HEADER, traceParentHeader);
258-
if (traceStateHeader != null) {
259-
traceContextMap.put(Task.TRACE_STATE, traceStateHeader);
260-
}
261-
parentContext = services.openTelemetry.getPropagators()
262-
.getTextMapPropagator()
263-
.extract(Context.current(), traceContextMap, new MapKeyGetter());
250+
final String traceParentHeader = traceContext.getTransient(Task.PARENT_TRACE_PARENT_HEADER);
251+
final String traceStateHeader = traceContext.getTransient(Task.PARENT_TRACE_STATE);
252+
253+
if (traceParentHeader != null) {
254+
final Map<String, String> traceContextMap = Maps.newMapWithExpectedSize(2);
255+
// traceparent and tracestate should match the keys used by W3CTraceContextPropagator
256+
traceContextMap.put(Task.TRACE_PARENT_HTTP_HEADER, traceParentHeader);
257+
if (traceStateHeader != null) {
258+
traceContextMap.put(Task.TRACE_STATE, traceStateHeader);
264259
}
260+
return services.openTelemetry.getPropagators()
261+
.getTextMapPropagator()
262+
.extract(Context.current(), traceContextMap, new MapKeyGetter());
265263
}
266-
return parentContext;
264+
return null;
267265
}
268266

269267
/**
@@ -288,7 +286,7 @@ private Context getParentContext(TraceContext traceContext) {
288286
@Override
289287
public Releasable withScope(Traceable traceable) {
290288
final Context context = spans.get(traceable.getSpanId());
291-
if (context != null) {
289+
if (context != null && Span.fromContextOrNull(context).isRecording()) {
292290
return context.makeCurrent()::close;
293291
}
294292
return () -> {};
@@ -385,9 +383,10 @@ public void setAttribute(Traceable traceable, String key, String value) {
385383

386384
@Override
387385
public void stopTrace(Traceable traceable) {
388-
final var span = Span.fromContextOrNull(spans.remove(traceable.getSpanId()));
386+
final String spanId = traceable.getSpanId();
387+
final var span = Span.fromContextOrNull(spans.remove(spanId));
389388
if (span != null) {
390-
logger.trace("Finishing trace [{}]", traceable);
389+
logger.trace("Finishing trace [{}]", spanId);
391390
span.end();
392391
}
393392
}

modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/tracing/APMTracerTests.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.util.concurrent.ThreadContext;
2424
import org.elasticsearch.tasks.Task;
2525
import org.elasticsearch.telemetry.apm.internal.APMAgentSettings;
26+
import org.elasticsearch.telemetry.tracing.TraceContext;
2627
import org.elasticsearch.telemetry.tracing.Traceable;
2728
import org.elasticsearch.test.ESTestCase;
2829
import org.elasticsearch.test.junit.annotations.TestLogging;
@@ -42,6 +43,7 @@
4243
import static org.hamcrest.Matchers.is;
4344
import static org.hamcrest.Matchers.not;
4445
import static org.hamcrest.Matchers.notNullValue;
46+
import static org.hamcrest.Matchers.nullValue;
4547
import static org.mockito.ArgumentMatchers.anyString;
4648
import static org.mockito.Mockito.doAnswer;
4749
import static org.mockito.Mockito.mock;
@@ -87,22 +89,29 @@ public void test_onTraceStarted_startsTrace() {
8789
Settings settings = Settings.builder().put(APMAgentSettings.TELEMETRY_TRACING_ENABLED_SETTING.getKey(), true).build();
8890
APMTracer apmTracer = buildTracer(settings);
8991

90-
apmTracer.startTrace(new ThreadContext(settings), TRACEABLE1, "name1", null);
92+
ThreadContext traceContext = new ThreadContext(settings);
93+
apmTracer.startTrace(traceContext, TRACEABLE1, "name1", null);
9194

95+
assertThat(traceContext.getTransient(Task.APM_TRACE_CONTEXT), notNullValue());
9296
assertThat(apmTracer.getSpans(), aMapWithSize(1));
9397
assertThat(apmTracer.getSpans(), hasKey(TRACEABLE1.getSpanId()));
9498
}
9599

96100
/**
97-
* Check that when a trace is started, but it is not recorded, e.g. due to sampling, the tracer does not record it either.
101+
* Check that when a root trace is started, but it is not recorded, e.g. due to sampling,
102+
* the tracer tracks it but doesn't start tracing.
98103
*/
99-
public void test_onTraceStarted_ifNotRecorded_doesNotStartTrace() {
104+
public void test_onTraceStarted_ifNotRecorded_doesNotStartTracing() {
100105
Settings settings = Settings.builder().put(APMAgentSettings.TELEMETRY_TRACING_ENABLED_SETTING.getKey(), true).build();
101106
APMTracer apmTracer = buildTracer(settings);
102107

103-
apmTracer.startTrace(new ThreadContext(settings), TRACEABLE1, "name1_discard", null);
108+
ThreadContext traceContext = new ThreadContext(settings);
109+
apmTracer.startTrace(traceContext, TRACEABLE1, "name1_discard", null);
104110

105-
assertThat(apmTracer.getSpans(), anEmptyMap());
111+
assertThat(traceContext.getTransient(Task.APM_TRACE_CONTEXT), nullValue());
112+
// the root span (transaction) is tracked
113+
assertThat(apmTracer.getSpans(), aMapWithSize(1));
114+
assertThat(apmTracer.getSpans(), hasKey(TRACEABLE1.getSpanId()));
106115
}
107116

108117
/**
@@ -116,8 +125,11 @@ public void test_onNestedTraceStarted_ifNotRecorded_doesNotStartTrace() {
116125
apmTracer.startTrace(traceContext, TRACEABLE1, "name1", null);
117126
try (var ignore1 = traceContext.newTraceContext()) {
118127
apmTracer.startTrace(traceContext, TRACEABLE2, "name2_discard", null);
128+
assertThat(traceContext.getTransient(Task.APM_TRACE_CONTEXT), nullValue());
129+
119130
try (var ignore2 = traceContext.newTraceContext()) {
120131
apmTracer.startTrace(traceContext, TRACEABLE3, "name3_discard", null);
132+
assertThat(traceContext.getTransient(Task.APM_TRACE_CONTEXT), nullValue());
121133
}
122134
}
123135
assertThat(apmTracer.getSpans(), aMapWithSize(1));
@@ -131,12 +143,13 @@ public void test_onTraceStartedWithStartTime_startsTrace() {
131143
Settings settings = Settings.builder().put(APMAgentSettings.TELEMETRY_TRACING_ENABLED_SETTING.getKey(), true).build();
132144
APMTracer apmTracer = buildTracer(settings);
133145

134-
ThreadContext threadContext = new ThreadContext(settings);
146+
TraceContext traceContext = new ThreadContext(settings);
135147
// 1_000_000L because of "toNanos" conversions that overflow for large long millis
136148
Instant spanStartTime = Instant.ofEpochMilli(randomLongBetween(0, Long.MAX_VALUE / 1_000_000L));
137-
threadContext.putTransient(Task.TRACE_START_TIME, spanStartTime);
138-
apmTracer.startTrace(threadContext, TRACEABLE1, "name1", null);
149+
traceContext.putTransient(Task.TRACE_START_TIME, spanStartTime);
150+
apmTracer.startTrace(traceContext, TRACEABLE1, "name1", null);
139151

152+
assertThat(traceContext.getTransient(Task.APM_TRACE_CONTEXT), notNullValue());
140153
assertThat(apmTracer.getSpans(), aMapWithSize(1));
141154
assertThat(apmTracer.getSpans(), hasKey(TRACEABLE1.getSpanId()));
142155
assertThat(((SpyAPMTracer) apmTracer).getSpanStartTime("name1"), is(spanStartTime));
@@ -151,6 +164,7 @@ public void test_onTraceStopped_stopsTrace() {
151164

152165
apmTracer.startTrace(new ThreadContext(settings), TRACEABLE1, "name1", null);
153166
apmTracer.stopTrace(TRACEABLE1);
167+
apmTracer.stopTrace(TRACEABLE2); // stopping a non-existent trace is a noop
154168

155169
assertThat(apmTracer.getSpans(), anEmptyMap());
156170
}

server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,13 @@ public StoredContext newTraceContext() {
204204

205205
// this is the context when this method returns
206206
final ThreadContextStruct newContext;
207-
if (originalContext.hasTraceContext() == false) {
207+
208+
final boolean hasTraceHeaders = originalContext.requestHeaders.containsKey(Task.TRACE_PARENT_HTTP_HEADER)
209+
|| originalContext.requestHeaders.containsKey(Task.TRACE_STATE)
210+
|| originalContext.transientHeaders.containsKey(Task.APM_TRACE_CONTEXT);
211+
212+
if (hasTraceHeaders == false) {
213+
// no need to copy if no trace headers are present
208214
newContext = originalContext;
209215
} else {
210216
final Map<String, String> newRequestHeaders = new HashMap<>(originalContext.requestHeaders);
@@ -223,6 +229,9 @@ public StoredContext newTraceContext() {
223229
final Object previousTraceContext = newTransientHeaders.remove(Task.APM_TRACE_CONTEXT);
224230
if (previousTraceContext != null) {
225231
newTransientHeaders.put(Task.PARENT_APM_TRACE_CONTEXT, previousTraceContext);
232+
// Remove the trace start time override for a previous context if such a context already exists.
233+
// If kept, all spans would contain the same start time.
234+
newTransientHeaders.remove(Task.TRACE_START_TIME);
226235
}
227236

228237
newContext = new ThreadContextStruct(
@@ -246,12 +255,12 @@ public StoredContext newTraceContext() {
246255
};
247256
}
248257

249-
public boolean hasTraceContext() {
250-
return threadLocal.get().hasTraceContext();
258+
public boolean hasApmTraceContext() {
259+
return threadLocal.get().hasApmTraceContext();
251260
}
252261

253-
public boolean hasParentTraceContext() {
254-
return threadLocal.get().hasParentTraceContext();
262+
public boolean hasParentApmTraceContext() {
263+
return threadLocal.get().hasParentApmTraceContext();
255264
}
256265

257266
/**
@@ -644,6 +653,10 @@ public <T> T getTransient(String key) {
644653
return (T) threadLocal.get().transientHeaders.get(key);
645654
}
646655

656+
public boolean hasTransient(Collection<String> keys) {
657+
return threadLocal.get().transientHeaders.keySet().containsAll(keys);
658+
}
659+
647660
/**
648661
* Returns unmodifiable copy of all transient headers.
649662
*/
@@ -873,18 +886,6 @@ private ThreadContextStruct putResponseHeaders(Map<String, Set<String>> headers)
873886
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext);
874887
}
875888

876-
private boolean hasTraceContext() {
877-
return requestHeaders.containsKey(Task.TRACE_PARENT_HTTP_HEADER)
878-
|| requestHeaders.containsKey(Task.TRACE_STATE)
879-
|| transientHeaders.containsKey(Task.APM_TRACE_CONTEXT);
880-
}
881-
882-
private boolean hasParentTraceContext() {
883-
return transientHeaders.containsKey(Task.PARENT_TRACE_PARENT_HEADER)
884-
|| transientHeaders.containsKey(Task.PARENT_TRACE_STATE)
885-
|| transientHeaders.containsKey(Task.PARENT_APM_TRACE_CONTEXT);
886-
}
887-
888889
private void logWarningHeaderThresholdExceeded(long threshold, Setting<?> thresholdSetting) {
889890
// If available, log some selected headers to help identifying the source of the request.
890891
// Note: Only Task.HEADERS_TO_COPY are guaranteed to be preserved at this point.
@@ -963,7 +964,18 @@ private ThreadContextStruct putResponse(
963964
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext, newWarningHeaderSize);
964965
}
965966

967+
private boolean hasApmTraceContext() {
968+
return transientHeaders.containsKey(Task.APM_TRACE_CONTEXT);
969+
}
970+
971+
private boolean hasParentApmTraceContext() {
972+
return transientHeaders.containsKey(Task.PARENT_APM_TRACE_CONTEXT);
973+
}
974+
966975
private ThreadContextStruct putTransient(String key, Object value) {
976+
assert key != Task.TRACE_START_TIME || (hasApmTraceContext() || hasParentApmTraceContext()) == false
977+
: "trace.starttime cannot be set after a trace context is present";
978+
967979
Map<String, Object> newTransient = new HashMap<>(this.transientHeaders);
968980
putSingleHeader(key, value, newTransient);
969981
return new ThreadContextStruct(requestHeaders, responseHeaders, newTransient, isSystemContext);

server/src/main/java/org/elasticsearch/tasks/Task.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ public class Task implements Traceable {
6060
*/
6161
public static final String TRACE_STATE = "tracestate";
6262

63+
/**
64+
* Optional transient header allowing to override the start time of the root trace.
65+
* This is discarded when creating a new trace context once an APM trace context exists.
66+
*/
6367
public static final String TRACE_START_TIME = "trace.starttime";
6468

6569
/**

server/src/main/java/org/elasticsearch/tasks/TaskManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public Task register(String type, String action, TaskAwareRequest request, boole
139139
long maxSize = maxHeaderSize.getBytes();
140140
ThreadContext threadContext = threadPool.getThreadContext();
141141

142-
assert threadContext.hasTraceContext() == false : "Expected threadContext to have no traceContext fields";
142+
assert threadContext.hasApmTraceContext() == false : "Expected threadContext to have no APM trace context";
143143

144144
for (String key : taskHeaders) {
145145
String httpHeader = threadContext.getHeader(key);
@@ -181,7 +181,7 @@ public Task register(String type, String action, TaskAwareRequest request, boole
181181
* For REST actions this will be the case, otherwise {@link Tracer#startTrace} can be used.
182182
*/
183183
void maybeStartTrace(ThreadContext threadContext, Task task) {
184-
if (threadContext.hasParentTraceContext() == false) {
184+
if (threadContext.hasParentApmTraceContext() == false) {
185185
return;
186186
}
187187
TaskId parentTask = task.getParentTaskId();

0 commit comments

Comments
 (0)