From edb3c7ee9c17792439e23a57f0b1f0301abd3d6d Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 16 Dec 2024 09:27:38 -0500 Subject: [PATCH 1/4] Add test that update respects first_execution_run_id --- tests/worker/test_workflow.py | 47 +++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 2611ecd5..ee7ce4f1 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -4425,6 +4425,53 @@ async def test_workflow_update_task_fails(client: Client, env: WorkflowEnvironme assert bad_validator_fail_ct == 2 +@workflow.defn +class UpdateRespectsFirstExecutionRunIdWorkflow: + def __init__(self) -> None: + self.update_received = False + + @workflow.run + async def run(self) -> None: + await workflow.wait_condition(lambda: self.update_received) + + @workflow.update + async def update(self) -> None: + self.update_received = True + + +async def test_workflow_update_respects_first_execution_run_id( + client: Client, env: WorkflowEnvironment +): + # Start one workflow, obtain the run ID (r1), and let it complete. Start a second + # workflow with the same workflow ID, and try to send an update using the handle from + # r1. + workflow_id = f"update-respects-first-execution-run-id-{uuid.uuid4()}" + async with new_worker(client, UpdateRespectsFirstExecutionRunIdWorkflow) as worker: + + async def start_workflow(workflow_id: str) -> WorkflowHandle: + return await client.start_workflow( + UpdateRespectsFirstExecutionRunIdWorkflow.run, + id=workflow_id, + task_queue=worker.task_queue, + ) + + wf_execution_1_handle = await start_workflow(workflow_id) + await wf_execution_1_handle.execute_update( + UpdateRespectsFirstExecutionRunIdWorkflow.update + ) + await wf_execution_1_handle.result() + await start_workflow(workflow_id) + + # Execution 1 has closed. This would succeed if the update incorrectly targets + # the second execution + with pytest.raises(RPCError) as exc_info: + await wf_execution_1_handle.execute_update( + UpdateRespectsFirstExecutionRunIdWorkflow.update + ) + assert exc_info.value.status == RPCStatusCode.NOT_FOUND + assert "workflow execution not found" in str(exc_info.value) + + @workflow.defn class ImmediatelyCompleteUpdateAndWorkflow: def __init__(self) -> None: From c0b06b1aec9f8c34ac0a38036966671319d27b7f Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 16 Dec 2024 09:16:45 -0500 Subject: [PATCH 2/4] Send first_execution_run_id with Update requests Fixes #682 --- temporalio/client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/temporalio/client.py b/temporalio/client.py index 765c0071..e526c9cf 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -2060,6 +2060,7 @@ async def _start_update( StartWorkflowUpdateInput( id=self._id, run_id=self._run_id, + first_execution_run_id=self.first_execution_run_id, update_id=id, update=update_name, args=temporalio.common._arg_or_args(arg, args), @@ -4728,6 +4729,7 @@ class StartWorkflowUpdateInput: id: str run_id: Optional[str] + first_execution_run_id: Optional[str] update_id: Optional[str] update: str args: Sequence[Any] @@ -5360,6 +5362,7 @@ async def start_workflow_update( workflow_id=input.id, run_id=input.run_id or "", ), + first_execution_run_id=input.first_execution_run_id or "", request=temporalio.api.update.v1.Request( meta=temporalio.api.update.v1.Meta( update_id=input.update_id or str(uuid.uuid4()), From 7f0908e0aa46a22c890f145d17c427e4262e7786 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 16 Dec 2024 21:57:46 -0500 Subject: [PATCH 3/4] Remove warning notices --- temporalio/client.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index e526c9cf..461ba51c 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -1882,11 +1882,6 @@ async def execute_update( .. warning:: This API is experimental - .. warning:: - WorkflowHandles created as a result of :py:meth:`Client.start_workflow` will - send updates to the latest workflow with the same workflow ID even if it is - unrelated to the started workflow. - Args: update: Update function or name on the workflow. arg: Single argument to the update. @@ -1994,11 +1989,6 @@ async def start_update( .. warning:: This API is experimental - .. warning:: - WorkflowHandles created as a result of :py:meth:`Client.start_workflow` will - send updates to the latest workflow with the same workflow ID even if it is - unrelated to the started workflow. - Args: update: Update function or name on the workflow. arg: Single argument to the update. From a3f08200672a64eef421375841d1386a0423aad4 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 19 Dec 2024 09:12:01 -0500 Subject: [PATCH 4/4] Skip the test under Java test server --- tests/worker/test_workflow.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index ee7ce4f1..01b9b6ba 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -4442,6 +4442,10 @@ async def update(self) -> None: async def test_workflow_update_respects_first_execution_run_id( client: Client, env: WorkflowEnvironment ): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + ) # Start one workflow, obtain the run ID (r1), and let it complete. Start a second # workflow with the same workflow ID, and try to send an update using the handle from # r1.