From ba4bda70ab39184114fac2a82c483aeec5be58b5 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 25 Oct 2023 09:31:26 -0700 Subject: [PATCH] Ensure otel test will wait until after task fails to send update --- tests/contrib/test_opentelemetry.py | 33 ++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/test_opentelemetry.py index bd986f2d..79a71282 100644 --- a/tests/contrib/test_opentelemetry.py +++ b/tests/contrib/test_opentelemetry.py @@ -19,7 +19,7 @@ from temporalio.contrib.opentelemetry import TracingInterceptor from temporalio.contrib.opentelemetry import workflow as otel_workflow from temporalio.testing import WorkflowEnvironment -from temporalio.worker import Worker +from temporalio.worker import UnsandboxedWorkflowRunner, Worker @dataclass @@ -48,6 +48,7 @@ class TracingWorkflowAction: activity: Optional[TracingWorkflowActionActivity] = None continue_as_new: Optional[TracingWorkflowActionContinueAsNew] = None wait_until_signal_count: int = 0 + wait_and_do_update: bool = False @dataclass @@ -71,10 +72,14 @@ class TracingWorkflowActionContinueAsNew: param: TracingWorkflowParam +ready_for_update = asyncio.Semaphore(0) + + @workflow.defn class TracingWorkflow: def __init__(self) -> None: self._signal_count = 0 + self._did_update = False @workflow.run async def run(self, param: TracingWorkflowParam) -> None: @@ -126,6 +131,9 @@ async def run(self, param: TracingWorkflowParam) -> None: await workflow.wait_condition( lambda: self._signal_count >= action.wait_until_signal_count ) + if action.wait_and_do_update: + ready_for_update.release() + await workflow.wait_condition(lambda: self._did_update) async def _raise_on_non_replay(self) -> None: replaying = workflow.unsafe.is_replaying() @@ -144,13 +152,11 @@ def signal(self) -> None: self._signal_count += 1 @workflow.update - def update(self) -> int: - self._signal_count += 1 - return self._signal_count + def update(self) -> None: + self._did_update = True @update.validator def update_validator(self) -> None: - print("Actually in validator") pass @@ -176,6 +182,8 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): task_queue=task_queue, workflows=[TracingWorkflow], activities=[tracing_activity], + # Needed so we can wait to send update at the right time + workflow_runner=UnsandboxedWorkflowRunner(), ): # Run workflow with various actions workflow_id = f"workflow_{uuid.uuid4()}" @@ -185,8 +193,8 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): actions=[ # First fail on replay TracingWorkflowAction(fail_on_non_replay=True), - # Wait for a signal & update - TracingWorkflowAction(wait_until_signal_count=2), + # Wait for a signal + TracingWorkflowAction(wait_until_signal_count=1), # Exec activity that fails task before complete TracingWorkflowAction( activity=TracingWorkflowActionActivity( @@ -194,6 +202,8 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): fail_on_non_replay_before_complete=True, ), ), + # Wait for update + TracingWorkflowAction(wait_and_do_update=True), # Exec child workflow that fails task before complete TracingWorkflowAction( child_workflow=TracingWorkflowActionChildWorkflow( @@ -240,7 +250,10 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): # Send query, then signal to move it along assert "some query" == await handle.query(TracingWorkflow.query) await handle.signal(TracingWorkflow.signal) - await handle.execute_update(TracingWorkflow.update) + # Wait to send the update until after the things that fail tasks are over, as failing a task while the update + # is running can mean we execute it twice, which will mess up our spans. + async with ready_for_update: + await handle.execute_update(TracingWorkflow.update) await handle.result() # Dump debug with attributes, but do string assertion test without @@ -253,11 +266,11 @@ async def test_opentelemetry_tracing(client: Client, env: WorkflowEnvironment): " RunWorkflow:TracingWorkflow", " MyCustomSpan", " HandleSignal:signal (links: SignalWorkflow:signal)", - " ValidateUpdate:update (links: StartWorkflowUpdate:update)", - " HandleUpdate:update (links: StartWorkflowUpdate:update)", " StartActivity:tracing_activity", " RunActivity:tracing_activity", " RunActivity:tracing_activity", + " ValidateUpdate:update (links: StartWorkflowUpdate:update)", + " HandleUpdate:update (links: StartWorkflowUpdate:update)", " StartChildWorkflow:TracingWorkflow", " RunWorkflow:TracingWorkflow", " MyCustomSpan",