Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opentracing Support #263

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions temporal-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@ dependencies {
api project(':temporal-serviceclient')
api group: 'com.google.code.gson', name: 'gson', version: '2.8.6'
api group: 'io.micrometer', name: 'micrometer-core', version: '1.6.0'
api group: 'io.opentracing', name: 'opentracing-api', version: '0.33.0'

implementation group: 'com.google.guava', name: 'guava', version: '30.0-jre'
implementation group: 'com.cronutils', name: 'cron-utils', version: '9.1.1'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.11.3'
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.11.3'
implementation group: 'io.opentracing', name: 'opentracing-util', version: '0.33.0'

if (!JavaVersion.current().isJava8()) {
implementation 'javax.annotation:javax.annotation-api:1.3.2'
}

testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
testImplementation group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
testImplementation group: 'junit', name: 'junit', version: '4.13.1'
testImplementation group: 'io.opentracing', name: 'opentracing-mock', version: '0.33.0'
}

configurations.all {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import java.util.Map;

/**
* Context Propagators are used to propagate information from workflow to activity, workflow to
* child workflow, and workflow to child thread (using {@link io.temporal.workflow.Async}).
* Context Propagators are used to propagate information from workflow stub to workflow, workflow to
* activity, workflow to child workflow, and workflow to child thread (using {@link
* io.temporal.workflow.Async}).
*
* <p>It is important to note that all threads share one ContextPropagator instance, so your
* implementation <b>must</b> be thread-safe and store any state in ThreadLocal variables.
*
* <p>A sample <code>ContextPropagator</code> that copies all {@link org.slf4j.MDC} entries starting
* with a given prefix along the code path looks like this:
Expand Down Expand Up @@ -126,4 +130,27 @@ public interface ContextPropagator {

/** Sets the current context */
void setCurrentContext(Object context);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this and the implementation I'm getting more inclined to just using interceptors for this integration and getting rid of the context propagator interface completely as it has very limited use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d be happy to look into that - are there any docs or examples of interceptors anywhere? I didn’t see a way of propagating information from worker/client to the server and back using interceptors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right that there is something missing in the interceptors to support this use case. Let me think about this.

/**
* This is a lifecycle method, called after the context has been propagated to the
* workflow/activity thread but the workflow/activity has not yet started.
*/
default void setUp() {
// No-op
}

/** This is a lifecycle method, called after the workflow/activity has completed. */
default void finish() {
// No-op
}

/**
* This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled
* exception. {@link #finish()} is called after this method.
*
* @param t The unhandled exception that caused the workflow/activity to terminate
*/
default void onError(Throwable t) {
// No-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.common.context;

import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.log.Fields;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import io.temporal.api.common.v1.Payload;
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.logging.LoggerTag;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/** Support for OpenTracing spans */
public class OpenTracingContextPropagator implements ContextPropagator {

private static final Logger log = LoggerFactory.getLogger(OpenTracingContextPropagator.class);

private static final ThreadLocal<SpanContext> currentOpenTracingSpanContext = new ThreadLocal<>();
private static final ThreadLocal<Span> currentOpenTracingSpan = new ThreadLocal<>();
private static final ThreadLocal<Scope> currentOpenTracingScope = new ThreadLocal<>();

public static SpanContext getCurrentOpenTracingSpanContext() {
return currentOpenTracingSpanContext.get();
}

public static void setCurrentOpenTracingSpanContext(SpanContext ctx) {
if (ctx != null) {
currentOpenTracingSpanContext.set(ctx);
}
}

@Override
public String getName() {
return "OpenTracing";
}

@Override
public Map<String, Payload> serializeContext(Object context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The header is a shared resource. So we cannot assume that all the keys in it are created using this specific propagator. So the solution is to store the whole context in a single header key. We also want this to be compatible with Go SDK. Here is the Go SDK open trace propagator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point - I will take a look at the go propagator and work on ensuring cross-platform.

Map<String, Payload> serializedContext = new HashMap<>();
Map<String, String> contextMap = (Map<String, String>) context;
if (contextMap != null) {
for (Map.Entry<String, String> entry : contextMap.entrySet()) {
serializedContext.put(
entry.getKey(), DataConverter.getDefaultInstance().toPayload(entry.getValue()).get());
}
}
return serializedContext;
}

@Override
public Object deserializeContext(Map<String, Payload> context) {
Map<String, String> contextMap = new HashMap<>();
for (Map.Entry<String, Payload> entry : context.entrySet()) {
contextMap.put(
entry.getKey(),
DataConverter.getDefaultInstance()
.fromPayload(entry.getValue(), String.class, String.class));
}
return contextMap;
}

@Override
public Object getCurrentContext() {
Tracer currentTracer = GlobalTracer.get();
Span currentSpan = currentTracer.scopeManager().activeSpan();
if (currentSpan != null) {
HashMapTextMap contextTextMap = new HashMapTextMap();
currentTracer.inject(currentSpan.context(), Format.Builtin.TEXT_MAP, contextTextMap);
return contextTextMap.getBackingMap();
} else {
return null;
}
}

@Override
public void setCurrentContext(Object context) {
Tracer currentTracer = GlobalTracer.get();
Map<String, String> contextAsMap = (Map<String, String>) context;
if (contextAsMap != null) {
HashMapTextMap contextTextMap = new HashMapTextMap(contextAsMap);
setCurrentOpenTracingSpanContext(
currentTracer.extract(Format.Builtin.TEXT_MAP, contextTextMap));
}
}

@Override
public void setUp() {
Tracer openTracingTracer = GlobalTracer.get();
Tracer.SpanBuilder builder = openTracingTracer.buildSpan("cadence.workflow");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is going to work. The problem is that during replay a new span is going to be created.
Unfortunately, there is no way to create a span passing deterministically generated ID to it.

Another option would be using SideEffect to store the span, but that implies additional MarkerEvent for every workflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it really matter if we start multiple spans if we only finish one? I don't think spans get reported until you finish() them.


if (MDC.getCopyOfContextMap().containsKey(LoggerTag.WORKFLOW_TYPE)) {
builder.withTag("resource.name", MDC.get(LoggerTag.WORKFLOW_TYPE));
} else {
builder.withTag("resource.name", MDC.get(LoggerTag.ACTIVITY_TYPE));
}

if (getCurrentOpenTracingSpanContext() != null) {
builder.asChildOf(getCurrentOpenTracingSpanContext());
}
Span span = builder.start();
openTracingTracer.activateSpan(span);
currentOpenTracingSpan.set(span);
Scope scope = openTracingTracer.activateSpan(span);
currentOpenTracingScope.set(scope);
}

@Override
public void onError(Throwable t) {
Span span = currentOpenTracingSpan.get();
if (span != null) {
Tags.ERROR.set(span, true);
Map<String, Object> errorData = new HashMap<>();
errorData.put(Fields.EVENT, "error");
if (t != null) {
errorData.put(Fields.ERROR_OBJECT, t);
errorData.put(Fields.MESSAGE, t.getMessage());
}
span.log(errorData);
}
}

@Override
public void finish() {
Scope currentScope = currentOpenTracingScope.get();
Span currentSpan = currentOpenTracingSpan.get();
if (currentScope != null) {
currentScope.close();
}
if (currentSpan != null) {
currentSpan.finish();
}
currentOpenTracingScope.remove();
currentOpenTracingSpan.remove();
currentOpenTracingSpanContext.remove();
}

/** Just check for other instances of the same class */
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (this == obj) {
return true;
}
return this.getClass().equals(obj.getClass());
}

@Override
public int hashCode() {
return this.getClass().hashCode();
}

private class HashMapTextMap implements TextMap {
private final HashMap<String, String> backingMap = new HashMap<>();

public HashMapTextMap() {
// Noop
}

public HashMapTextMap(Map<String, String> spanData) {
backingMap.putAll(spanData);
}

@Override
public Iterator<Map.Entry<String, String>> iterator() {
return backingMap.entrySet().iterator();
}

@Override
public void put(String key, String value) {
backingMap.put(key, value);
}

public HashMap<String, String> getBackingMap() {
return backingMap;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.internal.context;

import io.temporal.common.context.ContextPropagator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** This class holds the current set of context propagators */
public abstract class AbstractContextThreadLocal {

private static final Logger log = LoggerFactory.getLogger(AbstractContextThreadLocal.class);

/**
* Returns the context propagators for the current thread
*
* @return
*/
protected abstract List<ContextPropagator> getPropagatorsForThread();

/** Sets the context propagators for this thread */
public abstract void setContextPropagators(List<ContextPropagator> contextPropagators);

public List<ContextPropagator> getContextPropagators() {
return getPropagatorsForThread();
}

public Map<String, Object> getCurrentContextForPropagation() {
Map<String, Object> contextData = new HashMap<>();
for (ContextPropagator propagator : getPropagatorsForThread()) {
contextData.put(propagator.getName(), propagator.getCurrentContext());
}
return contextData;
}

/**
* Injects the context data into the thread for each configured context propagator
*
* @param contextData The context data received from the server
*/
public void propagateContextToCurrentThread(Map<String, Object> contextData) {
if (contextData == null || contextData.isEmpty()) {
return;
}
for (ContextPropagator propagator : getPropagatorsForThread()) {
if (contextData.containsKey(propagator.getName())) {
propagator.setCurrentContext(contextData.get(propagator.getName()));
}
}
}

/** Calls {@link ContextPropagator#setUp()} for each propagator */
public void setUpContextPropagators() {
for (ContextPropagator propagator : getPropagatorsForThread()) {
try {
propagator.setUp();
} catch (Throwable t) {
// Don't let an error in one propagator block the others
log.error("Error calling setUp() on a contextpropagator", t);
}
}
}

/**
* Calls {@link ContextPropagator#onError(Throwable)} for each propagator
*
* @param t The Throwable that caused the workflow/activity to finish
*/
public void onErrorContextPropagators(Throwable t) {
for (ContextPropagator propagator : getPropagatorsForThread()) {
try {
propagator.onError(t);
} catch (Throwable t1) {
// Don't let an error in one propagator block the others
log.error("Error calling onError() on a contextpropagator", t1);
}
}
}

/** Calls {@link ContextPropagator#finish()} for each propagator */
public void finishContextPropagators() {
for (ContextPropagator propagator : getPropagatorsForThread()) {
try {
propagator.finish();
} catch (Throwable t) {
// Don't let an error in one propagator block the others
log.error("Error calling finish() on a contextpropagator", t);
}
}
}
}
Loading