From ded3747385e4ca3b24a0deb2b13bc2ee7d1b05e2 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Fri, 6 Oct 2023 11:35:47 -0500 Subject: [PATCH] Encourage threaded activities, warn when max_workers too low, and other small changes (#387) --- README.md | 75 ++++++++++++++++++++++------------- temporalio/client.py | 2 + temporalio/runtime.py | 2 + temporalio/worker/_worker.py | 25 +++++++++--- tests/worker/test_activity.py | 75 +++++++++++++++++++++++++---------- tests/worker/test_workflow.py | 10 ----- 6 files changed, 127 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index 8d70d7d8..83b99532 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,9 @@ execute asynchronous, long-running business logic in a scalable and resilient wa "Temporal Python SDK" is the framework for authoring workflows and activities using the Python programming language. Also see: -* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) - Once you've tried our [Quick Start](#quick-start), check out our guide on how to use Temporal in your Python applications, including information around Temporal core concepts. +* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) - Once you've tried our + [Quick Start](#quick-start), check out our guide on how to use Temporal in your Python applications, including + information around Temporal core concepts. * [Python Code Samples](https://github.com/temporalio/samples-python) * [API Documentation](https://python.temporal.io) - Complete Temporal Python SDK Package reference. @@ -84,10 +86,10 @@ informal introduction to the features and their implementation. - [Activities](#activities) - [Definition](#definition-1) - [Types of Activities](#types-of-activities) - - [Asynchronous Activities](#asynchronous-activities) - [Synchronous Activities](#synchronous-activities) - [Synchronous Multithreaded Activities](#synchronous-multithreaded-activities) - [Synchronous Multiprocess/Other Activities](#synchronous-multiprocessother-activities) + - [Asynchronous Activities](#asynchronous-activities) - [Activity Context](#activity-context) - [Heartbeating and Cancellation](#heartbeating-and-cancellation) - [Worker Shutdown](#worker-shutdown) @@ -111,7 +113,9 @@ informal introduction to the features and their implementation. # Quick Start -We will guide you through the Temporal basics to create a "hello, world!" script on your machine. It is not intended as one of the ways to use Temporal, but in reality it is very simplified and decidedly not "the only way" to use Temporal. For more information, check out the docs references in "Next Steps" below the quick start. +We will guide you through the Temporal basics to create a "hello, world!" script on your machine. It is not intended as +one of the ways to use Temporal, but in reality it is very simplified and decidedly not "the only way" to use Temporal. +For more information, check out the docs references in "Next Steps" below the quick start. ## Installation @@ -136,7 +140,7 @@ Create the following in `activities.py`: from temporalio import activity @activity.defn -async def say_hello(name: str) -> str: +def say_hello(name: str) -> str: return f"Hello, {name}!" ``` @@ -163,6 +167,7 @@ Create the following in `run_worker.py`: ```python import asyncio +import concurrent.futures from temporalio.client import Client from temporalio.worker import Worker @@ -175,8 +180,15 @@ async def main(): client = await Client.connect("localhost:7233") # Run the worker - worker = Worker(client, task_queue="my-task-queue", workflows=[SayHello], activities=[say_hello]) - await worker.run() + with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor: + worker = Worker( + client, + task_queue="my-task-queue", + workflows=[SayHello], + activities=[say_hello], + activity_executor=activity_executor, + ) + await worker.run() if __name__ == "__main__": asyncio.run(main()) @@ -235,8 +247,9 @@ give you much more information about how Temporal works with Python: # Usage -From here, you will find reference documentation about specific pieces of the Temporal Python SDK that were built around Temporal concepts. -*This section is not intended as a how-to guide* -- For more how-to oriented information, check out the links in the [Next Steps](#next-steps) section above. +From here, you will find reference documentation about specific pieces of the Temporal Python SDK that were built around +Temporal concepts. *This section is not intended as a how-to guide* -- For more how-to oriented information, check out +the links in the [Next Steps](#next-steps) section above. ### Client @@ -269,6 +282,7 @@ Some things to note about the above code: does the same thing * Clients can have many more options not shown here (e.g. data converters and interceptors) * A string can be used instead of the method reference to call a workflow by name (e.g. if defined in another language) +* Clients to not work across forks Clients also provide a shallow copy of their config for use in making slightly different clients backed by the same connection. For instance, given the `client` above, this is how to have a client in another namespace: @@ -516,7 +530,7 @@ class GreetingInfo: name: str = "" @activity.defn -async def create_greeting_activity(info: GreetingInfo) -> str: +def create_greeting_activity(info: GreetingInfo) -> str: return f"{info.salutation}, {info.name}!" ``` @@ -1044,13 +1058,14 @@ Activities are decorated with `@activity.defn` like so: from temporalio import activity @activity.defn -async def say_hello_activity(name: str) -> str: +def say_hello_activity(name: str) -> str: return f"Hello, {name}!" ``` Some things to note about activity definitions: -* The `say_hello_activity` is `async` which is the recommended activity type (see "Types of Activities" below) +* The `say_hello_activity` is synchronous which is the recommended activity type (see "Types of Activities" below), but + it can be `async` * A custom name for the activity can be set with a decorator argument, e.g. `@activity.defn(name="my activity")` * Long running activities should regularly heartbeat and handle cancellation * Activities can only have positional arguments. Best practice is to only take a single argument that is an @@ -1066,19 +1081,8 @@ Some things to note about activity definitions: #### Types of Activities -There are 3 types of activity callables accepted and described below: asynchronous, synchronous multithreaded, and -synchronous multiprocess/other. Only positional parameters are allowed in activity callables. - -##### Asynchronous Activities - -Asynchronous activities, i.e. functions using `async def`, are the recommended activity type. When using asynchronous -activities no special worker parameters are needed. - -Cancellation for asynchronous activities is done via -[`asyncio.Task.cancel`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel). This means that -`asyncio.CancelledError` will be raised (and can be caught, but it is not recommended). A non-local activity must -heartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and -"Heartbeating and Cancellation" later). +There are 3 types of activity callables accepted and described below: synchronous multithreaded, synchronous +multiprocess/other, and asynchronous. Only positional parameters are allowed in activity callables. ##### Synchronous Activities @@ -1102,8 +1106,9 @@ will fail and shutdown. ###### Synchronous Multithreaded Activities If `activity_executor` is set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous activities -are considered multithreaded activities. Besides `activity_executor`, no other worker parameters are required for -synchronous multithreaded activities. +are considered multithreaded activities. If `max_workers` is not set to at least the worker's +`max_concurrent_activities` setting a warning will be issued. Besides `activity_executor`, no other worker parameters +are required for synchronous multithreaded activities. By default, cancellation of a synchronous multithreaded activity is done via a `temporalio.exceptions.CancelledError` thrown into the activity thread. Activities that do not wish to have cancellation thrown can set @@ -1118,6 +1123,8 @@ there is a return statement within, it will throw the cancellation if there was If `activity_executor` is set to an instance of `concurrent.futures.Executor` that is _not_ `concurrent.futures.ThreadPoolExecutor`, then the synchronous activities are considered multiprocess/other activities. +Users should prefer threaded activities over multiprocess ones since, among other reasons, threaded activities can raise +on cancellation. These require special primitives for heartbeating and cancellation. The `shared_state_manager` worker parameter must be set to an instance of `temporalio.worker.SharedStateManager`. The most common implementation can be created by passing a @@ -1127,6 +1134,20 @@ set to an instance of `temporalio.worker.SharedStateManager`. The most common im Also, all of these activity functions must be ["picklable"](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled). +##### Asynchronous Activities + +Asynchronous activities are functions defined with `async def`. Asynchronous activities are often much more performant +than synchronous ones. When using asynchronous activities no special worker parameters are needed. + +**⚠️ WARNING: Do not block the thread in `async def` Python functions. This can stop the processing of the rest of the +Temporal.** + +Cancellation for asynchronous activities is done via +[`asyncio.Task.cancel`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel). This means that +`asyncio.CancelledError` will be raised (and can be caught, but it is not recommended). A non-local activity must +heartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and +"Heartbeating and Cancellation" later). + #### Activity Context During activity execution, an implicit activity context is set as a @@ -1155,7 +1176,7 @@ occurs. Synchronous activities cannot call any of the `async` functions. In order for a non-local activity to be notified of cancellation requests, it must be given a `heartbeat_timeout` at invocation time and invoke `temporalio.activity.heartbeat()` inside the activity. It is strongly recommended that all but the fastest executing activities call this function regularly. "Types of Activities" has specifics on cancellation -for asynchronous and synchronous activities. +for synchronous and asynchronous activities. In addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the server for retrieval during activity retry. If an activity calls `temporalio.activity.heartbeat(123, 456)` and then fails and diff --git a/temporalio/client.py b/temporalio/client.py index 1a3ffde6..03db102e 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -80,6 +80,8 @@ class Client: than where it was created, make sure the event loop where it was created is captured, and then call :py:func:`asyncio.run_coroutine_threadsafe` with the client call and that event loop. + + Clients do not work across forks since runtimes do not work across forks. """ @staticmethod diff --git a/temporalio/runtime.py b/temporalio/runtime.py index eb494bcf..b2650e9d 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -25,6 +25,8 @@ class Runtime: Users are encouraged to use :py:meth:`default`. It can be set with :py:meth:`set_default`. Every time a new runtime is created, a new internal thread pool is created. + + Runtimes do not work across forks. """ @staticmethod diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index a975936e..b6205d3c 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -7,6 +7,7 @@ import hashlib import logging import sys +import warnings from datetime import timedelta from typing import Any, Awaitable, Callable, List, Optional, Sequence, Type, cast @@ -93,11 +94,14 @@ def __init__( workflows: Set of workflow classes decorated with :py:func:`@workflow.defn`. activity_executor: Concurrent executor to use for non-async - activities. This is required if any activities are non-async. If - this is a :py:class:`concurrent.futures.ProcessPoolExecutor`, - all non-async activities must be picklable. Note, a broken - executor failure from this executor will cause the worker to - fail and shutdown. + activities. This is required if any activities are non-async. + :py:class:`concurrent.futures.ThreadPoolExecutor` is + recommended. If this is a + :py:class:`concurrent.futures.ProcessPoolExecutor`, all + non-async activities must be picklable. ``max_workers`` on the + executor should at least be ``max_concurrent_activities`` or a + warning is issued. Note, a broken-executor failure from this + executor will cause the worker to fail and shutdown. workflow_task_executor: Thread pool executor for workflow tasks. If this is not present, a new :py:class:`concurrent.futures.ThreadPoolExecutor` will be @@ -262,6 +266,17 @@ def __init__( self._activity_worker: Optional[_ActivityWorker] = None runtime = bridge_client.config.runtime or temporalio.runtime.Runtime.default() if activities: + # Issue warning here if executor max_workers is lower than max + # concurrent activities. We do this here instead of in + # _ActivityWorker so the stack level is predictable. + max_workers = getattr(activity_executor, "_max_workers", None) + if isinstance(max_workers, int) and max_workers < max_concurrent_activities: + warnings.warn( + f"Worker max_concurrent_activities is {max_concurrent_activities} " + + f"but activity_executor's max_workers is only {max_workers}", + stacklevel=2, + ) + self._activity_worker = _ActivityWorker( bridge_worker=lambda: self._bridge_worker, task_queue=task_queue, diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index ac17ee87..4e70e797 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -52,6 +52,8 @@ multiprocessing.Manager() ) +default_max_concurrent_activities = 50 + async def test_activity_hello(client: Client, worker: ExternalWorker): @activity.defn @@ -141,13 +143,19 @@ async def test_sync_activity_thread(client: Client, worker: ExternalWorker): def some_activity() -> str: return f"activity name: {activity.info().activity_type}" + # We intentionally leave max_workers by default in the thread pool executor + # to confirm that the warning is triggered with concurrent.futures.ThreadPoolExecutor() as executor: - result = await _execute_workflow_with_activity( - client, - worker, - some_activity, - worker_config={"activity_executor": executor}, - ) + with pytest.warns( + UserWarning, + match=f"Worker max_concurrent_activities is {default_max_concurrent_activities} but activity_executor's max_workers is only", + ): + result = await _execute_workflow_with_activity( + client, + worker, + some_activity, + worker_config={"activity_executor": executor}, + ) assert result.result == "activity name: some_activity" @@ -157,13 +165,19 @@ def picklable_activity() -> str: async def test_sync_activity_process(client: Client, worker: ExternalWorker): + # We intentionally leave max_workers by default in the process pool executor + # to confirm that the warning is triggered with concurrent.futures.ProcessPoolExecutor() as executor: - result = await _execute_workflow_with_activity( - client, - worker, - picklable_activity, - worker_config={"activity_executor": executor}, - ) + with pytest.warns( + UserWarning, + match=f"Worker max_concurrent_activities is {default_max_concurrent_activities} but activity_executor's max_workers is only", + ): + result = await _execute_workflow_with_activity( + client, + worker, + picklable_activity, + worker_config={"activity_executor": executor}, + ) assert result.result == "activity name: picklable_activity" @@ -288,7 +302,9 @@ def wait_cancel() -> str: assert activity.is_cancelled() return "Cancelled" - with concurrent.futures.ThreadPoolExecutor() as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=default_max_concurrent_activities + ) as executor: result = await _execute_workflow_with_activity( client, worker, @@ -311,7 +327,9 @@ def wait_cancel() -> NoReturn: activity.heartbeat() with pytest.raises(WorkflowFailureError) as err: - with concurrent.futures.ThreadPoolExecutor() as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=default_max_concurrent_activities + ) as executor: await _execute_workflow_with_activity( client, worker, @@ -339,7 +357,9 @@ def wait_cancel() -> str: activity.heartbeat() return "Cancelled" - with concurrent.futures.ThreadPoolExecutor() as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=default_max_concurrent_activities + ) as executor: result = await _execute_workflow_with_activity( client, worker, @@ -372,7 +392,9 @@ def wait_cancel() -> None: events.append("post1") with pytest.raises(WorkflowFailureError) as err: - with concurrent.futures.ThreadPoolExecutor() as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=default_max_concurrent_activities + ) as executor: await _execute_workflow_with_activity( client, worker, @@ -424,10 +446,13 @@ def new_worker() -> Worker: activities=[sync_activity_wait_cancel], workflows=[CancelOnWorkerShutdownWorkflow], activity_executor=executor, + max_concurrent_activities=default_max_concurrent_activities, max_cached_workflows=0, ) - with concurrent.futures.ThreadPoolExecutor() as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=default_max_concurrent_activities + ) as executor: async with new_worker(): # Start the workflow handle = await client.start_workflow( @@ -678,7 +703,9 @@ def picklable_heartbeat_details_activity() -> str: async def test_sync_activity_thread_heartbeat_details( client: Client, worker: ExternalWorker ): - with concurrent.futures.ThreadPoolExecutor() as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=default_max_concurrent_activities + ) as executor: result = await _execute_workflow_with_activity( client, worker, @@ -890,6 +917,7 @@ async def test_sync_activity_process_worker_shutdown_graceful( task_queue=act_task_queue, activities=[picklable_wait_on_event], activity_executor=executor, + max_concurrent_activities=default_max_concurrent_activities, graceful_shutdown_timeout=timedelta(seconds=2), shared_state_manager=_default_shared_state_manager, ) @@ -946,6 +974,7 @@ async def test_sync_activity_process_executor_crash( task_queue=act_task_queue, activities=[kill_my_process], activity_executor=executor, + max_concurrent_activities=default_max_concurrent_activities, graceful_shutdown_timeout=timedelta(seconds=2), shared_state_manager=_default_shared_state_manager, ) @@ -1110,7 +1139,9 @@ async def test_sync_activity_contextvars(client: Client, worker: ExternalWorker) def some_activity() -> str: return f"context var: {some_context_var.get()}" - with concurrent.futures.ThreadPoolExecutor() as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=default_max_concurrent_activities + ) as executor: result = await _execute_workflow_with_activity( client, worker, @@ -1191,7 +1222,9 @@ async def async_dyn_activity(args: Sequence[RawValue]) -> DynActivityValue: async def test_sync_activity_dynamic_thread(client: Client, worker: ExternalWorker): - with concurrent.futures.ThreadPoolExecutor() as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=default_max_concurrent_activities + ) as executor: result = await _execute_workflow_with_activity( client, worker, @@ -1268,6 +1301,8 @@ async def _execute_workflow_with_activity( worker_config["task_queue"] = str(uuid.uuid4()) worker_config["activities"] = [fn] + additional_activities worker_config["shared_state_manager"] = _default_shared_state_manager + if not worker_config.get("max_concurrent_activities"): + worker_config["max_concurrent_activities"] = default_max_concurrent_activities async with Worker(**worker_config): try: handle = await client.start_workflow( diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index eab77e30..666863e2 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -2267,11 +2267,6 @@ async def run(self) -> None: async def test_workflow_patch(client: Client): - # TODO(cretz): Patches have issues on older servers since core needs patch - # metadata support for some fixes. Unskip for local server only once we - # upgrade to https://github.com/temporalio/sdk-python/issues/272. - pytest.skip("Needs SDK metadata support") - workflow_run = PrePatchWorkflow.run task_queue = str(uuid.uuid4()) @@ -2360,11 +2355,6 @@ async def run(self) -> List[str]: async def test_workflow_patch_memoized(client: Client): - # TODO(cretz): Patches have issues on older servers since core needs patch - # metadata support for some fixes. Unskip for local server only once we - # upgrade to https://github.com/temporalio/sdk-python/issues/272. - pytest.skip("Needs SDK metadata support") - # Start a worker with the workflow unpatched and wait until halfway through. # Need to disable workflow cache since we restart the worker and don't want # to pay the sticky queue penalty.