From 9ac1af3d1ab3ad5a888e395ad3dd76d44c6f3bc8 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Thu, 5 Dec 2024 16:36:57 -0800 Subject: [PATCH] New Update-with-Start API (#2337) --- .../io/temporal/client/UpdateOptions.java | 7 + .../UpdateWithStartWorkflowOperation.java | 428 -------- .../client/WithStartWorkflowOperation.java | 330 ++++++ .../io/temporal/client/WorkflowClient.java | 673 +++++++++--- .../client/WorkflowClientInternalImpl.java | 31 + .../client/WorkflowInvocationHandler.java | 113 ++- .../java/io/temporal/client/WorkflowStub.java | 28 +- .../io/temporal/client/WorkflowStubImpl.java | 90 +- .../WorkflowClientCallsInterceptor.java | 16 +- .../client/RootWorkflowClientInvoker.java | 63 +- .../client/functional/UpdateTest.java | 87 +- .../java/io/temporal/workflow/SagaTest.java | 2 +- ...arentWorkflowInfoInChildWorkflowsTest.java | 2 +- .../shared/TestMultiArgWorkflowFunctions.java | 105 +- .../shared/TestNoArgsWorkflowFuncParent.java | 2 +- .../updateTest/UpdateWithStartTest.java | 954 ++++++++++++------ .../temporal/serviceclient/StatusUtils.java | 2 +- .../testservice/TestWorkflowService.java | 59 +- .../functional/MultiOperationTest.java | 18 +- .../testing/TimeLockingInterceptor.java | 12 +- 20 files changed, 1905 insertions(+), 1117 deletions(-) delete mode 100644 temporal-sdk/src/main/java/io/temporal/client/UpdateWithStartWorkflowOperation.java create mode 100644 temporal-sdk/src/main/java/io/temporal/client/WithStartWorkflowOperation.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java b/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java index e8acd8532..ebc4d47f7 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/UpdateOptions.java @@ -154,6 +154,13 @@ public void validate() { } } + void validateWaitForCompleted() { + if (waitForStage != null && waitForStage != WorkflowUpdateStage.COMPLETED) { + throw new IllegalArgumentException( + "waitForStage must be unspecified or " + WorkflowUpdateStage.COMPLETED); + } + } + public static final class Builder { private String updateName; private String updateId; diff --git a/temporal-sdk/src/main/java/io/temporal/client/UpdateWithStartWorkflowOperation.java b/temporal-sdk/src/main/java/io/temporal/client/UpdateWithStartWorkflowOperation.java deleted file mode 100644 index e75400979..000000000 --- a/temporal-sdk/src/main/java/io/temporal/client/UpdateWithStartWorkflowOperation.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. - * - * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this material except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.temporal.client; - -import io.temporal.common.Experimental; -import io.temporal.workflow.Functions; -import java.lang.reflect.Type; -import java.util.Arrays; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.Nullable; - -/** - * UpdateWithStartWorkflowOperation is an update workflow request that can be executed together with - * a start workflow request. - */ -@Experimental -public final class UpdateWithStartWorkflowOperation { - - private final AtomicBoolean invoked = new AtomicBoolean(false); - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a zero argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - */ - public static Builder newBuilder(Functions.Func request) { - return new Builder<>( - () -> { - request.apply(); - }); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a one argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - */ - public static Builder newBuilder(Functions.Func1 request, A1 arg1) { - return new Builder<>(() -> request.apply(arg1)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a two argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - * @param arg2 second request function parameter - */ - public static Builder newBuilder( - Functions.Func2 request, A1 arg1, A2 arg2) { - return new Builder<>(() -> request.apply(arg1, arg2)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a three argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - * @param arg2 second request function parameter - * @param arg3 third request function parameter - */ - public static Builder newBuilder( - Functions.Func3 request, A1 arg1, A2 arg2, A3 arg3) { - return new Builder<>(() -> request.apply(arg1, arg2, arg3)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a four argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - * @param arg2 second request function parameter - * @param arg3 third request function parameter - * @param arg4 fourth request function parameter - */ - public static Builder newBuilder( - Functions.Func4 request, A1 arg1, A2 arg2, A3 arg3, A4 arg4) { - return new Builder<>(() -> request.apply(arg1, arg2, arg3, arg4)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a five argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - * @param arg2 second request function parameter - * @param arg3 third request function parameter - * @param arg4 fourth request function parameter - * @param arg5 fifth request function parameter - */ - public static Builder newBuilder( - Functions.Func5 request, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5) { - return new Builder<>(() -> request.apply(arg1, arg2, arg3, arg4, arg5)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a six argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - * @param arg2 second request function parameter - * @param arg3 third request function parameter - * @param arg4 fourth request function parameter - * @param arg5 fifth request function parameter - * @param arg6 sixth request function parameter - */ - public static Builder newBuilder( - Functions.Func6 request, - A1 arg1, - A2 arg2, - A3 arg3, - A4 arg4, - A5 arg5, - A6 arg6) { - return new Builder<>(() -> request.apply(arg1, arg2, arg3, arg4, arg5, arg6)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a zero argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - */ - public static Builder newBuilder(Functions.Proc request) { - return new Builder<>( - () -> { - request.apply(); - }); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a one argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - */ - public static Builder newBuilder(Functions.Proc1 request, A1 arg1) { - return new Builder<>(() -> request.apply(arg1)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a two argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - * @param arg2 second request function parameter - */ - public static Builder newBuilder( - Functions.Proc2 request, A1 arg1, A2 arg2) { - return new Builder<>(() -> request.apply(arg1, arg2)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a three argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - * @param arg2 second request function parameter - * @param arg3 third request function parameter - */ - public static Builder newBuilder( - Functions.Proc3 request, A1 arg1, A2 arg2, A3 arg3) { - return new Builder<>(() -> request.apply(arg1, arg2, arg3)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a four argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - * @param arg2 second request function parameter - * @param arg3 third request function parameter - * @param arg4 fourth request function parameter - */ - public static Builder newBuilder( - Functions.Proc4 request, A1 arg1, A2 arg2, A3 arg3, A4 arg4) { - return new Builder<>(() -> request.apply(arg1, arg2, arg3, arg4)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a five argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - * @param arg2 second request function parameter - * @param arg3 third request function parameter - * @param arg4 fourth request function parameter - * @param arg5 fifth request function parameter - */ - public static Builder newBuilder( - Functions.Proc5 request, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5) { - return new Builder<>(() -> request.apply(arg1, arg2, arg3, arg4, arg5)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a six argument - * request. - * - * @param request method reference annotated with @UpdateMethod of a proxy created through {@link - * WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first request function parameter - * @param arg2 second request function parameter - * @param arg3 third request function parameter - * @param arg4 fourth request function parameter - * @param arg5 fifth request function parameter - * @param arg6 sixth request function parameter - */ - public static Builder newBuilder( - Functions.Proc6 request, - A1 arg1, - A2 arg2, - A3 arg3, - A4 arg4, - A5 arg5, - A6 arg6) { - return new Builder<>(() -> request.apply(arg1, arg2, arg3, arg4, arg5, arg6)); - } - - /** - * Returns a new builder for an {@link UpdateWithStartWorkflowOperation}. - * - * @param updateName name of the update handler - * @param resultClass class of the update return value - * @param args update request arguments - */ - public static Builder newBuilder(String updateName, Class resultClass, Object[] args) { - return new Builder<>( - UpdateOptions.newBuilder().setResultClass(resultClass).setUpdateName(updateName), args); - } - - private WorkflowStub stub; - - private UpdateOptions options; - - // set by constructor (untyped) or `prepareUpdate` (typed) - private Object[] updateArgs; - - // set by `prepareStart` - private Object[] workflowArgs; - - private final CompletableFuture> handle; - - @Nullable private final Functions.Proc updateRequest; - - private UpdateWithStartWorkflowOperation( - UpdateOptions options, Functions.Proc updateRequest, Object[] updateArgs) { - this.options = options; - this.updateArgs = updateArgs; - this.handle = new CompletableFuture<>(); - this.updateRequest = updateRequest; - } - - WorkflowUpdateHandle invoke(Functions.Proc workflowRequest) { - WorkflowInvocationHandler.initAsyncInvocation( - WorkflowInvocationHandler.InvocationType.UPDATE_WITH_START, this); - try { - // invokes `prepareStart` via WorkflowInvocationHandler.UpdateWithStartInvocationHandler - workflowRequest.apply(); - - if (updateRequest != null) { // only present when using typed API - // invokes `prepareUpdate` via WorkflowInvocationHandler.UpdateWithStartInvocationHandler - updateRequest.apply(); - } - stub.updateWithStart(this, this.workflowArgs); - return this.handle.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - throw (cause instanceof RuntimeException - ? (RuntimeException) cause - : new RuntimeException(cause)); - } finally { - WorkflowInvocationHandler.closeAsyncInvocation(); - } - } - - /** Invoked by {@link WorkflowInvocationHandler.UpdateWithStartInvocationHandler}. */ - void prepareUpdate( - WorkflowStub stub, String updateName, Class resultClass, Type resultType, Object[] args) { - setStub(stub); - this.updateArgs = args; - this.options = - this.options.toBuilder() - .setUpdateName(updateName) - .setResultClass(resultClass) - .setResultType(resultType) - .build(); - } - - /** Invoked by {@link WorkflowInvocationHandler.UpdateWithStartInvocationHandler}. */ - void prepareStart(WorkflowStub stub, Object[] args) { - setStub(stub); - this.workflowArgs = args; - } - - /** Returns the result of the update request. */ - public R getResult() throws ExecutionException, InterruptedException { - return this.getUpdateHandle().get().getResultAsync().get(); - } - - public Future> getUpdateHandle() { - return this.handle; - } - - void setUpdateHandle(WorkflowUpdateHandle updateHandle) { - this.handle.complete(updateHandle); - } - - public UpdateOptions getOptions() { - return this.options; - } - - public Object[] getUpdateArgs() { - return this.updateArgs; - } - - // equals/hashCode intentionally left as default - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("UpdateWithStartWorkflowOperation{options=").append(options); - if (updateRequest != null) { - sb.append(", updateRequest=").append(updateRequest); - } - if (updateArgs != null) { - sb.append(", updateArgs=").append(Arrays.toString(updateArgs)); - } - return sb.toString(); - } - - private void setStub(WorkflowStub stub) { - if (this.stub != null && stub != this.stub) { - throw new IllegalArgumentException( - "UpdateWithStartWorkflowOperation invoked on different workflow stubs"); - } - this.stub = stub; - } - - /** - * Mark the operation as having been invoked. - * - * @return false if the operation was already invoked - */ - boolean markInvoked() { - return invoked.compareAndSet(false, true); - } - - public static final class Builder { - private UpdateOptions.Builder options; - private Functions.Proc request; - private Object[] args; - - private Builder() {} - - private Builder(UpdateOptions.Builder options, Object[] args) { - this.options = options; - this.request = null; - this.args = args; - } - - private Builder(Functions.Proc request) { - this.options = UpdateOptions.newBuilder(); - this.request = request; - } - - public Builder setWaitForStage(WorkflowUpdateStage waitForStage) { - this.options.setWaitForStage(waitForStage); - return this; - } - - public Builder setUpdateId(String updateId) { - this.options.setUpdateId(updateId); - return this; - } - - public UpdateWithStartWorkflowOperation build() { - return new UpdateWithStartWorkflowOperation<>(this.options.build(), this.request, this.args); - } - } -} diff --git a/temporal-sdk/src/main/java/io/temporal/client/WithStartWorkflowOperation.java b/temporal-sdk/src/main/java/io/temporal/client/WithStartWorkflowOperation.java new file mode 100644 index 000000000..99d4e1a0d --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/client/WithStartWorkflowOperation.java @@ -0,0 +1,330 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.client; + +import io.temporal.common.Experimental; +import io.temporal.workflow.Functions; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; + +/** + * WithStartWorkflowOperation is a start workflow request that can be executed together with an + * update workflow request. See {@link WorkflowClient#startUpdateWithStart} and {@link + * WorkflowClient#executeUpdateWithStart}. + * + * @param type of the workflow result + */ +@Experimental +public final class WithStartWorkflowOperation { + + private final AtomicBoolean invoked = new AtomicBoolean(false); + private WorkflowStub stub; + private Object[] args; + @Nullable private Functions.Proc startMethod; + private Class resultClass; + + private WithStartWorkflowOperation() {} + + /** + * Creates a new {@link WithStartWorkflowOperation} for a zero argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + */ + public WithStartWorkflowOperation(Functions.Func startMethod) { + this.startMethod = + () -> { + startMethod.apply(); + }; + this.args = new Object[] {}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a one argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + */ + public WithStartWorkflowOperation(Functions.Func1 startMethod, A1 arg1) { + this.startMethod = () -> startMethod.apply(arg1); + this.args = new Object[] {arg1}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a two argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + * @param arg2 second workflow method parameter + */ + public WithStartWorkflowOperation( + Functions.Func2 startMethod, A1 arg1, A2 arg2) { + this.startMethod = () -> startMethod.apply(arg1, arg2); + this.args = new Object[] {arg1, arg2}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a three argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + * @param arg2 second workflow method parameter + * @param arg3 third workflow method parameter + */ + public WithStartWorkflowOperation( + Functions.Func3 startMethod, A1 arg1, A2 arg2, A3 arg3) { + this.startMethod = () -> startMethod.apply(arg1, arg2, arg3); + this.args = new Object[] {arg1, arg2, arg3}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a four argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + * @param arg2 second workflow method parameter + * @param arg3 third workflow method parameter + * @param arg4 fourth workflow method parameter + */ + public WithStartWorkflowOperation( + Functions.Func4 startMethod, A1 arg1, A2 arg2, A3 arg3, A4 arg4) { + this.startMethod = () -> startMethod.apply(arg1, arg2, arg3, arg4); + this.args = new Object[] {arg1, arg2, arg3, arg4}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a five argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + * @param arg2 second workflow method parameter + * @param arg3 third workflow method parameter + * @param arg4 fourth workflow method parameter + * @param arg5 fifth workflow method parameter + */ + public WithStartWorkflowOperation( + Functions.Func5 startMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5) { + this.startMethod = () -> startMethod.apply(arg1, arg2, arg3, arg4, arg5); + this.args = new Object[] {arg1, arg2, arg3, arg4, arg5}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a six argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + * @param arg2 second workflow method parameter + * @param arg3 third workflow method parameter + * @param arg4 fourth workflow method parameter + * @param arg5 fifth workflow method parameter + * @param arg6 sixth workflow method parameter + */ + public WithStartWorkflowOperation( + Functions.Func6 startMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + A6 arg6) { + this.startMethod = () -> startMethod.apply(arg1, arg2, arg3, arg4, arg5, arg6); + this.args = new Object[] {arg1, arg2, arg3, arg4, arg5, arg6}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a zero argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + */ + public WithStartWorkflowOperation(Functions.Proc startMethod) { + this.startMethod = + () -> { + startMethod.apply(); + }; + this.args = new Object[] {}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a one argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + */ + public WithStartWorkflowOperation(Functions.Proc1 startMethod, A1 arg1) { + this.startMethod = () -> startMethod.apply(arg1); + this.args = new Object[] {arg1}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a two argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + * @param arg2 second workflow method parameter + */ + public WithStartWorkflowOperation( + Functions.Proc2 startMethod, A1 arg1, A2 arg2) { + this.startMethod = () -> startMethod.apply(arg1, arg2); + this.args = new Object[] {arg1, arg2}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a three argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + * @param arg2 second workflow method parameter + * @param arg3 third workflow method parameter + */ + public WithStartWorkflowOperation( + Functions.Proc3 startMethod, A1 arg1, A2 arg2, A3 arg3) { + this.startMethod = () -> startMethod.apply(arg1, arg2, arg3); + this.args = new Object[] {arg1, arg2, arg3}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a four argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + * @param arg2 second workflow method parameter + * @param arg3 third workflow method parameter + * @param arg4 fourth workflow method parameter + */ + public WithStartWorkflowOperation( + Functions.Proc4 startMethod, A1 arg1, A2 arg2, A3 arg3, A4 arg4) { + this.startMethod = () -> startMethod.apply(arg1, arg2, arg3, arg4); + this.args = new Object[] {arg1, arg2, arg3, arg4}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a five argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + * @param arg2 second workflow method parameter + * @param arg3 third workflow method parameter + * @param arg4 fourth workflow method parameter + * @param arg5 fifth workflow method parameter + */ + public WithStartWorkflowOperation( + Functions.Proc5 startMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5) { + this.startMethod = () -> startMethod.apply(arg1, arg2, arg3, arg4, arg5); + this.args = new Object[] {arg1, arg2, arg3, arg4, arg5}; + } + + /** + * Creates a new {@link WithStartWorkflowOperation} for a six argument workflow method. + * + * @param startMethod method reference annotated with @WorkflowMethod of a proxy created through + * {@link WorkflowClient#newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow method parameter + * @param arg2 second workflow method parameter + * @param arg3 third workflow method parameter + * @param arg4 fourth workflow method parameter + * @param arg5 fifth workflow method parameter + * @param arg6 sixth workflow method parameter + */ + public WithStartWorkflowOperation( + Functions.Proc6 startMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + A6 arg6) { + this.startMethod = () -> startMethod.apply(arg1, arg2, arg3, arg4, arg5, arg6); + this.args = new Object[] {arg1, arg2, arg3, arg4, arg5, arg6}; + } + + /** + * Obtains workflow result. + * + * @return the result of the workflow + */ + public R getResult() { + return this.stub.getResult(this.resultClass); + } + + /** + * Mark the operation as having been invoked. + * + * @return false if the operation was already invoked + */ + boolean markInvoked() { + return invoked.compareAndSet(false, true); + } + + @Nullable + Functions.Proc getStartMethod() { + return startMethod; + } + + void setResultClass(Class resultClass) { + this.resultClass = resultClass; + } + + // equals/hashCode intentionally left as default + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("WithStartWorkflowOperation{args=").append(Arrays.toString(args)); + if (stub != null) { + sb.append(", stub=").append(stub); + } + if (startMethod != null) { + sb.append(", startMethod=").append(startMethod); + } + if (resultClass != null) { + sb.append(", resultClass=").append(resultClass); + } + sb.append("}"); + return sb.toString(); + } + + void setStub(WorkflowStub stub) { + this.stub = stub; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java index f3295a069..07cb66530 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java @@ -858,299 +858,666 @@ static WorkflowUpdateHandle startUpdate( } /** - * Executes zero argument workflow with void return type together with an update workflow request. + * Start a zero argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Proc workflow, @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(workflow); + static WorkflowUpdateHandle startUpdateWithStart( + Proc updateMethod, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart(updateMethod, options, startOperation); } /** - * Executes one argument workflow with void return type together with an update workflow request. + * Start a one argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first workflow function parameter - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Proc1 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Proc1 updateMethod, A1 arg1, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1), options, startOperation); } /** - * Executes two argument workflow with void return type together with an update workflow request. + * Start a two argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first workflow function parameter - * @param arg2 second workflow function parameter - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Proc2 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Proc2 updateMethod, A1 arg1, A2 arg2, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1, arg2)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1, arg2), options, startOperation); } /** - * Executes three argument workflow with void return type together with an update workflow + * Start a three argument update workflow request asynchronously, along with a workflow start * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first workflow function parameter - * @param arg2 second workflow function parameter - * @param arg3 third workflow function parameter - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Proc3 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Proc3 updateMethod, A1 arg1, A2 arg2, A3 arg3, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1, arg2, arg3)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3), options, startOperation); } /** - * Executes four argument workflow with void return type together with an update workflow request. + * Start a four argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first workflow function parameter - * @param arg2 second workflow function parameter - * @param arg3 third workflow function parameter - * @param arg4 fourth workflow function parameter - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Proc4 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Proc4 updateMethod, A1 arg1, A2 arg2, A3 arg3, A4 arg4, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1, arg2, arg3, arg4)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4), options, startOperation); } /** - * Executes five argument workflow with void return type together with an update workflow request. + * Start a five argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first workflow function parameter - * @param arg2 second workflow function parameter - * @param arg3 third workflow function parameter - * @param arg4 fourth workflow function parameter - * @param arg5 fifth workflow function parameter - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param arg5 fifth update function parameter + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Proc5 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Proc5 updateMethod, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5), options, startOperation); } /** - * Executes six argument workflow with void return type together with an update workflow request. + * Start a six argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first workflow function parameter - * @param arg2 second workflow function parameter - * @param arg3 third workflow function parameter - * @param arg4 fourth workflow function parameter - * @param arg5 fifth workflow function parameter - * @param arg6 sixth workflow function parameter - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param arg5 fifth update function parameter + * @param arg6 sixth update function parameter + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Proc6 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Proc6 updateMethod, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5, A6 arg6, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5, arg6), options, startOperation); } /** - * Executes zero argument workflow. + * Start a zero argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Func workflow, @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke( - () -> { - workflow.apply(); - }); + static WorkflowUpdateHandle startUpdateWithStart( + Func updateMethod, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + updateMethod::apply, options, startOperation); } /** - * Executes one argument workflow together with an update workflow request. + * Start a one argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. * @param arg1 first workflow argument - * @param updateOperation update workflow operation + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Func1 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Func1 updateMethod, A1 arg1, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1), options, startOperation); } /** - * Executes two argument workflow together with an update workflow request. + * Start a two argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first workflow function parameter - * @param arg2 second workflow function parameter - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Func2 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Functions.Func2 updateMethod, A1 arg1, A2 arg2, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1, arg2)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1, arg2), options, startOperation); } /** - * Executes three argument workflow together with an update workflow request. + * Start a three argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first workflow function parameter - * @param arg2 second workflow function parameter - * @param arg3 third workflow function parameter - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Func3 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Functions.Func3 updateMethod, A1 arg1, A2 arg2, A3 arg3, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1, arg2, arg3)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3), options, startOperation); } /** - * Executes four argument workflow together with an update workflow request. + * Start a four argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first workflow function parameter - * @param arg2 second workflow function parameter - * @param arg3 third workflow function parameter - * @param arg4 fourth workflow function parameter - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Func4 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Functions.Func4 updateMethod, A1 arg1, A2 arg2, A3 arg3, A4 arg4, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1, arg2, arg3, arg4)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4), options, startOperation); } /** - * Executes five argument workflow together with an update workflow request. + * Start a five argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. - * @param arg1 first workflow function parameter - * @param arg2 second workflow function parameter - * @param arg3 third workflow function parameter - * @param arg4 fourth workflow function parameter - * @param arg5 fifth workflow function parameter - * @param updateOperation update workflow operation + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param arg5 fifth update function parameter + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Func5 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Functions.Func5 updateMethod, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5), options, startOperation); } /** - * Executes six argument workflow together with an update workflow request. + * Start a six argument update workflow request asynchronously, along with a workflow start + * request. * - * @param workflow The only supported value is method reference to a proxy created through {@link - * #newWorkflowStub(Class, WorkflowOptions)}. + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. * @param arg1 first workflow argument - * @param arg2 second workflow function parameter - * @param arg3 third workflow function parameter - * @param arg4 fourth workflow function parameter - * @param arg5 fifth workflow function parameter - * @param arg6 sixth workflow function parameter - * @param updateOperation update workflow operation + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param arg5 fifth update function parameter + * @param arg6 sixth update function parameter + * @param startOperation start workflow operation * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - static WorkflowUpdateHandle updateWithStart( - Functions.Func6 workflow, + static WorkflowUpdateHandle startUpdateWithStart( + Functions.Func6 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + A6 arg6, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.startUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5, arg6), options, startOperation); + } + + /** + * Execute a zero argument update workflow request synchronously, along with a workflow start + * request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param startOperation start workflow operation + * @return WorkflowUpdateHandle that can be used to get the result of the update + */ + @Experimental + static R executeUpdateWithStart( + Functions.Proc updateMethod, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart(updateMethod, options, startOperation); + } + + /** + * Execute a one argument update workflow request synchronously, along with a workflow start + * request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Proc1 updateMethod, + A1 arg1, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1), options, startOperation); + } + + /** + * Execute a two argument update workflow request synchronously, along with a workflow start + * request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Proc2 updateMethod, + A1 arg1, + A2 arg2, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1, arg2), options, startOperation); + } + + /** + * Execute a three argument update workflow request synchronously, along with a workflow start + * request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Proc3 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3), options, startOperation); + } + + /** + * Execute a four argument update workflow request synchronously, along with a workflow start + * request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Proc4 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4), options, startOperation); + } + + /** + * Execute a five argument update workflow request synchronously, along with a workflow start + * request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param arg5 fifth update function parameter + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Proc5 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5), options, startOperation); + } + + /** + * Execute a six argument update workflow request synchronously, along with a workflow start + * request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param arg5 fifth update function parameter + * @param arg6 sixth update function parameter + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Proc6 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + A6 arg6, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5, arg6), options, startOperation); + } + + /** + * Executes zero argument workflow. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Func updateMethod, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + updateMethod::apply, options, startOperation); + } + + /** + * Executes one argument workflow together with an update workflow request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow argument + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Func1 updateMethod, + A1 arg1, + @Nonnull UpdateOptions updateOptions, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1), updateOptions, startOperation); + } + + /** + * Executes two argument workflow together with an update workflow request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Functions.Func2 updateMethod, + A1 arg1, + A2 arg2, + @Nonnull UpdateOptions updateOptions, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1, arg2), updateOptions, startOperation); + } + + /** + * Executes three argument workflow together with an update workflow request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Functions.Func3 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3), options, startOperation); + } + + /** + * Executes four argument workflow together with an update workflow request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Functions.Func4 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4), options, startOperation); + } + + /** + * Executes five argument workflow together with an update workflow request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first update function parameter + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param arg5 fifth update function parameter + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Functions.Func5 updateMethod, + A1 arg1, + A2 arg2, + A3 arg3, + A4 arg4, + A5 arg5, + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5), options, startOperation); + } + + /** + * Executes six argument workflow together with an update workflow request. + * + * @param updateMethod The only supported value is method reference to a proxy created through + * {@link #newWorkflowStub(Class, WorkflowOptions)}. + * @param arg1 first workflow argument + * @param arg2 second update function parameter + * @param arg3 third update function parameter + * @param arg4 fourth update function parameter + * @param arg5 fifth update function parameter + * @param arg6 sixth update function parameter + * @param startOperation start workflow operation + * @return update result + */ + @Experimental + static R executeUpdateWithStart( + Functions.Func6 updateMethod, A1 arg1, A2 arg2, A3 arg3, A4 arg4, A5 arg5, A6 arg6, - @Nonnull UpdateWithStartWorkflowOperation updateOperation) { - return updateOperation.invoke(() -> workflow.apply(arg1, arg2, arg3, arg4, arg5, arg6)); + @Nonnull UpdateOptions options, + @Nonnull WithStartWorkflowOperation startOperation) { + return WorkflowClientInternalImpl.executeUpdateWithStart( + () -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5, arg6), options, startOperation); } /** diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java index 1060bd87f..b9a8fe996 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java @@ -658,6 +658,37 @@ public static WorkflowUpdateHandle startUpdate( return startUpdate(() -> updateMethod.apply(arg1, arg2, arg3, arg4, arg5, arg6), options); } + public static WorkflowUpdateHandle startUpdateWithStart( + Functions.Proc updateMethod, + UpdateOptions updateOptions, + WithStartWorkflowOperation startOp) { + enforceNonWorkflowThread(); + WorkflowInvocationHandler.initAsyncInvocation( + InvocationType.UPDATE_WITH_START, + new WorkflowInvocationHandler.UpdateWithStartOptions(updateOptions, startOp)); + try { + updateMethod.apply(); + + if (startOp.getStartMethod() != null) { // only present when using typed API + startOp.getStartMethod().apply(); + } + + return WorkflowInvocationHandler.getAsyncInvocationResult(WorkflowUpdateHandle.class); + } finally { + WorkflowInvocationHandler.closeAsyncInvocation(); + } + } + + public static R executeUpdateWithStart( + Functions.Proc updateMethod, + UpdateOptions updateOptions, + WithStartWorkflowOperation startOp) { + updateOptions.validateWaitForCompleted(); + UpdateOptions optionsWithWaitStageCompleted = + updateOptions.toBuilder().setWaitForStage(WorkflowUpdateStage.COMPLETED).build(); + return startUpdateWithStart(updateMethod, optionsWithWaitStageCompleted, startOp).getResult(); + } + Stream streamHistory(WorkflowExecution execution) { Preconditions.checkNotNull(execution, "execution is required"); diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java index 09643dcc3..0f123ac17 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowInvocationHandler.java @@ -36,10 +36,7 @@ import io.temporal.common.metadata.WorkflowMethodType; import io.temporal.internal.client.NexusStartWorkflowRequest; import io.temporal.internal.sync.StubMarker; -import io.temporal.workflow.QueryMethod; -import io.temporal.workflow.SignalMethod; -import io.temporal.workflow.UpdateMethod; -import io.temporal.workflow.WorkflowMethod; +import io.temporal.workflow.*; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.*; @@ -97,11 +94,13 @@ static void initAsyncInvocation(InvocationType type, T value) { NexusStartWorkflowRequest request = (NexusStartWorkflowRequest) value; invocationContext.set(new StartNexusOperationInvocationHandler(request)); } else if (type == InvocationType.UPDATE) { - UpdateOptions updateOptions = (UpdateOptions) value; + UpdateOptions updateOptions = (UpdateOptions) value; invocationContext.set(new UpdateInvocationHandler(updateOptions)); } else if (type == InvocationType.UPDATE_WITH_START) { - UpdateWithStartWorkflowOperation operation = (UpdateWithStartWorkflowOperation) value; - invocationContext.set(new UpdateWithStartInvocationHandler(operation)); + UpdateWithStartOptions updateWithStartOptions = (UpdateWithStartOptions) value; + invocationContext.set( + new UpdateWithStartInvocationHandler( + updateWithStartOptions.options, updateWithStartOptions.startOp)); } else { throw new IllegalArgumentException("Unexpected InvocationType: " + type); } @@ -115,7 +114,7 @@ static R getAsyncInvocationResult(Class resultClass) { return invocation.getResult(resultClass); } - /** Closes async invocation created through {@link #initAsyncInvocation(InvocationType)} */ + /** Closes async invocation created through {@link #initAsyncInvocation} */ static void closeAsyncInvocation() { invocationContext.remove(); } @@ -475,8 +474,20 @@ public void invoke( throw new IllegalArgumentException( "Only a method annotated with @UpdateMethod can be used to start an Update."); } + result = untyped.startUpdate(mergeUpdateOptions(options, workflowMetadata, method), args); + } + + @Override + @SuppressWarnings("unchecked") + public R getResult(Class resultClass) { + return (R) result; + } + + static UpdateOptions mergeUpdateOptions( + UpdateOptions options, POJOWorkflowInterfaceMetadata workflowMetadata, Method method) { POJOWorkflowMethodMetadata methodMetadata = workflowMetadata.getMethodMetadata(method); UpdateOptions.Builder builder = UpdateOptions.newBuilder(options); + if (Strings.isNullOrEmpty(options.getUpdateName())) { builder.setUpdateName(methodMetadata.getName()); } else if (!options.getUpdateName().equals(methodMetadata.getName())) { @@ -492,14 +503,7 @@ public void invoke( if (options.getResultClass() == null) { builder.setResultClass(method.getReturnType()); } - - result = untyped.startUpdate(builder.build(), args); - } - - @Override - @SuppressWarnings("unchecked") - public R getResult(Class resultClass) { - return (R) result; + return builder.build(); } } @@ -511,12 +515,21 @@ enum State { UPDATE_RECEIVED, } - private final UpdateWithStartWorkflowOperation operation; - + private final UpdateOptions userProvidedUpdateOptions; + private Object[] updateArgs; + private Object[] startArgs; + private UpdateOptions updateOptions; + private final WithStartWorkflowOperation startOp; private State state = State.INIT; + private WorkflowUpdateHandle result; + private WorkflowStub stub; - public UpdateWithStartInvocationHandler(UpdateWithStartWorkflowOperation operation) { - this.operation = operation; + public UpdateWithStartInvocationHandler( + UpdateOptions options, WithStartWorkflowOperation startOp) { + Preconditions.checkNotNull(options, "options"); + Preconditions.checkNotNull(startOp, "startOp"); + this.userProvidedUpdateOptions = options; + this.startOp = startOp; } @Override @@ -531,29 +544,34 @@ public void invoke( Method method, Object[] args) { - POJOWorkflowMethodMetadata methodMetadata = workflowMetadata.getMethodMetadata(method); - if (state == State.INIT) { - WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class); - if (workflowMethod == null) { - throw new IllegalArgumentException( - "Method '" + method.getName() + "' is not a WorkflowMethod"); - } - this.operation.prepareStart(untyped, args); - state = State.START_RECEIVED; - } else if (state == State.START_RECEIVED) { UpdateMethod updateMethod = method.getAnnotation(UpdateMethod.class); if (updateMethod == null) { throw new IllegalArgumentException( - "Method '" + method.getName() + "' is not an UpdateMethod"); + "Method '" + method.getName() + "' is not an @UpdateMethod"); } - this.operation.prepareUpdate( - untyped, - methodMetadata.getName(), - method.getReturnType(), - method.getGenericReturnType(), - args); + this.setStub(untyped); + this.updateArgs = args; + this.updateOptions = + UpdateInvocationHandler.mergeUpdateOptions( + userProvidedUpdateOptions, workflowMetadata, method); state = State.UPDATE_RECEIVED; + } else if (state == State.UPDATE_RECEIVED) { + WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class); + if (workflowMethod == null) { + throw new IllegalArgumentException( + "Method '" + method.getName() + "' is not a @WorkflowMethod"); + } + if (!startOp.markInvoked()) { + throw new IllegalStateException("WithStartWorkflowOperation was already executed"); + } + this.setStub(untyped); + this.startArgs = args; + this.startOp.setStub(untyped); + this.startOp.setResultClass(method.getReturnType()); + state = State.START_RECEIVED; + + this.result = untyped.startUpdateWithStart(updateOptions, updateArgs, this.startArgs); } else { throw new IllegalArgumentException( "UpdateWithStartInvocationHandler called too many times"); @@ -561,8 +579,27 @@ public void invoke( } @Override + @SuppressWarnings("unchecked") public R getResult(Class resultClass) { - throw new IllegalStateException("No result is expected"); + return (R) result; + } + + private void setStub(WorkflowStub stub) { + if (this.stub != null && stub != this.stub) { + throw new IllegalArgumentException( + "WithStartWorkflowOperation invoked on different workflow stubs"); + } + this.stub = stub; + } + } + + static class UpdateWithStartOptions { + UpdateOptions options; + WithStartWorkflowOperation startOp; + + public UpdateWithStartOptions(UpdateOptions options, WithStartWorkflowOperation startOp) { + this.options = options; + this.startOp = startOp; } } } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java index 208d1d97d..a4d9f2e77 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStub.java @@ -22,6 +22,7 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.QueryRejectCondition; +import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; import io.temporal.common.Experimental; import io.temporal.failure.CanceledFailure; import io.temporal.failure.TerminatedFailure; @@ -157,16 +158,35 @@ WorkflowUpdateHandle getUpdateHandle( WorkflowExecution start(Object... args); /** - * Execute a workflow together with an update workflow request. + * Asynchronously update a workflow execution by invoking its update handler, and start the + * workflow according to the option's {@link WorkflowIdConflictPolicy}. It returns a handle to the + * update request. If {@link WorkflowUpdateStage#COMPLETED} is specified, in the options, the + * handle will not be returned until the update is completed. * - * @param updateOperation update workflow operation + * @param updateOptions options that will be used to configure and start a new update request + * @param updateArgs update method arguments * @param startArgs workflow start arguments * @param type of the update workflow result * @return WorkflowUpdateHandle that can be used to get the result of the update */ @Experimental - WorkflowUpdateHandle updateWithStart( - UpdateWithStartWorkflowOperation updateOperation, Object... startArgs); + WorkflowUpdateHandle startUpdateWithStart( + UpdateOptions updateOptions, Object[] updateArgs, Object[] startArgs); + + /** + * Synchronously update a workflow execution by invoking its update handler, and start the + * workflow according to the option's {@link WorkflowIdConflictPolicy}. It returns the update + * result. + * + * @param updateOptions options that will be used to configure and start a new update request + * @param updateArgs update method arguments + * @param startArgs workflow start arguments + * @param type of the update workflow result + * @return update result + */ + @Experimental + R executeUpdateWithStart( + UpdateOptions updateOptions, Object[] updateArgs, Object[] startArgs); WorkflowExecution signalWithStart(String signalName, Object[] signalArgs, Object[] startArgs); diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java index 58519e0ec..d644cbaae 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java @@ -131,45 +131,63 @@ public WorkflowExecution start(Object... args) { } @Override - public WorkflowUpdateHandle updateWithStart( - UpdateWithStartWorkflowOperation updateOperation, Object... startArgs) { + public WorkflowUpdateHandle startUpdateWithStart( + UpdateOptions updateOptions, Object[] updateArgs, Object[] startArgs) { if (options == null) { - throw new IllegalStateException("Required parameter WorkflowOptions is missing"); + throw new IllegalStateException( + "Required parameter WorkflowOptions is missing in WorkflowStub"); } - if (!updateOperation.markInvoked()) { - throw new IllegalStateException("UpdateWithStartWorkflowOperation was already executed"); + if (options.getWorkflowIdConflictPolicy() == null) { + throw new IllegalStateException( + "WorkflowIdConflictPolicy is required in WorkflowOptions for Update-With-Start"); } - checkExecutionIsNotStarted(); + updateOptions.validate(); + String workflowId = getWorkflowIdForStart(options); WorkflowExecution workflowExecution = null; try { + // gather inputs WorkflowClientCallsInterceptor.WorkflowStartInput startInput = new WorkflowClientCallsInterceptor.WorkflowStartInput( workflowId, workflowType.get(), Header.empty(), startArgs, options); + WorkflowClientCallsInterceptor.StartUpdateInput updateInput = + startUpdateInput( + updateOptions, + updateArgs, + WorkflowExecution.newBuilder().setWorkflowId(workflowId).build()); WorkflowClientCallsInterceptor.WorkflowUpdateWithStartInput input = new WorkflowClientCallsInterceptor.WorkflowUpdateWithStartInput<>( - startInput, updateOperation); + startInput, updateInput); + WorkflowClientCallsInterceptor.WorkflowUpdateWithStartOutput output = workflowClientInvoker.updateWithStart(input); + + // gather outputs workflowExecution = output.getWorkflowStartOutput().getWorkflowExecution(); populateExecutionAfterStart(workflowExecution); - WorkflowUpdateHandle updateHandle = output.getUpdateHandle(); - updateOperation.setUpdateHandle(updateHandle); - return updateHandle; + return output.getUpdateHandle(); } catch (StatusRuntimeException e) { throw wrapStartException(workflowId, workflowType.orElse(null), e); } catch (Exception e) { if (workflowExecution == null) { - // if start failed with exception - there could be no valid workflow execution populated - // from the server. - // WorkflowServiceException requires not null workflowExecution, so we have to provide - // an WorkflowExecution instance with just a workflowId + // If start failed with exception there could be no valid workflow execution populated + // from the server. WorkflowServiceException requires not null WorkflowExecution, so we have + // to provide a WorkflowExecution instance with just a workflowId. workflowExecution = WorkflowExecution.newBuilder().setWorkflowId(workflowId).build(); } throw new WorkflowServiceException(workflowExecution, workflowType.orElse(null), e); } } + @Override + public R executeUpdateWithStart( + UpdateOptions updateOptions, Object[] updateArgs, Object[] startArgs) { + updateOptions.validateWaitForCompleted(); + UpdateOptions optionsWithWaitStageCompleted = + updateOptions.toBuilder().setWaitForStage(WorkflowUpdateStage.COMPLETED).build(); + return startUpdateWithStart(optionsWithWaitStageCompleted, updateArgs, startArgs).getResult(); + } + private WorkflowExecution signalWithStartWithOptions( WorkflowOptions options, String signalName, Object[] signalArgs, Object[] startArgs) { checkExecutionIsNotStarted(); @@ -372,30 +390,38 @@ public WorkflowUpdateHandle startUpdate(UpdateOptions options, Object. options.validate(); WorkflowExecution targetExecution = execution.get(); try { - String updateId = - Strings.isNullOrEmpty(options.getUpdateId()) - ? UUID.randomUUID().toString() - : options.getUpdateId(); - return workflowClientInvoker.startUpdate( - new WorkflowClientCallsInterceptor.StartUpdateInput<>( - targetExecution, - workflowType, - options.getUpdateName(), - Header.empty(), - updateId, - args, - options.getResultClass(), - options.getResultType(), - options.getFirstExecutionRunId(), - WaitPolicy.newBuilder() - .setLifecycleStage(options.getWaitForStage().getProto()) - .build())); + WorkflowClientCallsInterceptor.StartUpdateInput input = + startUpdateInput(options, args, targetExecution); + return workflowClientInvoker.startUpdate(input); } catch (Exception e) { Throwable throwable = throwAsWorkflowFailureException(e, targetExecution); throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), throwable); } } + private WorkflowClientCallsInterceptor.StartUpdateInput startUpdateInput( + UpdateOptions options, Object[] args, WorkflowExecution targetExecution) { + String updateId = + Strings.isNullOrEmpty(options.getUpdateId()) + ? UUID.randomUUID().toString() + : options.getUpdateId(); + WorkflowClientCallsInterceptor.StartUpdateInput input = + new WorkflowClientCallsInterceptor.StartUpdateInput<>( + targetExecution, + workflowType, + options.getUpdateName(), + Header.empty(), + updateId, + args, + options.getResultClass(), + options.getResultType(), + options.getFirstExecutionRunId(), + WaitPolicy.newBuilder() + .setLifecycleStage(options.getWaitForStage().getProto()) + .build()); + return input; + } + @Override public WorkflowUpdateHandle getUpdateHandle(String updateId, Class resultClass) { return new LazyWorkflowUpdateHandleImpl<>( diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java index 1360d63c5..e089e4194 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java @@ -62,8 +62,9 @@ public interface WorkflowClientCallsInterceptor { WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input); /** - * Intercepts calls from {@link WorkflowStub#updateWithStart} and {@link - * WorkflowClient#updateWithStart}. + * Intercepts calls from {@link WorkflowStub#startUpdateWithStart} and {@link + * WorkflowStub#executeUpdateWithStart} as well as {@link WorkflowClient#startUpdateWithStart} and + * {@link WorkflowClient#executeUpdateWithStart}. */ @Experimental WorkflowUpdateWithStartOutput updateWithStart(WorkflowUpdateWithStartInput input); @@ -233,21 +234,20 @@ public WorkflowStartOutput getWorkflowStartOutput() { final class WorkflowUpdateWithStartInput { private final WorkflowStartInput workflowStartInput; - private final UpdateWithStartWorkflowOperation updateOperation; + private final StartUpdateInput workflowUpdateInput; public WorkflowUpdateWithStartInput( - WorkflowStartInput workflowStartInput, - UpdateWithStartWorkflowOperation updateOperation) { + WorkflowStartInput workflowStartInput, StartUpdateInput workflowUpdateInput) { this.workflowStartInput = workflowStartInput; - this.updateOperation = updateOperation; + this.workflowUpdateInput = workflowUpdateInput; } public WorkflowStartInput getWorkflowStartInput() { return workflowStartInput; } - public UpdateWithStartWorkflowOperation getUpdateOperation() { - return updateOperation; + public StartUpdateInput getStartUpdateInput() { + return workflowUpdateInput; } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 98399c86e..0162e8e1d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -25,7 +25,6 @@ import static io.temporal.internal.common.HeaderUtils.intoPayloadMap; import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData; -import com.google.common.base.Strings; import io.grpc.Deadline; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -40,7 +39,6 @@ import io.temporal.api.workflowservice.v1.*; import io.temporal.client.*; import io.temporal.common.converter.DataConverter; -import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; import io.temporal.internal.client.external.GenericWorkflowClient; import io.temporal.internal.common.HeaderUtils; @@ -184,32 +182,6 @@ public WorkflowUpdateWithStartOutput updateWithStart( .withContext( new WorkflowSerializationContext( clientOptions.getNamespace(), startInput.getWorkflowId())); - StartWorkflowExecutionRequest startRequest = - toStartRequest(dataConverterWithWorkflowContext, startInput).build(); - - UpdateWithStartWorkflowOperation updateOperation = input.getUpdateOperation(); - UpdateOptions updateOptions = updateOperation.getOptions(); - updateOptions.validate(); - String updateId = - Strings.isNullOrEmpty(updateOptions.getUpdateId()) - ? UUID.randomUUID().toString() - : updateOptions.getUpdateId(); - StartUpdateInput updateInput = - new StartUpdateInput<>( - WorkflowExecution.newBuilder().setWorkflowId(startInput.getWorkflowId()).build(), - Optional.of(startInput.getWorkflowType()), - updateOptions.getUpdateName(), - Header.empty(), - updateId, - updateOperation.getUpdateArgs(), - updateOptions.getResultClass(), - updateOptions.getResultType(), - "", // firstExecutionRunId is always empty - WaitPolicy.newBuilder() - .setLifecycleStage(updateOptions.getWaitForStage().getProto()) - .build()); - UpdateWorkflowExecutionRequest updateRequest = - toUpdateWorkflowExecutionRequest(updateInput, dataConverterWithWorkflowContext); ExecuteMultiOperationRequest request = ExecuteMultiOperationRequest.newBuilder() @@ -217,13 +189,14 @@ public WorkflowUpdateWithStartOutput updateWithStart( .addOperations( 0, ExecuteMultiOperationRequest.Operation.newBuilder() - .setStartWorkflow(startRequest) + .setStartWorkflow(toStartRequest(dataConverterWithWorkflowContext, startInput)) .build()) .addOperations( 1, ExecuteMultiOperationRequest.Operation.newBuilder() - .setUpdateWorkflow(updateRequest) - .build()) + .setUpdateWorkflow( + toUpdateWorkflowExecutionRequest( + input.getStartUpdateInput(), dataConverterWithWorkflowContext))) .build(); ExecuteMultiOperationResponse response; @@ -261,9 +234,9 @@ public WorkflowUpdateWithStartOutput updateWithStart( if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED || e.getStatus().getCode() == Status.Code.CANCELLED) { throw new WorkflowUpdateTimeoutOrCancelledException( - updateInput.getWorkflowExecution(), - updateInput.getUpdateName(), - updateInput.getUpdateId(), + input.getStartUpdateInput().getWorkflowExecution(), + input.getStartUpdateInput().getUpdateName(), + input.getStartUpdateInput().getUpdateId(), e); } @@ -283,32 +256,38 @@ public WorkflowUpdateWithStartOutput updateWithStart( } MultiOperationExecutionFailure.OperationStatus startStatus = failure.getStatuses(0); - if (startStatus.getDetailsCount() == 0 - || !startStatus.getDetails(0).is(MultiOperationExecutionAborted.class)) { + if (startStatus.getCode() != Status.Code.OK.value() + && (startStatus.getDetailsCount() == 0 + || !startStatus.getDetails(0).is(MultiOperationExecutionAborted.class))) { throw Status.fromCodeValue(startStatus.getCode()) .withDescription(startStatus.getMessage()) .asRuntimeException(); } MultiOperationExecutionFailure.OperationStatus updateStatus = failure.getStatuses(1); - if (updateStatus.getDetailsCount() == 0 - || !updateStatus.getDetails(0).is(MultiOperationExecutionAborted.class)) { + if (updateStatus.getCode() != Status.Code.OK.value() + && (updateStatus.getDetailsCount() == 0 + || !updateStatus.getDetails(0).is(MultiOperationExecutionAborted.class))) { throw Status.fromCodeValue(updateStatus.getCode()) - .withDescription("Invalid StartOperation: " + updateStatus.getMessage()) + .withDescription(updateStatus.getMessage()) .asRuntimeException(); } throw e; // no detailed failure was found } - } while (updateNotYetDurable(updateInput, updateResponse)); + } while (updateNotYetDurable(input.getStartUpdateInput(), updateResponse)); WorkflowUpdateHandle updateHandle = - toUpdateHandle(updateInput, updateResponse, dataConverterWithWorkflowContext); + toUpdateHandle( + input.getStartUpdateInput(), updateResponse, dataConverterWithWorkflowContext); WorkflowExecution execution = WorkflowExecution.newBuilder() .setRunId(startResponse.getRunId()) - .setWorkflowId(startRequest.getWorkflowId()) + .setWorkflowId( + toStartRequest(dataConverterWithWorkflowContext, startInput) + .build() + .getWorkflowId()) .build(); return new WorkflowUpdateWithStartOutput<>(new WorkflowStartOutput(execution), updateHandle); } diff --git a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java index 3bdc35906..62b82ec43 100644 --- a/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java +++ b/temporal-sdk/src/test/java/io/temporal/client/functional/UpdateTest.java @@ -188,7 +188,7 @@ public void updateWorkflowReuseOptions() throws ExecutionException, InterruptedE WorkflowExecution execution = workflowStub.start(); SDKTestWorkflowRule.waitForOKQuery(workflowStub); - UpdateOptions updateOptions = + UpdateOptions updateOptions = UpdateOptions.newBuilder(String.class) .setUpdateName("update") .setFirstExecutionRunId(execution.getRunId()) @@ -210,36 +210,35 @@ public void updateWorkflowReuseOptions() throws ExecutionException, InterruptedE } @Test - public void updateWithStart() throws ExecutionException, InterruptedException { + public void startUpdateWithStartWithUntypedStub() { String workflowId = UUID.randomUUID().toString(); String workflowType = TestWorkflows.WorkflowWithUpdate.class.getSimpleName(); WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); - // first update-with-start - UpdateWithStartWorkflowOperation update1 = - UpdateWithStartWorkflowOperation.newBuilder( - "update", String.class, new Object[] {0, "Hello Update"}) + UpdateOptions updateOptions = + UpdateOptions.newBuilder(String.class) + .setUpdateName("update") + .setResultClass(String.class) .setWaitForStage(WorkflowUpdateStage.COMPLETED) .build(); + + // send first update-with-start WorkflowStub workflowStub1 = workflowClient.newUntypedWorkflowStub( workflowType, SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()) .toBuilder() + .setWorkflowIdConflictPolicy( + WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL) .setWorkflowId(workflowId) .build()); WorkflowUpdateHandle updateHandle1 = - workflowStub1.updateWithStart(update1, new String[] {"some-value"}, new String[] {}); + workflowStub1.startUpdateWithStart( + updateOptions, new Object[] {0, "Hello Update 1"}, new Object[] {}); - assertEquals(updateHandle1, update1.getUpdateHandle().get()); - assertEquals("Hello Update", update1.getResult()); + assertEquals("Hello Update 1", updateHandle1.getResult()); - // second update-with-start - UpdateWithStartWorkflowOperation update2 = - UpdateWithStartWorkflowOperation.newBuilder( - "update", String.class, new Object[] {0, "Hello Update 2"}) - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); + // send second update-with-start WorkflowStub workflowStub2 = workflowClient.newUntypedWorkflowStub( workflowType, @@ -250,40 +249,68 @@ public void updateWithStart() throws ExecutionException, InterruptedException { .setWorkflowId(workflowId) .build()); WorkflowUpdateHandle updateHandle2 = - workflowStub2.updateWithStart(update2, new String[] {"some-value"}, new String[] {}); + workflowStub2.startUpdateWithStart( + updateOptions, new Object[] {0, "Hello Update 2"}, new Object[] {}); - assertEquals(updateHandle2, update2.getUpdateHandle().get()); - assertEquals("Hello Update 2", update2.getResult()); + assertEquals("Hello Update 2", updateHandle2.getResult()); + // send update workflowStub2.update("complete", void.class); + + assertEquals("complete", workflowStub1.getResult(String.class)); assertEquals("complete", workflowStub2.getResult(String.class)); } @Test - public void updateWithStartOperationSingleUse() { + public void executeUpdateWithStartWithUntypedStub() { String workflowId = UUID.randomUUID().toString(); + String workflowType = TestWorkflows.WorkflowWithUpdate.class.getSimpleName(); WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); - UpdateWithStartWorkflowOperation update = - UpdateWithStartWorkflowOperation.newBuilder( - "update", String.class, new Object[] {0, "Hello Update"}) + UpdateOptions updateOptions = + UpdateOptions.newBuilder(String.class) + .setUpdateName("update") + .setResultClass(String.class) .setWaitForStage(WorkflowUpdateStage.COMPLETED) .build(); - WorkflowStub workflowStub = + + // send first update-with-start + WorkflowStub workflowStub1 = + workflowClient.newUntypedWorkflowStub( + workflowType, + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()) + .toBuilder() + .setWorkflowIdConflictPolicy( + WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL) + .setWorkflowId(workflowId) + .build()); + String updateResult1 = + workflowStub1.executeUpdateWithStart( + updateOptions, new Object[] {0, "Hello Update 1"}, new Object[] {}); + + assertEquals("Hello Update 1", updateResult1); + + // send second update-with-start + WorkflowStub workflowStub2 = workflowClient.newUntypedWorkflowStub( - TestWorkflows.WorkflowWithUpdate.class.getSimpleName(), + workflowType, SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()) .toBuilder() + .setWorkflowIdConflictPolicy( + WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) .setWorkflowId(workflowId) .build()); + String updateResult2 = + workflowStub2.executeUpdateWithStart( + updateOptions, new Object[] {0, "Hello Update 2"}, new Object[] {}); - workflowStub.updateWithStart(update, new String[] {"some-value"}, new String[] {}); + assertEquals("Hello Update 2", updateResult2); - try { - workflowStub.updateWithStart(update, new String[] {"some-value"}, new String[] {}); - } catch (IllegalStateException e) { - assertEquals(e.getMessage(), "UpdateWithStartWorkflowOperation was already executed"); - } + // send update + workflowStub2.update("complete", void.class); + + assertEquals("complete", workflowStub1.getResult(String.class)); + assertEquals("complete", workflowStub2.getResult(String.class)); } public static class QuickWorkflowWithUpdateImpl implements TestWorkflows.TestUpdatedWorkflow { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/SagaTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/SagaTest.java index c049337d2..bb157a844 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/SagaTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/SagaTest.java @@ -144,7 +144,7 @@ public String func() { } @Override - public String update(Integer i) { + public String update() { throw new UnsupportedOperationException(); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ParentWorkflowInfoInChildWorkflowsTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ParentWorkflowInfoInChildWorkflowsTest.java index e5a2630de..76a89f2cd 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ParentWorkflowInfoInChildWorkflowsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/childWorkflowTests/ParentWorkflowInfoInChildWorkflowsTest.java @@ -69,7 +69,7 @@ public String func2(String s, int i) { } @Override - public String update(Integer i) { + public String update2(String a1, int a2) { throw new UnsupportedOperationException(); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowFunctions.java b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowFunctions.java index 55c4c3387..a6fb147a0 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowFunctions.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestMultiArgWorkflowFunctions.java @@ -27,56 +27,92 @@ public class TestMultiArgWorkflowFunctions { - public interface TestUpdateFunc { + public interface TestNoArgsUpdateFunc { @UpdateMethod - String update(Integer i); + String update(); + } + + public interface Test1ArgUpdateFunc { + + @UpdateMethod + String update1(String input); + } + + public interface Test2ArgsUpdateFunc { + + @UpdateMethod + String update2(String a1, int a2); + } + + public interface Test3ArgsUpdateFunc { + + @UpdateMethod + String update3(String a1, int a2, int a3); + } + + public interface Test4ArgsUpdateFunc { + + @UpdateMethod + String update4(String a1, int a2, int a3, int a4); + } + + public interface Test5ArgsUpdateFunc { + + @UpdateMethod + String update5(String a1, int a2, int a3, int a4, int a5); + } + + public interface Test6ArgsUpdateFunc { + + @UpdateMethod + String update6(String a1, int a2, int a3, int a4, int a5, int a6); } @WorkflowInterface - public interface TestNoArgsWorkflowFunc extends TestUpdateFunc { + public interface TestNoArgsWorkflowFunc extends TestNoArgsUpdateFunc { @WorkflowMethod String func(); } @WorkflowInterface - public interface Test1ArgWorkflowFunc extends TestUpdateFunc { + public interface Test1ArgWorkflowFunc extends Test1ArgUpdateFunc { @WorkflowMethod(name = "func1") String func1(String input); } @WorkflowInterface - public interface Test2ArgWorkflowFunc extends TestUpdateFunc { + public interface Test2ArgWorkflowFunc extends Test2ArgsUpdateFunc { @WorkflowMethod String func2(String a1, int a2); } @WorkflowInterface - public interface Test3ArgWorkflowFunc extends TestUpdateFunc { + public interface Test3ArgWorkflowFunc extends Test3ArgsUpdateFunc { @WorkflowMethod String func3(String a1, int a2, int a3); } @WorkflowInterface - public interface Test4ArgWorkflowFunc extends TestUpdateFunc { + public interface Test4ArgWorkflowFunc extends Test4ArgsUpdateFunc { @WorkflowMethod String func4(String a1, int a2, int a3, int a4); } @WorkflowInterface - public interface Test5ArgWorkflowFunc extends TestUpdateFunc { + public interface Test5ArgWorkflowFunc extends Test5ArgsUpdateFunc { @WorkflowMethod String func5(String a1, int a2, int a3, int a4, int a5); } @WorkflowInterface - public interface Test6ArgWorkflowFunc extends TestUpdateFunc { + public interface Test6ArgWorkflowFunc extends Test6ArgsUpdateFunc { @WorkflowMethod String func6(String a1, int a2, int a3, int a4, int a5, int a6); @@ -89,57 +125,56 @@ public interface ProcInvocationQueryable { } @WorkflowInterface - public interface TestNoArgsWorkflowProc extends ProcInvocationQueryable, TestUpdateFunc { + public interface TestNoArgsWorkflowProc extends ProcInvocationQueryable, TestNoArgsUpdateFunc { @WorkflowMethod void proc(); } @WorkflowInterface - public interface Test1ArgWorkflowProc extends ProcInvocationQueryable, TestUpdateFunc { + public interface Test1ArgWorkflowProc extends ProcInvocationQueryable, Test1ArgUpdateFunc { @WorkflowMethod void proc1(String input); } @WorkflowInterface - public interface Test2ArgWorkflowProc extends ProcInvocationQueryable, TestUpdateFunc { + public interface Test2ArgWorkflowProc extends ProcInvocationQueryable, Test2ArgsUpdateFunc { @WorkflowMethod void proc2(String a1, int a2); } @WorkflowInterface - public interface Test3ArgWorkflowProc extends ProcInvocationQueryable, TestUpdateFunc { + public interface Test3ArgWorkflowProc extends ProcInvocationQueryable, Test3ArgsUpdateFunc { @WorkflowMethod void proc3(String a1, int a2, int a3); } @WorkflowInterface - public interface Test4ArgWorkflowProc extends ProcInvocationQueryable, TestUpdateFunc { + public interface Test4ArgWorkflowProc extends ProcInvocationQueryable, Test4ArgsUpdateFunc { @WorkflowMethod void proc4(String a1, int a2, int a3, int a4); } @WorkflowInterface - public interface Test5ArgWorkflowProc extends ProcInvocationQueryable, TestUpdateFunc { + public interface Test5ArgWorkflowProc extends ProcInvocationQueryable, Test5ArgsUpdateFunc { @WorkflowMethod void proc5(String a1, int a2, int a3, int a4, int a5); } @WorkflowInterface - public interface Test6ArgWorkflowProc extends ProcInvocationQueryable, TestUpdateFunc { + public interface Test6ArgWorkflowProc extends ProcInvocationQueryable, Test6ArgsUpdateFunc { @WorkflowMethod void proc6(String a1, int a2, int a3, int a4, int a5, int a6); } public static class TestMultiArgWorkflowImpl - implements TestUpdateFunc, - TestNoArgsWorkflowFunc, + implements TestNoArgsWorkflowFunc, Test1ArgWorkflowFunc, Test2ArgWorkflowFunc, Test3ArgWorkflowFunc, @@ -232,8 +267,38 @@ public String query() { } @Override - public String update(Integer i) { - return i.toString(); + public String update() { + return "update"; + } + + @Override + public String update1(String a1) { + return a1; + } + + @Override + public String update2(String a1, int a2) { + return a1 + a2; + } + + @Override + public String update3(String a1, int a2, int a3) { + return a1 + a2 + a3; + } + + @Override + public String update4(String a1, int a2, int a3, int a4) { + return a1 + a2 + a3 + a4; + } + + @Override + public String update5(String a1, int a2, int a3, int a4, int a5) { + return a1 + a2 + a3 + a4 + a5; + } + + @Override + public String update6(String a1, int a2, int a3, int a4, int a5, int a6) { + return a1 + a2 + a3 + a4 + a5 + a6; } } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestNoArgsWorkflowFuncParent.java b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestNoArgsWorkflowFuncParent.java index a6bd2b641..baad4a1ae 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestNoArgsWorkflowFuncParent.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/shared/TestNoArgsWorkflowFuncParent.java @@ -46,7 +46,7 @@ public String func() { } @Override - public String update(Integer i) { + public String update() { throw new UnsupportedOperationException(); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java index c3efeb1de..818942010 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithStartTest.java @@ -20,10 +20,10 @@ package io.temporal.workflow.updateTest; +import static io.temporal.workflow.shared.TestMultiArgWorkflowFunctions.*; import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import com.uber.m3.tally.Scope; import io.grpc.Context; @@ -33,16 +33,17 @@ import io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage; import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; import io.temporal.api.enums.v1.WorkflowIdReusePolicy; +import io.temporal.api.errordetails.v1.MultiOperationExecutionFailure; import io.temporal.api.update.v1.UpdateRef; import io.temporal.api.workflowservice.v1.*; import io.temporal.client.*; +import io.temporal.serviceclient.StatusUtils; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.worker.WorkerOptions; import io.temporal.workflow.*; -import io.temporal.workflow.shared.TestMultiArgWorkflowFunctions; import io.temporal.workflow.shared.TestWorkflows; import java.util.*; import java.util.concurrent.ExecutionException; @@ -51,6 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Function; import org.junit.Rule; import org.junit.Test; @@ -63,83 +65,296 @@ public class UpdateWithStartTest { .setWorkflowTypes( WorkflowWithUpdateImpl.class, TestUpdatedWorkflowImpl.class, - TestMultiArgWorkflowFunctions.TestMultiArgWorkflowImpl.class) + TestMultiArgWorkflowImpl.class) .build(); - @Test - public void startAndSendUpdateTogether() throws ExecutionException, InterruptedException { - WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + private Results assertUpdateWithStart( + Class stubClass, + Object[] args, + Function> startOperationProvider, + BiFunction, WorkflowUpdateHandle> + updateHandleProvider, + BiFunction, String> updateResultProvider) + throws ExecutionException, InterruptedException { - WorkflowOptions options = createOptions(); - TestWorkflows.WorkflowWithUpdate workflow = - workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); + WorkflowClient client = testWorkflowRule.getWorkflowClient(); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow::update, 1, "Hello Update") - .setWaitForStage(WorkflowUpdateStage.COMPLETED) + String updateName = "update"; + if (args.length > 0) { + updateName = updateName + args.length; + } + UpdateOptions untypedUpdateOptions = + createUpdateOptions().toBuilder() + .setResultClass(String.class) + .setUpdateName(updateName) .build(); - WorkflowUpdateHandle handle1 = - WorkflowClient.updateWithStart(workflow::execute, updateOp); - assertEquals(options.getWorkflowId(), handle1.getExecution().getWorkflowId()); - assertEquals("Hello Update", handle1.getResultAsync().get()); - - WorkflowUpdateHandle handle2 = updateOp.getUpdateHandle().get(); - assertEquals(updateOp.getResult(), handle2.getResultAsync().get()); - - workflow.complete(); - - assertEquals("Hello Update complete", WorkflowStub.fromTyped(workflow).getResult(String.class)); + // === typed + + // startUpdateWithStart + T typedStub = client.newWorkflowStub(stubClass, createWorkflowOptions()); + WithStartWorkflowOperation typedStartOp = startOperationProvider.apply(typedStub); + WorkflowUpdateHandle updHandle = updateHandleProvider.apply(typedStub, typedStartOp); + + // these will serve as the canonical results + final String theWorkflowResult = typedStartOp.getResult(); + final String theUpdateResult = updHandle.getResult(); + assertEquals(theWorkflowResult, WorkflowStub.fromTyped(typedStub).getResult(String.class)); + + // executeUpdateWithStart + typedStub = client.newWorkflowStub(stubClass, createWorkflowOptions()); + typedStartOp = startOperationProvider.apply(typedStub); + String updResult = updateResultProvider.apply(typedStub, typedStartOp); + assertEquals(theUpdateResult, updResult); + assertEquals(theWorkflowResult, typedStartOp.getResult()); + + // === untyped + + // startUpdateWithStart + typedStub = client.newWorkflowStub(stubClass, createWorkflowOptions()); + WorkflowStub untypedStub = WorkflowStub.fromTyped(typedStub); + updHandle = untypedStub.startUpdateWithStart(untypedUpdateOptions, args, args); + assertEquals(theUpdateResult, updHandle.getResultAsync().get()); + assertEquals(theUpdateResult, updHandle.getResult()); + + // executeUpdateWithStart + typedStub = client.newWorkflowStub(stubClass, createWorkflowOptions()); + untypedStub = WorkflowStub.fromTyped(typedStub); + updResult = untypedStub.executeUpdateWithStart(untypedUpdateOptions, args, args); + assertEquals(theUpdateResult, updResult); + + return new Results(theWorkflowResult, theUpdateResult); } @Test - public void startAndSendUpdateTogetherUsingUntypedWorkflowOperation() - throws ExecutionException, InterruptedException { - WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + public void startWorkflowAndUpdate() throws ExecutionException, InterruptedException { + // no arg + Results results = + assertUpdateWithStart( + TestNoArgsWorkflowFunc.class, + new Object[] {}, + (TestNoArgsWorkflowFunc stub) -> new WithStartWorkflowOperation<>(stub::func), + (TestNoArgsWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart(stub::update, createUpdateOptions(), startOp), + (TestNoArgsWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update, createUpdateOptions(), startOp)); + assertEquals("update", results.updateResult); + assertEquals("func", results.workflowResult); + + results = + assertUpdateWithStart( + TestNoArgsWorkflowProc.class, + new Object[] {}, + (TestNoArgsWorkflowProc stub) -> new WithStartWorkflowOperation<>(stub::proc), + (TestNoArgsWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart(stub::update, createUpdateOptions(), startOp), + (TestNoArgsWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update, createUpdateOptions(), startOp)); + assertEquals("update", results.updateResult); + assertNull(results.workflowResult); - WorkflowOptions options = createOptions(); - TestWorkflows.WorkflowWithUpdate workflow = - workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); + // 1 arg + results = + assertUpdateWithStart( + Test1ArgWorkflowFunc.class, + new Object[] {"1"}, + (Test1ArgWorkflowFunc stub) -> new WithStartWorkflowOperation<>(stub::func1, "1"), + (Test1ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update1, "1", createUpdateOptions(), startOp), + (Test1ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update1, "1", createUpdateOptions(), startOp)); + assertEquals("1", results.updateResult); + assertEquals("1", results.workflowResult); + + results = + assertUpdateWithStart( + Test1ArgWorkflowProc.class, + new Object[] {"1"}, + (Test1ArgWorkflowProc stub) -> new WithStartWorkflowOperation<>(stub::proc1, "1"), + (Test1ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update1, "1", createUpdateOptions(), startOp), + (Test1ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update1, "1", createUpdateOptions(), startOp)); + assertEquals("1", results.updateResult); + assertNull(results.workflowResult); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder( - "update", String.class, new Object[] {1, "Hello Update"}) // untyped! - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); + // 2 args + results = + assertUpdateWithStart( + Test2ArgWorkflowFunc.class, + new Object[] {"1", 2}, + (Test2ArgWorkflowFunc stub) -> new WithStartWorkflowOperation<>(stub::func2, "1", 2), + (Test2ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update2, "1", 2, createUpdateOptions(), startOp), + (Test2ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update2, "1", 2, createUpdateOptions(), startOp)); + assertEquals("12", results.updateResult); + assertEquals("12", results.workflowResult); + + results = + assertUpdateWithStart( + Test2ArgWorkflowProc.class, + new Object[] {"1", 2}, + (Test2ArgWorkflowProc stub) -> new WithStartWorkflowOperation<>(stub::proc2, "1", 2), + (Test2ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update2, "1", 2, createUpdateOptions(), startOp), + (Test2ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update2, "1", 2, createUpdateOptions(), startOp)); + assertEquals("12", results.updateResult); + assertNull(results.workflowResult); - WorkflowUpdateHandle handle1 = - WorkflowClient.updateWithStart(workflow::execute, updateOp); - assertEquals("Hello Update", handle1.getResultAsync().get()); + // 3 args + results = + assertUpdateWithStart( + Test3ArgWorkflowFunc.class, + new Object[] {"1", 2, 3}, + (Test3ArgWorkflowFunc stub) -> new WithStartWorkflowOperation<>(stub::func3, "1", 2, 3), + (Test3ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update3, "1", 2, 3, createUpdateOptions(), startOp), + (Test3ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update3, "1", 2, 3, createUpdateOptions(), startOp)); + assertEquals("123", results.updateResult); + assertEquals("123", results.workflowResult); + + results = + assertUpdateWithStart( + Test3ArgWorkflowProc.class, + new Object[] {"1", 2, 3}, + (Test3ArgWorkflowProc stub) -> new WithStartWorkflowOperation<>(stub::proc3, "1", 2, 3), + (Test3ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update3, "1", 2, 3, createUpdateOptions(), startOp), + (Test3ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update3, "1", 2, 3, createUpdateOptions(), startOp)); + assertEquals("123", results.updateResult); + assertNull(results.workflowResult); - WorkflowUpdateHandle handle2 = updateOp.getUpdateHandle().get(); - assertEquals(updateOp.getResult(), handle2.getResultAsync().get()); + // 4 args + results = + assertUpdateWithStart( + Test4ArgWorkflowFunc.class, + new Object[] {"1", 2, 3, 4}, + (Test4ArgWorkflowFunc stub) -> + new WithStartWorkflowOperation<>(stub::func4, "1", 2, 3, 4), + (Test4ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update4, "1", 2, 3, 4, createUpdateOptions(), startOp), + (Test4ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update4, "1", 2, 3, 4, createUpdateOptions(), startOp)); + assertEquals("1234", results.updateResult); + assertEquals("1234", results.workflowResult); + + results = + assertUpdateWithStart( + Test4ArgWorkflowProc.class, + new Object[] {"1", 2, 3, 4}, + (Test4ArgWorkflowProc stub) -> + new WithStartWorkflowOperation<>(stub::proc4, "1", 2, 3, 4), + (Test4ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update4, "1", 2, 3, 4, createUpdateOptions(), startOp), + (Test4ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update4, "1", 2, 3, 4, createUpdateOptions(), startOp)); + assertEquals("1234", results.updateResult); + assertNull(results.workflowResult); - workflow.complete(); + // 5 args + results = + assertUpdateWithStart( + Test5ArgWorkflowFunc.class, + new Object[] {"1", 2, 3, 4, 5}, + (Test5ArgWorkflowFunc stub) -> + new WithStartWorkflowOperation<>(stub::func5, "1", 2, 3, 4, 5), + (Test5ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update5, "1", 2, 3, 4, 5, createUpdateOptions(), startOp), + (Test5ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update5, "1", 2, 3, 4, 5, createUpdateOptions(), startOp)); + assertEquals("12345", results.updateResult); + assertEquals("12345", results.workflowResult); + + results = + assertUpdateWithStart( + Test5ArgWorkflowProc.class, + new Object[] {"1", 2, 3, 4, 5}, + (Test5ArgWorkflowProc stub) -> + new WithStartWorkflowOperation<>(stub::proc5, "1", 2, 3, 4, 5), + (Test5ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update5, "1", 2, 3, 4, 5, createUpdateOptions(), startOp), + (Test5ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update5, "1", 2, 3, 4, 5, createUpdateOptions(), startOp)); + assertEquals("12345", results.updateResult); + assertNull(results.workflowResult); - assertEquals("Hello Update complete", WorkflowStub.fromTyped(workflow).getResult(String.class)); + // 6 args + results = + assertUpdateWithStart( + Test6ArgWorkflowFunc.class, + new Object[] {"1", 2, 3, 4, 5, 6}, + (Test6ArgWorkflowFunc stub) -> + new WithStartWorkflowOperation<>(stub::func6, "1", 2, 3, 4, 5, 6), + (Test6ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update6, "1", 2, 3, 4, 5, 6, createUpdateOptions(), startOp), + (Test6ArgWorkflowFunc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update6, "1", 2, 3, 4, 5, 6, createUpdateOptions(), startOp)); + assertEquals("123456", results.updateResult); + assertEquals("123456", results.workflowResult); + + results = + assertUpdateWithStart( + Test6ArgWorkflowProc.class, + new Object[] {"1", 2, 3, 4, 5, 6}, + (Test6ArgWorkflowProc stub) -> + new WithStartWorkflowOperation<>(stub::proc6, "1", 2, 3, 4, 5, 6), + (Test6ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.startUpdateWithStart( + stub::update6, "1", 2, 3, 4, 5, 6, createUpdateOptions(), startOp), + (Test6ArgWorkflowProc stub, WithStartWorkflowOperation startOp) -> + WorkflowClient.executeUpdateWithStart( + stub::update6, "1", 2, 3, 4, 5, 6, createUpdateOptions(), startOp)); + assertEquals("123456", results.updateResult); + assertNull(results.workflowResult); } @Test - public void startAndSendUpdateTogetherWithNullUpdateResult() - throws ExecutionException, InterruptedException { + public void nullUpdateResult() throws ExecutionException, InterruptedException { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); - WorkflowOptions options = createOptions(); + WorkflowOptions options = createWorkflowOptions(); TestWorkflows.TestUpdatedWorkflow workflow = workflowClient.newWorkflowStub(TestWorkflows.TestUpdatedWorkflow.class, options); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow::update, "Hello Update") - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); - WorkflowUpdateHandle handle1 = - WorkflowClient.updateWithStart(workflow::execute, updateOp); - assertNull(handle1.getResultAsync().get()); + WorkflowUpdateHandle updHandle = + WorkflowClient.startUpdateWithStart( + workflow::update, "Hello Update", createUpdateOptions(), startOp); - WorkflowUpdateHandle handle2 = updateOp.getUpdateHandle().get(); - assertEquals(updateOp.getResult(), handle2.getResultAsync().get()); + assertNull(updHandle.getResult()); + assertNull(updHandle.getResultAsync().get()); + assertEquals("Hello Update", startOp.getResult()); assertEquals("Hello Update", WorkflowStub.fromTyped(workflow).getResult(String.class)); } @@ -149,191 +364,31 @@ public void onlySendUpdateWhenWorkflowIsAlreadyRunning() WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); // first, start workflow - WorkflowOptions options1 = createOptions(); - TestWorkflows.WorkflowWithUpdate workflow1 = - workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options1); - WorkflowExecution execution1 = WorkflowClient.start(workflow1::execute); - - // then, send Update - WorkflowOptions options2 = - createOptions().toBuilder() + WorkflowOptions options = + createWorkflowOptions().toBuilder() .setWorkflowIdConflictPolicy( WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) - .setWorkflowId(options1.getWorkflowId()) .build(); - TestWorkflows.WorkflowWithUpdate workflow2 = - workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options2); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow2::update, 0, "Hello Update") - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); - - WorkflowUpdateHandle updHandle = - WorkflowClient.updateWithStart(workflow2::execute, updateOp); - assertEquals(execution1.getRunId(), updHandle.getExecution().getRunId()); - assertEquals(updateOp.getResult(), updHandle.getResultAsync().get()); - - workflow2.complete(); - assertEquals( - "Hello Update complete", WorkflowStub.fromTyped(workflow2).getResult(String.class)); - } - - @Test - public void startVariousFuncs() throws ExecutionException, InterruptedException { - WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); - - BiFunction, Integer, UpdateWithStartWorkflowOperation> - newUpdateOp = - (request, input) -> - UpdateWithStartWorkflowOperation.newBuilder(request, input) - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); - - // no arg - TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc stubF = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.TestNoArgsWorkflowFunc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp0 = newUpdateOp.apply(stubF::update, 0); - WorkflowUpdateHandle handle0 = WorkflowClient.updateWithStart(stubF::func, updateOp0); - - // 1 arg - TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc stubF1 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test1ArgWorkflowFunc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp1 = newUpdateOp.apply(stubF1::update, 1); - WorkflowUpdateHandle handle1 = - WorkflowClient.updateWithStart(stubF1::func1, "1", updateOp1); - - // 2 args - TestMultiArgWorkflowFunctions.Test2ArgWorkflowFunc stubF2 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test2ArgWorkflowFunc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp2 = newUpdateOp.apply(stubF2::update, 2); - WorkflowUpdateHandle handle2 = - WorkflowClient.updateWithStart(stubF2::func2, "1", 2, updateOp2); - - // 3 args - TestMultiArgWorkflowFunctions.Test3ArgWorkflowFunc stubF3 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test3ArgWorkflowFunc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp3 = newUpdateOp.apply(stubF3::update, 3); - WorkflowUpdateHandle handle3 = - WorkflowClient.updateWithStart(stubF3::func3, "1", 2, 3, updateOp3); - - // 4 args - TestMultiArgWorkflowFunctions.Test4ArgWorkflowFunc stubF4 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test4ArgWorkflowFunc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp4 = newUpdateOp.apply(stubF4::update, 4); - WorkflowUpdateHandle handle4 = - WorkflowClient.updateWithStart(stubF4::func4, "1", 2, 3, 4, updateOp4); - - // 5 args - TestMultiArgWorkflowFunctions.Test5ArgWorkflowFunc stubF5 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test5ArgWorkflowFunc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp5 = newUpdateOp.apply(stubF5::update, 5); - WorkflowUpdateHandle handle5 = - WorkflowClient.updateWithStart(stubF5::func5, "1", 2, 3, 4, 5, updateOp5); - - // 6 args - TestMultiArgWorkflowFunctions.Test6ArgWorkflowFunc stubF6 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test6ArgWorkflowFunc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp6 = newUpdateOp.apply(stubF6::update, 6); - WorkflowUpdateHandle handle6 = - WorkflowClient.updateWithStart(stubF6::func6, "1", 2, 3, 4, 5, 6, updateOp6); - - assertEquals("0", handle0.getResultAsync().get()); - assertEquals("func", WorkflowStub.fromTyped(stubF).getResult(String.class)); - assertEquals("1", handle1.getResultAsync().get()); - assertEquals("1", WorkflowStub.fromTyped(stubF1).getResult(String.class)); - assertEquals("2", handle2.getResultAsync().get()); - assertEquals("12", WorkflowStub.fromTyped(stubF2).getResult(String.class)); - assertEquals("3", handle3.getResultAsync().get()); - assertEquals("123", WorkflowStub.fromTyped(stubF3).getResult(String.class)); - assertEquals("4", handle4.getResultAsync().get()); - assertEquals("1234", WorkflowStub.fromTyped(stubF4).getResult(String.class)); - assertEquals("5", handle5.getResultAsync().get()); - assertEquals("12345", WorkflowStub.fromTyped(stubF5).getResult(String.class)); - assertEquals("6", handle6.getResultAsync().get()); - assertEquals("123456", WorkflowStub.fromTyped(stubF6).getResult(String.class)); - } - - @Test - public void startVariousProcs() throws ExecutionException, InterruptedException { - WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); - - BiFunction, Integer, UpdateWithStartWorkflowOperation> - newUpdateOp = - (request, input) -> - UpdateWithStartWorkflowOperation.newBuilder(request, input) - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); - - // no arg - TestMultiArgWorkflowFunctions.TestNoArgsWorkflowProc stubProc = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.TestNoArgsWorkflowProc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp0 = newUpdateOp.apply(stubProc::update, 0); - WorkflowUpdateHandle handle0 = - WorkflowClient.updateWithStart(stubProc::proc, updateOp0); - - // 1 arg - TestMultiArgWorkflowFunctions.Test1ArgWorkflowProc stubProc1 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test1ArgWorkflowProc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp1 = newUpdateOp.apply(stubProc1::update, 1); - WorkflowUpdateHandle handle1 = - WorkflowClient.updateWithStart(stubProc1::proc1, "1", updateOp1); + TestWorkflows.WorkflowWithUpdate workflow = + workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); + WorkflowExecution execution = WorkflowClient.start(workflow::execute); - // 2 args - TestMultiArgWorkflowFunctions.Test2ArgWorkflowProc stubProc2 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test2ArgWorkflowProc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp2 = newUpdateOp.apply(stubProc2::update, 2); - WorkflowUpdateHandle handle2 = - WorkflowClient.updateWithStart(stubProc2::proc2, "1", 2, updateOp2); + // then, send update-with-start + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); - // 3 args - TestMultiArgWorkflowFunctions.Test3ArgWorkflowProc stubProc3 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test3ArgWorkflowProc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp3 = newUpdateOp.apply(stubProc3::update, 3); - WorkflowUpdateHandle handle3 = - WorkflowClient.updateWithStart(stubProc3::proc3, "1", 2, 3, updateOp3); + WorkflowUpdateHandle updHandle = + WorkflowClient.startUpdateWithStart( + workflow::update, 0, "Hello Update", createUpdateOptions(), startOp); - // 4 args - TestMultiArgWorkflowFunctions.Test4ArgWorkflowProc stubProc4 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test4ArgWorkflowProc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp4 = newUpdateOp.apply(stubProc4::update, 4); - WorkflowUpdateHandle handle4 = - WorkflowClient.updateWithStart(stubProc4::proc4, "1", 2, 3, 4, updateOp4); + assertEquals(execution.getRunId(), updHandle.getExecution().getRunId()); + assertEquals("Hello Update", updHandle.getResult()); + assertEquals("Hello Update", updHandle.getResultAsync().get()); - // 5 args - TestMultiArgWorkflowFunctions.Test5ArgWorkflowProc stubProc5 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test5ArgWorkflowProc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp5 = newUpdateOp.apply(stubProc5::update, 5); - WorkflowUpdateHandle handle5 = - WorkflowClient.updateWithStart(stubProc5::proc5, "1", 2, 3, 4, 5, updateOp5); + workflow.complete(); - // 6 args - TestMultiArgWorkflowFunctions.Test6ArgWorkflowProc stubProc6 = - workflowClient.newWorkflowStub( - TestMultiArgWorkflowFunctions.Test6ArgWorkflowProc.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp6 = newUpdateOp.apply(stubProc6::update, 6); - WorkflowUpdateHandle handle6 = - WorkflowClient.updateWithStart(stubProc6::proc6, "1", 2, 3, 4, 5, 6, updateOp6); - - assertEquals("0", handle0.getResultAsync().get()); - assertEquals("1", handle1.getResultAsync().get()); - assertEquals("2", handle2.getResultAsync().get()); - assertEquals("3", handle3.getResultAsync().get()); - assertEquals("4", handle4.getResultAsync().get()); - assertEquals("5", handle5.getResultAsync().get()); - assertEquals("6", handle6.getResultAsync().get()); + assertEquals("Hello Update complete", startOp.getResult()); + assertEquals("Hello Update complete", WorkflowStub.fromTyped(workflow).getResult(String.class)); } @Test @@ -384,23 +439,71 @@ public void retryUntilDurable() { WorkflowClient workflowClient = WorkflowClient.newInstance(client, WorkflowClientOptions.newBuilder().build()); - String workflowId = UUID.randomUUID().toString(); - WorkflowOptions options = - WorkflowOptions.newBuilder() - .setTaskQueue(testWorkflowRule.getTaskQueue()) - .setWorkflowId(workflowId) - .build(); TestWorkflows.WorkflowWithUpdate workflow = - workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow::update, 0, "Hello Update") - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); + workflowClient.newWorkflowStub( + TestWorkflows.WorkflowWithUpdate.class, createWorkflowOptions()); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); + WorkflowUpdateHandle updHandle = + WorkflowClient.startUpdateWithStart( + workflow::update, 0, "Hello Update", createUpdateOptions(), startOp); + + assertEquals("run_id", updHandle.getExecution().getRunId()); + verify(blockingStub, times(2)).executeMultiOperation(any()); + } + + @Test + public void handleSuccessfulStartButUpdateOnlyErr() { + WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub = + mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); + when(blockingStub.withOption(any(), any())).thenReturn(blockingStub); + when(blockingStub.withDeadline(any())).thenReturn(blockingStub); + + Scope scope = mock(Scope.class); + when(scope.tagged(any())).thenReturn(scope); + + WorkflowServiceStubs client = mock(WorkflowServiceStubs.class); + when(client.getServerCapabilities()) + .thenReturn(() -> GetSystemInfoResponse.Capabilities.newBuilder().build()); + when(client.blockingStub()).thenReturn(blockingStub); + when(client.getOptions()) + .thenReturn(WorkflowServiceStubsOptions.newBuilder().setMetricsScope(scope).build()); + + // This is expected to be very rare, but possible. It occurs when one step was successful, + // but the step had an unexpected server-side error that could not be remedied by retries. + // Using "Unimplemented" to skip client retries + when(blockingStub.executeMultiOperation(any())) + .thenThrow( + // successful start, failed update + StatusUtils.newException( + Status.UNIMPLEMENTED.withDescription("MultiOperation could not be executed"), + MultiOperationExecutionFailure.newBuilder() + .addStatuses( + MultiOperationExecutionFailure.OperationStatus.newBuilder() + .setMessage("") + .setCode(Status.Code.OK.value())) + .addStatuses( + MultiOperationExecutionFailure.OperationStatus.newBuilder() + .setMessage("internal error") + .setCode(Status.Code.UNIMPLEMENTED.value())) + .build(), + MultiOperationExecutionFailure.getDescriptor())); - WorkflowUpdateHandle updateHandle = - WorkflowClient.updateWithStart(workflow::execute, updateOp); + WorkflowClient workflowClient = + WorkflowClient.newInstance(client, WorkflowClientOptions.newBuilder().build()); + WorkflowOptions options = createWorkflowOptions(); + TestWorkflows.WorkflowWithUpdate workflow = + workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); - assertEquals("run_id", updateHandle.getExecution().getRunId()); + try { + WorkflowClient.startUpdateWithStart( + workflow::update, 0, "Hello Update", createUpdateOptions(), startOp); + fail("unreachable"); + } catch (WorkflowServiceException e) { + assertEquals(e.getCause().getMessage(), "UNIMPLEMENTED: internal error"); + } } @Test @@ -409,18 +512,11 @@ public void timeoutError() { testWorkflowRule.getTestEnvironment().awaitTermination(5, TimeUnit.SECONDS); WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); - String workflowId = UUID.randomUUID().toString(); - WorkflowOptions options = - WorkflowOptions.newBuilder() - .setTaskQueue(testWorkflowRule.getTaskQueue()) - .setWorkflowId(workflowId) - .build(); + WorkflowOptions options = createWorkflowOptions(); TestWorkflows.WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow::update, 0, "Hello Update") - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); final AtomicReference exception = new AtomicReference<>(); ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); @@ -431,11 +527,19 @@ public void timeoutError() { exception.set( assertThrows( WorkflowServiceException.class, - () -> WorkflowClient.updateWithStart(workflow::execute, updateOp)))); - assertEquals(workflowId, exception.get().getExecution().getWorkflowId()); + () -> + WorkflowClient.startUpdateWithStart( + workflow::update, + 0, + "Hello Update", + UpdateOptions.newBuilder(String.class) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(), + startOp)))); + assertEquals(options.getWorkflowId(), exception.get().getExecution().getWorkflowId()); WorkflowServiceException cause = (WorkflowUpdateTimeoutOrCancelledException) exception.get().getCause(); - assertEquals(workflowId, cause.getExecution().getWorkflowId()); + assertEquals(options.getWorkflowId(), cause.getExecution().getWorkflowId()); } @Test @@ -443,25 +547,31 @@ public void failWhenWorkflowAlreadyRunning() { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); // first, start workflow - WorkflowOptions options1 = createOptions(); + WorkflowOptions options1 = createWorkflowOptions(); TestWorkflows.WorkflowWithUpdate workflow1 = workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options1); WorkflowClient.start(workflow1::execute); - // then, send Update-with-Start + // then, send update-with-start WorkflowOptions options2 = - createOptions().toBuilder().setWorkflowId(options1.getWorkflowId()).build(); + createWorkflowOptions().toBuilder().setWorkflowId(options1.getWorkflowId()).build(); TestWorkflows.WorkflowWithUpdate workflow2 = workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options2); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow2::update, 0, "Hello Update") - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow2::execute); WorkflowServiceException exception = assertThrows( WorkflowServiceException.class, - () -> WorkflowClient.updateWithStart(workflow2::execute, updateOp)); + () -> + WorkflowClient.startUpdateWithStart( + workflow2::update, + 0, + "Hello Update", + UpdateOptions.newBuilder(String.class) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(), + startOp)); StatusRuntimeException cause = (StatusRuntimeException) exception.getCause(); assertEquals(Status.ALREADY_EXISTS.getCode(), cause.getStatus().getCode()); } @@ -471,43 +581,91 @@ public void failWhenUpdatedIsRejected() { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); TestWorkflows.WorkflowWithUpdate workflow = - workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow::update, -1, "Hello Update") - .setWaitForStage(WorkflowUpdateStage.ACCEPTED) - .build(); + workflowClient.newWorkflowStub( + TestWorkflows.WorkflowWithUpdate.class, createWorkflowOptions()); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); assertThrows( WorkflowUpdateException.class, - () -> WorkflowClient.updateWithStart(workflow::execute, updateOp).getResult()); + () -> + WorkflowClient.startUpdateWithStart( + workflow::update, + -1, // cause for rejection + "Hello Update", + UpdateOptions.newBuilder(String.class) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(), + startOp) + .getResult()); } @Test - public void failWhenUpdateOperationUsedAgain() { + public void failWhenStartOperationUsedAgain() { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); TestWorkflows.WorkflowWithUpdate workflow = - workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, createOptions()); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow::update, 0, "Hello Update") - .setWaitForStage(WorkflowUpdateStage.ACCEPTED) - .build(); - WorkflowClient.updateWithStart(workflow::execute, updateOp); + workflowClient.newWorkflowStub( + TestWorkflows.WorkflowWithUpdate.class, createWorkflowOptions()); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); + WorkflowClient.startUpdateWithStart( + workflow::update, + 0, + "Hello Update", + UpdateOptions.newBuilder(String.class) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(), + startOp); try { - WorkflowClient.updateWithStart(workflow::execute, updateOp); + WorkflowClient.startUpdateWithStart( + workflow::update, + 0, + "Hello Update", + UpdateOptions.newBuilder(String.class) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(), + startOp); // re-use same `startOp` fail("unreachable"); } catch (IllegalStateException e) { - assertEquals(e.getMessage(), "UpdateWithStartWorkflowOperation was already executed"); + assertEquals(e.getMessage(), "WithStartWorkflowOperation was already executed"); + } + } + + @Test + public void failWhenUpdateNamesDoNotMatch() { + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + + WorkflowOptions options = createWorkflowOptions(); + TestWorkflows.TestUpdatedWorkflow workflow = + workflowClient.newWorkflowStub(TestWorkflows.TestUpdatedWorkflow.class, options); + + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); + + try { + WorkflowClient.startUpdateWithStart( + workflow::update, + "Hello Update", + createUpdateOptions().toBuilder() + .setUpdateName("custom_update_name") // custom name! + .build(), + startOp); + fail("unreachable"); + } catch (IllegalArgumentException e) { + assertEquals( + e.getMessage(), + "Update name in the options doesn't match the method name: custom_update_name != testUpdate"); } } @Test - public void failServerSideWhenStartOptionIsInvalid() { + public void failServerSideWhenStartIsInvalid() { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); WorkflowOptions options = // using invalid reuse/conflict policies - createOptions().toBuilder() + createWorkflowOptions().toBuilder() .setWorkflowIdConflictPolicy( WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) .setWorkflowIdReusePolicy( @@ -515,13 +673,18 @@ public void failServerSideWhenStartOptionIsInvalid() { .build(); TestWorkflows.WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow::update, 0, "Hello Update") - .setWaitForStage(WorkflowUpdateStage.ACCEPTED) - .build(); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); try { - WorkflowClient.updateWithStart(workflow::execute, updateOp); + WorkflowClient.startUpdateWithStart( + workflow::update, + 0, + "Hello Update", + UpdateOptions.newBuilder(String.class) + .setWaitForStage(WorkflowUpdateStage.ACCEPTED) + .build(), + startOp); fail("unreachable"); } catch (WorkflowServiceException e) { assertTrue( @@ -532,42 +695,176 @@ public void failServerSideWhenStartOptionIsInvalid() { } @Test - public void failClientSideWhenUpdateOptionIsInvalid() { + public void failServerSideWhenUpdateIsInvalid() { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); - WorkflowOptions options = createOptions(); + WorkflowOptions options = createWorkflowOptions().toBuilder().build(); TestWorkflows.WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); - UpdateWithStartWorkflowOperation updateOp = // without wait stage - UpdateWithStartWorkflowOperation.newBuilder(workflow::update, 0, "Hello Update").build(); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); try { - WorkflowClient.updateWithStart(workflow::execute, updateOp); + WorkflowClient.startUpdateWithStart( + workflow::update, + 0, + "Hello Update", + UpdateOptions.newBuilder(String.class) + .setWaitForStage(WorkflowUpdateStage.ACCEPTED) + .setFirstExecutionRunId(UUID.randomUUID().toString()) // using invalid option + .build(), + startOp); fail("unreachable"); } catch (WorkflowServiceException e) { - assertEquals(e.getCause().getMessage(), "waitForStage must not be null"); + assertTrue(e.getCause().getMessage().contains("FirstExecutionRunId")); } ensureNoWorkflowStarted(workflowClient, options.getWorkflowId()); } @Test - public void failWhenUsingNonUpdateMethod() { + public void failClientSideWhenUpdateIsInvalid() { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); - WorkflowOptions options = createOptions(); + WorkflowOptions options = createWorkflowOptions(); TestWorkflows.WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(workflow::execute) // incorrect! - .setWaitForStage(WorkflowUpdateStage.ACCEPTED) + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); + + try { + WorkflowClient.startUpdateWithStart( + workflow::update, + 0, + "Hello Update", + UpdateOptions.newBuilder(String.class).build(), // invalid + startOp); + fail("unreachable"); + } catch (IllegalStateException e) { + assertEquals(e.getMessage(), "waitForStage must not be null"); + } + + ensureNoWorkflowStarted(workflowClient, options.getWorkflowId()); + } + + @Test + public void failWhenWorkflowOptionsIsMissing() { + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + + WorkflowOptions options = createWorkflowOptions(); + WorkflowStub workflow = + workflowClient.newUntypedWorkflowStub("workflow-id"); // no WorkflowOptions! + UpdateOptions updateOptions = + UpdateOptions.newBuilder(String.class) + .setUpdateName("update") + .setResultClass(String.class) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(); + + try { + workflow.startUpdateWithStart( + updateOptions, new Object[] {0, "Hello Update"}, new Object[] {}); + } catch (IllegalStateException e) { + assertEquals(e.getMessage(), "Required parameter WorkflowOptions is missing in WorkflowStub"); + } + + ensureNoWorkflowStarted(workflowClient, options.getWorkflowId()); + } + + @Test + public void failWhenConflictPolicyIsMissing() { + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + + WorkflowStub workflowStub = + workflowClient.newUntypedWorkflowStub( + TestWorkflows.WorkflowWithUpdate.class.getSimpleName(), + SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()) + .toBuilder() // no WorkflowIdConflictPolicy! + .setWorkflowId(UUID.randomUUID().toString()) + .build()); + + UpdateOptions updateOptions = + UpdateOptions.newBuilder(String.class) + .setUpdateName("update") + .setResultClass(String.class) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) .build(); try { - WorkflowClient.updateWithStart(workflow::execute, updateOp); + workflowStub.startUpdateWithStart( + updateOptions, new Object[] {0, "Hello Update"}, new Object[] {}); + } catch (IllegalStateException e) { + assertEquals( + e.getMessage(), + "WorkflowIdConflictPolicy is required in WorkflowOptions for Update-With-Start"); + } + } + + @Test + public void failWhenWaitPolicyIsIncompatible() { + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + + WorkflowOptions options = createWorkflowOptions(); + TestWorkflows.WorkflowWithUpdate workflow = + workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); + + // typed + try { + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); + WorkflowClient.executeUpdateWithStart( + workflow::update, + 0, + "Hello Update", + UpdateOptions.newBuilder(String.class) + .setWaitForStage( + WorkflowUpdateStage.ACCEPTED) // incompatible with `executeUpdateWithStart` + .build(), + startOp); fail("unreachable"); } catch (IllegalArgumentException e) { - assertEquals(e.getMessage(), "Method 'execute' is not an UpdateMethod"); + assertEquals(e.getMessage(), "waitForStage must be unspecified or COMPLETED"); + } + + // untyped + try { + WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow); + workflowStub.executeUpdateWithStart( + UpdateOptions.newBuilder(String.class) + .setUpdateName("update") + .setWaitForStage( + WorkflowUpdateStage.ADMITTED) // incompatible with `executeUpdateWithStart` + .build(), + new Object[] {0, "Hello Update"}, + new Object[] {}); + fail("unreachable"); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), "waitForStage must be unspecified or COMPLETED"); + } + + ensureNoWorkflowStarted(workflowClient, options.getWorkflowId()); + } + + @Test + public void failWhenUsingNonUpdateMethod() { + WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); + + WorkflowOptions options = createWorkflowOptions(); + TestWorkflows.WorkflowWithUpdate workflow = + workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); + + try { + WorkflowClient.startUpdateWithStart( + workflow::execute, // incorrect! + UpdateOptions.newBuilder(String.class) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(), + startOp); + fail("unreachable"); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), "Method 'execute' is not an @UpdateMethod"); } ensureNoWorkflowStarted(workflowClient, options.getWorkflowId()); @@ -577,20 +874,24 @@ public void failWhenUsingNonUpdateMethod() { public void failWhenUsingNonStartMethod() { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); - WorkflowOptions options = createOptions(); + WorkflowOptions options = createWorkflowOptions(); TestWorkflows.WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder( - workflow::update, 0, "Hello Update") // incorrect! - .setWaitForStage(WorkflowUpdateStage.ACCEPTED) - .build(); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::update, 0, "Hello Update"); // incorrect! try { - WorkflowClient.updateWithStart(workflow::update, 0, "Hello Update", updateOp); + WorkflowClient.startUpdateWithStart( + workflow::update, + 0, + "Hello Update", + UpdateOptions.newBuilder(String.class) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(), + startOp); fail("unreachable"); } catch (IllegalArgumentException e) { - assertEquals(e.getMessage(), "Method 'update' is not a WorkflowMethod"); + assertEquals(e.getMessage(), "Method 'update' is not a @WorkflowMethod"); } ensureNoWorkflowStarted(workflowClient, options.getWorkflowId()); @@ -600,22 +901,26 @@ public void failWhenUsingNonStartMethod() { public void failWhenMixingStubs() { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); - WorkflowOptions options = createOptions(); + WorkflowOptions options = createWorkflowOptions(); TestWorkflows.TestUpdatedWorkflow stub1 = workflowClient.newWorkflowStub(TestWorkflows.TestUpdatedWorkflow.class, options); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder(stub1::update, "Hello Update") - .setWaitForStage(WorkflowUpdateStage.ACCEPTED) - .build(); + WithStartWorkflowOperation startOp = new WithStartWorkflowOperation<>(stub1::execute); TestWorkflows.WorkflowWithUpdate stub2 = workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); try { - WorkflowClient.updateWithStart(stub2::execute, updateOp); + WorkflowClient.startUpdateWithStart( + stub2::update, + 0, + "Hello Update", + UpdateOptions.newBuilder(String.class) + .setWaitForStage(WorkflowUpdateStage.COMPLETED) + .build(), + startOp); // for stub1! fail("unreachable"); } catch (IllegalArgumentException e) { assertEquals( - e.getMessage(), "UpdateWithStartWorkflowOperation invoked on different workflow stubs"); + e.getMessage(), "WithStartWorkflowOperation invoked on different workflow stubs"); } ensureNoWorkflowStarted(workflowClient, options.getWorkflowId()); @@ -630,9 +935,14 @@ private static void ensureNoWorkflowStarted(WorkflowClient workflowClient, Strin } } - private WorkflowOptions createOptions() { + private UpdateOptions createUpdateOptions() { + return UpdateOptions.newBuilder().setWaitForStage(WorkflowUpdateStage.COMPLETED).build(); + } + + private WorkflowOptions createWorkflowOptions() { return SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()) .toBuilder() + .setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL) .setWorkflowId(UUID.randomUUID().toString()) .build(); } @@ -689,4 +999,14 @@ public void update(String arg) { this.state = arg; } } + + static class Results { + final Object workflowResult; + final Object updateResult; + + public Results(Object workflowResult, Object updateResult) { + this.workflowResult = workflowResult; + this.updateResult = updateResult; + } + } } diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/StatusUtils.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/StatusUtils.java index 4df4e2a4f..a82e89cdc 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/StatusUtils.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/StatusUtils.java @@ -66,7 +66,7 @@ public static T getFailure( } } catch (InvalidProtocolBufferException e) { throw new IllegalArgumentException( - "failure getting grcp failure of " + failureType + " from " + details, e); + "Failure to construct gRPC error of type " + failureType + " from " + details, e); } return null; } diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index 6d26ed8c4..d455d8bc1 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -110,6 +110,16 @@ public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServi private final InProcessGRPCServer inProcessServer; private final WorkflowServiceStubs workflowServiceStubs; + private final MultiOperationExecutionFailure.OperationStatus abortedOperation = + MultiOperationExecutionFailure.OperationStatus.newBuilder() + .setCode(Status.ABORTED.getCode().value()) + .setMessage("Operation was aborted.") + .addDetails( + ProtoUtils.packAny( + MultiOperationExecutionAborted.newBuilder().build(), + MultiOperationExecutionAborted.getDescriptor())) + .build(); + TestWorkflowService( TestWorkflowStore store, TestVisibilityStore visibilityStore, @@ -1144,7 +1154,7 @@ public void executeMultiOperation( .setCode(Status.INVALID_ARGUMENT.getCode().value()) .setMessage("INVALID_ARGUMENT: CronSchedule is not allowed.") .build(), - null); + abortedOperation); // updated aborted } if (startRequest.getRequestEagerExecution()) { @@ -1153,7 +1163,16 @@ public void executeMultiOperation( .setCode(Status.INVALID_ARGUMENT.getCode().value()) .setMessage("INVALID_ARGUMENT: RequestEagerExecution is not supported.") .build(), - null); + abortedOperation); // update aborted + } + + if (startRequest.hasWorkflowStartDelay()) { + throw multiOperationExecutionFailure( + MultiOperationExecutionFailure.OperationStatus.newBuilder() + .setCode(Status.INVALID_ARGUMENT.getCode().value()) + .setMessage("INVALID_ARGUMENT: WorkflowStartDelay is not supported.") + .build(), + abortedOperation); // update aborted } UpdateWorkflowExecutionRequest updateRequest; @@ -1167,7 +1186,7 @@ public void executeMultiOperation( if (!updateRequest.getWorkflowExecution().getRunId().isEmpty()) { throw multiOperationExecutionFailure( - null, // start aborted + abortedOperation, // start aborted MultiOperationExecutionFailure.OperationStatus.newBuilder() .setCode(Status.INVALID_ARGUMENT.getCode().value()) .setMessage("INVALID_ARGUMENT: RunId is not allowed.") @@ -1176,7 +1195,7 @@ public void executeMultiOperation( if (!updateRequest.getFirstExecutionRunId().isEmpty()) { throw multiOperationExecutionFailure( - null, // start aborted + abortedOperation, // start aborted MultiOperationExecutionFailure.OperationStatus.newBuilder() .setCode(Status.INVALID_ARGUMENT.getCode().value()) .setMessage("INVALID_ARGUMENT: FirstExecutionRunId is not allowed.") @@ -1187,23 +1206,14 @@ public void executeMultiOperation( .getWorkflowId() .equals(updateRequest.getWorkflowExecution().getWorkflowId())) { throw multiOperationExecutionFailure( - null, // start aborted + abortedOperation, // start aborted MultiOperationExecutionFailure.OperationStatus.newBuilder() .setCode(Status.INVALID_ARGUMENT.getCode().value()) .setMessage( - "INVALID_ARGUMENT: WorkflowId is not consistent with previous operation(s)") + "INVALID_ARGUMENT: Update operation's WorkflowId is not consistent with Start operation's WorkflowId") .build()); } - if (startRequest.hasWorkflowStartDelay()) { - throw multiOperationExecutionFailure( - MultiOperationExecutionFailure.OperationStatus.newBuilder() - .setCode(Status.INVALID_ARGUMENT.getCode().value()) - .setMessage("INVALID_ARGUMENT: WorkflowStartDelay is not supported.") - .build(), - null); - } - @Nullable Deadline deadline = getUpdatePollDeadline(); AtomicReference updateHandle = @@ -1214,7 +1224,7 @@ public void executeMultiOperation( updateHandle.set(ms.updateWorkflowExecution(updateRequest, deadline)); } catch (StatusRuntimeException e) { throw multiOperationExecutionFailure( - null, // ie start aborted + abortedOperation, // ie start aborted MultiOperationExecutionFailure.OperationStatus.newBuilder() .setCode(e.getStatus().getCode().value()) .setMessage(e.getMessage()) @@ -1237,7 +1247,7 @@ public void executeMultiOperation( .setCode(e.getStatus().getCode().value()) .setMessage(e.getMessage()) .build(), - null); // ie update aborted + abortedOperation); // ie update aborted } // if the workflow wasn't started, only send the Update request @@ -1269,19 +1279,8 @@ public void executeMultiOperation( private StatusRuntimeException multiOperationExecutionFailure( MultiOperationExecutionFailure.OperationStatus... operationStatuses) { Status status = null; - for (int i = 0; i < operationStatuses.length; i++) { - MultiOperationExecutionFailure.OperationStatus operationStatus = operationStatuses[i]; - if (operationStatus == null) { - // convert to aborted failure - operationStatuses[i] = - MultiOperationExecutionFailure.OperationStatus.newBuilder() - .setCode(Status.ABORTED.getCode().value()) - .setMessage("Operation was aborted.") - .addDetails( - ProtoUtils.packAny( - MultiOperationExecutionAborted.newBuilder().build(), - MultiOperationExecutionAborted.getDescriptor())) - .build(); + for (MultiOperationExecutionFailure.OperationStatus operationStatus : operationStatuses) { + if (operationStatus == abortedOperation) { continue; } if (status != null) { diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/MultiOperationTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/MultiOperationTest.java index f21b5ac7e..0602e7cef 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/MultiOperationTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/MultiOperationTest.java @@ -29,6 +29,7 @@ import io.temporal.api.common.v1.WorkflowType; import io.temporal.api.enums.v1.TaskQueueKind; import io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage; +import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; import io.temporal.api.errordetails.v1.MultiOperationExecutionFailure; import io.temporal.api.taskqueue.v1.TaskQueue; import io.temporal.api.update.v1.Input; @@ -60,21 +61,22 @@ public class MultiOperationTest { @Test public void startAndUpdate() throws ExecutionException, InterruptedException { WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient(); - String workflowId = UUID.randomUUID().toString(); WorkflowOptions options = WorkflowOptions.newBuilder() + .setWorkflowIdConflictPolicy(WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL) .setTaskQueue(testWorkflowRule.getTaskQueue()) - .setWorkflowId(workflowId) + .setWorkflowId(UUID.randomUUID().toString()) .build(); TestWorkflows.WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdate.class, options); - UpdateWithStartWorkflowOperation updateOp = - UpdateWithStartWorkflowOperation.newBuilder( - workflow::update, TestWorkflows.UpdateType.COMPLETE) - .setWaitForStage(WorkflowUpdateStage.COMPLETED) - .build(); + WithStartWorkflowOperation startOp = + new WithStartWorkflowOperation<>(workflow::execute); WorkflowUpdateHandle updHandle = - WorkflowClient.updateWithStart(workflow::execute, updateOp); + WorkflowClient.startUpdateWithStart( + workflow::update, + TestWorkflows.UpdateType.COMPLETE, + UpdateOptions.newBuilder().setWaitForStage(WorkflowUpdateStage.COMPLETED).build(), + startOp); assertNull(updHandle.getResultAsync().get()); } diff --git a/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java index da38dbdf1..9766d9e7e 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TimeLockingInterceptor.java @@ -75,9 +75,15 @@ public WorkflowExecution start(Object... args) { } @Override - public WorkflowUpdateHandle updateWithStart( - UpdateWithStartWorkflowOperation updateOperation, Object... args) { - return next.updateWithStart(updateOperation, args); + public WorkflowUpdateHandle startUpdateWithStart( + UpdateOptions options, Object[] updateArgs, Object[] startArgs) { + return next.startUpdateWithStart(options, updateArgs, startArgs); + } + + @Override + public R executeUpdateWithStart( + UpdateOptions updateOptions, Object[] updateArgs, Object[] startArgs) { + return next.executeUpdateWithStart(updateOptions, updateArgs, startArgs); } @Override