Skip to content

Commit

Permalink
Clear transient header from system context
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Jan 4, 2024
1 parent 16d457d commit 6832da5
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,17 @@ boolean isDefaultContext() {
* by the system itself rather than by a user action.
*/
public void markAsSystemContext() {
threadLocal.set(threadLocal.get().setSystemContext());
ThreadContextStruct threadContextStruct = threadLocal.get();
final Map<String, Object> transients = new HashMap<>();
propagators.forEach(p -> transients.putAll(p.transientsForSystemContext(threadContextStruct.transientHeaders)));
ThreadContextStruct newThreadContextStruct = new ThreadContextStruct(
threadContextStruct.requestHeaders,
threadContextStruct.responseHeaders,
transients,
threadContextStruct.persistentHeaders,
threadContextStruct.isSystemContext
);
threadLocal.set(newThreadContextStruct.setSystemContext());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.common.annotation.PublicApi;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -27,6 +28,15 @@ public interface ThreadContextStatePropagator {
*/
Map<String, Object> transients(Map<String, Object> source);

/**
* Returns the list of transient headers that need to be propagated to the child system context.
* @param source current context transient headers
* @return the list of transient headers that needs to be propagated from current context to new thread context
*/
default Map<String, Object> transientsForSystemContext(Map<String, Object> source) {
return new HashMap<>();
}

/**
* Returns the list of request headers that needs to be propagated from current context to request.
* @param source current context headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public Map<String, Object> transients(Map<String, Object> source) {
return transients;
}

@Override
public Map<String, Object> transientsForSystemContext(Map<String, Object> source) {
return transients(source);
}

@Override
public Map<String, String> headers(Map<String, Object> source) {
return Collections.emptyMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -868,19 +868,10 @@ public final <T extends TransportResponse> void sendRequest(
final TransportRequestOptions options,
final TransportResponseHandler<T> handler
) {
if (connection == localNodeConnection) {
// See please https://github.com/opensearch-project/OpenSearch/issues/10291
sendRequestAsync(connection, action, request, options, handler);
} else {
final Span span = tracer.startSpan(SpanBuilder.from(action, connection));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
TransportResponseHandler<T> traceableTransportResponseHandler = TraceableTransportResponseHandler.create(
handler,
span,
tracer
);
sendRequestAsync(connection, action, request, options, traceableTransportResponseHandler);
}
final Span span = tracer.startSpan(SpanBuilder.from(action, connection));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
TransportResponseHandler<T> traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer);
sendRequestAsync(connection, action, request, options, traceableTransportResponseHandler);
}
}

Expand Down

0 comments on commit 6832da5

Please sign in to comment.