diff --git a/.vscode/launch.json b/.vscode/launch.json index 9b034e6..a649d76 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,7 +12,7 @@ "env": {}, "args": [ "-x", - "tests/scheduler/test_scheduler.py::test_disabled_execution" + "tests/scheduler/test_scheduler.py::test_async_handler" ], "debugOptions": [ "RedirectOutput" diff --git a/docs/reference/dispatchers.md b/docs/reference/dispatchers.md new file mode 100644 index 0000000..41862b7 --- /dev/null +++ b/docs/reference/dispatchers.md @@ -0,0 +1,13 @@ +# Event Dispatchers + +A set of classes for dispatching events, they can be imported from `fluid.utils.dispatcher`: + +```python +from fluid.utils.dispatcher import Dispatcher +``` + +::: fluid.utils.dispatcher.BaseDispatcher + +::: fluid.utils.dispatcher.Dispatcher + +::: fluid.utils.dispatcher.AsyncDispatcher diff --git a/docs/reference/workers.md b/docs/reference/workers.md index d4dbad4..04d43fc 100644 --- a/docs/reference/workers.md +++ b/docs/reference/workers.md @@ -9,8 +9,18 @@ from fastapi.utils.worker import StoppingWorker ::: fluid.utils.worker.Worker +::: fluid.utils.worker.RunningWorker + ::: fluid.utils.worker.StoppingWorker ::: fluid.utils.worker.WorkerFunction +::: fluid.utils.worker.QueueConsumer + +::: fluid.utils.worker.QueueConsumerWorker + +::: fluid.utils.worker.AsyncConsumer + ::: fluid.utils.worker.Workers + +::: fluid.utils.worker.DynamicWorkers diff --git a/docs/tutorials/dispatchers.md b/docs/tutorials/dispatchers.md new file mode 100644 index 0000000..a012360 --- /dev/null +++ b/docs/tutorials/dispatchers.md @@ -0,0 +1,11 @@ +# Event Dispatchers + +Event dispatchers are a way to decouple the event source from the event handler. This is useful when you want to have multiple handlers for the same event, or when you want to have a single handler for multiple events. + +```python +from fluid.utils.dispatcher import SimpleDispatcher + +simple = SimpleDispatcher[Any]() + +simple.dispatch("you can dispatch anything to this generic dispatcher") +``` diff --git a/fluid/scheduler/consumer.py b/fluid/scheduler/consumer.py index 392c9ec..ff4bd63 100644 --- a/fluid/scheduler/consumer.py +++ b/fluid/scheduler/consumer.py @@ -5,14 +5,15 @@ from collections import defaultdict, deque from contextlib import AsyncExitStack from functools import partial -from typing import Any, Callable, Coroutine, Self +from typing import Any, Awaitable, Callable, Self import async_timeout from inflection import underscore +from typing_extensions import Annotated, Doc from fluid.utils import log -from fluid.utils.dispatcher import Dispatcher -from fluid.utils.worker import WorkerFunction, Workers +from fluid.utils.dispatcher import AsyncDispatcher, Dispatcher, Event +from fluid.utils.worker import AsyncConsumer, WorkerFunction, Workers from .broker import TaskBroker, TaskRegistry from .errors import TaskAbortedError, TaskRunError, UnknownTaskError @@ -31,13 +32,19 @@ TaskManagerCLI = None # type: ignore[assignment,misc] -AsyncExecutor = Callable[..., Coroutine[Any, Any, None]] -AsyncMessage = tuple[AsyncExecutor, tuple[Any, ...]] +AsyncHandler = Callable[[TaskRun], Awaitable[None]] logger = log.get_logger(__name__) class TaskDispatcher(Dispatcher[TaskRun]): + """The task dispatcher is responsible for dispatching task run messages""" + + def message_type(self, message: TaskRun) -> str: + return message.state + + +class AsyncTaskDispatcher(AsyncDispatcher[TaskRun]): def message_type(self, message: TaskRun) -> str: return message.state @@ -49,7 +56,16 @@ class TaskManager: def __init__(self, **kwargs: Any) -> None: self.state: dict[str, Any] = {} self.config: TaskManagerConfig = TaskManagerConfig(**kwargs) - self.dispatcher = TaskDispatcher() + self.dispatcher: Annotated[ + TaskDispatcher, + Doc( + """ + A dispatcher of task run events. + + Register handlers to listen for task run events. + """ + ), + ] = TaskDispatcher() self.broker = TaskBroker.from_url(self.config.broker_url) self._stack = AsyncExitStack() @@ -83,9 +99,7 @@ async def on_shutdown(self) -> None: await self.broker.close() def execute_sync(self, task: Task | str, **params: Any) -> TaskRun: - return asyncio.get_event_loop().run_until_complete( - self._execute_and_exit(task, **params) - ) + return asyncio.run(self._execute_and_exit(task, **params)) def register_task(self, task: Task) -> None: """Register a task with the task manager @@ -139,6 +153,19 @@ def register_from_module(self, module: Any) -> None: if isinstance(obj := getattr(module, name), Task): self.register_task(obj) + def register_async_handler(self, event: str, handler: AsyncHandler) -> None: + """Register an async handler for a given event + + This method is a no op for a TaskManager that is not a worker + """ + + def unregister_async_handler(self, event: Event | str) -> AsyncHandler | None: + """Unregister an async handler for a given event + + This method is a no op for a TaskManager that is not a worker + """ + return None + def cli(self, **kwargs: Any) -> Any: """Create the task manager command line interface""" try: @@ -163,6 +190,7 @@ class TaskConsumer(TaskManager, Workers): def __init__(self, **config: Any) -> None: super().__init__(**config) Workers.__init__(self) + self._async_dispatcher_worker = AsyncConsumer(AsyncTaskDispatcher()) self._concurrent_tasks: dict[str, dict[str, TaskRun]] = defaultdict(dict) self._task_to_queue: deque[str | Task] = deque() self._priority_task_run_queue: deque[TaskRun] = deque() @@ -170,6 +198,7 @@ def __init__(self, **config: Any) -> None: self._queue_task, name="queue-task-worker" ) self.add_workers(self._queue_tasks_worker) + self.add_workers(self._async_dispatcher_worker) for i in range(self.config.max_concurrent_tasks): worker_name = f"task-worker-{i+1}" self.add_workers( @@ -200,6 +229,17 @@ async def queue_and_wait( with TaskRunWaiter(self) as waiter: return await waiter.wait(await self.queue(task, **params), timeout=timeout) + def register_async_handler(self, event: Event | str, handler: AsyncHandler) -> None: + event = Event.from_string_or_event(event) + self.dispatcher.register_handler( + f"{event.type}.async_dispatch", + self._async_dispatcher_worker.send, + ) + self._async_dispatcher_worker.dispatcher.register_handler(event, handler) + + def unregister_async_handler(self, event: Event | str) -> AsyncHandler | None: + return self._async_dispatcher_worker.dispatcher.unregister_handler(event) + # Internals # process tasks from the internal queue diff --git a/fluid/scheduler/models.py b/fluid/scheduler/models.py index a450c45..b6fddf7 100644 --- a/fluid/scheduler/models.py +++ b/fluid/scheduler/models.py @@ -18,7 +18,7 @@ from fluid import settings from fluid.utils import kernel, log from fluid.utils.data import compact_dict -from fluid.utils.dates import utcnow +from fluid.utils.dates import as_utc from fluid.utils.text import create_uid, trim_docstring from .crontab import Scheduler @@ -223,7 +223,7 @@ def set_state( ) -> None: if self.state == state: return - state_time = state_time or utcnow() + state_time = as_utc(state_time) match (self.state, state): case (TaskState.init, TaskState.queued): self.queued = state_time @@ -262,7 +262,7 @@ def lock(self, timeout: float | None) -> Lock: return self.task_manager.broker.lock(self.name, timeout=timeout) def _dispatch(self) -> None: - self.task_manager.dispatcher.dispatch(self) + self.task_manager.dispatcher.dispatch(self.model_copy()) @dataclass diff --git a/fluid/utils/dates.py b/fluid/utils/dates.py index 66f4df1..79e7860 100644 --- a/fluid/utils/dates.py +++ b/fluid/utils/dates.py @@ -1,4 +1,4 @@ -from datetime import datetime, timezone +from datetime import date, datetime, timezone from typing import Any from zoneinfo import ZoneInfo @@ -10,8 +10,13 @@ def utcnow() -> datetime: return datetime.now(tz=UTC) -def as_utc(dt: datetime) -> datetime: - return dt.replace(tzinfo=UTC) +def as_utc(dt: date | None) -> datetime: + if dt is None: + return utcnow() + elif isinstance(dt, datetime): + return dt.replace(tzinfo=UTC) + else: + return datetime(dt.year, dt.month, dt.day, tzinfo=UTC) def isoformat(dt: datetime, **kwargs: Any) -> str: diff --git a/fluid/utils/dispatcher.py b/fluid/utils/dispatcher.py index 48f5e99..d32be2a 100644 --- a/fluid/utils/dispatcher.py +++ b/fluid/utils/dispatcher.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio from abc import ABC, abstractmethod from collections import defaultdict @@ -11,6 +13,12 @@ class Event(NamedTuple): type: str tag: str + @classmethod + def from_string_or_event(cls, event: str | Self) -> Self: + if isinstance(event, str): + return cls.from_string(event) + return event + @classmethod def from_string(cls, event: str) -> Self: bits = event.split(".") @@ -27,23 +35,23 @@ def __init__(self) -> None: def register_handler( self, - message_type: str, + event: Event | str, handler: MessageHandlerType, ) -> MessageHandlerType | None: - event = Event.from_string(message_type) + event = Event.from_string_or_event(event) previous = self._msg_handlers[event.type].get(event.tag) self._msg_handlers[event.type][event.tag] = handler return previous - def unregister_handler(self, message_type: str) -> MessageHandlerType | None: - event = Event.from_string(message_type) + def unregister_handler(self, event: Event | str) -> MessageHandlerType | None: + event = Event.from_string_or_event(event) return self._msg_handlers[event.type].pop(event.tag, None) def get_handlers( self, message: MessageType, ) -> dict[str, MessageHandlerType] | None: - message_type = self.message_type(message) + message_type = str(self.message_type(message)) return self._msg_handlers.get(message_type) @abstractmethod @@ -52,6 +60,8 @@ def message_type(self, message: MessageType) -> str: class Dispatcher(BaseDispatcher[MessageType, Callable[[MessageType], None]]): + """Dispatcher for sync handlers""" + def dispatch(self, message: MessageType) -> int: """dispatch the message""" handlers = self.get_handlers(message) @@ -64,8 +74,10 @@ def dispatch(self, message: MessageType) -> int: class AsyncDispatcher( BaseDispatcher[MessageType, Callable[[MessageType], Awaitable[None]]], ): + """Dispatcher for async handlers""" + async def dispatch(self, message: MessageType) -> int: - """Dispatch the message""" + """Dispatch the message and wait for all handlers to complete""" handlers = self.get_handlers(message) if handlers: await asyncio.gather(*[handler(message) for handler in handlers.values()]) diff --git a/fluid/utils/waiter.py b/fluid/utils/waiter.py index 635512f..22f7250 100644 --- a/fluid/utils/waiter.py +++ b/fluid/utils/waiter.py @@ -1,11 +1,9 @@ import asyncio from typing import Callable -import async_timeout - async def wait_for(assertion: Callable[[], bool], timeout: float = 1.0) -> None: - async with async_timeout.timeout(timeout): + async with asyncio.timeout(timeout): while True: if assertion(): return diff --git a/fluid/utils/worker.py b/fluid/utils/worker.py index e14d93a..a3e1f18 100644 --- a/fluid/utils/worker.py +++ b/fluid/utils/worker.py @@ -41,10 +41,12 @@ def __init__(self, name: str = "") -> None: @property def worker_name(self) -> str: + """The name of the worker""" return self._name @property def num_workers(self) -> int: + """The number of workers in this worker""" return 1 @abstractmethod @@ -71,6 +73,7 @@ async def run(self) -> None: class RunningWorker(Worker): + """A Worker that can be started""" def __init__(self, name: str = "") -> None: super().__init__(name) @@ -110,6 +113,8 @@ async def status(self) -> dict: class WorkerFunction(StoppingWorker): + """A Worker that runs a coroutine function""" + def __init__( self, run_function: Callable[[], Awaitable[None]], @@ -135,7 +140,10 @@ def send(self, message: T | None) -> None: ... class QueueConsumer(StoppingWorker, MessageProducer[MessageType]): - """A Worker that can receive messages""" + """A Worker that can receive messages + + This worker can receive messages but not consume them. + """ def __init__(self, name: str = "") -> None: super().__init__(name=name) @@ -143,7 +151,7 @@ def __init__(self, name: str = "") -> None: async def get_message(self, timeout: float = 0.5) -> MessageType | None: try: - async with async_timeout.timeout(timeout): + async with asyncio.timeout(timeout): return await self._queue.get() except asyncio.TimeoutError: return None @@ -166,6 +174,8 @@ def send(self, message: MessageType | None) -> None: class QueueConsumerWorker(QueueConsumer[MessageType]): + """A Worker that can receive and consume messages""" + def __init__( self, on_message: Callable[[MessageType], Awaitable[None]], diff --git a/mkdocs.yml b/mkdocs.yml index feae73e..db06fcc 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -19,6 +19,8 @@ theme: toggle: icon: material/lightbulb-outline name: Switch to light mode + features: + - content.code.copy plugins: search: null mkdocstrings: @@ -41,7 +43,12 @@ plugins: show_symbol_type_heading: true show_symbol_type_toc: true markdown_extensions: - toc: - permalink: true - markdown.extensions.codehilite: - guess_lang: false + - pymdownx.highlight: + anchor_linenums: true + line_spans: __span + pygments_lang_class: true + - pymdownx.inlinehilite + - pymdownx.snippets + - pymdownx.superfences + - toc: + permalink: true diff --git a/poetry.lock b/poetry.lock index 00f5ffb..1c4e1e9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2903,4 +2903,4 @@ log = ["python-json-logger"] [metadata] lock-version = "2.0" python-versions = ">=3.11" -content-hash = "e39bfea99e09f0c8640b81b462346f4a034a8635766e6041e2febd8637eedbfa" +content-hash = "25352f66a5395461a6d79406ca9d67149640f892bacdbbff2202e2981909ef3e" diff --git a/pyproject.toml b/pyproject.toml index 4e4f271..c971da5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aio-fluid" -version = "1.1.1" +version = "1.1.2" description = "Tools for backend python services" license = "BSD" authors = ["Luca "] @@ -34,6 +34,7 @@ asyncpg = {version = "^0.29.0", optional = true} psycopg2-binary = {version = "^2.9.9", optional = true} httpx = {version = "^0.27.2", optional = true} prometheus-client = {version = "^0.21.0", optional = true} +typing-extensions = "^4.12.2" [tool.poetry.group.dev.dependencies] pytest = "^8.1.1" diff --git a/readme.md b/readme.md index aea5288..f8288c0 100644 --- a/readme.md +++ b/readme.md @@ -5,6 +5,7 @@ Async utilities for backend python services developed by [Quantmind](https://qua [![PyPI version](https://badge.fury.io/py/aio-fluid.svg)](https://badge.fury.io/py/aio-fluid) [![Python versions](https://img.shields.io/pypi/pyversions/aio-fluid.svg)](https://pypi.org/project/aio-fluid) [![build](https://github.com/quantmind/fluid/workflows/build/badge.svg)](https://github.com/quantmind/aio-fluid/actions?query=workflow%3Abuild) +[![codecov](https://codecov.io/gh/quantmind/aio-fluid/graph/badge.svg?token=81oWUoyEVp)](https://codecov.io/gh/quantmind/aio-fluid) **Documentation**: [https://quantmind.github.io/aio-fluid](https://quantmind.github.io/aio-fluid) diff --git a/tests/scheduler/test_scheduler.py b/tests/scheduler/test_scheduler.py index 427b56e..350fc5d 100644 --- a/tests/scheduler/test_scheduler.py +++ b/tests/scheduler/test_scheduler.py @@ -108,3 +108,23 @@ async def test_disabled_execution(task_scheduler: TaskScheduler) -> None: assert task_run.name == "disabled" assert task_run.end assert task_run.state == TaskState.aborted.name + + +@dataclass +class AsynHandler: + task_run: TaskRun | None = None + + async def __call__(self, task_run: TaskRun) -> None: + await asyncio.sleep(0.1) + self.task_run = task_run + + +async def test_async_handler(task_scheduler: TaskScheduler) -> None: + handler = AsynHandler() + task_scheduler.register_async_handler("running.test", handler) + task_run = await task_scheduler.queue_and_wait("dummy") + assert task_run.state == TaskState.success + await wait_for(lambda: handler.task_run is not None) + assert handler.task_run + assert handler.task_run.state == TaskState.running + assert task_scheduler.unregister_async_handler("running.test") is handler