Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Honor all non-completion commands #569

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 9 additions & 19 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,27 +403,17 @@ def activate(
f"Failed converting activation exception: {inner_err}"
)

# If there are successful commands, we must remove all
# non-query-responses after terminal workflow commands. We must do this
# in place to avoid the copy-on-write that occurs when you reassign.
seen_completion = False
i = 0
while i < len(self._current_completion.successful.commands):
command = self._current_completion.successful.commands[i]
if not seen_completion:
seen_completion = (
command.HasField("complete_workflow_execution")
or command.HasField("continue_as_new_workflow_execution")
or command.HasField("fail_workflow_execution")
or command.HasField("cancel_workflow_execution")
)
elif not command.HasField("respond_to_query"):
del self._current_completion.successful.commands[i]
continue
i += 1
Comment on lines -406 to -423
Copy link
Member

@cretz cretz Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concerns that removing this is backwards incompatible w/ already completed workflows, or can you confirm that in no-flag-replaying situations the core behavior was always the same (sans query stuff)? One thing you can do is make a workflow that has post-complete command, run it in older SDK, grab JSON history, and run replayer in tests here with new code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite following this bit of the question:

or can you confirm that in no-flag-replaying situations the core behavior was always the same

Here's how I am thinking of it:

Copy link
Member

@cretz cretz Jul 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite following this bit of the question:

Think about a user with an old workflow (i.e. sans flag). If you remove the old Python behavior that runs sans flag, it now relies on the old Core behavior sans flag. If that old behavior doesn't match Python's old behavior, they will get a non-determinism error. So we need to confirm that old Core code does the same thing as old Python code before removing old Python code. Did they drop post-terminal commands the same way? If so, we're all good here.

The new SDK code drops post-terminal commands when replaying without the flag set, and there is test coverage for this

IMO you should grab a workflow history JSON or two from a workflow that had post-terminal commands from a Python SDK before this change, then run it through a replayer in the test on this version. There's a couple of other JSON files in the test suite that you can see how their tests are doing this. Also, I assume the test in this PR is testing that now commands after workflow complete are properly included?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about a user with an old workflow (i.e. sans flag). If you remove the old Python behavior that runs sans flag, it now relies on the old Core behavior sans flag. If that old behavior doesn't match Python's old behavior, they will get a non-determinism error. So we need to confirm that old Core code does the same thing as old Python code before removing old Python code. Did they drop post-terminal commands the same way? If so, we're all good here.

Personally I would substitute s/old Core/new Core/ throughout this paragraph, since we're never going to be running old Core code: rather it's new Core code which, when replaying without the flag, is intended to behave as old Core did (i.e. truncating at first terminal command). This is tested in two different ways in the Core test suite, but I agree that SDK-specific tests replaying old workflows with post-terminal commands would be good too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Makes sense, yeah whatever the terms are that mean "Workflows with post-complete commands on previous Python SDK versions work the exact same with this PR"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the replay backward compatibility test. This should be ready to go.

def is_completion(command):
return (
command.HasField("complete_workflow_execution")
or command.HasField("continue_as_new_workflow_execution")
or command.HasField("fail_workflow_execution")
or command.HasField("cancel_workflow_execution")
)

if seen_completion:
if any(map(is_completion, self._current_completion.successful.commands)):
self._warn_if_unfinished_handlers()

return self._current_completion

def _apply(
Expand Down
75 changes: 75 additions & 0 deletions tests/worker/test_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,78 @@ def new_say_hello_worker(client: Client) -> Worker:
workflows=[SayHelloWorkflow],
activities=[say_hello],
)


@workflow.defn
class UpdateCompletionAfterWorkflowReturn:
def __init__(self) -> None:
self.workflow_returned = False

@workflow.run
async def run(self) -> str:
self.workflow_returned = True
return "workflow-result"

@workflow.update
async def my_update(self) -> str:
await workflow.wait_condition(lambda: self.workflow_returned)
return "update-result"


