diff --git a/temporal-sdk/src/main/java/io/temporal/client/UpdateWithStartWorkflowOperation.java b/temporal-sdk/src/main/java/io/temporal/client/UpdateWithStartWorkflowOperation.java index 6ca4794e6..e75400979 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/UpdateWithStartWorkflowOperation.java +++ b/temporal-sdk/src/main/java/io/temporal/client/UpdateWithStartWorkflowOperation.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; /** * UpdateWithStartWorkflowOperation is an update workflow request that can be executed together with @@ -284,26 +285,27 @@ public static Builder newBuilder(String updateName, Class resultClass, private final CompletableFuture> handle; - private final Functions.Proc request; + @Nullable private final Functions.Proc updateRequest; private UpdateWithStartWorkflowOperation( - UpdateOptions options, Functions.Proc request, Object[] updateArgs) { + UpdateOptions options, Functions.Proc updateRequest, Object[] updateArgs) { this.options = options; this.updateArgs = updateArgs; this.handle = new CompletableFuture<>(); - this.request = request; + this.updateRequest = updateRequest; } - WorkflowUpdateHandle invoke(Functions.Proc workflow) { + WorkflowUpdateHandle invoke(Functions.Proc workflowRequest) { WorkflowInvocationHandler.initAsyncInvocation( WorkflowInvocationHandler.InvocationType.UPDATE_WITH_START, this); try { - // invokes `prepareUpdate` via WorkflowInvocationHandler.UpdateWithStartInvocationHandler - request.apply(); - // invokes `prepareStart` via WorkflowInvocationHandler.UpdateWithStartInvocationHandler - workflow.apply(); + workflowRequest.apply(); + if (updateRequest != null) { // only present when using typed API + // invokes `prepareUpdate` via WorkflowInvocationHandler.UpdateWithStartInvocationHandler + updateRequest.apply(); + } stub.updateWithStart(this, this.workflowArgs); return this.handle.get(); } catch (InterruptedException e) { @@ -365,8 +367,8 @@ public Object[] getUpdateArgs() { public String toString() { StringBuilder sb = new StringBuilder(); sb.append("UpdateWithStartWorkflowOperation{options=").append(options); - if (request != null) { - sb.append(", request=").append(request); + if (updateRequest != null) { + sb.append(", updateRequest=").append(updateRequest); } if (updateArgs != null) { sb.append(", updateArgs=").append(Arrays.toString(updateArgs)); diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java index 8424ef23c..cd727e33e 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java @@ -444,14 +444,14 @@ public R getResult(Class resultClass) { private static class UpdateWithStartInvocationHandler implements SpecificInvocationHandler { enum State { - NOT_STARTED, + INIT, START_RECEIVED, UPDATE_RECEIVED, } private final UpdateWithStartWorkflowOperation operation; - private State state = State.NOT_STARTED; + private State state = State.INIT; public UpdateWithStartInvocationHandler(UpdateWithStartWorkflowOperation operation) { this.operation = operation; @@ -471,7 +471,15 @@ public void invoke( POJOWorkflowMethodMetadata methodMetadata = workflowMetadata.getMethodMetadata(method); - if (state == State.NOT_STARTED) { + if (state == State.INIT) { + WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class); + if (workflowMethod == null) { + throw new IllegalArgumentException( + "Method '" + method.getName() + "' is not a WorkflowMethod"); + } + this.operation.prepareStart(untyped, args); + state = State.START_RECEIVED; + } else if (state == State.START_RECEIVED) { UpdateMethod updateMethod = method.getAnnotation(UpdateMethod.class); if (updateMethod == null) { throw new IllegalArgumentException( @@ -483,14 +491,6 @@ public void invoke( method.getReturnType(), method.getGenericReturnType(), args); - state = State.START_RECEIVED; - } else if (state == State.START_RECEIVED) { - WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class); - if (workflowMethod == null) { - throw new IllegalArgumentException( - "Method '" + method.getName() + "' is not a WorkflowMethod"); - } - this.operation.prepareStart(untyped, args); state = State.UPDATE_RECEIVED; } else { throw new IllegalArgumentException( diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java index f33985bb8..692437d48 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java @@ -84,8 +84,35 @@ public void startAndSendUpdateTogether() throws ExecutionException, InterruptedE assertEquals(options.getWorkflowId(), handle1.getExecution().getWorkflowId()); assertEquals("Hello Update", handle1.getResultAsync().get()); - WorkflowUpdateHandle updHandle = updateOp.getUpdateHandle().get(); - assertEquals(updateOp.getResult(), updHandle.getResultAsync().get()); + WorkflowUpdateHandle handle2 = updateOp.getUpdateHandle().get(); + assertEquals(updateOp.getResult(), handle2.getResultAsync().get()); + + workflow.complete(); + + assertEquals("Hello Update complete", WorkflowStub.fromTyped(workflow).getResult(String.class)); + } + + @Test + public void startAndSendUpdateTogetherUsingUntypedWorkflowOperation() + throws ExecutionException, InterruptedException { + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + + WorkflowOptions options = createOptions(); + TestWorkflows.WorkflowWithUpdate workflow = + workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); + + UpdateWithStartWorkflowOperation updateOp = + UpdateWithStartWorkflowOperation.newBuilder( + "update", String.class, new Object[] {1, "Hello Update"}) // untyped! + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(); + + WorkflowUpdateHandle handle1 = + WorkflowClient.updateWithStart(workflow::execute, updateOp); + assertEquals("Hello Update", handle1.getResultAsync().get()); + + WorkflowUpdateHandle handle2 = updateOp.getUpdateHandle().get(); + assertEquals(updateOp.getResult(), handle2.getResultAsync().get()); workflow.complete(); @@ -110,8 +137,8 @@ public void startAndSendUpdateTogetherWithNullUpdateResult() WorkflowClient.updateWithStart(workflow::execute, updateOp); assertNull(handle1.getResultAsync().get()); - WorkflowUpdateHandle updHandle = updateOp.getUpdateHandle().get(); - assertEquals(updateOp.getResult(), updHandle.getResultAsync().get()); + WorkflowUpdateHandle handle2 = updateOp.getUpdateHandle().get(); + assertEquals(updateOp.getResult(), handle2.getResultAsync().get()); assertEquals("Hello Update", WorkflowStub.fromTyped(workflow).getResult(String.class)); }