Skip to content

Commit

Permalink
Require WaitForStage in StartUpdate (#2088)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored Jun 3, 2024
1 parent 4eda239 commit 1ad1c04
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 40 deletions.
43 changes: 25 additions & 18 deletions temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public static <T> UpdateOptions.Builder<T> newBuilder(Class<T> resultClass) {
return new UpdateOptions.Builder<T>().setResultClass(resultClass);
}

public static UpdateOptions.Builder newBuilder(UpdateOptions options) {
return new UpdateOptions.Builder(options);
public static <T> UpdateOptions.Builder<T> newBuilder(UpdateOptions<T> options) {
return new UpdateOptions.Builder<T>(options);
}

public static UpdateOptions getDefaultInstance() {
Expand All @@ -50,21 +50,21 @@ public static UpdateOptions getDefaultInstance() {
private final String updateName;
private final String updateId;
private final String firstExecutionRunId;
private final WorkflowUpdateStage waitPolicy;
private final WorkflowUpdateStage waitForStage;
private final Class<T> resultClass;
private final Type resultType;

private UpdateOptions(
String updateName,
String updateId,
String firstExecutionRunId,
WorkflowUpdateStage waitPolicy,
WorkflowUpdateStage waitForStage,
Class<T> resultClass,
Type resultType) {
this.updateName = updateName;
this.updateId = updateId;
this.firstExecutionRunId = firstExecutionRunId;
this.waitPolicy = waitPolicy;
this.waitForStage = waitForStage;
this.resultClass = resultClass;
this.resultType = resultType;
}
Expand All @@ -81,8 +81,8 @@ public String getFirstExecutionRunId() {
return firstExecutionRunId;
}

public WorkflowUpdateStage getWaitPolicy() {
return waitPolicy;
public WorkflowUpdateStage getWaitForStage() {
return waitForStage;
}

public Class<T> getResultClass() {
Expand All @@ -105,15 +105,15 @@ public boolean equals(Object o) {
return Objects.equal(updateName, that.updateName)
&& updateId == that.updateId
&& firstExecutionRunId == that.firstExecutionRunId
&& waitPolicy.equals(that.waitPolicy)
&& waitForStage.equals(that.waitForStage)
&& resultClass.equals(that.resultClass)
&& resultType.equals(that.resultType);
}

@Override
public int hashCode() {
return Objects.hashCode(
updateName, updateId, firstExecutionRunId, waitPolicy, resultClass, resultType);
updateName, updateId, firstExecutionRunId, waitForStage, resultClass, resultType);
}

@Override
Expand All @@ -125,8 +125,8 @@ public String toString() {
+ updateId
+ ", firstExecutionRunId="
+ firstExecutionRunId
+ ", waitPolicy="
+ waitPolicy
+ ", waitForStage="
+ waitForStage
+ ", resultClass="
+ resultClass
+ ", resultType='"
Expand All @@ -146,13 +146,19 @@ public void validate() {
if (resultClass == null) {
throw new IllegalStateException("resultClass must not be null");
}
if (waitForStage == null) {
throw new IllegalStateException("waitForStage must not be null");
}
if (waitForStage.equals(WorkflowUpdateStage.ADMITTED)) {
throw new IllegalStateException("waitForStage cannot be ADMITTED");
}
}

public static final class Builder<T> {
private String updateName;
private String updateId;
private String firstExecutionRunId;
private WorkflowUpdateStage waitPolicy;
private WorkflowUpdateStage waitForStage;
private Class<T> resultClass;
private Type resultType;

Expand All @@ -165,7 +171,7 @@ private Builder(UpdateOptions<T> options) {
this.updateName = options.updateName;
this.updateId = options.updateId;
this.firstExecutionRunId = options.firstExecutionRunId;
this.waitPolicy = options.waitPolicy;
this.waitForStage = options.waitForStage;
this.resultClass = options.resultClass;
this.resultType = options.resultType;
}
Expand Down Expand Up @@ -200,16 +206,17 @@ public Builder<T> setFirstExecutionRunId(String firstExecutionRunId) {

/**
* Specifies at what point in the update request life cycles this request should return.
*
* <p>Default value if not set: <b>Accepted</b>
* Required to be set to one of the following values:
*
* <ul>
* <li><b>Accepted</b> Wait for the update to be accepted by the workflow.
* <li><b>Completed</b> Wait for the update to be completed by the workflow.
* </ul>
*
* Admitted is not allowed as a value.
*/
public Builder<T> setWaitPolicy(WorkflowUpdateStage waitPolicy) {
this.waitPolicy = waitPolicy;
public Builder<T> setWaitForStage(WorkflowUpdateStage waitForStage) {
this.waitForStage = waitForStage;
return this;
}

Expand Down Expand Up @@ -239,7 +246,7 @@ public UpdateOptions<T> build() {
updateName,
updateId,
firstExecutionRunId == null ? "" : firstExecutionRunId,
waitPolicy == null ? WorkflowUpdateStage.ACCEPTED : waitPolicy,
waitForStage,
resultClass,
resultType == null ? resultClass : resultType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,20 @@ static <T> WorkflowStub fromTyped(T typed) {

/**
* Asynchronously update a workflow execution by invoking its update handler and returning a
* handle to the update request. Usually a update handler is a method annotated with {@link
* handle to the update request. Usually an update handler is a method annotated with {@link
* io.temporal.workflow.UpdateMethod}.
*
* @param updateName name of the update handler. Usually it is a method name.
* @param waitForStage stage to wait for before returning the update handle. Admitted is not
* allowed as a value.
* @param resultClass class of the update return value
* @param <R> type of the update return value
* @param args update method arguments
* @return update handle that can be used to get the result of the update.
*/
@Experimental
<R> UpdateHandle<R> startUpdate(String updateName, Class<R> resultClass, Object... args);
<R> UpdateHandle<R> startUpdate(
String updateName, WorkflowUpdateStage waitForStage, Class<R> resultClass, Object... args);

/**
* Asynchronously update a workflow execution by invoking its update handler and returning a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public <R> R update(String updateName, Class<R> resultClass, Object... args) {
UpdateOptions<R> options =
UpdateOptions.<R>newBuilder()
.setUpdateName(updateName)
.setWaitPolicy(WorkflowUpdateStage.COMPLETED)
.setWaitForStage(WorkflowUpdateStage.COMPLETED)
.setResultClass(resultClass)
.build();
return startUpdate(options, args).getResultAsync().get();
Expand All @@ -312,11 +312,12 @@ public <R> R update(String updateName, Class<R> resultClass, Object... args) {
}

@Override
public <R> UpdateHandle<R> startUpdate(String updateName, Class<R> resultClass, Object... args) {
public <R> UpdateHandle<R> startUpdate(
String updateName, WorkflowUpdateStage waitForStage, Class<R> resultClass, Object... args) {
UpdateOptions<R> options =
UpdateOptions.<R>newBuilder()
.setUpdateName(updateName)
.setWaitPolicy(WorkflowUpdateStage.ACCEPTED)
.setWaitForStage(waitForStage)
.setResultClass(resultClass)
.setResultType(resultClass)
.build();
Expand All @@ -342,7 +343,7 @@ public <R> UpdateHandle<R> startUpdate(UpdateOptions<R> options, Object... args)
options.getResultType(),
options.getFirstExecutionRunId(),
WaitPolicy.newBuilder()
.setLifecycleStage(options.getWaitPolicy().getProto())
.setLifecycleStage(options.getWaitForStage().getProto())
.build()));

if (result.hasResult()) {
Expand All @@ -360,7 +361,7 @@ public <R> UpdateHandle<R> startUpdate(UpdateOptions<R> options, Object... args)
result.getReference().getWorkflowExecution(),
options.getResultClass(),
options.getResultType());
if (options.getWaitPolicy() == WorkflowUpdateStage.COMPLETED) {
if (options.getWaitForStage() == WorkflowUpdateStage.COMPLETED) {
// Don't return the handle until completed, since that's what's been asked for
handle.waitCompleted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void updateNonExistentWorkflowUntyped() {

assertThrows(
WorkflowNotFoundException.class,
() -> workflowStub.startUpdate("update", Void.class, "some-value"));
() ->
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, Void.class, "some-value"));
}

@Test
Expand Down Expand Up @@ -103,7 +105,9 @@ public void updateCompletedWorkflowUntyped() {

assertThrows(
WorkflowNotFoundException.class,
() -> workflowStub.startUpdate("update", Void.class, "some-value"));
() ->
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, Void.class, "some-value"));
}

@Test
Expand Down Expand Up @@ -131,6 +135,7 @@ public void updateWorkflowDuplicateId() throws ExecutionException, InterruptedEx
.setUpdateName("update")
.setUpdateId(updateId)
.setFirstExecutionRunId(execution.getRunId())
.setWaitForStage(WorkflowUpdateStage.ACCEPTED)
.build(),
0,
"some-value")
Expand All @@ -146,6 +151,7 @@ public void updateWorkflowDuplicateId() throws ExecutionException, InterruptedEx
.setUpdateName("update")
.setUpdateId(updateId)
.setFirstExecutionRunId(execution.getRunId())
.setWaitForStage(WorkflowUpdateStage.ACCEPTED)
.build(),
"some-other-value")
.getResultAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.temporal.client.UpdateHandle;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowStub;
import io.temporal.client.WorkflowUpdateStage;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.*;
Expand Down Expand Up @@ -62,7 +63,8 @@ public void closeWorkflowWhileUpdateIsRunning() throws ExecutionException, Inter
SDKTestWorkflowRule.waitForOKQuery(workflowStub);
// Send an update that is accepted, but will not complete.
UpdateHandle<String> handle =
workflowStub.startUpdate("update", String.class, 10_000, "some-value");
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, String.class, 10_000, "some-value");

// Complete workflow, since the update is accepted it will not block completion
workflowStub.update("complete", void.class);
Expand All @@ -81,7 +83,8 @@ public void LongRunningWorkflowUpdateId() throws ExecutionException, Interrupted
workflowStub.start();
SDKTestWorkflowRule.waitForOKQuery(workflowStub);
UpdateHandle<String> handle =
workflowStub.startUpdate("update", String.class, 65_000, "some-value");
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, String.class, 65_000, "some-value");

assertEquals("some-value", handle.getResultAsync().get());
workflowStub.update("complete", void.class);
Expand All @@ -101,7 +104,8 @@ public void WorkflowUpdateGetResultTimeout() throws ExecutionException, Interrup
SDKTestWorkflowRule.waitForOKQuery(workflowStub);

UpdateHandle<String> handle =
workflowStub.startUpdate("update", String.class, 10_000, "some-value");
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, String.class, 10_000, "some-value");

CompletableFuture<String> result = handle.getResultAsync(2, TimeUnit.SECONDS);
// Verify get throws the correct exception in around the right amount of time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException
// send an update through the sync path
assertEquals("Execute-Hello", workflowStub.update("update", String.class, 0, "Hello"));
// send an update through the async path
UpdateHandle<String> updateRef = workflowStub.startUpdate("update", String.class, 0, "World");
UpdateHandle<String> updateRef =
workflowStub.startUpdate("update", WorkflowUpdateStage.ACCEPTED, String.class, 0, "World");
assertEquals("Execute-World", updateRef.getResultAsync().get());
// send a bad update that will be rejected through the sync path
assertThrows(
Expand All @@ -133,7 +134,9 @@ public void testUpdateUntyped() throws ExecutionException, InterruptedException
// send a bad update that will be rejected through the sync path
assertThrows(
WorkflowUpdateException.class,
() -> workflowStub.startUpdate("update", String.class, 0, "Bad Update"));
() ->
workflowStub.startUpdate(
"update", WorkflowUpdateStage.ACCEPTED, String.class, 0, "Bad Update"));

workflowStub.update("complete", void.class);

Expand Down Expand Up @@ -163,7 +166,10 @@ public void testUpdateHandleNotReturnedUntilCompleteWhenAsked()
() -> {
UpdateHandle<String> handle =
workflowStub.startUpdate(
UpdateOptions.newBuilder(String.class).setUpdateName("update").build(),
UpdateOptions.newBuilder(String.class)
.setUpdateName("update")
.setWaitForStage(WorkflowUpdateStage.COMPLETED)
.build(),
"Enchi");
updateCompletedLast.set(true);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testContinueAsNewInAUpdate() {

// Send an update to continue as new, must be async since the update won't complete
WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow);
workflowStub.startUpdate("update", String.class, 0, "");
workflowStub.startUpdate("update", WorkflowUpdateStage.ACCEPTED, String.class, 0, "");

testWorkflowRule.waitForTheEndOfWFT(execution.getWorkflowId());
testWorkflowRule.invalidateWorkflowCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
package io.temporal.testing;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.UpdateHandle;
import io.temporal.client.UpdateOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.client.*;
import io.temporal.common.interceptors.WorkflowClientInterceptorBase;
import io.temporal.serviceclient.TestServiceStubs;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -240,8 +237,8 @@ public <R> R update(String updateName, Class<R> resultClass, Object... args) {

@Override
public <R> UpdateHandle<R> startUpdate(
String updateName, Class<R> resultClass, Object... args) {
return next.startUpdate(updateName, resultClass, args);
String updateName, WorkflowUpdateStage waitForStage, Class<R> resultClass, Object... args) {
return next.startUpdate(updateName, waitForStage, resultClass, args);
}

@Override
Expand Down

0 comments on commit 1ad1c04

Please sign in to comment.