Skip to content

Commit

Permalink
Add Nexus Worker interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Oct 17, 2024
1 parent eb64ec3 commit b7e91c4
Show file tree
Hide file tree
Showing 26 changed files with 998 additions and 28 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ plugins {

allprojects {
repositories {
mavenLocal()
mavenCentral()
}
}
Expand All @@ -32,7 +33,7 @@ ext {
// Platforms
grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
jacksonVersion = '2.14.2' // [2.9.0,)
nexusVersion = '0.2.0-alpha'
nexusVersion = '0.3.0'
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.10.5' : '1.9.9' // [1.0.0,)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
package io.temporal.opentracing;

import io.temporal.common.interceptors.*;
import io.temporal.opentracing.internal.ContextAccessor;
import io.temporal.opentracing.internal.OpenTracingActivityInboundCallsInterceptor;
import io.temporal.opentracing.internal.OpenTracingWorkflowInboundCallsInterceptor;
import io.temporal.opentracing.internal.SpanFactory;
import io.temporal.opentracing.internal.*;

public class OpenTracingWorkerInterceptor implements WorkerInterceptor {
private final OpenTracingOptions options;
Expand Down Expand Up @@ -52,4 +49,11 @@ public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInt
return new OpenTracingActivityInboundCallsInterceptor(
next, options, spanFactory, contextAccessor);
}

@Override
public NexusOperationInboundCallsInterceptor interceptNexusOperation(
NexusOperationInboundCallsInterceptor next) {
return new OpenTracingNexusOperationInboundCallsInterceptor(
next, options, spanFactory, contextAccessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ public enum SpanOperationType {
UPDATE_WORKFLOW("UpdateWorkflow"),
HANDLE_QUERY("HandleQuery"),
HANDLE_SIGNAL("HandleSignal"),
HANDLE_UPDATE("HandleUpdate");
HANDLE_UPDATE("HandleUpdate"),
EXECUTE_NEXUS_OPERATION("ExecuteNexusOperation"),
START_NEXUS_OPERATION("StartNexusOperation"),
CANCEL_NEXUS_OPERATION("CancelNexusOperation");

private final String defaultPrefix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ protected Map<String, String> getSpanTags(SpanCreationContext context) {
return ImmutableMap.of(
StandardTagNames.WORKFLOW_ID, context.getWorkflowId(),
StandardTagNames.RUN_ID, context.getRunId());
case EXECUTE_NEXUS_OPERATION:
return ImmutableMap.of(
StandardTagNames.WORKFLOW_ID, context.getWorkflowId(),
StandardTagNames.RUN_ID, context.getRunId());
case START_NEXUS_OPERATION:
case CANCEL_NEXUS_OPERATION:
return ImmutableMap.of();
case HANDLE_QUERY:
return ImmutableMap.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMapAdapter;
import io.temporal.api.common.v1.Payload;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.common.converter.StdConverterBackwardsCompatAdapter;
Expand Down Expand Up @@ -72,4 +74,20 @@ public SpanContext readSpanContextFromHeader(Header header, Tracer tracer) {
payload, HashMap.class, HASH_MAP_STRING_STRING_TYPE);
return codec.decode(serializedSpanContext, tracer);
}

public Span writeSpanContextToHeader(
Supplier<Span> spanSupplier, Map<String, String> header, Tracer tracer) {
Span span = spanSupplier.get();
writeSpanContextToHeader(span.context(), header, tracer);
return span;
}

public void writeSpanContextToHeader(
SpanContext spanContext, Map<String, String> header, Tracer tracer) {
tracer.inject(spanContext, Format.Builtin.HTTP_HEADERS, new TextMapAdapter(header));
}

public SpanContext readSpanContextFromHeader(Map<String, String> header, Tracer tracer) {
return tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(header));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.opentracing.internal;

import io.nexusrpc.OperationUnsuccessfulException;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor;
import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptorBase;
import io.temporal.opentracing.OpenTracingOptions;

public class OpenTracingNexusOperationInboundCallsInterceptor
extends NexusOperationInboundCallsInterceptorBase {
private final OpenTracingOptions options;
private final SpanFactory spanFactory;
private final Tracer tracer;
private final ContextAccessor contextAccessor;

public OpenTracingNexusOperationInboundCallsInterceptor(
NexusOperationInboundCallsInterceptor next,
OpenTracingOptions options,
SpanFactory spanFactory,
ContextAccessor contextAccessor) {
super(next);
this.options = options;
this.spanFactory = spanFactory;
this.tracer = options.getTracer();
this.contextAccessor = contextAccessor;
}

@Override
public StartOperationOutput startOperation(StartOperationInput input)
throws OperationUnsuccessfulException {
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer);

Span operationStartSpan =
spanFactory
.createStartNexusOperationSpan(
tracer,
input.getOperationContext().getService(),
input.getOperationContext().getOperation(),
rootSpanContext)
.start();
try (Scope scope = tracer.scopeManager().activate(operationStartSpan)) {
return super.startOperation(input);
} catch (Throwable t) {
spanFactory.logFail(operationStartSpan, t);
throw t;
} finally {
operationStartSpan.finish();
}
}

@Override
public CancelOperationOutput cancelOperation(CancelOperationInput input) {
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer);

Span operationCancelSpan =
spanFactory
.createCancelNexusOperationSpan(
tracer,
input.getOperationContext().getService(),
input.getOperationContext().getOperation(),
rootSpanContext)
.start();
try (Scope scope = tracer.scopeManager().activate(operationCancelSpan)) {
return super.cancelOperation(input);
} catch (Throwable t) {
spanFactory.logFail(operationCancelSpan, t);
throw t;
} finally {
operationCancelSpan.finish();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInfo;
import io.temporal.workflow.unsafe.WorkflowUnsafe;
import java.util.HashMap;
import java.util.Map;

public class OpenTracingWorkflowOutboundCallsInterceptor
extends WorkflowOutboundCallsInterceptorBase {
Expand Down Expand Up @@ -100,6 +102,33 @@ public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> inp
}
}

@Override
public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
ExecuteNexusOperationInput<R> input) {
if (!WorkflowUnsafe.isReplaying()) {
Map<String, String> headers = new HashMap(input.getHeaders());
Span nexusOperationExecuteSpan =
contextAccessor.writeSpanContextToHeader(
() -> createNexusOperationExecuteSpanBuilder(input).start(), headers, tracer);
try (Scope ignored = tracer.scopeManager().activate(nexusOperationExecuteSpan)) {
return super.executeNexusOperation(
new ExecuteNexusOperationInput(
input.getEndpoint(),
input.getService(),
input.getOperation(),
input.getResultClass(),
input.getResultType(),
input.getArg(),
input.getOptions(),
headers));
} finally {
nexusOperationExecuteSpan.finish();
}
} else {
return super.executeNexusOperation(input);
}
}

@Override
public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
if (!WorkflowUnsafe.isReplaying()) {
Expand Down Expand Up @@ -176,6 +205,17 @@ private <R> Tracer.SpanBuilder createChildWorkflowStartSpanBuilder(ChildWorkflow
parentWorkflowInfo.getRunId());
}

private <R> Tracer.SpanBuilder createNexusOperationExecuteSpanBuilder(
ExecuteNexusOperationInput<R> input) {
WorkflowInfo parentWorkflowInfo = Workflow.getInfo();
return spanFactory.createNexusOperationExecuteSpan(
tracer,
input.getService(),
input.getOperation(),
parentWorkflowInfo.getWorkflowId(),
parentWorkflowInfo.getRunId());
}

private Tracer.SpanBuilder createContinueAsNewWorkflowStartSpanBuilder(ContinueAsNewInput input) {
WorkflowInfo continuedWorkflowInfo = Workflow.getInfo();
return spanFactory.createContinueAsNewWorkflowStartSpan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ public Tracer.SpanBuilder createWorkflowStartSpan(
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createNexusOperationExecuteSpan(
Tracer tracer, String serviceName, String operationName, String workflowId, String runId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.EXECUTE_NEXUS_OPERATION)
.setActionName(serviceName + "." + operationName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, null, References.CHILD_OF);
}

public Tracer.SpanBuilder createChildWorkflowStartSpan(
Tracer tracer,
String childWorkflowType,
Expand Down Expand Up @@ -173,6 +185,26 @@ public Tracer.SpanBuilder createActivityRunSpan(
return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createStartNexusOperationSpan(
Tracer tracer, String serviceName, String operationName, SpanContext nexusStartSpanContext) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.START_NEXUS_OPERATION)
.setActionName(serviceName + "." + operationName)
.build();
return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createCancelNexusOperationSpan(
Tracer tracer, String serviceName, String operationName, SpanContext nexusStartSpanContext) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.CANCEL_NEXUS_OPERATION)
.setActionName(serviceName + "." + operationName)
.build();
return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowStartUpdateSpan(
Tracer tracer, String updateName, String workflowId, String runId) {
SpanCreationContext context =
Expand Down
Loading

0 comments on commit b7e91c4

Please sign in to comment.