Skip to content

Commit

Permalink
IWF-253: code for reset recordings
Browse files Browse the repository at this point in the history
  • Loading branch information
Katie Atrops committed Dec 4, 2024
1 parent 31d3b31 commit e7fd5c0
Show file tree
Hide file tree
Showing 11 changed files with 526 additions and 0 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ dependencies {
compileOnly 'org.immutables:value:2.9.2'
annotationProcessor 'org.immutables:value:2.9.2'

testAnnotationProcessor 'org.immutables:value:2.9.2'
testCompileOnly 'org.immutables:value:2.9.2'

// cron utils
implementation 'com.cronutils:cron-utils:9.2.0'
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/iworkflow/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public ServiceApiRetryConfig getServiceApiRetryConfig() {
.maximumAttempts(10)
.build();
}
//Use "http://localhost:8080" when using iwf-java-sdk as the worker (src/test/java/io/iworkflow/spring/SpringMainApplication.java)
public static final String defaultWorkerUrl = "http://localhost:8802";

public static final String workerUrlFromDocker = "http://host.docker.internal:8802";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.iworkflow.integ.reset;

import io.iworkflow.core.Context;
import io.iworkflow.core.ObjectWorkflow;
import io.iworkflow.core.RPC;
import io.iworkflow.core.StateDef;
import io.iworkflow.core.StateMovement;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.communication.CommunicationMethodDef;
import io.iworkflow.core.communication.InternalChannelDef;
import io.iworkflow.core.communication.SignalChannelDef;
import io.iworkflow.core.persistence.DataAttributeDef;
import io.iworkflow.core.persistence.Persistence;
import io.iworkflow.core.persistence.PersistenceFieldDef;
import io.iworkflow.core.persistence.SearchAttributeDef;
import io.iworkflow.gen.models.PersistenceLoadingType;
import io.iworkflow.gen.models.SearchAttributeValueType;
import io.iworkflow.integ.rpc.RpcLockingWorkflowState1;
import io.iworkflow.integ.rpc.RpcLockingWorkflowState2;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;

import static io.iworkflow.integ.RpcTest.HARDCODED_STR;
import static io.iworkflow.integ.RpcTest.RPC_OUTPUT;

@Component
public class RpcLockingWorkflowReset implements ObjectWorkflow {

public static final String RPC_INTERNAL_CHANNEL_NAME = "rpc-channel-1";
public static final String TEST_DATA_OBJECT_KEY = "data-obj-1";
public static final String TEST_SEARCH_ATTRIBUTE_KEYWORD = "CustomKeywordField";
public static final String TEST_SEARCH_ATTRIBUTE_INT = "CustomIntField";

@Override
public List<CommunicationMethodDef> getCommunicationSchema() {
return Arrays.asList(
InternalChannelDef.create(Void.class, RPC_INTERNAL_CHANNEL_NAME)
);
}

@Override
public List<StateDef> getWorkflowStates() {
return Arrays.asList(
StateDef.startingState(new RpcLockingWorkflowStateReset1()),
StateDef.nonStartingState(new RpcLockingWorkflowStateReset2())
);
}

@Override
public List<PersistenceFieldDef> getPersistenceSchema() {
return Arrays.asList(
DataAttributeDef.create(String.class, TEST_DATA_OBJECT_KEY),
SearchAttributeDef.create(SearchAttributeValueType.INT, TEST_SEARCH_ATTRIBUTE_INT),
SearchAttributeDef.create(SearchAttributeValueType.KEYWORD, TEST_SEARCH_ATTRIBUTE_KEYWORD)
);
}

@RPC(
dataAttributesLoadingType = PersistenceLoadingType.PARTIAL_WITH_EXCLUSIVE_LOCK,
dataAttributesPartialLoadingKeys = {TEST_DATA_OBJECT_KEY}
)
public void testRpcWithLocking(Context context, Persistence persistence, Communication communication) {
if (context.getWorkflowId().isEmpty() || context.getWorkflowRunId().isEmpty()) {
throw new RuntimeException("invalid context");
}
persistence.setDataAttribute(TEST_DATA_OBJECT_KEY, HARDCODED_STR+"+locking");
persistence.setSearchAttributeInt64(TEST_SEARCH_ATTRIBUTE_INT, RPC_OUTPUT);
communication.publishInternalChannel(RPC_INTERNAL_CHANNEL_NAME, null);
communication.triggerStateMovements(StateMovement.create(RpcLockingWorkflowState2.class));
}

@RPC
public void testRpcWithoutLocking(Context context, Persistence persistence, Communication communication) {
if (context.getWorkflowId().isEmpty() || context.getWorkflowRunId().isEmpty()) {
throw new RuntimeException("invalid context");
}
persistence.setSearchAttributeKeyword(TEST_SEARCH_ATTRIBUTE_KEYWORD, HARDCODED_STR+"+nonlocking");
persistence.setSearchAttributeInt64(TEST_SEARCH_ATTRIBUTE_INT, RPC_OUTPUT);
communication.publishInternalChannel(RPC_INTERNAL_CHANNEL_NAME, null);
communication.triggerStateMovements(StateMovement.create(RpcLockingWorkflowState2.class));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.iworkflow.integ.reset;

import io.iworkflow.core.Context;
import io.iworkflow.core.ObjectWorkflow;
import io.iworkflow.core.RPC;
import io.iworkflow.core.StateDef;
import io.iworkflow.core.StateMovement;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.communication.CommunicationMethodDef;
import io.iworkflow.core.communication.InternalChannelDef;
import io.iworkflow.core.communication.SignalChannelDef;
import io.iworkflow.core.persistence.DataAttributeDef;
import io.iworkflow.core.persistence.Persistence;
import io.iworkflow.core.persistence.PersistenceFieldDef;
import io.iworkflow.core.persistence.SearchAttributeDef;
import io.iworkflow.gen.models.PersistenceLoadingType;
import io.iworkflow.gen.models.SearchAttributeValueType;
import io.iworkflow.integ.rpc.RpcLockingWorkflowState1;
import io.iworkflow.integ.rpc.RpcLockingWorkflowState2;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;

import static io.iworkflow.integ.RpcTest.HARDCODED_STR;
import static io.iworkflow.integ.RpcTest.RPC_OUTPUT;

@Component
public class RpcLockingWorkflowResetWithStateExecutionId implements ObjectWorkflow {

public static final String RPC_INTERNAL_CHANNEL_NAME = "rpc-channel-1";
public static final String SIGNAL_CHANNEL_NAME = "signal-channel-1";
public static final String TEST_DATA_OBJECT_KEY = "data-obj-1";
public static final String TEST_SEARCH_ATTRIBUTE_KEYWORD = "CustomKeywordField";
public static final String TEST_SEARCH_ATTRIBUTE_INT = "CustomIntField";

@Override
public List<CommunicationMethodDef> getCommunicationSchema() {
return Arrays.asList(
InternalChannelDef.create(Void.class, RPC_INTERNAL_CHANNEL_NAME),
SignalChannelDef.create(Void.class, SIGNAL_CHANNEL_NAME)
);
}

@Override
public List<StateDef> getWorkflowStates() {
return Arrays.asList(
StateDef.startingState(new RpcLockingWorkflowStateResetWithStateExecutionId1()),
StateDef.nonStartingState(new RpcLockingWorkflowStateResetWithStateExecutionId2())
);
}

@Override
public List<PersistenceFieldDef> getPersistenceSchema() {
return Arrays.asList(
DataAttributeDef.create(String.class, TEST_DATA_OBJECT_KEY),
SearchAttributeDef.create(SearchAttributeValueType.INT, TEST_SEARCH_ATTRIBUTE_INT),
SearchAttributeDef.create(SearchAttributeValueType.KEYWORD, TEST_SEARCH_ATTRIBUTE_KEYWORD)
);
}

@RPC(
dataAttributesLoadingType = PersistenceLoadingType.PARTIAL_WITH_EXCLUSIVE_LOCK,
dataAttributesPartialLoadingKeys = {TEST_DATA_OBJECT_KEY}
)
public void testRpcWithLocking(Context context, Persistence persistence, Communication communication) {
if (context.getWorkflowId().isEmpty() || context.getWorkflowRunId().isEmpty()) {
throw new RuntimeException("invalid context");
}
persistence.setDataAttribute(TEST_DATA_OBJECT_KEY, HARDCODED_STR+"+locking");
persistence.setSearchAttributeInt64(TEST_SEARCH_ATTRIBUTE_INT, RPC_OUTPUT);
communication.publishInternalChannel(RPC_INTERNAL_CHANNEL_NAME, null);
communication.triggerStateMovements(StateMovement.create(RpcLockingWorkflowState2.class));
}

@RPC
public void testRpcWithoutLocking(Context context, Persistence persistence, Communication communication) {
if (context.getWorkflowId().isEmpty() || context.getWorkflowRunId().isEmpty()) {
throw new RuntimeException("invalid context");
}
persistence.setSearchAttributeKeyword(TEST_SEARCH_ATTRIBUTE_KEYWORD, HARDCODED_STR+"+nonlocking");
persistence.setSearchAttributeInt64(TEST_SEARCH_ATTRIBUTE_INT, RPC_OUTPUT);
communication.publishInternalChannel(RPC_INTERNAL_CHANNEL_NAME, null);
communication.triggerStateMovements(StateMovement.create(RpcLockingWorkflowState2.class));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.iworkflow.integ.reset;

import io.iworkflow.core.Context;
import io.iworkflow.core.StateDecision;
import io.iworkflow.core.WorkflowState;
import io.iworkflow.core.command.CommandRequest;
import io.iworkflow.core.command.CommandResults;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.communication.InternalChannelCommand;
import io.iworkflow.core.persistence.Persistence;
import io.iworkflow.integ.rpc.RpcLockingWorkflowState2;

import static io.iworkflow.integ.reset.RpcLockingWorkflowReset.RPC_INTERNAL_CHANNEL_NAME;

public class RpcLockingWorkflowStateReset1 implements WorkflowState<Void> {
@Override
public Class<Void> getInputType() {
return Void.class;
}

@Override
public CommandRequest waitUntil(
Context context,
Void input,
Persistence persistence,
final Communication communication) {
return CommandRequest.forAllCommandCompleted(
InternalChannelCommand.create(RPC_INTERNAL_CHANNEL_NAME), InternalChannelCommand.create(RPC_INTERNAL_CHANNEL_NAME)
);
}

@Override
public StateDecision execute(
Context context,
Void input,
CommandResults commandResults,
Persistence persistence,
final Communication communication) {
return StateDecision.singleNextState(RpcLockingWorkflowState2.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.iworkflow.integ.reset;

import io.iworkflow.core.Context;
import io.iworkflow.core.StateDecision;
import io.iworkflow.core.WorkflowState;
import io.iworkflow.core.command.CommandRequest;
import io.iworkflow.core.command.CommandResults;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.persistence.Persistence;

public class RpcLockingWorkflowStateReset2 implements WorkflowState<Void> {

private static int counter = 0;

@Override
public Class<Void> getInputType() {
return Void.class;
}

@Override
public CommandRequest waitUntil(
Context context,
Void input,
Persistence persistence,
final Communication communication) {
return CommandRequest.empty;
}

@Override
public StateDecision execute(
Context context,
Void input,
CommandResults commandResults,
Persistence persistence,
final Communication communication) {
counter++;
return StateDecision.gracefulCompleteWorkflow("The execute method was executed " + counter + " times");
}

// reset counter so that new test can use it
public static int resetCounter() {
final int old = counter;
counter = 0;
return old;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.iworkflow.integ.reset;

import io.iworkflow.core.Context;
import io.iworkflow.core.StateDecision;
import io.iworkflow.core.WorkflowState;
import io.iworkflow.core.command.CommandRequest;
import io.iworkflow.core.command.CommandResults;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.communication.SignalCommand;
import io.iworkflow.core.persistence.Persistence;
import io.iworkflow.integ.rpc.RpcLockingWorkflowState2;

import static io.iworkflow.integ.reset.RpcLockingWorkflowResetWithStateExecutionId.SIGNAL_CHANNEL_NAME;

public class RpcLockingWorkflowStateResetWithStateExecutionId1 implements WorkflowState<Void> {
@Override
public Class<Void> getInputType() {
return Void.class;
}

@Override
public CommandRequest waitUntil(
Context context,
Void input,
Persistence persistence,
final Communication communication) {
return CommandRequest.forAllCommandCompleted(SignalCommand.create(SIGNAL_CHANNEL_NAME));
}

@Override
public StateDecision execute(
Context context,
Void input,
CommandResults commandResults,
Persistence persistence,
final Communication communication) {
return StateDecision.singleNextState(RpcLockingWorkflowState2.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.iworkflow.integ.reset;

import io.iworkflow.core.Context;
import io.iworkflow.core.StateDecision;
import io.iworkflow.core.WorkflowState;
import io.iworkflow.core.command.CommandRequest;
import io.iworkflow.core.command.CommandResults;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.persistence.Persistence;

public class RpcLockingWorkflowStateResetWithStateExecutionId2 implements WorkflowState<Void> {

private static int counter = 0;

@Override
public Class<Void> getInputType() {
return Void.class;
}

@Override
public CommandRequest waitUntil(
Context context,
Void input,
Persistence persistence,
final Communication communication) {
return CommandRequest.empty;
}

@Override
public StateDecision execute(
Context context,
Void input,
CommandResults commandResults,
Persistence persistence,
final Communication communication) {
counter++;
if (counter > 4) {
return StateDecision.gracefulCompleteWorkflow("The execute method was executed " + counter + " times");
}
return StateDecision.singleNextState(RpcLockingWorkflowStateResetWithStateExecutionId2.class);

}

// reset counter so that new test can use it
public static int resetCounter() {
final int old = counter;
counter = 0;
return old;
}
}
Loading

0 comments on commit e7fd5c0

Please sign in to comment.