Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve WaitForStateCompletion API with strongly typing experience, and rename getSimpleWorkflowResultWithWait to waitForWorkflowCompletion #216

Merged
merged 4 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 86 additions & 10 deletions src/main/java/io/iworkflow/core/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.iworkflow.core.mapper.StateMovementMapper;
import io.iworkflow.core.persistence.PersistenceOptions;
import io.iworkflow.gen.models.ErrorSubStatus;
import io.iworkflow.gen.models.ExecuteApiFailurePolicy;
import io.iworkflow.gen.models.KeyValue;
import io.iworkflow.gen.models.SearchAttribute;
Expand Down Expand Up @@ -251,16 +252,38 @@ private List<SearchAttribute> convertToSearchAttributeList(final Map<String, Sea
}

/**
* A long poll API to wait for the workflow completion
* If the workflow is not COMPLETED, throw the {@link WorkflowUncompletedException}.
*
* @param workflowId required, the workflowId
*/
public void waitForWorkflowCompletion(
final String workflowId) {
this.getSimpleWorkflowResultWithWait(Void.class, workflowId);
}

/**
* A long poll API to wait for the workflow completion
* For most cases, a workflow only has one result(one completion state).
* Use this API to retrieve the output of the state with waiting for the workflow to complete.
* If the workflow is not COMPLETED, throw the {@link feign.FeignException.FeignClientException}.
* If the workflow is not COMPLETED, throw the {@link WorkflowUncompletedException}.
*
* @param valueClass required, the type class of the output
* @param workflowId required, the workflowId
* @param workflowRunId optional, can be empty
* @param <T> type of the output
* @return the output result
*/
public <T> T waitForWorkflowCompletion(
final Class<T> valueClass,
final String workflowId) {
return this.getSimpleWorkflowResultWithWait(valueClass, workflowId);
}

