Skip to content

Commit

Permalink
Fix UnsupportedOperationException in handleSingleEventLookahead (#2061)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns authored May 10, 2024
1 parent a41c64e commit 9cdff7a
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void setReplaying(boolean replaying) {
}

public void setMessages(List<Message> messages) {
this.messages = messages;
this.messages = new ArrayList<>(messages);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.UpdateMessage;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.*;
import org.junit.AfterClass;
import org.junit.Test;

Expand Down Expand Up @@ -98,6 +95,12 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
builder
.add(
(r) -> {
if (message.getMessage().getId().startsWith("reject")) {
message
.getCallbacks()
.reject(converter.exceptionToFailure(new RuntimeException()));
return;
}
message.getCallbacks().accept();
})
.<HistoryEvent>add1(
Expand All @@ -122,8 +125,8 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
3: EVENT_TYPE_WORKFLOW_TASK_STARTED
4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
5: EVENT_TYPE_TIMER_STARTED
6: EVENT_TYPE_WORKFLOW_EXECUTION_ACCEPTED
5: EVENT_TYPE_WORKFLOW_EXECUTION_ACCEPTED
6: EVENT_TYPE_TIMER_STARTED
7: EVENT_TYPE_TIMER_FIRED
8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
9: EVENT_TYPE_WORKFLOW_TASK_STARTED
Expand Down Expand Up @@ -194,6 +197,27 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines);
assertEquals(0, commands.size());
}
{
TestEntityManagerListenerBase listener = new TestUpdateListener();
stateMachines = newStateMachines(listener);
stateMachines.setMessages(
Collections.unmodifiableList(
Arrays.asList(
Message.newBuilder()
.setProtocolInstanceId("reject_update_id")
.setId("reject")
.setEventId(9)
.setBody(
Any.pack(
Request.newBuilder()
.setInput(Input.newBuilder().setName("updateName").build())
.build()))
.build())));
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1, 2);
assertEquals(CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE, commands.get(0).getCommandType());
assertEquals(
CommandType.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, commands.get(1).getCommandType());
}
}

@Test
Expand All @@ -205,17 +229,19 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
builder.add(
(r) -> {
stateMachines.setMessages(
Arrays.asList(
Message.newBuilder()
.setProtocolInstanceId("protocol_id")
.setId("id")
.setEventId(2)
.setBody(
Any.pack(
Request.newBuilder()
.setInput(Input.newBuilder().setName("updateName").build())
.build()))
.build()));
Collections.unmodifiableList(
Arrays.asList(
Message.newBuilder()
.setProtocolInstanceId("protocol_id")
.setId("id")
.setEventId(2)
.setBody(
Any.pack(
Request.newBuilder()
.setInput(
Input.newBuilder().setName("updateName").build())
.build()))
.build())));
});
}

Expand Down Expand Up @@ -387,15 +413,16 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
.setArgs(converter.toPayloads("arg").get()))
.build());
stateMachines.setMessages(
Arrays.asList(
new Message[] {
Message.newBuilder()
.setProtocolInstanceId("protocol_id")
.setId("id")
.setEventId(0)
.setBody(messageBody)
.build(),
}));
Collections.unmodifiableList(
Arrays.asList(
new Message[] {
Message.newBuilder()
.setProtocolInstanceId("protocol_id")
.setId("id")
.setEventId(0)
.setBody(messageBody)
.build(),
})));
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1);
assertEquals(0, commands.size());
}
Expand Down Expand Up @@ -494,17 +521,19 @@ public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
builder.add(
(r) -> {
stateMachines.setMessages(
Arrays.asList(
Message.newBuilder()
.setProtocolInstanceId("message_updateID")
.setId("message_updateID/request")
.setEventId(6)
.setBody(
Any.pack(
Request.newBuilder()
.setInput(Input.newBuilder().setName("updateName").build())
.build()))
.build()));
Collections.unmodifiableList(
Arrays.asList(
Message.newBuilder()
.setProtocolInstanceId("message_updateID")
.setId("message_updateID/request")
.setEventId(6)
.setBody(
Any.pack(
Request.newBuilder()
.setInput(
Input.newBuilder().setName("updateName").build())
.build()))
.build())));
});
}

Expand All @@ -528,8 +557,8 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
3: EVENT_TYPE_WORKFLOW_TASK_STARTED
4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
5: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
6: EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED
5: EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED
6: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
7: EVENT_TYPE_WORKFLOW_TASK_STARTED
*/
TestHistoryBuilder h = new TestHistoryBuilder();
Expand All @@ -551,6 +580,12 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
h.addWorkflowTaskScheduled();
h.addWorkflowTaskStarted();
}
{
// Full replay
TestEntityManagerListenerBase listener = new TestUpdateListener();
stateMachines = newStateMachines(listener);
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1, 2);
}
{
// Full replay
TestEntityManagerListenerBase listener = new TestUpdateListener();
Expand Down Expand Up @@ -840,17 +875,18 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
TestEntityManagerListenerBase listener = new TestUpdateListener();
stateMachines = newStateMachines(listener);
stateMachines.setMessages(
Arrays.asList(
Message.newBuilder()
.setProtocolInstanceId("message_update")
.setId("message_update/request")
.setEventId(7)
.setBody(
Any.pack(
Request.newBuilder()
.setInput(Input.newBuilder().setName("updateName").build())
.build()))
.build()));
Collections.unmodifiableList(
Arrays.asList(
Message.newBuilder()
.setProtocolInstanceId("message_update")
.setId("message_update/request")
.setEventId(7)
.setBody(
Any.pack(
Request.newBuilder()
.setInput(Input.newBuilder().setName("updateName").build())
.build()))
.build())));
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines, 1, 2);
assertEquals(6, commands.size());
assertEquals(CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE, commands.get(0).getCommandType());
Expand Down

0 comments on commit 9cdff7a

Please sign in to comment.