Skip to content

Commit

Permalink
Avoid incrementing WFT failure metric on respond failure
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Oct 10, 2023
1 parent 2475838 commit 86045ef
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ public void handle(WorkflowTask task) throws Exception {
}
} catch (Exception e) {
logExceptionDuringResultReporting(e, currentTask, result);
workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1);
// if we failed to report the workflow task completion back to the server,
// our cached version of the workflow may be more advanced than the server is aware of.
// We should discard this execution and perform a clean replay based on what server
Expand All @@ -370,8 +369,6 @@ public void handle(WorkflowTask task) throws Exception {
throw e;
}

// this should be after sendReply, otherwise we may log
// WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER twice if sendReply throws
if (result.getTaskFailed() != null) {
// we don't trigger the counter in case of the legacy query
// (which never has taskFailed set)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,102 @@ public void concurrentPollRequestLockTest() throws Exception {
// Verify we only handled two tasks
verify(taskHandler, times(2)).handleWorkflowTask(any());
}

@Test
public void respondWorkflowTaskFailureMetricTest() throws Exception {
// Test that if the server sends multiple concurrent workflow tasks for the same workflow the
// SDK holds the lock during all processing.
WorkflowServiceStubs client = mock(WorkflowServiceStubs.class);
when(client.getServerCapabilities())
.thenReturn(() -> GetSystemInfoResponse.Capabilities.newBuilder().build());

WorkflowRunLockManager runLockManager = new WorkflowRunLockManager();

Scope metricsScope =
new RootScopeBuilder()
.reporter(reporter)
.reportEvery(com.uber.m3.util.Duration.ofMillis(1));
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, runLockManager, metricsScope);

WorkflowTaskHandler taskHandler = mock(WorkflowTaskHandler.class);
when(taskHandler.isAnyTypeSupported()).thenReturn(true);

EagerActivityDispatcher eagerActivityDispatcher = mock(EagerActivityDispatcher.class);
WorkflowWorker worker =
new WorkflowWorker(
client,
"default",
"task_queue",
"sticky_task_queue",
SingleWorkerOptions.newBuilder()
.setIdentity("test_identity")
.setBuildId(UUID.randomUUID().toString())
.setPollerOptions(PollerOptions.newBuilder().setPollThreadCount(1).build())
.setMetricsScope(metricsScope)
.build(),
runLockManager,
cache,
taskHandler,
eagerActivityDispatcher);

WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub =
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
when(client.blockingStub()).thenReturn(blockingStub);
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);

PollWorkflowTaskQueueResponse pollResponse =
PollWorkflowTaskQueueResponse.newBuilder()
.setTaskToken(ByteString.copyFrom("token", UTF_8))
.setWorkflowExecution(
WorkflowExecution.newBuilder().setWorkflowId(WORKFLOW_ID).setRunId(RUN_ID).build())
.setWorkflowType(WorkflowType.newBuilder().setName(WORKFLOW_TYPE).build())
.build();

CountDownLatch pollTaskQueueLatch = new CountDownLatch(1);
CountDownLatch blockPollTaskQueueLatch = new CountDownLatch(1);

when(blockingStub.pollWorkflowTaskQueue(any(PollWorkflowTaskQueueRequest.class)))
.thenReturn(pollResponse)
.thenAnswer(
(Answer<PollWorkflowTaskQueueResponse>)
invocation -> {
pollTaskQueueLatch.countDown();
blockPollTaskQueueLatch.await();
return null;
});
;

CountDownLatch handleTaskLatch = new CountDownLatch(1);

when(taskHandler.handleWorkflowTask(any(PollWorkflowTaskQueueResponse.class)))
.thenAnswer(
(Answer<WorkflowTaskHandler.Result>)
invocation -> {
handleTaskLatch.countDown();

return new WorkflowTaskHandler.Result(
WORKFLOW_TYPE,
RespondWorkflowTaskCompletedRequest.newBuilder().build(),
null,
null,
null,
false,
null);
});

when(blockingStub.respondWorkflowTaskCompleted(any(RespondWorkflowTaskCompletedRequest.class)))
.thenThrow(new RuntimeException());

assertTrue(worker.start());
// Wait until we have got all the polls
pollTaskQueueLatch.await();
// Wait until the worker handles at least one WFT
handleTaskLatch.await();
// Cleanup
worker.shutdown(new ShutdownManager(), false).get();
// Make sure we don't report workflow task failure
reporter.assertNoMetric(
MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER,
ImmutableMap.of("worker_type", "WorkflowWorker", "workflow_type", "test-workflow-type"));
}
}

0 comments on commit 86045ef

Please sign in to comment.