/**
* Use {@link #waitForWorkflowCompletion(Class, String)} instead
* It's just a renaming.
*/
@Deprecated
public <T> T getSimpleWorkflowResultWithWait(
final Class<T> valueClass,
final String workflowId,
Expand All @@ -269,15 +292,10 @@ public <T> T getSimpleWorkflowResultWithWait(
}

/**
* For most cases, a workflow only has one result(one completion state).
* Use this API to retrieve the output of the state with waiting for the workflow to complete.
* If the workflow is not COMPLETED, throw the {@link feign.FeignException.FeignClientException}.
*
* @param valueClass required, the type class of the output
* @param workflowId required, the workflowId
* @param <T> type of the output
* @return the output result
* Use {@link #waitForWorkflowCompletion(Class, String)} instead
* It's just a renaming.
*/
@Deprecated
public <T> T getSimpleWorkflowResultWithWait(
final Class<T> valueClass,
final String workflowId) {
Expand Down Expand Up @@ -922,6 +940,64 @@ public void skipTimer(
unregisteredClient.skipTimer(workflowId, workflowRunId, workflowStateId, stateExecutionNumber, timerCommandIndex);
}

/**
* A long poll API to wait for the completion of the state. This only waits for the first completion.
* Note 1 The stateCompletion to wait for is needed to registered on starting workflow due to limitation in https://github.com/indeedeng/iwf/issues/349
* Note 2 The max polling time is configured as clientOptions as the Feign client timeout(default to 10s)
* If the state is not COMPLETED, throw the {@link ClientSideException} with the sub status of {@link ErrorSubStatus#LONG_POLL_TIME_OUT_SUB_STATUS}
* @param workflowId the workflowId
* @param stateClass the state class.
*/
public void waitForStateExecutionCompletion(
final String workflowId,
final Class<? extends WorkflowState> stateClass) {
this.waitForStateExecutionCompletion(Void.class, workflowId, stateClass, 1);
}

/**
* A long poll API to wait for the completion of the state. This only waits for the first completion.
* Note 1 The stateCompletion to wait for is needed to registered on starting workflow due to limitation in https://github.com/indeedeng/iwf/issues/349
* Note 2 The max polling time is configured as clientOptions as the Feign client timeout(default to 10s)
* If the state is not COMPLETED, throw the {@link ClientSideException} with the sub status of {@link ErrorSubStatus#LONG_POLL_TIME_OUT_SUB_STATUS}
* @param valueClass the result of the state completion. Could be Void if not interested
* @param workflowId the workflowId
* @param stateClass the state class.
* @return the result of the state completion
* @param <T> the result type of the state completion
*/
public <T> T waitForStateExecutionCompletion(
final Class<T> valueClass,
final String workflowId,
final Class<? extends WorkflowState> stateClass) {
return this.waitForStateExecutionCompletion(valueClass, workflowId, stateClass, 1);
}

/**
* A long poll API to wait for the completion of the state. This only waits for the first completion.
* Note 1 The stateCompletion and stateExecutionNumber to wait for must be registered on starting workflow due to limitation in https://github.com/indeedeng/iwf/issues/349
* Note 2 The max polling time is configured as clientOptions as the Feign client timeout(default to 10s)
* If the state is not COMPLETED, throw the {@link ClientSideException} with the sub status of {@link ErrorSubStatus#LONG_POLL_TIME_OUT_SUB_STATUS}
* @param valueClass the result of the state completion. Could be Void if not interested
* @param workflowId the workflowId
* @param stateClass the state class
* @param stateExecutionNumber the state execution number. E.g. if it's 2, it means the 2nd execution of the state
* @return the result of the state completion
* @param <T> the result type of the state completion
*/
public <T> T waitForStateExecutionCompletion(
final Class<T> valueClass,
final String workflowId,
final Class<? extends WorkflowState> stateClass,
final int stateExecutionNumber) {
final String stateExecutionId= WorkflowState.getStateExecutionId(stateClass, stateExecutionNumber);
return unregisteredClient.waitForStateExecutionCompletion(valueClass, workflowId, stateExecutionId);
}


/**
* This method is deprecated, use the method with stateClass instead for strongly typing experience
*/
@Deprecated
public <T> T waitForStateExecutionCompletion(
final Class<T> valueClass,
final String workflowId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.iworkflow.core;

import java.util.Arrays;

/**
* This class is for extending {@link ImmutableWorkflowOptions.Builder} to provide a
* better experience with strongly typing.
*/
public class WorkflowOptionBuilderExtension {
private ImmutableWorkflowOptions.Builder builder = ImmutableWorkflowOptions.builder();

/**
* Add a state to wait for completion. This only waiting for the first completion of the state
* @param states The states to wait for completion. O
* @return The builder.
*/
public WorkflowOptionBuilderExtension WaitForCompletionStates(Class<? extends WorkflowState> ...states) {
Arrays.stream(states).forEach(
state -> builder.addWaitForCompletionStateExecutionIds(
WorkflowState.getStateExecutionId(state,1)
));
return this;
}

/**
* Add a state to wait for completion. This can wait for any times completion of the state
* @param state The state to wait for completion.
* @param number The number of the state completion to wait for. E.g. when it's 2, it's waiting for the second completion of the state.
* @return The builder.
*/
public WorkflowOptionBuilderExtension WaitForCompletionStateWithNumber(Class<? extends WorkflowState> state, int number) {
builder.addWaitForCompletionStateExecutionIds(
WorkflowState.getStateExecutionId(state, number)
);
return this;
}

public ImmutableWorkflowOptions.Builder getBuilder() {
return builder;
}
}
8 changes: 8 additions & 0 deletions src/main/java/io/iworkflow/core/WorkflowOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,12 @@ public abstract class WorkflowOptions {
public abstract Optional<WorkflowConfig> getWorkflowConfigOverride();

public abstract List<String> getWaitForCompletionStateExecutionIds();

public static WorkflowOptionBuilderExtension extendedBuilder() {
return new WorkflowOptionBuilderExtension();
}

public static ImmutableWorkflowOptions.Builder basicBuilder() {
return ImmutableWorkflowOptions.builder();
}
}
4 changes: 4 additions & 0 deletions src/main/java/io/iworkflow/core/WorkflowState.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ static boolean shouldSkipWaitUntil(final WorkflowState state) {
}
return false;
}

static String getStateExecutionId(Class<? extends WorkflowState> state, int number) {
return String.format("%s-%d", state.getSimpleName(), number);
}
}


11 changes: 8 additions & 3 deletions src/test/java/io/iworkflow/integ/TimerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import io.iworkflow.core.Client;
import io.iworkflow.core.ClientOptions;
import io.iworkflow.core.ImmutableWorkflowOptions;
import io.iworkflow.core.WorkflowOptionBuilderExtension;
import io.iworkflow.core.WorkflowOptions;
import io.iworkflow.integ.timer.BasicTimerWorkflow;
import io.iworkflow.integ.timer.BasicTimerWorkflowState1;
import io.iworkflow.spring.TestSingletonWorkerService;
import io.iworkflow.spring.controller.WorkflowRegistry;
import org.junit.jupiter.api.Assertions;
Expand All @@ -28,10 +31,12 @@ public void testBasicTimerWorkflow() throws InterruptedException {

client.startWorkflow(
BasicTimerWorkflow.class, wfId, 10, input,
ImmutableWorkflowOptions.builder().addWaitForCompletionStateExecutionIds("BasicTimerWorkflowState1-1").build());
WorkflowOptions.extendedBuilder()
.WaitForCompletionStates(BasicTimerWorkflowState1.class)
.getBuilder().build());

client.waitForStateExecutionCompletion(Void.class, wfId, "BasicTimerWorkflowState1-1");
client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
client.waitForStateExecutionCompletion(wfId, BasicTimerWorkflowState1.class);
client.waitForWorkflowCompletion(wfId);
final long elapsed = System.currentTimeMillis() - startTs;
Assertions.assertTrue(elapsed >= 4000 && elapsed <= 7000, String.format("actual duration: %d", elapsed));
}
Expand Down
Loading