diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java index 65b490426..413a7c92d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java @@ -260,7 +260,7 @@ public void setReplaying(boolean replaying) { } public void setMessages(List messages) { - this.messages = messages; + this.messages = new ArrayList<>(messages); } /** diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java index 35f6fb47a..402c28471 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java @@ -43,10 +43,7 @@ import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.UpdateMessage; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; +import java.util.*; import org.junit.AfterClass; import org.junit.Test; @@ -98,6 +95,12 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder builder) builder .add( (r) -> { + if (message.getMessage().getId().startsWith("reject")) { + message + .getCallbacks() + .reject(converter.exceptionToFailure(new RuntimeException())); + return; + } message.getCallbacks().accept(); }) .add1( @@ -122,8 +125,8 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder builder) 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED 3: EVENT_TYPE_WORKFLOW_TASK_STARTED 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED - 5: EVENT_TYPE_TIMER_STARTED - 6: EVENT_TYPE_WORKFLOW_EXECUTION_ACCEPTED + 5: EVENT_TYPE_WORKFLOW_EXECUTION_ACCEPTED + 6: EVENT_TYPE_TIMER_STARTED 7: EVENT_TYPE_TIMER_FIRED 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED 9: EVENT_TYPE_WORKFLOW_TASK_STARTED @@ -194,6 +197,27 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder builder) List commands = h.handleWorkflowTaskTakeCommands(stateMachines); assertEquals(0, commands.size()); } + { + TestEntityManagerListenerBase listener = new TestUpdateListener(); + stateMachines = newStateMachines(listener); + stateMachines.setMessages( + Collections.unmodifiableList( + Arrays.asList( + Message.newBuilder() + .setProtocolInstanceId("reject_update_id") + .setId("reject") + .setEventId(9) + .setBody( + Any.pack( + Request.newBuilder() + .setInput(Input.newBuilder().setName("updateName").build()) + .build())) + .build()))); + List commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1, 2); + assertEquals(CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE, commands.get(0).getCommandType()); + assertEquals( + CommandType.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, commands.get(1).getCommandType()); + } } @Test @@ -205,17 +229,19 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { builder.add( (r) -> { stateMachines.setMessages( - Arrays.asList( - Message.newBuilder() - .setProtocolInstanceId("protocol_id") - .setId("id") - .setEventId(2) - .setBody( - Any.pack( - Request.newBuilder() - .setInput(Input.newBuilder().setName("updateName").build()) - .build())) - .build())); + Collections.unmodifiableList( + Arrays.asList( + Message.newBuilder() + .setProtocolInstanceId("protocol_id") + .setId("id") + .setEventId(2) + .setBody( + Any.pack( + Request.newBuilder() + .setInput( + Input.newBuilder().setName("updateName").build()) + .build())) + .build()))); }); } @@ -387,15 +413,16 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder builder) .setArgs(converter.toPayloads("arg").get())) .build()); stateMachines.setMessages( - Arrays.asList( - new Message[] { - Message.newBuilder() - .setProtocolInstanceId("protocol_id") - .setId("id") - .setEventId(0) - .setBody(messageBody) - .build(), - })); + Collections.unmodifiableList( + Arrays.asList( + new Message[] { + Message.newBuilder() + .setProtocolInstanceId("protocol_id") + .setId("id") + .setEventId(0) + .setBody(messageBody) + .build(), + }))); List commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1); assertEquals(0, commands.size()); } @@ -494,17 +521,19 @@ public void buildWorkflow(AsyncWorkflowBuilder builder) { builder.add( (r) -> { stateMachines.setMessages( - Arrays.asList( - Message.newBuilder() - .setProtocolInstanceId("message_updateID") - .setId("message_updateID/request") - .setEventId(6) - .setBody( - Any.pack( - Request.newBuilder() - .setInput(Input.newBuilder().setName("updateName").build()) - .build())) - .build())); + Collections.unmodifiableList( + Arrays.asList( + Message.newBuilder() + .setProtocolInstanceId("message_updateID") + .setId("message_updateID/request") + .setEventId(6) + .setBody( + Any.pack( + Request.newBuilder() + .setInput( + Input.newBuilder().setName("updateName").build()) + .build())) + .build()))); }); } @@ -528,8 +557,8 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder builder) 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED 3: EVENT_TYPE_WORKFLOW_TASK_STARTED 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED - 5: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 6: EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED + 5: EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED + 6: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED 7: EVENT_TYPE_WORKFLOW_TASK_STARTED */ TestHistoryBuilder h = new TestHistoryBuilder(); @@ -551,6 +580,12 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder builder) h.addWorkflowTaskScheduled(); h.addWorkflowTaskStarted(); } + { + // Full replay + TestEntityManagerListenerBase listener = new TestUpdateListener(); + stateMachines = newStateMachines(listener); + List commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1, 2); + } { // Full replay TestEntityManagerListenerBase listener = new TestUpdateListener(); @@ -840,17 +875,18 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder builder) TestEntityManagerListenerBase listener = new TestUpdateListener(); stateMachines = newStateMachines(listener); stateMachines.setMessages( - Arrays.asList( - Message.newBuilder() - .setProtocolInstanceId("message_update") - .setId("message_update/request") - .setEventId(7) - .setBody( - Any.pack( - Request.newBuilder() - .setInput(Input.newBuilder().setName("updateName").build()) - .build())) - .build())); + Collections.unmodifiableList( + Arrays.asList( + Message.newBuilder() + .setProtocolInstanceId("message_update") + .setId("message_update/request") + .setEventId(7) + .setBody( + Any.pack( + Request.newBuilder() + .setInput(Input.newBuilder().setName("updateName").build()) + .build())) + .build()))); List commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1, 2); assertEquals(6, commands.size()); assertEquals(CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE, commands.get(0).getCommandType());