From 0e58687f783efeab29e2b41e95baff6cf04146f2 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 23 Sep 2024 09:59:33 -0700 Subject: [PATCH] Add getMetricsScope interceptor (#2224) Add getMetricsScope interceptor --- .../WorkflowOutboundCallsInterceptor.java | 4 ++ .../WorkflowOutboundCallsInterceptorBase.java | 6 ++ .../internal/sync/SyncWorkflowContext.java | 1 + .../internal/sync/WorkflowInternal.java | 2 +- .../io/temporal/workflow/MetricsTest.java | 69 +++++++++++++++++++ .../TestActivityEnvironmentInternal.java | 5 ++ .../internal/TracingWorkerInterceptor.java | 9 +++ 7 files changed, 95 insertions(+), 1 deletion(-) diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index d3f0b0ac7..7cb961644 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -20,6 +20,7 @@ package io.temporal.common.interceptors; +import com.uber.m3.tally.Scope; import io.temporal.activity.ActivityOptions; import io.temporal.activity.LocalActivityOptions; import io.temporal.api.common.v1.WorkflowExecution; @@ -621,6 +622,9 @@ R mutableSideEffect( void upsertMemo(Map memo); + /** Intercepts call to get the metric scope in a workflow. */ + Scope getMetricsScope(); + /** * Intercepts creation of a workflow child thread. * diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java index 71dc167da..e2d422355 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java @@ -20,6 +20,7 @@ package io.temporal.common.interceptors; +import com.uber.m3.tally.Scope; import io.temporal.common.SearchAttributeUpdate; import io.temporal.workflow.Functions.Func; import io.temporal.workflow.Promise; @@ -167,6 +168,11 @@ public void upsertMemo(Map memo) { next.upsertMemo(memo); } + @Override + public Scope getMetricsScope() { + return next.getMetricsScope(); + } + @Override public Object newChildThread(Runnable runnable, boolean detached, String name) { return next.newChildThread(runnable, detached, name); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index a827e34d2..3ed169b5b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1210,6 +1210,7 @@ public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) { return new CancelWorkflowOutput(result); } + @Override public Scope getMetricsScope() { return replayContext.getMetricsScope(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 2b7d2d403..6c4d55d29 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -661,7 +661,7 @@ public static Optional getCurrentUpdateInfo() { } public static Scope getMetricsScope() { - return getRootWorkflowContext().getMetricsScope(); + return getWorkflowOutboundInterceptor().getMetricsScope(); } private static boolean isLoggingEnabledInReplay() { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java index 7c54fa63b..d3731129b 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java @@ -61,6 +61,7 @@ import io.temporal.workflow.shared.TestWorkflows.ReceiveSignalObjectWorkflow; import io.temporal.workflow.shared.TestWorkflows.TestWorkflowReturnString; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -77,6 +78,7 @@ public class MetricsTest { private static final long REPORTING_FLUSH_TIME = 600; private static final String TASK_QUEUE = "metrics_test"; + private static final String TEST_TAG = "test_tag"; private TestWorkflowEnvironment testEnvironment; private TestStatsReporter reporter; @@ -243,6 +245,44 @@ public void testWorkflowMetrics() throws InterruptedException { reporter.assertCounter(TEMPORAL_REQUEST, workflowTaskCompletionTags, 4); } + @Test + public void testWorkflowMetricsInterceptor() throws InterruptedException { + setUp( + WorkerFactoryOptions.getDefaultInstance().toBuilder() + .setWorkerInterceptors(new WorkerInterceptor()) + .build()); + + Worker worker = testEnvironment.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes( + TestCustomMetricsInWorkflow.class, TestMetricsInChildWorkflow.class); + worker.registerActivitiesImplementations(new TestActivityImpl()); + testEnvironment.start(); + + WorkflowClient workflowClient = testEnvironment.getWorkflowClient(); + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowRunTimeout(Duration.ofSeconds(1000)) + .setTaskQueue(TASK_QUEUE) + .build(); + NoArgsWorkflow workflow = workflowClient.newWorkflowStub(NoArgsWorkflow.class, options); + workflow.execute(); + + Thread.sleep(REPORTING_FLUSH_TIME); + + Map workflowTags = new LinkedHashMap<>(TAGS_TASK_QUEUE); + // Assert the interceptor added the extra tag + workflowTags.put(TEST_TAG, NAMESPACE); + + workflowTags.put(MetricsTag.WORKFLOW_TYPE, "NoArgsWorkflow"); + reporter.assertCounter("test_started", workflowTags, 1); + reporter.assertCounter("test_done", workflowTags, 1); + + workflowTags.put(MetricsTag.WORKFLOW_TYPE, "TestChildWorkflow"); + reporter.assertCounter("test_child_started", workflowTags, 1); + reporter.assertCounter("test_child_done", workflowTags, 1); + reporter.assertTimerMinDuration("test_timer", workflowTags, Duration.ofSeconds(3)); + } + @Test public void testCorruptedSignalMetrics() throws InterruptedException { setUp( @@ -600,4 +640,33 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) { overrideArgs.apply(args))); } } + + private static class WorkerInterceptor extends WorkerInterceptorBase { + @Override + public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) { + return new WorkflowInboundCallsInterceptorBase(next) { + @Override + public void init(WorkflowOutboundCallsInterceptor outboundCalls) { + next.init(new OutboundCallsInterceptor(outboundCalls)); + } + }; + } + } + + private static class OutboundCallsInterceptor extends WorkflowOutboundCallsInterceptorBase { + WorkflowOutboundCallsInterceptor next; + + public OutboundCallsInterceptor(WorkflowOutboundCallsInterceptor next) { + super(next); + this.next = next; + } + + @Override + public Scope getMetricsScope() { + return next.getMetricsScope() + .tagged( + Collections.singletonMap( + TEST_TAG, String.valueOf(Workflow.getInfo().getNamespace()))); + } + } } diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index 6fbe9ea67..a8adb759d 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -481,6 +481,11 @@ public void upsertMemo(Map memo) { throw new UnsupportedOperationException("not implemented"); } + @Override + public Scope getMetricsScope() { + throw new UnsupportedOperationException("not implemented"); + } + @Override public Object newChildThread(Runnable runnable, boolean detached, String name) { throw new UnsupportedOperationException("not implemented"); diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index 411a8f6fc..6282b7d2a 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import com.uber.m3.tally.Scope; import io.temporal.activity.ActivityExecutionContext; import io.temporal.client.ActivityCompletionException; import io.temporal.common.SearchAttributeUpdate; @@ -398,6 +399,14 @@ public void upsertMemo(Map memo) { next.upsertMemo(memo); } + @Override + public Scope getMetricsScope() { + if (!WorkflowUnsafe.isReplaying()) { + trace.add("getMetricsScope"); + } + return next.getMetricsScope(); + } + @Override public Object newChildThread(Runnable runnable, boolean detached, String name) { if (!WorkflowUnsafe.isReplaying()) {