Skip to content

Commit

Permalink
Add support for conditional Transient header propagation
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Dec 6, 2023
1 parent c204585 commit ff81b70
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public StoredContext stashContext() {
);
}

final Map<String, Object> transientHeaders = propagateTransients(context.transientHeaders);
final Map<String, Object> transientHeaders = propagateTransients(context.transientHeaders, false);
if (!transientHeaders.isEmpty()) {
threadContextStruct = threadContextStruct.putTransient(transientHeaders);
}
Expand Down Expand Up @@ -230,7 +230,11 @@ public StoredContext stashAndMergeHeaders(Map<String, String> headers) {
* @param preserveResponseHeaders if set to <code>true</code> the response headers of the restore thread will be preserved.
*/
public StoredContext newStoredContext(boolean preserveResponseHeaders) {
return newStoredContext(preserveResponseHeaders, Collections.emptyList());
return newStoredContext(preserveResponseHeaders, Collections.emptyList(), false);
}

public StoredContext newStoredContext(boolean preserveResponseHeaders, boolean forIndependentTask) {
return newStoredContext(preserveResponseHeaders, Collections.emptyList(), forIndependentTask);
}

/**
Expand All @@ -241,11 +245,28 @@ public StoredContext newStoredContext(boolean preserveResponseHeaders) {
* @param preserveResponseHeaders if set to <code>true</code> the response headers of the restore thread will be preserved.
*/
public StoredContext newStoredContext(boolean preserveResponseHeaders, Collection<String> transientHeadersToClear) {
return newStoredContext(preserveResponseHeaders, transientHeadersToClear, false);
}

/**
* Just like {@link #stashContext()} but no default context is set. Instead, the {@code transientHeadersToClear} argument can be used
* to clear specific transient headers in the new context. All headers (with the possible exception of {@code responseHeaders}) are
* restored by closing the returned {@link StoredContext}.
*
* It passes the forIndependentTask parameter to the transient header propagators to decide whether the header needs to
* be propagated or not.
* @param preserveResponseHeaders if set to <code>true</code> the response headers of the restore thread will be preserved.
*/
public StoredContext newStoredContext(
boolean preserveResponseHeaders,
Collection<String> transientHeadersToClear,
boolean forIndependentTask
) {
final ThreadContextStruct originalContext = threadLocal.get();
final Map<String, Object> newTransientHeaders = new HashMap<>(originalContext.transientHeaders);

boolean transientHeadersModified = false;
final Map<String, Object> transientHeaders = propagateTransients(originalContext.transientHeaders);
final Map<String, Object> transientHeaders = propagateTransients(originalContext.transientHeaders, forIndependentTask);
if (!transientHeaders.isEmpty()) {
newTransientHeaders.putAll(transientHeaders);
transientHeadersModified = true;
Expand Down Expand Up @@ -573,9 +594,9 @@ public static Map<String, String> buildDefaultHeaders(Settings settings) {
}
}

private Map<String, Object> propagateTransients(Map<String, Object> source) {
private Map<String, Object> propagateTransients(Map<String, Object> source, boolean forIndependentTask) {
final Map<String, Object> transients = new HashMap<>();
propagators.forEach(p -> transients.putAll(p.transients(source)));
propagators.forEach(p -> transients.putAll(p.transients(source, forIndependentTask)));
return transients;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
public interface ThreadContextStatePropagator {
/**
* Returns the list of transient headers that needs to be propagated from current context to new thread context.
* @param source current context transient headers
*
* @param source current context transient headers
* @param forIndependentTask Helps in deciding whether transient header needs to be propagated or not for the
* scenarios where the new independent/background/scheduled task is being spawned from the
* current thread's context.
* @return the list of transient headers that needs to be propagated from current context to new thread context
*/
Map<String, Object> transients(Map<String, Object> source);
Map<String, Object> transients(Map<String, Object> source, boolean forIndependentTask);

/**
* Returns the list of request headers that needs to be propagated from current context to request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
public class TaskThreadContextStatePropagator implements ThreadContextStatePropagator {
@Override
public Map<String, Object> transients(Map<String, Object> source) {
public Map<String, Object> transients(Map<String, Object> source, boolean forIndependentTask) {
final Map<String, Object> transients = new HashMap<>();

if (source.containsKey(TASK_ID)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,17 @@ public void put(String key, Span span) {
}

@Override
public Map<String, Object> transients(Map<String, Object> source) {
public Map<String, Object> transients(Map<String, Object> source, boolean forIndependentTask) {
final Map<String, Object> transients = new HashMap<>();

if (source.containsKey(CURRENT_SPAN)) {
final SpanReference current = (SpanReference) source.get(CURRENT_SPAN);
if (current != null) {
transients.put(CURRENT_SPAN, new SpanReference(current.getSpan()));
if (forIndependentTask == false) {
if (source.containsKey(CURRENT_SPAN)) {
final SpanReference current = (SpanReference) source.get(CURRENT_SPAN);
if (current != null) {
transients.put(CURRENT_SPAN, new SpanReference(current.getSpan()));
}
}
} else {
transients.put(CURRENT_SPAN, null);
}

return transients;
Expand Down
10 changes: 6 additions & 4 deletions server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,13 @@ public ExecutorService executor(String name) {
*/
@Override
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) {
command = threadContext.preserveContext(command);
if (!Names.SAME.equals(executor)) {
command = new ThreadedRunnable(command, executor(executor));
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext(false, true)) {
command = threadContext.preserveContext(command);
if (!Names.SAME.equals(executor)) {
command = new ThreadedRunnable(command, executor(executor));
}
return new ScheduledCancellableAdapter(scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
}
return new ScheduledCancellableAdapter(scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
}

public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) {
Expand Down

0 comments on commit ff81b70

Please sign in to comment.