From 74022f16bd7f20f8d9f08b417b2e3f422e5bd959 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sat, 23 Nov 2024 15:43:52 -0800 Subject: [PATCH] Add getResult to WorkflowUpdateHandle (#2324) Add getResult to UpdateHandle --- .../temporal/client/WorkflowUpdateHandle.java | 17 +++++++ .../CompletedWorkflowUpdateHandleImpl.java | 10 +++++ .../client/LazyWorkflowUpdateHandleImpl.java | 45 ++++++++++++++----- .../client/functional/UpdateTest.java | 20 ++++++--- .../client/functional/UpdateTestTimeout.java | 36 ++++++++++++++- .../workflow/updateTest/UpdateTest.java | 8 ++++ 6 files changed, 117 insertions(+), 19 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateHandle.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateHandle.java index 1996de331..525586875 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateHandle.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowUpdateHandle.java @@ -45,6 +45,23 @@ public interface WorkflowUpdateHandle { */ String getId(); + /** + * Returns the result of the workflow update. + * + * @return the result of the workflow update + */ + T getResult(); + + /** + * Returns the result of the workflow update. + * + * @param timeout maximum time to wait and perform the background long polling + * @param unit unit of timeout + * @throws WorkflowUpdateTimeoutOrCancelledException if the timeout is reached. + * @return the result of the workflow update + */ + T getResult(long timeout, TimeUnit unit); + /** * Returns a {@link CompletableFuture} with the update workflow execution request result, * potentially waiting for the update to complete. diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/CompletedWorkflowUpdateHandleImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/CompletedWorkflowUpdateHandleImpl.java index 60480ecc5..b8dfadfb0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/CompletedWorkflowUpdateHandleImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/CompletedWorkflowUpdateHandleImpl.java @@ -49,6 +49,16 @@ public String getId() { return id; } + @Override + public T getResult() { + return result; + } + + @Override + public T getResult(long timeout, TimeUnit unit) { + return result; + } + @Override public CompletableFuture getResultAsync() { return CompletableFuture.completedFuture(result); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/LazyWorkflowUpdateHandleImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/client/LazyWorkflowUpdateHandleImpl.java index 479d553e5..103a91ef0 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/LazyWorkflowUpdateHandleImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/LazyWorkflowUpdateHandleImpl.java @@ -20,7 +20,6 @@ package io.temporal.internal.client; -import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.client.WorkflowException; @@ -30,10 +29,7 @@ import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; import io.temporal.serviceclient.CheckedExceptionWrapper; import java.lang.reflect.Type; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; @Experimental public final class LazyWorkflowUpdateHandleImpl implements WorkflowUpdateHandle { @@ -97,19 +93,16 @@ public CompletableFuture getResultAsync(long timeout, TimeUnit unit) { failure -> { if (failure instanceof CompletionException) { // unwrap the CompletionException - failure = ((Throwable) failure).getCause(); + failure = failure.getCause(); } - failure = CheckedExceptionWrapper.unwrap((Throwable) failure); + failure = CheckedExceptionWrapper.unwrap(failure); if (failure instanceof Error) { throw (Error) failure; } if (failure instanceof StatusRuntimeException) { StatusRuntimeException sre = (StatusRuntimeException) failure; - if (Status.Code.NOT_FOUND.equals(sre.getStatus().getCode())) { - // Currently no way to tell if the NOT_FOUND was because the workflow ID - // does not exist or because the update ID does not exist. - throw sre; - } + // Currently no way to tell if the NOT_FOUND was because the workflow ID + // does not exist or because the update ID does not exist. throw sre; } else if (failure instanceof WorkflowException) { throw (WorkflowException) failure; @@ -120,6 +113,34 @@ public CompletableFuture getResultAsync(long timeout, TimeUnit unit) { }); } + @Override + public T getResult() { + try { + return getResultAsync().get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + throw (cause instanceof RuntimeException + ? (RuntimeException) cause + : new RuntimeException(cause)); + } + } + + @Override + public T getResult(long timeout, TimeUnit unit) { + try { + return getResultAsync(timeout, unit).get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + throw (cause instanceof RuntimeException + ? (RuntimeException) cause + : new RuntimeException(cause)); + } + } + @Override public CompletableFuture getResultAsync() { return this.getResultAsync(Long.MAX_VALUE, TimeUnit.MILLISECONDS); diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java index 6dc34642f..3bdc35906 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.*; +import io.grpc.StatusRuntimeException; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; import io.temporal.client.*; @@ -54,12 +55,18 @@ public void updateNonExistentWorkflow() { } @Test - public void pollUpdateNonExistentWorkflow() throws ExecutionException, InterruptedException { + public void pollUpdateNonExistentWorkflow() { WorkflowStub workflowStub = testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub("non-existing-id"); // Getting the update handle to a nonexistent workflow is fine WorkflowUpdateHandle handle = workflowStub.getUpdateHandle("update-id", String.class); - assertThrows(Exception.class, () -> handle.getResultAsync().get()); + ExecutionException e = + assertThrows(ExecutionException.class, () -> handle.getResultAsync().get()); + assertTrue(e.getCause() instanceof StatusRuntimeException); + StatusRuntimeException sre = (StatusRuntimeException) e.getCause(); + assertEquals(io.grpc.Status.Code.NOT_FOUND, sre.getStatus().getCode()); + sre = assertThrows(StatusRuntimeException.class, () -> handle.getResult()); + assertEquals(io.grpc.Status.Code.NOT_FOUND, sre.getStatus().getCode()); } @Test @@ -127,7 +134,7 @@ public void updateWorkflowDuplicateId() throws ExecutionException, InterruptedEx // Try to get the result of an invalid update WorkflowUpdateHandle handle = workflowStub.getUpdateHandle(updateId, String.class); - assertThrows(Exception.class, () -> handle.getResultAsync().get()); + assertThrows(ExecutionException.class, () -> handle.getResultAsync().get()); assertEquals( "some-value", @@ -192,9 +199,10 @@ public void updateWorkflowReuseOptions() throws ExecutionException, InterruptedE workflowStub.startUpdate(updateOptions, 0, "some-value").getResultAsync().get()); testWorkflowRule.waitForTheEndOfWFT(execution.getWorkflowId()); // Try to send another update request with the same update options - assertEquals( - "some-other-value", - workflowStub.startUpdate(updateOptions, 0, "some-other-value").getResultAsync().get()); + WorkflowUpdateHandle handle = + workflowStub.startUpdate(updateOptions, 0, "some-other-value"); + assertEquals("some-other-value", handle.getResultAsync().get()); + assertEquals("some-other-value", handle.getResult()); // Complete the workflow workflowStub.update("complete", void.class); diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java index cb53b17d9..90d790c3f 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java @@ -88,7 +88,7 @@ public void LongRunningWorkflowUpdateId() throws ExecutionException, Interrupted } @Test - public void WorkflowUpdateGetResultTimeout() throws ExecutionException, InterruptedException { + public void WorkflowUpdateGetResultAsyncTimeout() { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); String workflowType = BlockingWorkflow.class.getSimpleName(); WorkflowStub workflowStub = @@ -123,6 +123,40 @@ public void WorkflowUpdateGetResultTimeout() throws ExecutionException, Interrup assertEquals("complete", workflowStub.getResult(String.class)); } + @Test + public void WorkflowUpdateGetResultTimeout() { + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + String workflowType = BlockingWorkflow.class.getSimpleName(); + WorkflowStub workflowStub = + workflowClient.newUntypedWorkflowStub( + workflowType, + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue())); + + workflowStub.start(); + SDKTestWorkflowRule.waitForOKQuery(workflowStub); + + WorkflowUpdateHandle handle = + workflowStub.startUpdate( + "update", WorkflowUpdateStage.ACCEPTED, String.class, 10_000, "some-value"); + + // Verify get throws the correct exception in around the right amount of time + Stopwatch stopWatch = Stopwatch.createStarted(); + assertThrows( + WorkflowUpdateTimeoutOrCancelledException.class, + () -> handle.getResult(2, TimeUnit.SECONDS)); + stopWatch.stop(); + long elapsedSeconds = stopWatch.elapsed(TimeUnit.SECONDS); + assertTrue( + "We shouldn't return too early or too late by the timeout, took " + + elapsedSeconds + + " seconds", + elapsedSeconds >= 1 && elapsedSeconds <= 3); + + // Complete workflow, since the update is accepted it will not block completion + workflowStub.update("complete", void.class); + assertEquals("complete", workflowStub.getResult(String.class)); + } + @WorkflowInterface public interface BlockingWorkflow { @WorkflowMethod diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java index 9b51f9561..3b074180c 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java @@ -186,6 +186,14 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException WorkflowUpdateException.class, () -> workflowStub.update("bad_update_name", String.class, 0, "Bad Update")); + // send an update request to a bad name through the async path + assertThrows( + WorkflowUpdateException.class, + () -> + workflowStub + .startUpdate("bad_update_name", WorkflowUpdateStage.COMPLETED, String.class, 0, "") + .getResult()); + // send a bad update that will be rejected through the sync path assertThrows( WorkflowUpdateException.class,