From 1ad1c04ed60481fffe1685b09c9c6900361f9c0f Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 3 Jun 2024 09:52:21 -0700 Subject: [PATCH] Require WaitForStage in StartUpdate (#2088) --- .../io/temporal/client/UpdateOptions.java | 43 +++++++++++-------- .../java/io/temporal/client/WorkflowStub.java | 7 ++- .../io/temporal/client/WorkflowStubImpl.java | 11 ++--- .../client/functional/UpdateTest.java | 10 ++++- .../client/functional/UpdateTestTimeout.java | 10 +++-- .../workflow/updateTest/UpdateTest.java | 12 ++++-- .../updateTest/UpdateTestContinueAsNew.java | 2 +- .../testing/TimeLockingInterceptor.java | 9 ++-- 8 files changed, 64 insertions(+), 40 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java b/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java index 56ebf3ede..7ffdefd5e 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java @@ -33,8 +33,8 @@ public static UpdateOptions.Builder newBuilder(Class resultClass) { return new UpdateOptions.Builder().setResultClass(resultClass); } - public static UpdateOptions.Builder newBuilder(UpdateOptions options) { - return new UpdateOptions.Builder(options); + public static UpdateOptions.Builder newBuilder(UpdateOptions options) { + return new UpdateOptions.Builder(options); } public static UpdateOptions getDefaultInstance() { @@ -50,7 +50,7 @@ public static UpdateOptions getDefaultInstance() { private final String updateName; private final String updateId; private final String firstExecutionRunId; - private final WorkflowUpdateStage waitPolicy; + private final WorkflowUpdateStage waitForStage; private final Class resultClass; private final Type resultType; @@ -58,13 +58,13 @@ private UpdateOptions( String updateName, String updateId, String firstExecutionRunId, - WorkflowUpdateStage waitPolicy, + WorkflowUpdateStage waitForStage, Class resultClass, Type resultType) { this.updateName = updateName; this.updateId = updateId; this.firstExecutionRunId = firstExecutionRunId; - this.waitPolicy = waitPolicy; + this.waitForStage = waitForStage; this.resultClass = resultClass; this.resultType = resultType; } @@ -81,8 +81,8 @@ public String getFirstExecutionRunId() { return firstExecutionRunId; } - public WorkflowUpdateStage getWaitPolicy() { - return waitPolicy; + public WorkflowUpdateStage getWaitForStage() { + return waitForStage; } public Class getResultClass() { @@ -105,7 +105,7 @@ public boolean equals(Object o) { return Objects.equal(updateName, that.updateName) && updateId == that.updateId && firstExecutionRunId == that.firstExecutionRunId - && waitPolicy.equals(that.waitPolicy) + && waitForStage.equals(that.waitForStage) && resultClass.equals(that.resultClass) && resultType.equals(that.resultType); } @@ -113,7 +113,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hashCode( - updateName, updateId, firstExecutionRunId, waitPolicy, resultClass, resultType); + updateName, updateId, firstExecutionRunId, waitForStage, resultClass, resultType); } @Override @@ -125,8 +125,8 @@ public String toString() { + updateId + ", firstExecutionRunId=" + firstExecutionRunId - + ", waitPolicy=" - + waitPolicy + + ", waitForStage=" + + waitForStage + ", resultClass=" + resultClass + ", resultType='" @@ -146,13 +146,19 @@ public void validate() { if (resultClass == null) { throw new IllegalStateException("resultClass must not be null"); } + if (waitForStage == null) { + throw new IllegalStateException("waitForStage must not be null"); + } + if (waitForStage.equals(WorkflowUpdateStage.ADMITTED)) { + throw new IllegalStateException("waitForStage cannot be ADMITTED"); + } } public static final class Builder { private String updateName; private String updateId; private String firstExecutionRunId; - private WorkflowUpdateStage waitPolicy; + private WorkflowUpdateStage waitForStage; private Class resultClass; private Type resultType; @@ -165,7 +171,7 @@ private Builder(UpdateOptions options) { this.updateName = options.updateName; this.updateId = options.updateId; this.firstExecutionRunId = options.firstExecutionRunId; - this.waitPolicy = options.waitPolicy; + this.waitForStage = options.waitForStage; this.resultClass = options.resultClass; this.resultType = options.resultType; } @@ -200,16 +206,17 @@ public Builder setFirstExecutionRunId(String firstExecutionRunId) { /** * Specifies at what point in the update request life cycles this request should return. - * - *

Default value if not set: Accepted + * Required to be set to one of the following values: * *

    *
  • Accepted Wait for the update to be accepted by the workflow. *
  • Completed Wait for the update to be completed by the workflow. *
+ * + * Admitted is not allowed as a value. */ - public Builder setWaitPolicy(WorkflowUpdateStage waitPolicy) { - this.waitPolicy = waitPolicy; + public Builder setWaitForStage(WorkflowUpdateStage waitForStage) { + this.waitForStage = waitForStage; return this; } @@ -239,7 +246,7 @@ public UpdateOptions build() { updateName, updateId, firstExecutionRunId == null ? "" : firstExecutionRunId, - waitPolicy == null ? WorkflowUpdateStage.ACCEPTED : waitPolicy, + waitForStage, resultClass, resultType == null ? resultClass : resultType); } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java index a929826bb..094db521c 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java @@ -93,17 +93,20 @@ static WorkflowStub fromTyped(T typed) { /** * Asynchronously update a workflow execution by invoking its update handler and returning a - * handle to the update request. Usually a update handler is a method annotated with {@link + * handle to the update request. Usually an update handler is a method annotated with {@link * io.temporal.workflow.UpdateMethod}. * * @param updateName name of the update handler. Usually it is a method name. + * @param waitForStage stage to wait for before returning the update handle. Admitted is not + * allowed as a value. * @param resultClass class of the update return value * @param type of the update return value * @param args update method arguments * @return update handle that can be used to get the result of the update. */ @Experimental - UpdateHandle startUpdate(String updateName, Class resultClass, Object... args); + UpdateHandle startUpdate( + String updateName, WorkflowUpdateStage waitForStage, Class resultClass, Object... args); /** * Asynchronously update a workflow execution by invoking its update handler and returning a diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java index 40724f2a4..023283477 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java @@ -297,7 +297,7 @@ public R update(String updateName, Class resultClass, Object... args) { UpdateOptions options = UpdateOptions.newBuilder() .setUpdateName(updateName) - .setWaitPolicy(WorkflowUpdateStage.COMPLETED) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) .setResultClass(resultClass) .build(); return startUpdate(options, args).getResultAsync().get(); @@ -312,11 +312,12 @@ public R update(String updateName, Class resultClass, Object... args) { } @Override - public UpdateHandle startUpdate(String updateName, Class resultClass, Object... args) { + public UpdateHandle startUpdate( + String updateName, WorkflowUpdateStage waitForStage, Class resultClass, Object... args) { UpdateOptions options = UpdateOptions.newBuilder() .setUpdateName(updateName) - .setWaitPolicy(WorkflowUpdateStage.ACCEPTED) + .setWaitForStage(waitForStage) .setResultClass(resultClass) .setResultType(resultClass) .build(); @@ -342,7 +343,7 @@ public UpdateHandle startUpdate(UpdateOptions options, Object... args) options.getResultType(), options.getFirstExecutionRunId(), WaitPolicy.newBuilder() - .setLifecycleStage(options.getWaitPolicy().getProto()) + .setLifecycleStage(options.getWaitForStage().getProto()) .build())); if (result.hasResult()) { @@ -360,7 +361,7 @@ public UpdateHandle startUpdate(UpdateOptions options, Object... args) result.getReference().getWorkflowExecution(), options.getResultClass(), options.getResultType()); - if (options.getWaitPolicy() == WorkflowUpdateStage.COMPLETED) { + if (options.getWaitForStage() == WorkflowUpdateStage.COMPLETED) { // Don't return the handle until completed, since that's what's been asked for handle.waitCompleted(); } diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java index d3a97b7e5..0dbcb584b 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java @@ -70,7 +70,9 @@ public void updateNonExistentWorkflowUntyped() { assertThrows( WorkflowNotFoundException.class, - () -> workflowStub.startUpdate("update", Void.class, "some-value")); + () -> + workflowStub.startUpdate( + "update", WorkflowUpdateStage.ACCEPTED, Void.class, "some-value")); } @Test @@ -103,7 +105,9 @@ public void updateCompletedWorkflowUntyped() { assertThrows( WorkflowNotFoundException.class, - () -> workflowStub.startUpdate("update", Void.class, "some-value")); + () -> + workflowStub.startUpdate( + "update", WorkflowUpdateStage.ACCEPTED, Void.class, "some-value")); } @Test @@ -131,6 +135,7 @@ public void updateWorkflowDuplicateId() throws ExecutionException, InterruptedEx .setUpdateName("update") .setUpdateId(updateId) .setFirstExecutionRunId(execution.getRunId()) + .setWaitForStage(WorkflowUpdateStage.ACCEPTED) .build(), 0, "some-value") @@ -146,6 +151,7 @@ public void updateWorkflowDuplicateId() throws ExecutionException, InterruptedEx .setUpdateName("update") .setUpdateId(updateId) .setFirstExecutionRunId(execution.getRunId()) + .setWaitForStage(WorkflowUpdateStage.ACCEPTED) .build(), "some-other-value") .getResultAsync() diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java index 1ab7be531..f5219d6f8 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTestTimeout.java @@ -30,6 +30,7 @@ import io.temporal.client.UpdateHandle; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowStub; +import io.temporal.client.WorkflowUpdateStage; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.*; @@ -62,7 +63,8 @@ public void closeWorkflowWhileUpdateIsRunning() throws ExecutionException, Inter SDKTestWorkflowRule.waitForOKQuery(workflowStub); // Send an update that is accepted, but will not complete. UpdateHandle handle = - workflowStub.startUpdate("update", String.class, 10_000, "some-value"); + workflowStub.startUpdate( + "update", WorkflowUpdateStage.ACCEPTED, String.class, 10_000, "some-value"); // Complete workflow, since the update is accepted it will not block completion workflowStub.update("complete", void.class); @@ -81,7 +83,8 @@ public void LongRunningWorkflowUpdateId() throws ExecutionException, Interrupted workflowStub.start(); SDKTestWorkflowRule.waitForOKQuery(workflowStub); UpdateHandle handle = - workflowStub.startUpdate("update", String.class, 65_000, "some-value"); + workflowStub.startUpdate( + "update", WorkflowUpdateStage.ACCEPTED, String.class, 65_000, "some-value"); assertEquals("some-value", handle.getResultAsync().get()); workflowStub.update("complete", void.class); @@ -101,7 +104,8 @@ public void WorkflowUpdateGetResultTimeout() throws ExecutionException, Interrup SDKTestWorkflowRule.waitForOKQuery(workflowStub); UpdateHandle handle = - workflowStub.startUpdate("update", String.class, 10_000, "some-value"); + workflowStub.startUpdate( + "update", WorkflowUpdateStage.ACCEPTED, String.class, 10_000, "some-value"); CompletableFuture result = handle.getResultAsync(2, TimeUnit.SECONDS); // Verify get throws the correct exception in around the right amount of time diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java index 9d424d37b..8e8c1b670 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTest.java @@ -118,7 +118,8 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException // send an update through the sync path assertEquals("Execute-Hello", workflowStub.update("update", String.class, 0, "Hello")); // send an update through the async path - UpdateHandle updateRef = workflowStub.startUpdate("update", String.class, 0, "World"); + UpdateHandle updateRef = + workflowStub.startUpdate("update", WorkflowUpdateStage.ACCEPTED, String.class, 0, "World"); assertEquals("Execute-World", updateRef.getResultAsync().get()); // send a bad update that will be rejected through the sync path assertThrows( @@ -133,7 +134,9 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException // send a bad update that will be rejected through the sync path assertThrows( WorkflowUpdateException.class, - () -> workflowStub.startUpdate("update", String.class, 0, "Bad Update")); + () -> + workflowStub.startUpdate( + "update", WorkflowUpdateStage.ACCEPTED, String.class, 0, "Bad Update")); workflowStub.update("complete", void.class); @@ -163,7 +166,10 @@ public void testUpdateHandleNotReturnedUntilCompleteWhenAsked() () -> { UpdateHandle handle = workflowStub.startUpdate( - UpdateOptions.newBuilder(String.class).setUpdateName("update").build(), + UpdateOptions.newBuilder(String.class) + .setUpdateName("update") + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(), "Enchi"); updateCompletedLast.set(true); try { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTestContinueAsNew.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTestContinueAsNew.java index 2bc9ddf42..74d448b2c 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTestContinueAsNew.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateTestContinueAsNew.java @@ -83,7 +83,7 @@ public void testContinueAsNewInAUpdate() { // Send an update to continue as new, must be async since the update won't complete WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow); - workflowStub.startUpdate("update", String.class, 0, ""); + workflowStub.startUpdate("update", WorkflowUpdateStage.ACCEPTED, String.class, 0, ""); testWorkflowRule.waitForTheEndOfWFT(execution.getWorkflowId()); testWorkflowRule.invalidateWorkflowCache(); diff --git a/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java index a75a72883..7660f7415 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java @@ -21,10 +21,7 @@ package io.temporal.testing; import io.temporal.api.common.v1.WorkflowExecution; -import io.temporal.client.UpdateHandle; -import io.temporal.client.UpdateOptions; -import io.temporal.client.WorkflowOptions; -import io.temporal.client.WorkflowStub; +import io.temporal.client.*; import io.temporal.common.interceptors.WorkflowClientInterceptorBase; import io.temporal.serviceclient.TestServiceStubs; import java.lang.reflect.Type; @@ -240,8 +237,8 @@ public R update(String updateName, Class resultClass, Object... args) { @Override public UpdateHandle startUpdate( - String updateName, Class resultClass, Object... args) { - return next.startUpdate(updateName, resultClass, args); + String updateName, WorkflowUpdateStage waitForStage, Class resultClass, Object... args) { + return next.startUpdate(updateName, waitForStage, resultClass, args); } @Override