Skip to content

Commit

Permalink
Ensure compatibility with old approach & add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Dec 30, 2024
1 parent a901b4d commit c292f59
Show file tree
Hide file tree
Showing 5 changed files with 996 additions and 26 deletions.
25 changes: 19 additions & 6 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,23 +358,36 @@ def activate(
activation_err: Optional[Exception] = None
try:
# Apply every job, running the loop afterward
is_query = False
no_queries = True
for job in act.jobs:
if job.HasField("initialize_workflow"):
self._workflow_input = self._make_workflow_input(
job.initialize_workflow
)
elif job.HasField("query_workflow"):
is_query = True
no_queries = False
# Let errors bubble out of these to the caller to fail the task
self._apply(job)

# Conditions are not checked on query activations. Query activations always come without
# any other jobs.
self._run_once(check_conditions=not is_query)
# Ensure the main workflow function is called on first task, and called last.
if self._primary_task_initter is not None and self._primary_task is None:
self._primary_task_initter()
first_task = False
try:
self._run_once(check_conditions=no_queries)
finally:
# Ensure the main workflow function task is initialized after a first run of the
# event loop, which might execute before-start signals/updates. This is behind
# a finally because if those handlers fail, we need still need the main routine
# to be initialized in order to fail tasks properly.
if (
self._primary_task_initter is not None
and self._primary_task is None
):
self._primary_task_initter()
first_task = True
# Because we want any before-start signals/updates to fully process before running
# the main routine for the first time, we run the loop again if this is the first time.
if first_task:
self._run_once(check_conditions=True)
except Exception as err:
# We want some errors during activation, like those that can happen
Expand Down
88 changes: 86 additions & 2 deletions tests/worker/test_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,24 @@
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from typing import Dict
from typing import Any, Dict, Optional, Type

import pytest

from temporalio import activity, workflow
from temporalio.client import Client, WorkflowFailureError, WorkflowHistory
from temporalio.exceptions import ApplicationError
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Replayer, Worker
from temporalio.worker import (
ExecuteWorkflowInput,
Interceptor,
Replayer,
Worker,
WorkflowInboundInterceptor,
WorkflowInterceptorClassInput,
)
from tests.helpers import assert_eq_eventually
from tests.worker.test_workflow import SignalsActivitiesTimersUpdatesTracingWorkflow


@activity.defn
Expand Down Expand Up @@ -385,3 +393,79 @@ async def test_replayer_command_reordering_backward_compatibility() -> None:
await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow(
WorkflowHistory.from_json("fake", history)
)


workflow_res = None


class WorkerWorkflowResultInterceptor(Interceptor):
def workflow_interceptor_class(
self, input: WorkflowInterceptorClassInput
) -> Optional[Type[WorkflowInboundInterceptor]]:
return WorkflowResultInterceptor


class WorkflowResultInterceptor(WorkflowInboundInterceptor):
async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
global workflow_res
res = await super().execute_workflow(input)
workflow_res = res
return res


async def test_replayer_async_ordering() -> None:
"""
This test verifies that the order that asyncio tasks/coroutines are woken up matches the
order they were before changes to apply all jobs and then run the event loop, where previously
the event loop was ran after each "batch" of jobs.
"""
histories_and_expecteds = [
(
"test_replayer_event_tracing.json",
[
"sig-before-sync",
"sig-before-1",
"sig-before-2",
"timer-sync",
"act-sync",
"act-1",
"act-2",
"sig-1-sync",
"sig-1-1",
"sig-1-2",
"update-1-sync",
"update-1-1",
"update-1-2",
"timer-1",
"timer-2",
],
),
(
"test_replayer_event_tracing_double_sig_at_start.json",
[
"sig-before-sync",
"sig-before-1",
"sig-1-sync",
"sig-1-1",
"sig-before-2",
"sig-1-2",
"timer-sync",
"act-sync",
"update-1-sync",
"update-1-1",
"update-1-2",
"act-1",
"act-2",
"timer-1",
"timer-2",
],
),
]
for history, expected in histories_and_expecteds:
with Path(__file__).with_name(history).open() as f:
history = f.read()
await Replayer(
workflows=[SignalsActivitiesTimersUpdatesTracingWorkflow],
interceptors=[WorkerWorkflowResultInterceptor()],
).replay_workflow(WorkflowHistory.from_json("fake", history))
assert workflow_res == expected
Loading

0 comments on commit c292f59

Please sign in to comment.