async def test_replayer_command_reordering_backward_compatibility() -> None:
"""
The UpdateCompletionAfterWorkflowReturn workflow above features an update handler that returns
after the main workflow coroutine has exited. It will (if an update is sent in the first WFT)
generate a raw command sequence (before sending to core) of
Comment on lines +333 to +335
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, great comment, makes sense 👍


[UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted].

Prior to https://github.com/temporalio/sdk-python/pull/569, Python truncated this command
sequence to

[UpdateAccepted, CompleteWorkflowExecution].

With #569, Python performs no truncation, and Core changes it to

[UpdateAccepted, UpdateCompleted, CompleteWorkflowExecution].

This test takes a history generated using pre-#569 SDK code, and replays it. This succeeds.
The history is

1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 WorkflowExecutionUpdateAccepted
6 WorkflowExecutionCompleted

Note that the history lacks a WorkflowExecutionUpdateCompleted event.

If Core's logic (which involves a flag) incorrectly allowed this history to be replayed
using Core's post-#569 implementation, then a non-determinism error would result. Specifically,
Core would, at some point during replay, do the following:

Receive [UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted] from lang,
change that to [UpdateAccepted, UpdateCompleted, CompleteWorkflowExecution]
and create an UpdateMachine instance (the WorkflowTaskMachine instance already exists).
Then continue to consume history events.

Event 5 WorkflowExecutionUpdateAccepted would apply to the UpdateMachine associated with
the UpdateAccepted command, but event 6 WorkflowExecutionCompleted would not, since
core is expecting an event that can be applied to the UpdateMachine corresponding to
UpdateCompleted. If we modify core to incorrectly apply its new logic then we do see that:

[TMPRL1100] Nondeterminism error: Update machine does not handle this event: HistoryEvent(id: 6, WorkflowExecutionCompleted)

The test passes because core in fact (because the history lacks the flag) uses its old logic
and changes the command sequence from [UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted]
to [UpdateAccepted, CompleteWorkflowExecution], and events 5 and 6 can be applied to the
corresponding state machines.
"""
with Path(__file__).with_name(
"test_replayer_command_reordering_backward_compatibility.json"
).open() as f:
history = f.read()
await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow(
WorkflowHistory.from_json("fake", history)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2024-08-02T23:35:00.061520Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
"taskId": "1049558",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "UpdateCompletionAfterWorkflowReturn"
},
"taskQueue": {
"name": "tq",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"workflowTaskTimeout": "10s",
"originalExecutionRunId": "a32ce0cb-b50e-4734-b003-784dda811861",
"identity": "[email protected]",
"firstExecutionRunId": "a32ce0cb-b50e-4734-b003-784dda811861",
"attempt": 1,
"firstWorkflowTaskBackoff": "0s",
"workflowId": "wf-dd1e2267-d1bf-4822-be38-2a97a499331e"
}
},
{
"eventId": "2",
"eventTime": "2024-08-02T23:35:00.070867Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1049559",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "tq",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2024-08-02T23:35:00.155562Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1049564",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "[email protected]",
"requestId": "b03f25fb-b2ab-4b93-b2ad-0f6899f6e2e2",
"historySizeBytes": "260"
}
},
{
"eventId": "4",
"eventTime": "2024-08-02T23:35:00.224744Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1049568",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"startedEventId": "3",
"identity": "[email protected]",
"workerVersion": {
"buildId": "17647b02191ec9e4e58b623a9c71f20a"
},
"sdkMetadata": {
"coreUsedFlags": [
1,
2
]
},
"meteringMetadata": {}
}
},
{
"eventId": "5",
"eventTime": "2024-08-02T23:35:00.242507Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED",
"taskId": "1049569",
"workflowExecutionUpdateAcceptedEventAttributes": {
"protocolInstanceId": "my-update",
"acceptedRequestMessageId": "my-update/request",
"acceptedRequestSequencingEventId": "2",
"acceptedRequest": {
"meta": {
"updateId": "my-update",
"identity": "[email protected]"
},
"input": {
"name": "my_update"
}
}
}
},
{
"eventId": "6",
"eventTime": "2024-08-02T23:35:00.258465Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
"taskId": "1049570",
"workflowExecutionCompletedEventAttributes": {
"result": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg==",
"encodingDecoded": "json/plain"
},
"data": "workflow-result"
}
]
},
"workflowTaskCompletedEventId": "4"
}
}
]
}
Loading
Loading