From 531d3cb2d4fae3a16c7ac5ae7a0ea0f762fc4a1e Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 6 Aug 2024 14:52:23 -0700 Subject: [PATCH] Wrap GRPC::CANCELED and DEADLINE_EXCEEDED in new exception type (#2172) * Wrap GRPC::CANCELED and DEADLINE_EXCEEDED --- ...flowUpdateTimeoutOrCancelledException.java | 36 ++++++ .../internal/client/LazyUpdateHandleImpl.java | 5 +- .../client/RootWorkflowClientInvoker.java | 32 +++-- .../client/functional/UpdateTestTimeout.java | 10 +- .../updateTest/UpdateExceptionWrapped.java | 116 ++++++++++++++++++ 5 files changed, 180 insertions(+), 19 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateTimeoutOrCancelledException.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateExceptionWrapped.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateTimeoutOrCancelledException.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateTimeoutOrCancelledException.java new file mode 100644 index 000000000..fb7d24c82 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateTimeoutOrCancelledException.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.client; + +import io.temporal.api.common.v1.WorkflowExecution; + +/** + * Error that occurs when an update call times out or is cancelled. + * + *

Note, this is not related to any general concept of timing out or cancelling a running update, + * this is only related to the client call itself. + */ +public class WorkflowUpdateTimeoutOrCancelledException extends WorkflowServiceException { + public WorkflowUpdateTimeoutOrCancelledException( + WorkflowExecution execution, String updateId, String updateName, Throwable cause) { + super(execution, "", cause); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/LazyUpdateHandleImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/LazyUpdateHandleImpl.java index e48c49808..8ca5a611a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/LazyUpdateHandleImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/LazyUpdateHandleImpl.java @@ -110,12 +110,13 @@ public CompletableFuture getResultAsync(long timeout, TimeUnit unit) { // does not exist or because the update ID does not exist. throw sre; } + throw sre; } else if (failure instanceof WorkflowException) { throw (WorkflowException) failure; } else if (failure instanceof TimeoutException) { - throw new CompletionException((TimeoutException) failure); + throw new CompletionException(failure); } - throw new WorkflowServiceException(execution, workflowType, (Throwable) failure); + throw new WorkflowServiceException(execution, workflowType, failure); }); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 439304e3f..500cacf0a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -339,8 +339,18 @@ public UpdateHandle startUpdate(StartUpdateInput input) { UpdateWorkflowExecutionLifecycleStage waitForStage = input.getWaitPolicy().getLifecycleStage(); do { Deadline pollTimeoutDeadline = Deadline.after(POLL_UPDATE_TIMEOUT_S, TimeUnit.SECONDS); - result = genericClient.update(updateRequest, pollTimeoutDeadline); - } while (result.getStage().getNumber() < waitForStage.getNumber() + try { + result = genericClient.update(updateRequest, pollTimeoutDeadline); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED + || e.getStatus().getCode() == Status.Code.CANCELLED) { + throw new WorkflowUpdateTimeoutOrCancelledException( + input.getWorkflowExecution(), input.getUpdateName(), input.getUpdateId(), e); + } + throw e; + } + + } while (result.getStage().getNumber() < input.getWaitPolicy().getLifecycleStage().getNumber() && result.getStage().getNumber() < UpdateWorkflowExecutionLifecycleStage .UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED @@ -466,17 +476,17 @@ private void pollWorkflowUpdateHelper( return; } if ((e instanceof StatusRuntimeException - && ((StatusRuntimeException) e).getStatus().getCode() - == Status.Code.DEADLINE_EXCEEDED) + && (((StatusRuntimeException) e).getStatus().getCode() + == Status.Code.DEADLINE_EXCEEDED + || ((StatusRuntimeException) e).getStatus().getCode() + == Status.Code.CANCELLED)) || deadline.isExpired()) { resultCF.completeExceptionally( - new TimeoutException( - "WorkflowId=" - + request.getUpdateRef().getWorkflowExecution().getWorkflowId() - + ", runId=" - + request.getUpdateRef().getWorkflowExecution().getRunId() - + ", updateId=" - + request.getUpdateRef().getUpdateId())); + new WorkflowUpdateTimeoutOrCancelledException( + request.getUpdateRef().getWorkflowExecution(), + request.getUpdateRef().getUpdateId(), + "", + e)); } else if (e != null) { resultCF.completeExceptionally(e); } else { diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java index f5219d6f8..717999167 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java @@ -27,17 +27,13 @@ import static org.junit.Assert.assertEquals; import com.google.common.base.Stopwatch; -import io.temporal.client.UpdateHandle; -import io.temporal.client.WorkflowClient; -import io.temporal.client.WorkflowStub; -import io.temporal.client.WorkflowUpdateStage; +import io.temporal.client.*; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.junit.Rule; import org.junit.Test; @@ -111,7 +107,9 @@ public void WorkflowUpdateGetResultTimeout() throws ExecutionException, Interrup // Verify get throws the correct exception in around the right amount of time Stopwatch stopWatch = Stopwatch.createStarted(); ExecutionException executionException = assertThrows(ExecutionException.class, result::get); - assertThat(executionException.getCause(), is(instanceOf(TimeoutException.class))); + assertThat( + executionException.getCause(), + is(instanceOf(WorkflowUpdateTimeoutOrCancelledException.class))); stopWatch.stop(); long elapsedSeconds = stopWatch.elapsed(TimeUnit.SECONDS); assertTrue( diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateExceptionWrapped.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateExceptionWrapped.java new file mode 100644 index 000000000..8a51e4bbb --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateExceptionWrapped.java @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.updateTest; + +import static org.junit.Assert.assertThrows; + +import io.grpc.Context; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.*; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkerOptions; +import io.temporal.workflow.CompletablePromise; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestWorkflows.WorkflowWithUpdate; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class UpdateExceptionWrapped { + + private static ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkerOptions(WorkerOptions.newBuilder().build()) + .setWorkflowTypes(TestUpdateWorkflowImpl.class) + .build(); + + @Test + public void testUpdateStart() { + String workflowId = UUID.randomUUID().toString(); + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + WorkflowOptions options = + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder() + .setWorkflowId(workflowId) + .build(); + WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options); + // To execute workflow client.execute() would do. But we want to start workflow and immediately + // return. + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + testWorkflowRule.getTestEnvironment().shutdownNow(); + testWorkflowRule.getTestEnvironment().awaitTermination(1000, TimeUnit.MILLISECONDS); + + final AtomicReference exception = + new AtomicReference<>(); + + Context.current() + .withDeadlineAfter(500, TimeUnit.MILLISECONDS, scheduledExecutor) + .run( + () -> + exception.set( + assertThrows( + WorkflowUpdateTimeoutOrCancelledException.class, + () -> workflow.update(0, "")))); + Assert.assertEquals(execution.getWorkflowId(), exception.get().getExecution().getWorkflowId()); + } + + public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate { + String state = "initial"; + List updates = new ArrayList<>(); + CompletablePromise promise = Workflow.newPromise(); + + @Override + public String execute() { + promise.get(); + return ""; + } + + @Override + public String getState() { + return state; + } + + @Override + public String update(Integer index, String value) { + Workflow.await(() -> false); + return ""; + } + + @Override + public void updateValidator(Integer index, String value) {} + + @Override + public void complete() { + promise.complete(null); + } + + @Override + public void completeValidator() {} + } +}