From abe36c3e4a418c5f7d8b620c9482e7a9919d2e1f Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 17 Oct 2023 10:30:22 -0700 Subject: [PATCH] Update core / additional failure path tests --- temporalio/bridge/Cargo.lock | 77 ++++++++++++++----------- temporalio/bridge/sdk-core | 2 +- temporalio/worker/_workflow_instance.py | 32 +++++----- temporalio/workflow.py | 2 +- tests/worker/test_workflow.py | 57 ++++++++++++++++++ 5 files changed, 118 insertions(+), 52 deletions(-) diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index 7a73f865..79e0c7ae 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -90,9 +90,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", @@ -196,9 +196,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" [[package]] name = "block-buffer" @@ -457,9 +457,12 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" +dependencies = [ + "powerfmt", +] [[package]] name = "derive_builder" @@ -626,9 +629,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flate2" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" dependencies = [ "crc32fast", "miniz_oxide", @@ -1234,7 +1237,7 @@ version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "cfg-if", "libc", ] @@ -1524,6 +1527,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1848,14 +1857,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.0" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d119d7c7ca818f8a53c300863d4f87566aac09943aef5b355bb83969dae75d87" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.1", - "regex-syntax 0.8.1", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", ] [[package]] @@ -1869,13 +1878,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.1" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "465c6fc0621e4abc4187a2bda0937bfd4f722c2730b29562e19689ea796c9a4b" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.1", + "regex-syntax 0.8.2", ] [[package]] @@ -1886,9 +1895,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56d84fdd47036b038fc80dd333d10b6aab10d5d31f4a366e20014def75328d33" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" @@ -1996,11 +2005,11 @@ version = "0.1.0" [[package]] name = "rustix" -version = "0.38.18" +version = "0.38.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a74ee2d7c2581cd139b42447d7d9389b889bdaad3a73f1ebb16f2a3237bb19c" +checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "errno", "libc", "linux-raw-sys", @@ -2118,18 +2127,18 @@ checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" [[package]] name = "serde" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" dependencies = [ "proc-macro2", "quote", @@ -2534,11 +2543,12 @@ dependencies = [ [[package]] name = "time" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" dependencies = [ "deranged", + "powerfmt", "serde", "time-core", ] @@ -2718,11 +2728,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9" dependencies = [ - "cfg-if", "log", "pin-project-lite", "tracing-attributes", @@ -2731,9 +2740,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", @@ -2742,9 +2751,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", "valuable", diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index dd610947..45d2bc99 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit dd610947468ec760601b57c93f76532d08739168 +Subproject commit 45d2bc997fd25bf24d347b04d519e7279851aea4 diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 46279b1b..1417b98f 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -456,7 +456,8 @@ async def run_update( try: if defn.validator is not None: # Run the validator - await self._inbound.handle_update_validator(handler_input) + with self._as_read_only(): + await self._inbound.handle_update_validator(handler_input) # Accept the update command.update_response.accepted.SetInParent() @@ -474,7 +475,6 @@ async def run_update( job.protocol_instance_id ) command.update_response.completed.CopyFrom(result_payloads[0]) - # TODO: Dedupe exception handling if it makes sense except (Exception, asyncio.CancelledError) as err: logger.debug( f"Update raised failure with run ID {self._info.run_id}", @@ -485,21 +485,21 @@ async def run_update( err = temporalio.exceptions.CancelledError( f"Cancellation raised within update {err}" ) - if isinstance(err, temporalio.exceptions.FailureError): - # All other failure errors fail the update - if command is None: - command = self._add_command() - command.update_response.protocol_instance_id = ( - job.protocol_instance_id - ) - self._failure_converter.to_failure( - err, - self._payload_converter, - command.update_response.rejected.cause, - ) - else: - # All other exceptions fail the task + # Read-only issues during validation should fail the task + if isinstance(err, temporalio.workflow.ReadOnlyContextError): self._current_activation_error = err + return + # All other errors fail the update + if command is None: + command = self._add_command() + command.update_response.protocol_instance_id = ( + job.protocol_instance_id + ) + self._failure_converter.to_failure( + err, + self._payload_converter, + command.update_response.rejected.cause, + ) except BaseException as err: # During tear down, generator exit and no-runtime exceptions can appear if not self._deleting: diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 4e1d448b..6b8acbd9 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -1068,7 +1068,7 @@ def _apply_to_class( raise ValueError("Class already contains workflow definition") issues: List[str] = [] - # Collect run fn and all signal/query fns + # Collect run fn and all signal/query/update fns members = inspect.getmembers(cls) run_fn: Optional[Callable[..., Awaitable[Any]]] = None seen_run_attr = False diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 6fded7d5..0a8c68a9 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -3506,6 +3506,9 @@ async def test_workflow_buffered_metrics(client: Client): ) +bad_validator_fail_ct = 0 + + @workflow.defn class UpdateHandlersWorkflow: def __init__(self) -> None: @@ -3548,6 +3551,20 @@ async def runs_activity(self, name: str) -> str: await act return "done" + @workflow.update + async def bad_validator(self) -> str: + return "done" + + @bad_validator.validator + def bad_validator_validator(self) -> None: + global bad_validator_fail_ct + # Run a command which should not be allowed the first few tries, then "fix" it as if new code was deployed + if bad_validator_fail_ct < 2: + bad_validator_fail_ct += 1 + workflow.start_activity( + say_hello, "boo", schedule_to_close_timeout=timedelta(seconds=5) + ) + # @workflow.signal # def set_signal_handler(self, signal_name: str) -> None: # def new_handler(arg: str) -> None: @@ -3644,3 +3661,43 @@ async def test_workflow_update_handlers_unhappy(client: Client): with pytest.raises(WorkflowUpdateFailedError) as err: await handle.update(UpdateHandlersWorkflow.runs_activity, "foo") assert isinstance(err.value.cause, CancelledError) + + # Incorrect args for handler + with pytest.raises(WorkflowUpdateFailedError) as err: + await handle.update("last_event", args=[121, "badarg"]) + assert isinstance(err.value.cause, ApplicationError) + assert ( + "UpdateHandlersWorkflow.last_event_validator() takes 2 positional arguments but 3 were given" + == err.value.cause.message + ) + + # Un-deserializeable nonsense + with pytest.raises(WorkflowUpdateFailedError) as err: + await handle.update( + "last_event", + arg=RawValue( + payload=Payload( + metadata={"encoding": b"u-dont-know-me"}, data=b"enchi-cat" + ) + ), + ) + assert isinstance(err.value.cause, ApplicationError) + assert "Failed decoding arguments" == err.value.cause.message + + +async def test_workflow_update_command_in_validator(client: Client): + # Need to not sandbox so behavior of validator can change based on global + async with new_worker( + client, UpdateHandlersWorkflow, workflow_runner=UnsandboxedWorkflowRunner() + ) as worker: + handle = await client.start_workflow( + UpdateHandlersWorkflow.run, + id=f"update-handlers-command-in-validator-{uuid.uuid4()}", + task_queue=worker.task_queue, + task_timeout=timedelta(seconds=1), + ) + + # This will produce a WFT failure which will eventually resolve and then this + # update will return + res = await handle.update(UpdateHandlersWorkflow.bad_validator) + assert res == "done"