From de4efeaeb9afb2c7eb517ec915db310c825cf9e2 Mon Sep 17 00:00:00 2001 From: isra17 Date: Wed, 25 Dec 2019 15:14:29 -0500 Subject: [PATCH] Initial commit --- MANIFEST.in | 7 + README.md | 0 pyproject.toml | 7 + setup.cfg | 56 +++++++ setup.py | 95 +++++++++++ src/dramatiq_abort/__init__.py | 6 + src/dramatiq_abort/backend.py | 21 +++ src/dramatiq_abort/backends/__init__.py | 15 ++ src/dramatiq_abort/backends/redis.py | 49 ++++++ src/dramatiq_abort/backends/stub.py | 36 +++++ src/dramatiq_abort/middleware.py | 152 +++++++++++++++++ tests/__init__.py | 0 tests/backends/__init__.py | 0 tests/backends/test_backends.py | 14 ++ tests/backends/test_redis.py | 7 + tests/conftest.py | 77 +++++++++ tests/test_abortable.py | 207 ++++++++++++++++++++++++ tox.ini | 19 +++ 18 files changed, 768 insertions(+) create mode 100644 MANIFEST.in create mode 100644 README.md create mode 100644 pyproject.toml create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 src/dramatiq_abort/__init__.py create mode 100644 src/dramatiq_abort/backend.py create mode 100644 src/dramatiq_abort/backends/__init__.py create mode 100644 src/dramatiq_abort/backends/redis.py create mode 100644 src/dramatiq_abort/backends/stub.py create mode 100644 src/dramatiq_abort/middleware.py create mode 100644 tests/__init__.py create mode 100644 tests/backends/__init__.py create mode 100644 tests/backends/test_backends.py create mode 100644 tests/backends/test_redis.py create mode 100644 tests/conftest.py create mode 100644 tests/test_abortable.py create mode 100644 tox.ini diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..346766b --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,7 @@ +include pyproject.toml + +# Include the README +include *.md + +# Include the license file +include LICENSE diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..3f55061 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,7 @@ +[build-system] +requires = ["setuptools>=40.8.0", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.black] +line-length = 88 +target-version = ['py35', 'py36', 'py37', 'py38'] diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..688ae40 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,56 @@ +[metadata] +license_files = LICENSE + +[tool:pytest] +testpaths = tests +addopts = --cov dramatiq_abort --cov-report html + +[pep8] +max-line-length = 88 + +[flake8] +max-complexity = 18 +max-line-length = 80 +inline-quotes = double +multiline-quotes = double +ignore = E203, E266, E501, W503 +select = B,C,E,F,W,T4,B9 + + +[isort] +not_skip = __init__.py +known_first_party = dramatiq_abort +order_by_type = true +multi_line_output=3 +include_trailing_comma=True +force_grid_wrap=0 +use_parentheses=True +line_length=88 + +[mypy] +python_version=3.7 +platform=linux + +# flake8-mypy expects the two following for sensible formatting +show_column_numbers=True + +# show error messages from unrelated files +follow_imports=normal + +# suppress errors about unsatisfied imports +ignore_missing_imports=True + +# be strict +disallow_untyped_calls=True +warn_return_any=True +strict_optional=True +warn_no_return=True +warn_redundant_casts=True +warn_unused_ignores=True +disallow_any_generics=True + +# The following are off by default. Flip them on if you feel +# adventurous. +disallow_untyped_defs=True +check_untyped_defs=True + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..1d03c31 --- /dev/null +++ b/setup.py @@ -0,0 +1,95 @@ +# Dramatiq-abort is a middleware to abort Dramatiq tasks. +# Copyright (C) 2019 Flare Systems Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +import os + +from setuptools import find_packages, setup + +here = os.path.abspath(os.path.dirname(__file__)) + + +def rel(*xs: str) -> str: + return os.path.join(here, *xs) + + +with open(rel("README.md")) as f: + long_description = f.read() + + +with open(rel("src", "dramatiq_abort", "__init__.py"), "r") as f: + version_marker = "__version__ = " + for line in f: + if line.startswith(version_marker): + _, version = line.split(version_marker) + version = version.strip().strip('"') + break + else: + raise RuntimeError("Version marker not found.") + + +dependencies = [ + "dramatiq", +] + +extra_dependencies = { + "redis": ["redis>=2.0,<4.0"], +} + +extra_dependencies["all"] = list(set(sum(extra_dependencies.values(), []))) +extra_dependencies["dev"] = extra_dependencies["all"] + [ + # Tools + "black", + # Linting + "flake8", + "flake8-bugbear", + "flake8-quotes", + "isort", + "mypy", + # Testing + "pytest", + "pytest-cov", + "tox", +] + +setup( + name="dramatiq-abort", + version=version, + author="Flare Systems Inc.", + author_email="oss@flare.systems", + description="Dramatiq middleware to abort tasks.", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/flared/dramatiq_abort", + packages=find_packages(where="src"), + package_dir={"": "src"}, + include_package_data=True, + install_requires=dependencies, + python_requires=">=3.5", + extras_require=extra_dependencies, + zip_safe=False, + classifiers=[ + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3 :: Only", + "Development Status :: 4 - Beta", + "Topic :: System :: Distributed Computing", + ( + "License :: OSI Approved :: " + "GNU Lesser General Public License v3 or later (LGPLv3+)", + ), + ], +) diff --git a/src/dramatiq_abort/__init__.py b/src/dramatiq_abort/__init__.py new file mode 100644 index 0000000..6e8c39f --- /dev/null +++ b/src/dramatiq_abort/__init__.py @@ -0,0 +1,6 @@ +__version__ = "0.1beta" + +from .backend import EventBackend +from .middleware import Abort, Abortable, abort + +__all__ = ["EventBackend", "Abortable", "Abort", "abort"] diff --git a/src/dramatiq_abort/backend.py b/src/dramatiq_abort/backend.py new file mode 100644 index 0000000..ce530e4 --- /dev/null +++ b/src/dramatiq_abort/backend.py @@ -0,0 +1,21 @@ +import abc +from typing import List, Optional + + +class EventBackend(abc.ABC): + """ABC for event backends. + """ + + @abc.abstractmethod + def wait_many( + self, keys: List[bytes], timeout: int + ) -> Optional[bytes]: # pragma: no cover + raise NotImplementedError + + @abc.abstractmethod + def poll(self, key: bytes) -> bool: # pragma: no cover + raise NotImplementedError + + @abc.abstractmethod + def notify(self, key: bytes, ttl: int) -> None: # pragma: no cover + raise NotImplementedError diff --git a/src/dramatiq_abort/backends/__init__.py b/src/dramatiq_abort/backends/__init__.py new file mode 100644 index 0000000..9ad6a7d --- /dev/null +++ b/src/dramatiq_abort/backends/__init__.py @@ -0,0 +1,15 @@ +import warnings + +from .stub import StubBackend + +try: + from .redis import RedisBackend +except ImportError: # pragma: no cover + warnings.warn( + "RedisBackend is not available. Run `pip install dramatiq[redis]` " + "to add support for that backend.", + ImportWarning, + ) + + +__all__ = ["StubBackend", "RedisBackend"] diff --git a/src/dramatiq_abort/backends/redis.py b/src/dramatiq_abort/backends/redis.py new file mode 100644 index 0000000..92f0934 --- /dev/null +++ b/src/dramatiq_abort/backends/redis.py @@ -0,0 +1,49 @@ +from typing import Any, List, Optional + +import redis + +from ..backend import EventBackend + + +class RedisBackend(EventBackend): + """A event backend for Redis_. + + Parameters: + client(Redis): An optional client. If this is passed, + then all other parameters are ignored. + url(str): An optional connection URL. If both a URL and + connection paramters are provided, the URL is used. + **parameters(dict): Connection parameters are passed directly + to :class:`redis.Redis`. + + .. _redis: https://redis.io + """ + + def __init__(self, *, client: Any) -> None: + self.client = client + + @classmethod + def from_url(cls, url: str) -> "RedisBackend": + return cls( + client=redis.StrictRedis(connection_pool=redis.ConnectionPool.from_url(url)) + ) + + def wait_many(self, keys: List[bytes], timeout: int) -> Optional[bytes]: + assert timeout is None or timeout >= 1000, "wait timeouts must be >= 1000" + event = self.client.blpop(keys, (timeout or 0) // 1000) + if event is None: + return None + key, value = event + if value != b"x": + return None + return key + + def poll(self, key: bytes) -> bool: + event = self.client.lpop(key) + return event == b"x" + + def notify(self, key: bytes, ttl: int) -> None: + with self.client.pipeline() as pipe: + pipe.rpush(key, b"x") + pipe.pexpire(key, ttl) + pipe.execute() diff --git a/src/dramatiq_abort/backends/stub.py b/src/dramatiq_abort/backends/stub.py new file mode 100644 index 0000000..bffad15 --- /dev/null +++ b/src/dramatiq_abort/backends/stub.py @@ -0,0 +1,36 @@ +from threading import Condition +from typing import List, Optional, Set + +from ..backend import EventBackend + + +class StubBackend(EventBackend): + def __init__(self) -> None: + self.condition = Condition() + self.events: Set[bytes] = set() + + def wait_many(self, keys: List[bytes], timeout: int) -> Optional[bytes]: + with self.condition: + if self.condition.wait_for( + lambda: self._anyset(keys), timeout=timeout / 1000 + ): + for key in keys: + if key in self.events: + self.events.remove(key) + return key + return None + + def poll(self, key: bytes) -> bool: + with self.condition: + if key in self.events: + self.events.remove(key) + return True + return False + + def notify(self, key: bytes, ttl: int) -> None: + with self.condition: + self.events.add(key) + self.condition.notify_all() + + def _anyset(self, keys: List[bytes]) -> bool: + return any(k in self.events for k in keys) diff --git a/src/dramatiq_abort/middleware.py b/src/dramatiq_abort/middleware.py new file mode 100644 index 0000000..9f29d47 --- /dev/null +++ b/src/dramatiq_abort/middleware.py @@ -0,0 +1,152 @@ +import threading +import time +import warnings +from threading import Thread +from typing import Any, Dict, Optional, Set + +import dramatiq +from dramatiq import get_broker +from dramatiq.logging import get_logger +from dramatiq.middleware import Middleware, SkipMessage +from dramatiq.middleware.threading import ( + Interrupt, + current_platform, + raise_thread_exception, + supported_platforms, +) + +from .backend import EventBackend + + +class Abort(Interrupt): + """Exception used to interrupt worker threads when their worker + processes have been signaled to abort. + """ + + +class Abortable(Middleware): + """Middleware that interrupts actors whose job has been signaled for + termination. + Currently, this is only available on CPython. + + Note: + This works by setting an async exception in the worker thread + that runs the actor. This means that the exception will only get + called the next time that thread acquires the GIL. Concretely, + this means that this middleware can't cancel system calls. + + Parameters: + abortable(bool): When true, the actor will be interrupted + if the task was aborted. + """ + + def __init__(self, *, backend: EventBackend, abortable: bool = True): + self.logger = get_logger(__name__, type(self)) + self.abortable = abortable + self.backend = backend + self.wait_timeout = 1000 + self.abort_ttl = 90000 + self.abortables: Dict[str, int] = {} + # This lock avoid race between the monitor and a task cleaning up. + self.lock = threading.Lock() + + @property + def actor_options(self) -> Set[str]: + return {"abortable"} + + def is_abortable(self, actor: dramatiq.Actor, message: dramatiq.Message) -> bool: + abortable = message.options.get("abortable") + if abortable is None: + abortable = actor.options.get("abortable") + if abortable is None: + abortable = self.abortable + return bool(abortable) + + def after_process_boot(self, broker: dramatiq.Broker) -> None: + if current_platform in supported_platforms: + thread = Thread(target=self._watcher, daemon=True) + thread.start() + else: # pragma: no cover + msg = "Abortable cannot kill threads on your current platform (%r)." + warnings.warn(msg % current_platform, category=RuntimeWarning, stacklevel=2) + + def before_process_message( + self, broker: dramatiq.Broker, message: dramatiq.Message + ) -> None: + actor = broker.get_actor(message.actor_name) + if not self.is_abortable(actor, message): + return + + if self.backend.poll(self.id_to_key(message.message_id)): + raise SkipMessage() + + self.abortables[message.message_id] = threading.get_ident() + + def after_process_message( + self, + broker: dramatiq.Broker, + message: dramatiq.Message, + *, + result: Optional[Any] = None, + exception: Optional[BaseException] = None + ) -> None: + with self.lock: + self.abortables.pop(message.message_id, None) + + after_skip_message = after_process_message + + def abort(self, message_id: str) -> None: + self.backend.notify(self.id_to_key(message_id), ttl=self.abort_ttl) + + def _handle(self) -> None: + message_ids = self.abortables.keys() + if not message_ids: + time.sleep(self.wait_timeout / 1000) + return + + abort_keys = [self.id_to_key(id_) for id_ in message_ids] + key = self.backend.wait_many(abort_keys, self.wait_timeout) + if not key: + return + + message_id = self.key_to_id(key) + with self.lock: + thread_id = self.abortables.pop(message_id, None) + # In case the task was done in between the polling and now. + if thread_id is None: + return # pragma: no cover + + self.logger.info( + "Aborting task. Raising exception in worker thread %r.", thread_id + ) + raise_thread_exception(thread_id, Abort) + + def _watcher(self) -> None: + while True: + try: + print("_handle") + self._handle() + except Exception: # pragma: no cover + self.logger.exception( + "Unhandled error while running the time limit handler." + ) + + @staticmethod + def id_to_key(message_id: str) -> bytes: + return ("abort:" + message_id).encode() + + @staticmethod + def key_to_id(key: bytes) -> str: + return key.decode()[6:] + + +def abort(message_id: str, middleware: Optional[Abortable] = None) -> None: + if not middleware: + broker = get_broker() + for middleware in broker.middleware: + if isinstance(middleware, Abortable): + break + else: + raise RuntimeError("The default broker doesn't have an abortable backend.") + + middleware.abort(message_id) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/backends/__init__.py b/tests/backends/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/backends/test_backends.py b/tests/backends/test_backends.py new file mode 100644 index 0000000..f238934 --- /dev/null +++ b/tests/backends/test_backends.py @@ -0,0 +1,14 @@ +from dramatiq_abort import EventBackend + + +def test_backend(event_backend: EventBackend) -> None: + assert event_backend.wait_many([b"test"], timeout=1000) is None + + event_backend.notify(b"test-a", 1000) + event_backend.notify(b"test-b", 1000) + event_backend.notify(b"test-c", 1000) + + assert event_backend.wait_many([b"test-a", b"test-b"], timeout=1000) == b"test-a" + assert event_backend.wait_many([b"test-a", b"test-b"], timeout=1000) == b"test-b" + assert event_backend.poll(b"test-c") is True + assert event_backend.poll(b"test-d") is False diff --git a/tests/backends/test_redis.py b/tests/backends/test_redis.py new file mode 100644 index 0000000..df27a55 --- /dev/null +++ b/tests/backends/test_redis.py @@ -0,0 +1,7 @@ +from dramatiq_abort.backends import RedisBackend + + +def test_redis_backend_wait_many_error(redis_event_backend: RedisBackend) -> None: + # A key without the sentinel value returns None. + redis_event_backend.client.rpush(b"test", b"not-x") + assert redis_event_backend.wait_many([b"test"], timeout=1000) is None diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..3838b7b --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,77 @@ +import logging +import os +import random +from typing import Any, Dict + +import dramatiq +import pytest +import redis +from dramatiq import Worker +from dramatiq.brokers.stub import StubBroker + +from dramatiq_abort import EventBackend +from dramatiq_abort import backends as evt_backends + +logfmt = "[%(asctime)s] [%(threadName)s] [%(name)s] [%(levelname)s] %(message)s" +logging.basicConfig(level=logging.INFO, format=logfmt) +logging.getLogger("pika").setLevel(logging.WARN) + +random.seed(1337) + +CI: bool = os.getenv("GITHUB_ACTION") is not None + + +def check_redis(client: redis.StrictRedis) -> None: + try: + client.ping() + except redis.ConnectionError as e: + raise e if CI else pytest.skip("No connection to Redis server.") + + +@pytest.fixture() +def stub_broker() -> dramatiq.Broker: + broker = StubBroker() + broker.emit_after("process_boot") + dramatiq.set_broker(broker) + yield broker + broker.flush_all() + broker.close() + + +@pytest.fixture() +def stub_worker(stub_broker: dramatiq.Broker) -> dramatiq.Worker: + worker = Worker(stub_broker, worker_timeout=100, worker_threads=32) + worker.start() + yield worker + worker.stop() + + +@pytest.fixture +def redis_event_backend() -> evt_backends.RedisBackend: + backend = evt_backends.RedisBackend.from_url("redis://localhost:6379") + check_redis(backend.client) + backend.client.flushall() + return backend + + +@pytest.fixture +def stub_event_backend() -> evt_backends.StubBackend: + return evt_backends.StubBackend() + + +@pytest.fixture +def event_backends( + redis_event_backend: evt_backends.RedisBackend, + stub_event_backend: evt_backends.StubBackend, +) -> Dict[str, EventBackend]: + return { + "redis": redis_event_backend, + "stub": stub_event_backend, + } + + +@pytest.fixture(params=["redis", "stub"]) +def event_backend( + request: Any, event_backends: Dict[str, EventBackend] +) -> EventBackend: + return event_backends[request.param] diff --git a/tests/test_abortable.py b/tests/test_abortable.py new file mode 100644 index 0000000..4cc599a --- /dev/null +++ b/tests/test_abortable.py @@ -0,0 +1,207 @@ +import time +from typing import Optional + +import dramatiq +import pytest +from dramatiq.middleware import threading + +from dramatiq_abort import Abort, Abortable, EventBackend, abort + +not_supported = threading.current_platform not in threading.supported_platforms + + +@pytest.mark.skipif(not_supported, reason="Threading not supported on this platform.") +def test_abort_notifications_are_received( + stub_broker: dramatiq.Broker, + stub_worker: dramatiq.Worker, + event_backend: EventBackend, +) -> None: + # Given that I have a database + aborts, successes = [], [] + + abortable = Abortable(backend=event_backend) + stub_broker.add_middleware(abortable) + + # And an actor that handles shutdown interrupts + @dramatiq.actor(abortable=True, max_retries=0) + def do_work() -> None: + try: + for _ in range(10): + time.sleep(0.1) + except Abort: + aborts.append(1) + raise + successes.append(1) + + stub_broker.emit_after("process_boot") + + # If I send it a message + message = do_work.send() + + # Then wait and signal the task to terminate + time.sleep(0.1) + abort(message.message_id) + + # Then join on the queue + stub_broker.join(do_work.queue_name) + stub_worker.join() + + +def test_not_abortable( + stub_broker: dramatiq.Broker, + stub_worker: dramatiq.Worker, + stub_event_backend: EventBackend, +) -> None: + aborts, successes = [], [] + abortable = Abortable(backend=stub_event_backend) + stub_broker.add_middleware(abortable) + + @dramatiq.actor(abortable=False) + def not_abortable() -> None: + try: + for _ in range(10): + time.sleep(0.1) + except Abort: + aborts.append(1) + raise + successes.append(1) + + stub_broker.emit_after("process_boot") + + # If I send it a message + message = not_abortable.send() + + # Then wait and signal the task to terminate + time.sleep(0.1) + abort(message.message_id) + + # Then join on the queue + stub_broker.join(not_abortable.queue_name) + stub_worker.join() + + # I expect it to shutdown + assert sum(aborts) == 0 + assert sum(successes) == 1 + + +def test_abort_before_processing( + stub_broker: dramatiq.Broker, stub_event_backend: EventBackend +) -> None: + calls = [] + abortable = Abortable(backend=stub_event_backend) + stub_broker.add_middleware(abortable) + + @dramatiq.actor(abortable=True, max_retries=0) + def do_work() -> None: + calls.append(1) + + stub_broker.emit_after("process_boot") + + # If I send it a message + message = do_work.send() + # And abort right after. + abort(message.message_id) + + # Then start the worker. + worker = dramatiq.Worker(stub_broker, worker_timeout=100, worker_threads=1) + worker.start() + + stub_broker.join(do_work.queue_name) + worker.join() + worker.stop() + + # I expect the task to not have been called. + assert sum(calls) == 0 + + +@pytest.mark.parametrize( + "middleware_abortable,actor_abortable,message_abortable,is_abortable", + [ + (True, None, None, True), + (True, False, None, False), + (True, True, None, True), + (True, None, False, False), + (True, False, False, False), + (True, True, False, False), + (True, None, True, True), + (True, False, True, True), + (True, True, True, True), + (False, None, None, False), + (False, False, None, False), + (False, True, None, True), + (False, None, False, False), + (False, False, False, False), + (False, True, False, False), + (False, None, True, True), + (False, False, True, True), + (False, True, True, True), + ], +) +def test_abortable_configs( + stub_event_backend: EventBackend, + middleware_abortable: bool, + actor_abortable: Optional[bool], + message_abortable: Optional[bool], + is_abortable: bool, +) -> None: + abortable = Abortable(backend=stub_event_backend, abortable=middleware_abortable) + + message = dramatiq.Message( + queue_name="some-queue", + actor_name="some-actor", + args=(), + kwargs={}, + options={"abortable": message_abortable}, + ) + + @dramatiq.actor(abortable=actor_abortable) + def actor() -> None: + pass + + assert abortable.is_abortable(actor, message) == is_abortable + + +def test_abort_polling( + stub_broker: dramatiq.Broker, + stub_worker: dramatiq.Worker, + stub_event_backend: EventBackend, +) -> None: + sentinel = [] + abortable = Abortable(backend=stub_event_backend) + stub_broker.add_middleware(abortable) + + @dramatiq.actor(abortable=True, max_retries=0) + def abort_with_delay() -> None: + try: + sentinel.append(True) + time.sleep(5) + abortable.abort(message.message_id) + for _ in range(20): + time.sleep(0.1) + sentinel.append(True) + except Abort: + sentinel.append(False) + raise + sentinel.append(True) + + stub_broker.emit_after("process_boot") + + # If I send it a message + message = abort_with_delay.send() + + # Then join on the queue + stub_broker.join(abort_with_delay.queue_name) + stub_worker.join() + + # I expect it to shutdown + assert sentinel == [True, False] + + +def test_abort_with_no_middleware( + stub_broker: dramatiq.Broker, stub_worker: dramatiq.Worker +) -> None: + try: + abort("foo") + raise AssertionError("Exception not raised") + except RuntimeError: + assert True diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..4183354 --- /dev/null +++ b/tox.ini @@ -0,0 +1,19 @@ +[tox] +envlist= + py37 +minversion = 3.3.0 +isolated_build = true + +[testenv] +extras= + dev +deps= + pytest +commands= + python setup.py check -m -s + isort -rc -c . + black --check . + flake8 . + pytest tests {posargs} +passenv= + GITHUB_ACTION