From cf5c9c63d848b9a97b11457943c69351b85a106c Mon Sep 17 00:00:00 2001 From: Scott Rankin Date: Thu, 2 Apr 2020 14:09:41 -0400 Subject: [PATCH 1/5] Open Tracing implementation --- .../context/OpenTracingContextPropagator.java | 120 ++++++++++++++++++ .../internal/sync/WorkflowThreadImpl.java | 37 +++++- .../internal/worker/ActivityWorker.java | 30 ++++- .../java/com/uber/cadence/worker/Worker.java | 6 +- .../internal/testing/WorkflowTestingTest.java | 52 ++++++++ 5 files changed, 242 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java diff --git a/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java new file mode 100644 index 000000000..3235e5c55 --- /dev/null +++ b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java @@ -0,0 +1,120 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.context; + +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMap; +import io.opentracing.util.GlobalTracer; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** Support for OpenTracing spans */ +public class OpenTracingContextPropagator implements ContextPropagator { + + private static ThreadLocal currentOpenTracingSpanContext = new ThreadLocal<>(); + + public static void setCurrentOpenTracingSpanContext(SpanContext ctx) { + if (ctx != null) { + currentOpenTracingSpanContext.set(ctx); + } + } + + public static SpanContext getCurrentOpenTracingSpanContext() { + return currentOpenTracingSpanContext.get(); + } + + @Override + public String getName() { + return "OpenTracing"; + } + + @Override + public Map serializeContext(Object context) { + Map serializedContext = new HashMap<>(); + Map contextMap = (Map) context; + for (Map.Entry entry : contextMap.entrySet()) { + serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset())); + } + return serializedContext; + } + + @Override + public Object deserializeContext(Map context) { + Map contextMap = new HashMap<>(); + for (Map.Entry entry : context.entrySet()) { + contextMap.put(entry.getKey(), new String(entry.getValue(), Charset.defaultCharset())); + } + return contextMap; + } + + @Override + public Object getCurrentContext() { + Tracer currentTracer = GlobalTracer.get(); + Span currentSpan = currentTracer.scopeManager().activeSpan(); + if (currentSpan != null) { + HashMapTextMap contextTextMap = new HashMapTextMap(); + currentTracer.inject(currentSpan.context(), Format.Builtin.TEXT_MAP, contextTextMap); + return contextTextMap.getBackingMap(); + } else { + return null; + } + } + + @Override + public void setCurrentContext(Object context) { + Tracer currentTracer = GlobalTracer.get(); + Map contextAsMap = (Map) context; + if (contextAsMap != null) { + HashMapTextMap contextTextMap = new HashMapTextMap(contextAsMap); + setCurrentOpenTracingSpanContext( + currentTracer.extract(Format.Builtin.TEXT_MAP, contextTextMap)); + } + } + + private class HashMapTextMap implements TextMap { + + private final HashMap backingMap = new HashMap<>(); + + public HashMapTextMap() { + // Noop + } + + public HashMapTextMap(Map spanData) { + backingMap.putAll(spanData); + } + + @Override + public Iterator> iterator() { + return backingMap.entrySet().iterator(); + } + + @Override + public void put(String key, String value) { + backingMap.put(key, value); + } + + public HashMap getBackingMap() { + return backingMap; + } + } +} diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java index 9b7f5891f..37ba26ec4 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java @@ -19,12 +19,19 @@ import com.google.common.util.concurrent.RateLimiter; import com.uber.cadence.context.ContextPropagator; +import com.uber.cadence.context.OpenTracingContextPropagator; import com.uber.cadence.internal.context.ContextThreadLocal; import com.uber.cadence.internal.logging.LoggerTag; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.internal.replay.DeciderCache; import com.uber.cadence.internal.replay.DecisionContext; import com.uber.cadence.workflow.Promise; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.log.Fields; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; import java.io.PrintWriter; import java.io.StringWriter; import java.util.HashMap; @@ -92,13 +99,26 @@ public void run() { ContextThreadLocal.setContextPropagators(this.contextPropagators); ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts); - try { + // Set up an opentracing span + Tracer openTracingTracer = GlobalTracer.get(); + Tracer.SpanBuilder builder = + openTracingTracer + .buildSpan("cadence.workflow") + .withTag("resource.name", decisionContext.getWorkflowType().getName()); + + if (OpenTracingContextPropagator.getCurrentOpenTracingSpanContext() != null) { + builder.asChildOf(OpenTracingContextPropagator.getCurrentOpenTracingSpanContext()); + } + Span span = builder.start(); + + try (Scope scope = openTracingTracer.activateSpan(span)) { // initialYield blocks thread until the first runUntilBlocked is called. // Otherwise r starts executing without control of the sync. threadContext.initialYield(); cancellationScope.run(); } catch (DestroyWorkflowThreadError e) { if (!threadContext.isDestroyRequested()) { + setSpanError(span, e); threadContext.setUnhandledException(e); } } catch (Error e) { @@ -111,9 +131,11 @@ public void run() { log.error( String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace)); } + setSpanError(span, e); threadContext.setUnhandledException(e); } catch (CancellationException e) { if (!isCancelRequested()) { + setSpanError(span, e); threadContext.setUnhandledException(e); } if (log.isDebugEnabled()) { @@ -130,6 +152,7 @@ public void run() { "Workflow thread \"%s\" run failed with unhandled exception:\n%s", name, stackTrace)); } + setSpanError(span, e); threadContext.setUnhandledException(e); } finally { DeterministicRunnerImpl.setCurrentThreadInternal(null); @@ -137,7 +160,19 @@ public void run() { thread.setName(originalName); thread = null; MDC.clear(); + span.finish(); + } + } + + private void setSpanError(Span span, Throwable ex) { + Tags.ERROR.set(span, true); + Map errorData = new HashMap<>(); + errorData.put(Fields.EVENT, "error"); + if (ex != null) { + errorData.put(Fields.ERROR_OBJECT, ex); + errorData.put(Fields.MESSAGE, ex.getMessage()); } + span.log(errorData); } public String getName() { diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index 60374d73e..1edca6cbc 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -28,6 +28,7 @@ import com.uber.cadence.WorkflowExecution; import com.uber.cadence.common.RetryOptions; import com.uber.cadence.context.ContextPropagator; +import com.uber.cadence.context.OpenTracingContextPropagator; import com.uber.cadence.internal.common.Retryer; import com.uber.cadence.internal.logging.LoggerTag; import com.uber.cadence.internal.metrics.MetricsTag; @@ -38,6 +39,11 @@ import com.uber.m3.tally.Stopwatch; import com.uber.m3.util.Duration; import com.uber.m3.util.ImmutableMap; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.log.Fields; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -175,7 +181,18 @@ public void handle(PollForActivityTaskResponse task) throws Exception { propagateContext(task); - try { + // Set up an opentracing span + Tracer openTracingTracer = GlobalTracer.get(); + Tracer.SpanBuilder builder = + openTracingTracer + .buildSpan("cadence.activity") + .withTag("resource.name", task.getActivityType().getName()); + if (OpenTracingContextPropagator.getCurrentOpenTracingSpanContext() != null) { + builder.asChildOf(OpenTracingContextPropagator.getCurrentOpenTracingSpanContext()); + } + Span span = builder.start(); + + try (io.opentracing.Scope scope = openTracingTracer.activateSpan(span)) { Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start(); ActivityTaskHandler.Result response = handler.handle(task, metricsScope, false); sw.stop(); @@ -197,11 +214,22 @@ public void handle(PollForActivityTaskResponse task) throws Exception { Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start(); sendReply(task, new Result(null, null, cancelledRequest, null), metricsScope); sw.stop(); + } catch (Exception e) { + Tags.ERROR.set(span, true); + Map errorData = new HashMap<>(); + errorData.put(Fields.EVENT, "error"); + if (e != null) { + errorData.put(Fields.ERROR_OBJECT, e); + errorData.put(Fields.MESSAGE, e.getMessage()); + } + span.log(errorData); + throw e; } finally { MDC.remove(LoggerTag.ACTIVITY_ID); MDC.remove(LoggerTag.ACTIVITY_TYPE); MDC.remove(LoggerTag.WORKFLOW_ID); MDC.remove(LoggerTag.RUN_ID); + span.finish(); } } diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index a011849bd..2e69c9b4a 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -25,6 +25,7 @@ import com.uber.cadence.client.WorkflowClient; import com.uber.cadence.common.WorkflowExecutionHistory; import com.uber.cadence.context.ContextPropagator; +import com.uber.cadence.context.OpenTracingContextPropagator; import com.uber.cadence.converter.DataConverter; import com.uber.cadence.internal.common.InternalUtils; import com.uber.cadence.internal.metrics.MetricsTag; @@ -952,10 +953,13 @@ private FactoryOptions( } if (contextPropagators != null) { - this.contextPropagators = contextPropagators; + this.contextPropagators = new ArrayList(contextPropagators); } else { this.contextPropagators = new ArrayList<>(); } + + // Add the OpenTracing propagator + this.contextPropagators.add(new OpenTracingContextPropagator()); } } } diff --git a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java index 73d5fbb62..a0891a685 100644 --- a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java @@ -44,6 +44,7 @@ import com.uber.cadence.client.WorkflowStub; import com.uber.cadence.client.WorkflowTimedOutException; import com.uber.cadence.context.ContextPropagator; +import com.uber.cadence.context.OpenTracingContextPropagator; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.testing.SimulatedTimeoutException; import com.uber.cadence.testing.TestEnvironmentOptions; @@ -57,8 +58,15 @@ import com.uber.cadence.workflow.SignalMethod; import com.uber.cadence.workflow.Workflow; import com.uber.cadence.workflow.WorkflowMethod; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.GlobalTracer; +import io.opentracing.util.ThreadLocalScopeManager; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -983,4 +991,48 @@ public void testDefaultChildWorkflowContextPropagation() { String result = workflow.workflow("input1"); assertEquals("testing123testing123", result); } + + public static class OpenTracingContextPropagationWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + Tracer tracer = GlobalTracer.get(); + Span span = tracer.buildSpan("testContextPropagationWorkflow").start(); + try (Scope scope = tracer.scopeManager().activate(span)) { + Span activeSpan = tracer.scopeManager().activeSpan(); + return activeSpan.getBaggageItem("foo"); + } finally { + span.finish(); + } + } + } + + @Test + public void testOpenTracingContextPropagation() { + MockTracer tracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + GlobalTracer.registerIfAbsent(tracer); + Span span = tracer.buildSpan("testContextPropagation").start(); + + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(OpenTracingContextPropagationWorkflowImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators( + Arrays.asList(new TestContextPropagator(), new OpenTracingContextPropagator())) + .build(); + + try (Scope scope = tracer.scopeManager().activate(span)) { + + Span activeSpan = tracer.scopeManager().activeSpan(); + activeSpan.setBaggageItem("foo", "bar"); + + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + assertEquals("bar", workflow.workflow1("input1")); + + } finally { + span.finish(); + } + } } From b24cd306b0a21a88773644fa26da51ae33b05dfb Mon Sep 17 00:00:00 2001 From: Scott Rankin Date: Mon, 14 Sep 2020 13:46:25 -0400 Subject: [PATCH 2/5] Use lifecycle methods on ContextPropagator so as not to leak OpenTracing implementation --- build.gradle | 3 ++ .../activity/LocalActivityOptions.java | 8 ++- .../cadence/context/ContextPropagator.java | 30 +++++++++++ .../context/OpenTracingContextPropagator.java | 47 ++++++++++++++++ .../internal/context/ContextThreadLocal.java | 54 +++++++++++++++++++ .../internal/sync/SyncDecisionContext.java | 13 ++--- .../internal/sync/WorkflowThreadImpl.java | 43 +++------------ .../internal/worker/LocalActivityWorker.java | 6 ++- .../LocalActivityContextPropagationTest.java | 3 +- .../internal/testing/WorkflowTestingTest.java | 14 ++--- 10 files changed, 167 insertions(+), 54 deletions(-) diff --git a/build.gradle b/build.gradle index 221c5a160..0415313ae 100644 --- a/build.gradle +++ b/build.gradle @@ -58,10 +58,13 @@ dependencies { compile group: 'com.google.guava', name: 'guava', version: '28.1-jre' compile group: 'com.cronutils', name: 'cron-utils', version: '9.0.0' compile group: 'io.micrometer', name: 'micrometer-core', version: '1.1.2' + compile group: 'io.opentracing', name: 'opentracing-api', version: '0.33.0' + compile group: 'io.opentracing', name: 'opentracing-util', version: '0.33.0' testCompile group: 'junit', name: 'junit', version: '4.12' testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4' testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3' + testCompile group: 'io.opentracing', name: 'opentracing-mock', version: '0.33.0' } license { diff --git a/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java b/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java index de13cc860..9534af9a2 100644 --- a/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java +++ b/src/main/java/com/uber/cadence/activity/LocalActivityOptions.java @@ -101,7 +101,8 @@ public LocalActivityOptions validateAndBuildWithDefaults() { if (retryOptions != null) { ro = new RetryOptions.Builder(retryOptions).validateBuildWithDefaults(); } - return new LocalActivityOptions(roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators); + return new LocalActivityOptions( + roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators); } } @@ -109,7 +110,10 @@ public LocalActivityOptions validateAndBuildWithDefaults() { private final RetryOptions retryOptions; private final List contextPropagators; - private LocalActivityOptions(Duration scheduleToCloseTimeout, RetryOptions retryOptions, List contextPropagators) { + private LocalActivityOptions( + Duration scheduleToCloseTimeout, + RetryOptions retryOptions, + List contextPropagators) { this.scheduleToCloseTimeout = scheduleToCloseTimeout; this.retryOptions = retryOptions; this.contextPropagators = contextPropagators; diff --git a/src/main/java/com/uber/cadence/context/ContextPropagator.java b/src/main/java/com/uber/cadence/context/ContextPropagator.java index 3a618f96c..3bfca5b71 100644 --- a/src/main/java/com/uber/cadence/context/ContextPropagator.java +++ b/src/main/java/com/uber/cadence/context/ContextPropagator.java @@ -23,6 +23,9 @@ * Context Propagators are used to propagate information from workflow to activity, workflow to * child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}). * + *

It is important to note that all threads share one ContextPropagator instance, so your + * implementation must be thread-safe and store any state in ThreadLocal variables. + * *

A sample ContextPropagator that copies all {@link org.slf4j.MDC} entries starting * with a given prefix along the code path looks like this: * @@ -136,4 +139,31 @@ public interface ContextPropagator { /** Sets the current context */ void setCurrentContext(Object context); + + /** + * This is a lifecycle method, called after the context has been propagated to the + * workflow/activity thread but the workflow/activity has not yet started. + */ + default void setUp() { + // No-op + } + + /** + * This is a lifecycle method, called after the workflow/activity has completed. If the method + * finished without exception, {@code successful} will be true. Otherwise, it will be false and + * {@link #onError(Throwable)} will have already been called. + */ + default void finish(boolean successful) { + // No-op + } + + /** + * This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled + * exception. {@link #finish(boolean)} is called after this method. + * + * @param t The unhandled exception that caused the workflow/activity to terminate + */ + default void onError(Throwable t) { + // No-op + } } diff --git a/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java index 3235e5c55..8b0501368 100644 --- a/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java +++ b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java @@ -17,21 +17,28 @@ package com.uber.cadence.context; +import com.uber.cadence.internal.logging.LoggerTag; +import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.Tracer; +import io.opentracing.log.Fields; import io.opentracing.propagation.Format; import io.opentracing.propagation.TextMap; +import io.opentracing.tag.Tags; import io.opentracing.util.GlobalTracer; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import org.slf4j.MDC; /** Support for OpenTracing spans */ public class OpenTracingContextPropagator implements ContextPropagator { private static ThreadLocal currentOpenTracingSpanContext = new ThreadLocal<>(); + private static ThreadLocal currentOpenTracingSpan = new ThreadLocal<>(); + private static ThreadLocal currentOpenTracingScope = new ThreadLocal<>(); public static void setCurrentOpenTracingSpanContext(SpanContext ctx) { if (ctx != null) { @@ -91,6 +98,46 @@ public void setCurrentContext(Object context) { } } + @Override + public void setUp() { + Tracer openTracingTracer = GlobalTracer.get(); + Tracer.SpanBuilder builder = + openTracingTracer + .buildSpan("cadence.workflow") + .withTag("resource.name", MDC.get(LoggerTag.WORKFLOW_TYPE)); + + if (getCurrentOpenTracingSpanContext() != null) { + builder.asChildOf(getCurrentOpenTracingSpanContext()); + } + + Span span = builder.start(); + openTracingTracer.activateSpan(span); + currentOpenTracingSpan.set(span); + Scope scope = openTracingTracer.activateSpan(span); + currentOpenTracingScope.set(scope); + } + + @Override + public void onError(Throwable t) { + Span span = currentOpenTracingSpan.get(); + Tags.ERROR.set(span, true); + Map errorData = new HashMap<>(); + errorData.put(Fields.EVENT, "error"); + if (t != null) { + errorData.put(Fields.ERROR_OBJECT, t); + errorData.put(Fields.MESSAGE, t.getMessage()); + } + span.log(errorData); + } + + @Override + public void finish(boolean successful) { + Scope currentScope = currentOpenTracingScope.get(); + Span currentSpan = currentOpenTracingSpan.get(); + currentScope.close(); + currentSpan.finish(); + } + private class HashMapTextMap implements TextMap { private final HashMap backingMap = new HashMap<>(); diff --git a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java index 227124170..078da44e8 100644 --- a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java +++ b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java @@ -24,10 +24,14 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** This class holds the current set of context propagators */ public class ContextThreadLocal { + private static final Logger log = LoggerFactory.getLogger(ContextThreadLocal.class); + private static WorkflowThreadLocal> contextPropagators = WorkflowThreadLocal.withInitial( new Supplier>() { @@ -57,6 +61,11 @@ public static Map getCurrentContextForPropagation() { return contextData; } + /** + * Injects the context data into the thread for each configured context propagator + * + * @param contextData The context data received from the server + */ public static void propagateContextToCurrentThread(Map contextData) { if (contextData == null || contextData.isEmpty()) { return; @@ -67,4 +76,49 @@ public static void propagateContextToCurrentThread(Map contextDa } } } + + /** Calls {@link ContextPropagator#setUp()} for each propagator */ + public static void setUpContextPropagators() { + for (ContextPropagator propagator : contextPropagators.get()) { + try { + propagator.setUp(); + } catch (Throwable t) { + // Don't let an error in one propagator block the others + log.error("Error calling setUp() on a contextpropagator", t); + } + } + } + + /** + * Calls {@link ContextPropagator#onError(Throwable)} for each propagator + * + * @param t The Throwable that caused the workflow/activity to finish + */ + public static void onErrorContextPropagators(Throwable t) { + for (ContextPropagator propagator : contextPropagators.get()) { + try { + propagator.onError(t); + } catch (Throwable t1) { + // Don't let an error in one propagator block the others + log.error("Error calling onError() on a contextpropagator", t1); + } + } + } + + /** + * Calls {@link ContextPropagator#finish(boolean))} for each propagator + * + * @param successful True if the workflow/activity completed without unhandled exception, false + * otherwise + */ + public static void finishContextPropagators(boolean successful) { + for (ContextPropagator propagator : contextPropagators.get()) { + try { + propagator.finish(successful); + } catch (Throwable t) { + // Don't let an error in one propagator block the others + log.error("Error calling finish() on a contextpropagator", t); + } + } + } } diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java index 5cf6d7b01..23955ce69 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java @@ -323,10 +323,11 @@ private ExecuteActivityParameters constructExecuteActivityParameters( private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters( String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) { - ExecuteLocalActivityParameters parameters = new ExecuteLocalActivityParameters() - .withActivityType(new ActivityType().setName(name)) - .withInput(input) - .withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds()); + ExecuteLocalActivityParameters parameters = + new ExecuteLocalActivityParameters() + .withActivityType(new ActivityType().setName(name)) + .withInput(input) + .withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds()); RetryOptions retryOptions = options.getRetryOptions(); if (retryOptions != null) { @@ -337,8 +338,8 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters( parameters.setWorkflowDomain(this.context.getDomain()); parameters.setWorkflowExecution(this.context.getWorkflowExecution()); - List propagators = Optional.ofNullable(options.getContextPropagators()) - .orElse(contextPropagators); + List propagators = + Optional.ofNullable(options.getContextPropagators()).orElse(contextPropagators); parameters.setContext(extractContextsAndConvertToBytes(propagators)); return parameters; diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java index 37ba26ec4..295610dcf 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java @@ -19,19 +19,12 @@ import com.google.common.util.concurrent.RateLimiter; import com.uber.cadence.context.ContextPropagator; -import com.uber.cadence.context.OpenTracingContextPropagator; import com.uber.cadence.internal.context.ContextThreadLocal; import com.uber.cadence.internal.logging.LoggerTag; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.cadence.internal.replay.DeciderCache; import com.uber.cadence.internal.replay.DecisionContext; import com.uber.cadence.workflow.Promise; -import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.Tracer; -import io.opentracing.log.Fields; -import io.opentracing.tag.Tags; -import io.opentracing.util.GlobalTracer; import java.io.PrintWriter; import java.io.StringWriter; import java.util.HashMap; @@ -98,27 +91,16 @@ public void run() { // Repopulate the context(s) ContextThreadLocal.setContextPropagators(this.contextPropagators); ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts); + ContextThreadLocal.setUpContextPropagators(); - // Set up an opentracing span - Tracer openTracingTracer = GlobalTracer.get(); - Tracer.SpanBuilder builder = - openTracingTracer - .buildSpan("cadence.workflow") - .withTag("resource.name", decisionContext.getWorkflowType().getName()); - - if (OpenTracingContextPropagator.getCurrentOpenTracingSpanContext() != null) { - builder.asChildOf(OpenTracingContextPropagator.getCurrentOpenTracingSpanContext()); - } - Span span = builder.start(); - - try (Scope scope = openTracingTracer.activateSpan(span)) { + try { // initialYield blocks thread until the first runUntilBlocked is called. // Otherwise r starts executing without control of the sync. threadContext.initialYield(); cancellationScope.run(); } catch (DestroyWorkflowThreadError e) { if (!threadContext.isDestroyRequested()) { - setSpanError(span, e); + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } } catch (Error e) { @@ -131,11 +113,11 @@ public void run() { log.error( String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace)); } - setSpanError(span, e); + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } catch (CancellationException e) { if (!isCancelRequested()) { - setSpanError(span, e); + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } if (log.isDebugEnabled()) { @@ -152,27 +134,16 @@ public void run() { "Workflow thread \"%s\" run failed with unhandled exception:\n%s", name, stackTrace)); } - setSpanError(span, e); + ContextThreadLocal.onErrorContextPropagators(e); threadContext.setUnhandledException(e); } finally { + ContextThreadLocal.finishContextPropagators(threadContext.getUnhandledException() == null); DeterministicRunnerImpl.setCurrentThreadInternal(null); threadContext.setStatus(Status.DONE); thread.setName(originalName); thread = null; MDC.clear(); - span.finish(); - } - } - - private void setSpanError(Span span, Throwable ex) { - Tags.ERROR.set(span, true); - Map errorData = new HashMap<>(); - errorData.put(Fields.EVENT, "error"); - if (ex != null) { - errorData.put(Fields.ERROR_OBJECT, ex); - errorData.put(Fields.MESSAGE, ex.getMessage()); } - span.log(errorData); } public String getName() { diff --git a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java index 85f60ccb9..de6ce8da3 100644 --- a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java @@ -271,7 +271,9 @@ private void propagateContext(ExecuteLocalActivityParameters params) { } private void restoreContext(Map context) { - options.getContextPropagators() - .forEach(propagator -> propagator.setCurrentContext(propagator.deserializeContext(context))); + options + .getContextPropagators() + .forEach( + propagator -> propagator.setCurrentContext(propagator.deserializeContext(context))); } } diff --git a/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java b/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java index 12c3197c6..62fa3723c 100644 --- a/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java +++ b/src/test/java/com/uber/cadence/activity/LocalActivityContextPropagationTest.java @@ -59,7 +59,8 @@ public class LocalActivityContextPropagationTest { private final WrapperContext wrapperContext = new WrapperContext(EXPECTED_CONTEXT_NAME); - //let's add safe TestWorkflowEnvironment closing and make configurable propagation enabling/disabling + // let's add safe TestWorkflowEnvironment closing and make configurable propagation + // enabling/disabling private class TestEnvAutoCloseable implements AutoCloseable { private TestWorkflowEnvironment testEnv; diff --git a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java index a0891a685..cb00733b3 100644 --- a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java @@ -18,6 +18,7 @@ package com.uber.cadence.internal.testing; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyString; @@ -61,6 +62,7 @@ import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.Tracer; +import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; import io.opentracing.util.GlobalTracer; import io.opentracing.util.ThreadLocalScopeManager; @@ -996,13 +998,11 @@ public static class OpenTracingContextPropagationWorkflowImpl implements TestWor @Override public String workflow1(String input) { Tracer tracer = GlobalTracer.get(); - Span span = tracer.buildSpan("testContextPropagationWorkflow").start(); - try (Scope scope = tracer.scopeManager().activate(span)) { - Span activeSpan = tracer.scopeManager().activeSpan(); - return activeSpan.getBaggageItem("foo"); - } finally { - span.finish(); - } + Span activeSpan = tracer.scopeManager().activeSpan(); + MockSpan mockSpan = (MockSpan) activeSpan; + assertNotNull(activeSpan); + assertEquals("TestWorkflow::workflow1", mockSpan.tags().get("resource.name")); + return activeSpan.getBaggageItem("foo"); } } From ac2bff4ba96ac65bb95e27e86cb435594d9e890b Mon Sep 17 00:00:00 2001 From: Scott Rankin Date: Mon, 14 Sep 2020 17:42:28 -0400 Subject: [PATCH 3/5] Refactoring opentracing out of core code, moving context unit tests into their own test --- .../context/OpenTracingContextPropagator.java | 45 +- .../internal/context/ContextThreadLocal.java | 2 +- .../internal/replay/WorkflowContext.java | 13 +- .../internal/sync/SyncDecisionContext.java | 14 +- .../internal/sync/WorkflowStubImpl.java | 14 +- .../internal/worker/ActivityWorker.java | 14 +- .../internal/worker/LocalActivityWorker.java | 19 +- .../java/com/uber/cadence/worker/Worker.java | 7 +- .../uber/cadence/context/ContextTests.java | 440 ++++++++++++++++++ .../internal/testing/WorkflowTestingTest.java | 303 +----------- src/test/resources/logback-test.xml | 1 + 11 files changed, 557 insertions(+), 315 deletions(-) create mode 100644 src/test/java/com/uber/cadence/context/ContextTests.java diff --git a/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java index 8b0501368..9f0fb4f28 100644 --- a/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java +++ b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java @@ -31,11 +31,15 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.slf4j.MDC; /** Support for OpenTracing spans */ public class OpenTracingContextPropagator implements ContextPropagator { + private static final Logger log = LoggerFactory.getLogger(OpenTracingContextPropagator.class); + private static ThreadLocal currentOpenTracingSpanContext = new ThreadLocal<>(); private static ThreadLocal currentOpenTracingSpan = new ThreadLocal<>(); private static ThreadLocal currentOpenTracingScope = new ThreadLocal<>(); @@ -59,8 +63,10 @@ public String getName() { public Map serializeContext(Object context) { Map serializedContext = new HashMap<>(); Map contextMap = (Map) context; - for (Map.Entry entry : contextMap.entrySet()) { - serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset())); + if (contextMap != null) { + for (Map.Entry entry : contextMap.entrySet()) { + serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset())); + } } return serializedContext; } @@ -76,11 +82,14 @@ public Object deserializeContext(Map context) { @Override public Object getCurrentContext() { + log.debug("Getting current context"); Tracer currentTracer = GlobalTracer.get(); Span currentSpan = currentTracer.scopeManager().activeSpan(); if (currentSpan != null) { HashMapTextMap contextTextMap = new HashMapTextMap(); currentTracer.inject(currentSpan.context(), Format.Builtin.TEXT_MAP, contextTextMap); + log.debug( + "Retrieving current span data as current context: " + contextTextMap.getBackingMap()); return contextTextMap.getBackingMap(); } else { return null; @@ -89,9 +98,11 @@ public Object getCurrentContext() { @Override public void setCurrentContext(Object context) { + log.debug("Setting current context"); Tracer currentTracer = GlobalTracer.get(); Map contextAsMap = (Map) context; if (contextAsMap != null) { + log.debug("setting current context to " + contextAsMap); HashMapTextMap contextTextMap = new HashMapTextMap(contextAsMap); setCurrentOpenTracingSpanContext( currentTracer.extract(Format.Builtin.TEXT_MAP, contextTextMap)); @@ -100,6 +111,7 @@ public void setCurrentContext(Object context) { @Override public void setUp() { + log.debug("Starting a new opentracing span"); Tracer openTracingTracer = GlobalTracer.get(); Tracer.SpanBuilder builder = openTracingTracer @@ -111,6 +123,7 @@ public void setUp() { } Span span = builder.start(); + log.debug("New span: " + span); openTracingTracer.activateSpan(span); currentOpenTracingSpan.set(span); Scope scope = openTracingTracer.activateSpan(span); @@ -134,8 +147,36 @@ public void onError(Throwable t) { public void finish(boolean successful) { Scope currentScope = currentOpenTracingScope.get(); Span currentSpan = currentOpenTracingSpan.get(); + + log.debug("Closing currently open span " + currentSpan.context().toSpanId()); currentScope.close(); currentSpan.finish(); + currentOpenTracingScope.remove(); + currentOpenTracingSpan.remove(); + currentOpenTracingSpanContext.remove(); + } + + /** Just check for other instances of the same class */ + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (this == obj) { + return true; + } + + if (this.getClass().equals(obj.getClass())) { + return true; + } + + return false; + } + + @Override + public int hashCode() { + return this.getClass().hashCode(); } private class HashMapTextMap implements TextMap { diff --git a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java index 078da44e8..8bbf7729e 100644 --- a/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java +++ b/src/main/java/com/uber/cadence/internal/context/ContextThreadLocal.java @@ -106,7 +106,7 @@ public static void onErrorContextPropagators(Throwable t) { } /** - * Calls {@link ContextPropagator#finish(boolean))} for each propagator + * Calls {@link ContextPropagator#finish(boolean)} for each propagator * * @param successful True if the workflow/activity completed without unhandled exception, false * otherwise diff --git a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java index 5f1d8f16f..67874439a 100644 --- a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; final class WorkflowContext { @@ -166,7 +167,17 @@ Map getPropagatedContexts() { Map contextData = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - contextData.put(propagator.getName(), propagator.deserializeContext(headerData)); + // Only send the context propagator the fields that belong to them + // Change the map from MyPropagator:foo -> bar to foo -> bar + Map filteredData = + headerData + .entrySet() + .stream() + .filter(e -> e.getKey().startsWith(propagator.getName())) + .collect( + Collectors.toMap( + e -> e.getKey().substring(e.getKey().indexOf(":") + 1), Map.Entry::getValue)); + contextData.put(propagator.getName(), propagator.deserializeContext(filteredData)); } return contextData; diff --git a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java index 23955ce69..0e332ba7a 100644 --- a/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java +++ b/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java @@ -72,6 +72,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -449,7 +450,18 @@ private Map extractContextsAndConvertToBytes( } Map result = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - result.putAll(propagator.serializeContext(propagator.getCurrentContext())); + // Get the serialized context from the propagator + Map serializedContext = + propagator.serializeContext(propagator.getCurrentContext()); + // Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar + Map namespacedSerializedContext = + serializedContext + .entrySet() + .stream() + .collect( + Collectors.toMap( + e -> propagator.getName() + ":" + e.getKey(), Map.Entry::getValue)); + result.putAll(namespacedSerializedContext); } return result; } diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java index c7bc92046..f81101ef3 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowStubImpl.java @@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; class WorkflowStubImpl implements WorkflowStub { @@ -188,7 +189,18 @@ private Map extractContextsAndConvertToBytes( } Map result = new HashMap<>(); for (ContextPropagator propagator : contextPropagators) { - result.putAll(propagator.serializeContext(propagator.getCurrentContext())); + // Get the serialized context from the propagator + Map serializedContext = + propagator.serializeContext(propagator.getCurrentContext()); + // Namespace each entry in case of overlaps, so foo -> bar becomes MyPropagator:foo -> bar + Map namespacedSerializedContext = + serializedContext + .entrySet() + .stream() + .collect( + Collectors.toMap( + k -> propagator.getName() + ":" + k.getKey(), Map.Entry::getValue)); + result.putAll(namespacedSerializedContext); } return result; } diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index 1edca6cbc..c9b29f7c7 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -50,6 +50,7 @@ import java.util.Objects; import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.thrift.TException; import org.slf4j.MDC; @@ -252,7 +253,18 @@ void propagateContext(PollForActivityTaskResponse response) { }); for (ContextPropagator propagator : options.getContextPropagators()) { - propagator.setCurrentContext(propagator.deserializeContext(headerData)); + // Only send the context propagator the fields that belong to them + // Change the map from MyPropagator:foo -> bar to foo -> bar + Map filteredData = + headerData + .entrySet() + .stream() + .filter(e -> e.getKey().startsWith(propagator.getName())) + .collect( + Collectors.toMap( + e -> e.getKey().substring(e.getKey().indexOf(":") + 1), + Map.Entry::getValue)); + propagator.setCurrentContext(propagator.deserializeContext(filteredData)); } } diff --git a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java index de6ce8da3..56a370821 100644 --- a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java @@ -22,6 +22,7 @@ import com.uber.cadence.MarkerRecordedEventAttributes; import com.uber.cadence.PollForActivityTaskResponse; import com.uber.cadence.common.RetryOptions; +import com.uber.cadence.context.ContextPropagator; import com.uber.cadence.internal.common.LocalActivityMarkerData; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; @@ -38,6 +39,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; public final class LocalActivityWorker implements SuspendableWorker { @@ -271,9 +273,18 @@ private void propagateContext(ExecuteLocalActivityParameters params) { } private void restoreContext(Map context) { - options - .getContextPropagators() - .forEach( - propagator -> propagator.setCurrentContext(propagator.deserializeContext(context))); + for (ContextPropagator propagator : options.getContextPropagators()) { + // Only send the context propagator the fields that belong to them + // Change the map from MyPropagator:foo -> bar to foo -> bar + Map filteredData = + context + .entrySet() + .stream() + .filter(e -> e.getKey().startsWith(propagator.getName())) + .collect( + Collectors.toMap( + e -> e.getKey().substring(e.getKey().indexOf(":") + 1), Map.Entry::getValue)); + propagator.setCurrentContext(propagator.deserializeContext(filteredData)); + } } } diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 2e69c9b4a..72a77933c 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -958,8 +958,11 @@ private FactoryOptions( this.contextPropagators = new ArrayList<>(); } - // Add the OpenTracing propagator - this.contextPropagators.add(new OpenTracingContextPropagator()); + // Add the OpenTracing propagator if it's not already there + OpenTracingContextPropagator openTracing = new OpenTracingContextPropagator(); + if (!this.contextPropagators.contains(openTracing)) { + this.contextPropagators.add(openTracing); + } } } } diff --git a/src/test/java/com/uber/cadence/context/ContextTests.java b/src/test/java/com/uber/cadence/context/ContextTests.java new file mode 100644 index 000000000..161a01c9c --- /dev/null +++ b/src/test/java/com/uber/cadence/context/ContextTests.java @@ -0,0 +1,440 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.context; + +import static org.junit.Assert.*; + +import com.uber.cadence.activity.ActivityMethod; +import com.uber.cadence.activity.ActivityOptions; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowException; +import com.uber.cadence.client.WorkflowOptions; +import com.uber.cadence.internal.testing.WorkflowTestingTest.ChildWorkflow; +import com.uber.cadence.testing.TestEnvironmentOptions; +import com.uber.cadence.testing.TestWorkflowEnvironment; +import com.uber.cadence.worker.Worker; +import com.uber.cadence.workflow.Async; +import com.uber.cadence.workflow.ChildWorkflowOptions; +import com.uber.cadence.workflow.Promise; +import com.uber.cadence.workflow.SignalMethod; +import com.uber.cadence.workflow.Workflow; +import com.uber.cadence.workflow.WorkflowMethod; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.GlobalTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.rules.Timeout; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +public class ContextTests { + private static final Logger log = LoggerFactory.getLogger(ContextTests.class); + + @Rule public Timeout globalTimeout = Timeout.seconds(5000); + + @Rule + public TestWatcher watchman = + new TestWatcher() { + @Override + protected void failed(Throwable e, Description description) { + System.err.println(testEnvironment.getDiagnostics()); + } + }; + + private static final String TASK_LIST = "test-workflow"; + + private TestWorkflowEnvironment testEnvironment; + private MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + @Before + public void setUp() { + TestEnvironmentOptions options = + new TestEnvironmentOptions.Builder() + .setFactoryOptions( + new Worker.FactoryOptions.Builder() + .setContextPropagators( + Arrays.asList( + new TestContextPropagator(), new OpenTracingContextPropagator())) + .build()) + .build(); + testEnvironment = TestWorkflowEnvironment.newInstance(options); + GlobalTracer.registerIfAbsent(mockTracer); + } + + @After + public void tearDown() { + testEnvironment.close(); + mockTracer.reset(); + } + + public interface TestWorkflow { + + @WorkflowMethod(executionStartToCloseTimeoutSeconds = 3600 * 24, taskList = TASK_LIST) + String workflow1(String input); + } + + public interface ParentWorkflow { + + @WorkflowMethod(executionStartToCloseTimeoutSeconds = 3600 * 24, taskList = TASK_LIST) + String workflow(String input); + + @SignalMethod + void signal(String value); + } + + public interface TestActivity { + + @ActivityMethod(scheduleToCloseTimeoutSeconds = 3600) + String activity1(String input); + } + + public static class TestContextPropagator implements ContextPropagator { + + @Override + public String getName() { + return this.getClass().getName(); + } + + @Override + public Map serializeContext(Object context) { + String testKey = (String) context; + if (testKey != null) { + return Collections.singletonMap("test", testKey.getBytes(StandardCharsets.UTF_8)); + } else { + return Collections.emptyMap(); + } + } + + @Override + public Object deserializeContext(Map context) { + if (context.containsKey("test")) { + return new String(context.get("test"), StandardCharsets.UTF_8); + } else { + return null; + } + } + + @Override + public Object getCurrentContext() { + return MDC.get("test"); + } + + @Override + public void setCurrentContext(Object context) { + MDC.put("test", String.valueOf(context)); + } + } + + public static class ContextPropagationWorkflowImpl implements TestWorkflow { + + @Override + public String workflow1(String input) { + // The test value should be in the MDC + return MDC.get("test"); + } + } + + @Test() + public void testWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("testing123", result); + } + + public static class ContextPropagationParentWorkflowImpl implements ParentWorkflow { + + @Override + public String workflow(String input) { + // Get the MDC value + String mdcValue = MDC.get("test"); + + // Fire up a child workflow + ChildWorkflowOptions options = + new ChildWorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); + + String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); + return result; + } + + @Override + public void signal(String value) {} + } + + public static class ContextPropagationChildWorkflowImpl implements ChildWorkflow { + + @Override + public String workflow(String input, String parentId) { + String mdcValue = MDC.get("test"); + return input + mdcValue; + } + } + + @Test + public void testChildWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes( + ContextPropagationParentWorkflowImpl.class, ContextPropagationChildWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); + String result = workflow.workflow("input1"); + assertEquals("testing123testing123", result); + } + + public static class ContextPropagationThreadWorkflowImpl implements TestWorkflow { + + @Override + public String workflow1(String input) { + Promise asyncPromise = Async.function(this::async); + return asyncPromise.get(); + } + + private String async() { + return "async" + MDC.get("test"); + } + } + + @Test + public void testThreadContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationThreadWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("asynctesting123", result); + } + + public static class ContextActivityImpl implements TestActivity { + @Override + public String activity1(String input) { + return "activity" + MDC.get("test"); + } + } + + public static class ContextPropagationActivityWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + ActivityOptions options = + new ActivityOptions.Builder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); + return activity.activity1("foo"); + } + } + + @Test + public void testActivityContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(ContextPropagationActivityWorkflowImpl.class); + worker.registerActivitiesImplementations(new ContextActivityImpl()); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("activitytesting123", result); + } + + public static class DefaultContextPropagationActivityWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + ActivityOptions options = + new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build(); + TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); + return activity.activity1("foo"); + } + } + + @Test + public void testDefaultActivityContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(DefaultContextPropagationActivityWorkflowImpl.class); + worker.registerActivitiesImplementations(new ContextActivityImpl()); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + String result = workflow.workflow1("input1"); + assertEquals("activitytesting123", result); + } + + public static class DefaultContextPropagationParentWorkflowImpl implements ParentWorkflow { + + @Override + public String workflow(String input) { + // Get the MDC value + String mdcValue = MDC.get("test"); + + // Fire up a child workflow + ChildWorkflowOptions options = new ChildWorkflowOptions.Builder().build(); + ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); + + String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); + return result; + } + + @Override + public void signal(String value) {} + } + + @Test + public void testDefaultChildWorkflowContextPropagation() { + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes( + DefaultContextPropagationParentWorkflowImpl.class, + ContextPropagationChildWorkflowImpl.class); + testEnvironment.start(); + MDC.put("test", "testing123"); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators(Collections.singletonList(new TestContextPropagator())) + .build(); + ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); + String result = workflow.workflow("input1"); + assertEquals("testing123testing123", result); + } + + public static class OpenTracingContextPropagationWorkflowImpl implements TestWorkflow { + @Override + public String workflow1(String input) { + Tracer tracer = GlobalTracer.get(); + Span activeSpan = tracer.scopeManager().activeSpan(); + MockSpan mockSpan = (MockSpan) activeSpan; + assertNotNull(activeSpan); + assertEquals("TestWorkflow::workflow1", mockSpan.tags().get("resource.name")); + assertNotEquals(0, mockSpan.parentId()); + if ("fail".equals(input)) { + throw new IllegalArgumentException(); + } else { + return activeSpan.getBaggageItem("foo"); + } + } + } + + @Test + public void testOpenTracingContextPropagation() { + Tracer tracer = GlobalTracer.get(); + Span span = tracer.buildSpan("testContextPropagationSuccess").start(); + + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(OpenTracingContextPropagationWorkflowImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators( + Arrays.asList(new TestContextPropagator(), new OpenTracingContextPropagator())) + .build(); + + try (Scope scope = tracer.scopeManager().activate(span)) { + + Span activeSpan = tracer.scopeManager().activeSpan(); + activeSpan.setBaggageItem("foo", "bar"); + + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + assertEquals("bar", workflow.workflow1("input1")); + + } finally { + span.finish(); + } + } + + @Test + public void testOpenTracingContextPropagationWithFailure() { + Tracer tracer = GlobalTracer.get(); + Span span = tracer.buildSpan("testContextPropagationFailure").start(); + + Worker worker = testEnvironment.newWorker(TASK_LIST); + worker.registerWorkflowImplementationTypes(OpenTracingContextPropagationWorkflowImpl.class); + testEnvironment.start(); + WorkflowClient client = testEnvironment.newWorkflowClient(); + WorkflowOptions options = + new WorkflowOptions.Builder() + .setContextPropagators( + Arrays.asList(new TestContextPropagator(), new OpenTracingContextPropagator())) + .build(); + + try (Scope scope = tracer.scopeManager().activate(span)) { + + Span activeSpan = tracer.scopeManager().activeSpan(); + activeSpan.setBaggageItem("foo", "bar"); + + TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); + try { + workflow.workflow1("fail"); + fail("Unreachable"); + } catch (WorkflowException e) { + // Expected + assertEquals(IllegalArgumentException.class, e.getCause().getClass()); + } + + } finally { + span.finish(); + } + } +} diff --git a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java index cb00733b3..de22c9b41 100644 --- a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java @@ -18,7 +18,6 @@ package com.uber.cadence.internal.testing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyString; @@ -44,8 +43,6 @@ import com.uber.cadence.client.WorkflowOptions; import com.uber.cadence.client.WorkflowStub; import com.uber.cadence.client.WorkflowTimedOutException; -import com.uber.cadence.context.ContextPropagator; -import com.uber.cadence.context.OpenTracingContextPropagator; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.testing.SimulatedTimeoutException; import com.uber.cadence.testing.TestEnvironmentOptions; @@ -53,25 +50,13 @@ import com.uber.cadence.worker.Worker; import com.uber.cadence.workflow.ActivityTimeoutException; import com.uber.cadence.workflow.Async; -import com.uber.cadence.workflow.ChildWorkflowOptions; import com.uber.cadence.workflow.ChildWorkflowTimedOutException; import com.uber.cadence.workflow.Promise; import com.uber.cadence.workflow.SignalMethod; import com.uber.cadence.workflow.Workflow; import com.uber.cadence.workflow.WorkflowMethod; -import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.Tracer; -import io.opentracing.mock.MockSpan; -import io.opentracing.mock.MockTracer; -import io.opentracing.util.GlobalTracer; -import io.opentracing.util.ThreadLocalScopeManager; -import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CancellationException; @@ -87,7 +72,6 @@ import org.junit.runner.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; public class WorkflowTestingTest { private static final Logger log = LoggerFactory.getLogger(WorkflowTestingTest.class); @@ -111,10 +95,7 @@ protected void failed(Throwable e, Description description) { public void setUp() { TestEnvironmentOptions options = new TestEnvironmentOptions.Builder() - .setFactoryOptions( - new Worker.FactoryOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build()) + .setFactoryOptions(new Worker.FactoryOptions.Builder().build()) .build(); testEnvironment = TestWorkflowEnvironment.newInstance(options); } @@ -753,286 +734,4 @@ public void testMockedChildSimulatedTimeout() { assertTrue(e.getCause() instanceof ChildWorkflowTimedOutException); } } - - public static class TestContextPropagator implements ContextPropagator { - - @Override - public String getName() { - return this.getClass().getName(); - } - - @Override - public Map serializeContext(Object context) { - String testKey = (String) context; - if (testKey != null) { - return Collections.singletonMap("test", testKey.getBytes(StandardCharsets.UTF_8)); - } else { - return Collections.emptyMap(); - } - } - - @Override - public Object deserializeContext(Map context) { - if (context.containsKey("test")) { - return new String(context.get("test"), StandardCharsets.UTF_8); - } else { - return null; - } - } - - @Override - public Object getCurrentContext() { - return MDC.get("test"); - } - - @Override - public void setCurrentContext(Object context) { - MDC.put("test", String.valueOf(context)); - } - } - - public static class ContextPropagationWorkflowImpl implements TestWorkflow { - - @Override - public String workflow1(String input) { - // The test value should be in the MDC - return MDC.get("test"); - } - } - - @Test - public void testWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("testing123", result); - } - - public static class ContextPropagationParentWorkflowImpl implements ParentWorkflow { - - @Override - public String workflow(String input) { - // Get the MDC value - String mdcValue = MDC.get("test"); - - // Fire up a child workflow - ChildWorkflowOptions options = - new ChildWorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); - - String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); - return result; - } - - @Override - public void signal(String value) {} - } - - public static class ContextPropagationChildWorkflowImpl implements ChildWorkflow { - - @Override - public String workflow(String input, String parentId) { - String mdcValue = MDC.get("test"); - return input + mdcValue; - } - } - - @Test - public void testChildWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes( - ContextPropagationParentWorkflowImpl.class, ContextPropagationChildWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); - String result = workflow.workflow("input1"); - assertEquals("testing123testing123", result); - } - - public static class ContextPropagationThreadWorkflowImpl implements TestWorkflow { - - @Override - public String workflow1(String input) { - Promise asyncPromise = Async.function(this::async); - return asyncPromise.get(); - } - - private String async() { - return "async" + MDC.get("test"); - } - } - - @Test - public void testThreadContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationThreadWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("asynctesting123", result); - } - - public static class ContextActivityImpl implements TestActivity { - @Override - public String activity1(String input) { - return "activity" + MDC.get("test"); - } - } - - public static class ContextPropagationActivityWorkflowImpl implements TestWorkflow { - @Override - public String workflow1(String input) { - ActivityOptions options = - new ActivityOptions.Builder() - .setScheduleToCloseTimeout(Duration.ofSeconds(5)) - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); - return activity.activity1("foo"); - } - } - - @Test - public void testActivityContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(ContextPropagationActivityWorkflowImpl.class); - worker.registerActivitiesImplementations(new ContextActivityImpl()); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("activitytesting123", result); - } - - public static class DefaultContextPropagationActivityWorkflowImpl implements TestWorkflow { - @Override - public String workflow1(String input) { - ActivityOptions options = - new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofSeconds(5)).build(); - TestActivity activity = Workflow.newActivityStub(TestActivity.class, options); - return activity.activity1("foo"); - } - } - - @Test - public void testDefaultActivityContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(DefaultContextPropagationActivityWorkflowImpl.class); - worker.registerActivitiesImplementations(new ContextActivityImpl()); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - String result = workflow.workflow1("input1"); - assertEquals("activitytesting123", result); - } - - public static class DefaultContextPropagationParentWorkflowImpl implements ParentWorkflow { - - @Override - public String workflow(String input) { - // Get the MDC value - String mdcValue = MDC.get("test"); - - // Fire up a child workflow - ChildWorkflowOptions options = new ChildWorkflowOptions.Builder().build(); - ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class, options); - - String result = child.workflow(mdcValue, Workflow.getWorkflowInfo().getWorkflowId()); - return result; - } - - @Override - public void signal(String value) {} - } - - @Test - public void testDefaultChildWorkflowContextPropagation() { - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes( - DefaultContextPropagationParentWorkflowImpl.class, - ContextPropagationChildWorkflowImpl.class); - testEnvironment.start(); - MDC.put("test", "testing123"); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators(Collections.singletonList(new TestContextPropagator())) - .build(); - ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options); - String result = workflow.workflow("input1"); - assertEquals("testing123testing123", result); - } - - public static class OpenTracingContextPropagationWorkflowImpl implements TestWorkflow { - @Override - public String workflow1(String input) { - Tracer tracer = GlobalTracer.get(); - Span activeSpan = tracer.scopeManager().activeSpan(); - MockSpan mockSpan = (MockSpan) activeSpan; - assertNotNull(activeSpan); - assertEquals("TestWorkflow::workflow1", mockSpan.tags().get("resource.name")); - return activeSpan.getBaggageItem("foo"); - } - } - - @Test - public void testOpenTracingContextPropagation() { - MockTracer tracer = - new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); - GlobalTracer.registerIfAbsent(tracer); - Span span = tracer.buildSpan("testContextPropagation").start(); - - Worker worker = testEnvironment.newWorker(TASK_LIST); - worker.registerWorkflowImplementationTypes(OpenTracingContextPropagationWorkflowImpl.class); - testEnvironment.start(); - WorkflowClient client = testEnvironment.newWorkflowClient(); - WorkflowOptions options = - new WorkflowOptions.Builder() - .setContextPropagators( - Arrays.asList(new TestContextPropagator(), new OpenTracingContextPropagator())) - .build(); - - try (Scope scope = tracer.scopeManager().activate(span)) { - - Span activeSpan = tracer.scopeManager().activeSpan(); - activeSpan.setBaggageItem("foo", "bar"); - - TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options); - assertEquals("bar", workflow.workflow1("input1")); - - } finally { - span.finish(); - } - } } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index b87fe0374..27d076198 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -25,6 +25,7 @@ + From 500b131cb6f5b5f56ee51a952c2b7827148c76b9 Mon Sep 17 00:00:00 2001 From: Scott Rankin Date: Tue, 15 Sep 2020 10:31:55 -0400 Subject: [PATCH 4/5] Handle propagator names with colons, better error handling --- .../context/OpenTracingContextPropagator.java | 36 +++++++++---------- .../internal/replay/WorkflowContext.java | 3 +- .../internal/worker/ActivityWorker.java | 2 +- .../internal/worker/LocalActivityWorker.java | 3 +- .../uber/cadence/context/ContextTests.java | 2 +- 5 files changed, 24 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java index 9f0fb4f28..e90ac29b9 100644 --- a/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java +++ b/src/main/java/com/uber/cadence/context/OpenTracingContextPropagator.java @@ -82,14 +82,11 @@ public Object deserializeContext(Map context) { @Override public Object getCurrentContext() { - log.debug("Getting current context"); Tracer currentTracer = GlobalTracer.get(); Span currentSpan = currentTracer.scopeManager().activeSpan(); if (currentSpan != null) { HashMapTextMap contextTextMap = new HashMapTextMap(); currentTracer.inject(currentSpan.context(), Format.Builtin.TEXT_MAP, contextTextMap); - log.debug( - "Retrieving current span data as current context: " + contextTextMap.getBackingMap()); return contextTextMap.getBackingMap(); } else { return null; @@ -98,11 +95,9 @@ public Object getCurrentContext() { @Override public void setCurrentContext(Object context) { - log.debug("Setting current context"); Tracer currentTracer = GlobalTracer.get(); Map contextAsMap = (Map) context; if (contextAsMap != null) { - log.debug("setting current context to " + contextAsMap); HashMapTextMap contextTextMap = new HashMapTextMap(contextAsMap); setCurrentOpenTracingSpanContext( currentTracer.extract(Format.Builtin.TEXT_MAP, contextTextMap)); @@ -111,7 +106,6 @@ public void setCurrentContext(Object context) { @Override public void setUp() { - log.debug("Starting a new opentracing span"); Tracer openTracingTracer = GlobalTracer.get(); Tracer.SpanBuilder builder = openTracingTracer @@ -123,7 +117,6 @@ public void setUp() { } Span span = builder.start(); - log.debug("New span: " + span); openTracingTracer.activateSpan(span); currentOpenTracingSpan.set(span); Scope scope = openTracingTracer.activateSpan(span); @@ -133,14 +126,16 @@ public void setUp() { @Override public void onError(Throwable t) { Span span = currentOpenTracingSpan.get(); - Tags.ERROR.set(span, true); - Map errorData = new HashMap<>(); - errorData.put(Fields.EVENT, "error"); - if (t != null) { - errorData.put(Fields.ERROR_OBJECT, t); - errorData.put(Fields.MESSAGE, t.getMessage()); - } - span.log(errorData); + if (span != null) { + Tags.ERROR.set(span, true); + Map errorData = new HashMap<>(); + errorData.put(Fields.EVENT, "error"); + if (t != null) { + errorData.put(Fields.ERROR_OBJECT, t); + errorData.put(Fields.MESSAGE, t.getMessage()); + } + span.log(errorData); + } } @Override @@ -148,9 +143,14 @@ public void finish(boolean successful) { Scope currentScope = currentOpenTracingScope.get(); Span currentSpan = currentOpenTracingSpan.get(); - log.debug("Closing currently open span " + currentSpan.context().toSpanId()); - currentScope.close(); - currentSpan.finish(); + if (currentScope != null) { + currentScope.close(); + } + + if (currentSpan != null) { + currentSpan.finish(); + } + currentOpenTracingScope.remove(); currentOpenTracingSpan.remove(); currentOpenTracingSpanContext.remove(); diff --git a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java index 67874439a..729255727 100644 --- a/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java +++ b/src/main/java/com/uber/cadence/internal/replay/WorkflowContext.java @@ -176,7 +176,8 @@ Map getPropagatedContexts() { .filter(e -> e.getKey().startsWith(propagator.getName())) .collect( Collectors.toMap( - e -> e.getKey().substring(e.getKey().indexOf(":") + 1), Map.Entry::getValue)); + e -> e.getKey().substring(propagator.getName().length() + 1), + Map.Entry::getValue)); contextData.put(propagator.getName(), propagator.deserializeContext(filteredData)); } diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index c9b29f7c7..eb88bd332 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -262,7 +262,7 @@ void propagateContext(PollForActivityTaskResponse response) { .filter(e -> e.getKey().startsWith(propagator.getName())) .collect( Collectors.toMap( - e -> e.getKey().substring(e.getKey().indexOf(":") + 1), + e -> e.getKey().substring(propagator.getName().length() + 1), Map.Entry::getValue)); propagator.setCurrentContext(propagator.deserializeContext(filteredData)); } diff --git a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java index 56a370821..f0ae8f72f 100644 --- a/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java @@ -283,7 +283,8 @@ private void restoreContext(Map context) { .filter(e -> e.getKey().startsWith(propagator.getName())) .collect( Collectors.toMap( - e -> e.getKey().substring(e.getKey().indexOf(":") + 1), Map.Entry::getValue)); + e -> e.getKey().substring(propagator.getName().length() + 1), + Map.Entry::getValue)); propagator.setCurrentContext(propagator.deserializeContext(filteredData)); } } diff --git a/src/test/java/com/uber/cadence/context/ContextTests.java b/src/test/java/com/uber/cadence/context/ContextTests.java index 161a01c9c..210f9b065 100644 --- a/src/test/java/com/uber/cadence/context/ContextTests.java +++ b/src/test/java/com/uber/cadence/context/ContextTests.java @@ -123,7 +123,7 @@ public static class TestContextPropagator implements ContextPropagator { @Override public String getName() { - return this.getClass().getName(); + return "TestContextPropagator::withSomeColons"; } @Override From 238cdbf418a18563d71b8a3c77b0da75ed5246cf Mon Sep 17 00:00:00 2001 From: Scott Rankin Date: Fri, 13 Nov 2020 10:20:05 -0500 Subject: [PATCH 5/5] Merging from upstream master --- .../internal/worker/ActivityWorker.java | 3 -- .../java/com/uber/cadence/worker/Worker.java | 1 - .../uber/cadence/context/ContextTests.java | 6 ++-- .../internal/testing/WorkflowTestingTest.java | 29 +++++++++---------- 4 files changed, 17 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java index 9b5ba0923..6941f1cc4 100644 --- a/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java +++ b/src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java @@ -24,11 +24,8 @@ import com.uber.cadence.RespondActivityTaskFailedRequest; import com.uber.cadence.WorkflowExecution; import com.uber.cadence.context.ContextPropagator; - import com.uber.cadence.context.OpenTracingContextPropagator; - import com.uber.cadence.internal.common.RpcRetryer; - import com.uber.cadence.internal.logging.LoggerTag; import com.uber.cadence.internal.metrics.MetricsTag; import com.uber.cadence.internal.metrics.MetricsType; diff --git a/src/main/java/com/uber/cadence/worker/Worker.java b/src/main/java/com/uber/cadence/worker/Worker.java index 700b9fd45..1dd9eb79e 100644 --- a/src/main/java/com/uber/cadence/worker/Worker.java +++ b/src/main/java/com/uber/cadence/worker/Worker.java @@ -35,7 +35,6 @@ import com.uber.cadence.workflow.WorkflowMethod; import com.uber.m3.tally.Scope; import com.uber.m3.util.ImmutableMap; - import java.time.Duration; import java.util.List; import java.util.Objects; diff --git a/src/test/java/com/uber/cadence/context/ContextTests.java b/src/test/java/com/uber/cadence/context/ContextTests.java index 210f9b065..320ddcb05 100644 --- a/src/test/java/com/uber/cadence/context/ContextTests.java +++ b/src/test/java/com/uber/cadence/context/ContextTests.java @@ -22,6 +22,7 @@ import com.uber.cadence.activity.ActivityMethod; import com.uber.cadence.activity.ActivityOptions; import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; import com.uber.cadence.client.WorkflowException; import com.uber.cadence.client.WorkflowOptions; import com.uber.cadence.internal.testing.WorkflowTestingTest.ChildWorkflow; @@ -81,13 +82,14 @@ protected void failed(Throwable e, Description description) { public void setUp() { TestEnvironmentOptions options = new TestEnvironmentOptions.Builder() - .setFactoryOptions( - new Worker.FactoryOptions.Builder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() .setContextPropagators( Arrays.asList( new TestContextPropagator(), new OpenTracingContextPropagator())) .build()) .build(); + testEnvironment = TestWorkflowEnvironment.newInstance(options); GlobalTracer.registerIfAbsent(mockTracer); } diff --git a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java index 8e09831e0..41589711d 100644 --- a/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java +++ b/src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java @@ -17,6 +17,11 @@ package com.uber.cadence.internal.testing; +import static org.junit.Assert.*; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import com.uber.cadence.EventType; import com.uber.cadence.GetWorkflowExecutionHistoryRequest; import com.uber.cadence.History; @@ -49,6 +54,13 @@ import com.uber.cadence.workflow.SignalMethod; import com.uber.cadence.workflow.Workflow; import com.uber.cadence.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.thrift.TException; import org.junit.After; import org.junit.Before; @@ -60,19 +72,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; -import java.util.List; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -import static org.junit.Assert.*; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class WorkflowTestingTest { private static final Logger log = LoggerFactory.getLogger(WorkflowTestingTest.class); @@ -95,9 +94,7 @@ protected void failed(Throwable e, Description description) { public void setUp() { TestEnvironmentOptions options = new TestEnvironmentOptions.Builder() - .setWorkflowClientOptions( - WorkflowClientOptions.newBuilder() - .build()) + .setWorkflowClientOptions(WorkflowClientOptions.newBuilder().build()) .build(); testEnvironment = TestWorkflowEnvironment.newInstance(options); }