Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
isra17 committed Dec 25, 2019
1 parent 2164f1b commit de4efea
Show file tree
Hide file tree
Showing 18 changed files with 768 additions and 0 deletions.
7 changes: 7 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
include pyproject.toml

# Include the README
include *.md

# Include the license file
include LICENSE
Empty file added README.md
Empty file.
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[build-system]
requires = ["setuptools>=40.8.0", "wheel"]
build-backend = "setuptools.build_meta"

[tool.black]
line-length = 88
target-version = ['py35', 'py36', 'py37', 'py38']
56 changes: 56 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[metadata]
license_files = LICENSE

[tool:pytest]
testpaths = tests
addopts = --cov dramatiq_abort --cov-report html

[pep8]
max-line-length = 88

[flake8]
max-complexity = 18
max-line-length = 80
inline-quotes = double
multiline-quotes = double
ignore = E203, E266, E501, W503
select = B,C,E,F,W,T4,B9


[isort]
not_skip = __init__.py
known_first_party = dramatiq_abort
order_by_type = true
multi_line_output=3
include_trailing_comma=True
force_grid_wrap=0
use_parentheses=True
line_length=88

[mypy]
python_version=3.7
platform=linux

# flake8-mypy expects the two following for sensible formatting
show_column_numbers=True

# show error messages from unrelated files
follow_imports=normal

# suppress errors about unsatisfied imports
ignore_missing_imports=True

# be strict
disallow_untyped_calls=True
warn_return_any=True
strict_optional=True
warn_no_return=True
warn_redundant_casts=True
warn_unused_ignores=True
disallow_any_generics=True

# The following are off by default. Flip them on if you feel
# adventurous.
disallow_untyped_defs=True
check_untyped_defs=True

95 changes: 95 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Dramatiq-abort is a middleware to abort Dramatiq tasks.
# Copyright (C) 2019 Flare Systems Inc. <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import os

from setuptools import find_packages, setup

here = os.path.abspath(os.path.dirname(__file__))


def rel(*xs: str) -> str:
return os.path.join(here, *xs)


with open(rel("README.md")) as f:
long_description = f.read()


with open(rel("src", "dramatiq_abort", "__init__.py"), "r") as f:
version_marker = "__version__ = "
for line in f:
if line.startswith(version_marker):
_, version = line.split(version_marker)
version = version.strip().strip('"')
break
else:
raise RuntimeError("Version marker not found.")


dependencies = [
"dramatiq",
]

extra_dependencies = {
"redis": ["redis>=2.0,<4.0"],
}

extra_dependencies["all"] = list(set(sum(extra_dependencies.values(), [])))
extra_dependencies["dev"] = extra_dependencies["all"] + [
# Tools
"black",
# Linting
"flake8",
"flake8-bugbear",
"flake8-quotes",
"isort",
"mypy",
# Testing
"pytest",
"pytest-cov",
"tox",
]

setup(
name="dramatiq-abort",
version=version,
author="Flare Systems Inc.",
author_email="[email protected]",
description="Dramatiq middleware to abort tasks.",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/flared/dramatiq_abort",
packages=find_packages(where="src"),
package_dir={"": "src"},
include_package_data=True,
install_requires=dependencies,
python_requires=">=3.5",
extras_require=extra_dependencies,
zip_safe=False,
classifiers=[
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3 :: Only",
"Development Status :: 4 - Beta",
"Topic :: System :: Distributed Computing",
(
"License :: OSI Approved :: "
"GNU Lesser General Public License v3 or later (LGPLv3+)",
),
],
)
6 changes: 6 additions & 0 deletions src/dramatiq_abort/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
__version__ = "0.1beta"

from .backend import EventBackend
from .middleware import Abort, Abortable, abort

__all__ = ["EventBackend", "Abortable", "Abort", "abort"]
21 changes: 21 additions & 0 deletions src/dramatiq_abort/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import abc
from typing import List, Optional


class EventBackend(abc.ABC):
"""ABC for event backends.
"""

@abc.abstractmethod
def wait_many(
self, keys: List[bytes], timeout: int
) -> Optional[bytes]: # pragma: no cover
raise NotImplementedError

@abc.abstractmethod
def poll(self, key: bytes) -> bool: # pragma: no cover
raise NotImplementedError

@abc.abstractmethod
def notify(self, key: bytes, ttl: int) -> None: # pragma: no cover
raise NotImplementedError
15 changes: 15 additions & 0 deletions src/dramatiq_abort/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import warnings

from .stub import StubBackend

try:
from .redis import RedisBackend
except ImportError: # pragma: no cover
warnings.warn(
"RedisBackend is not available. Run `pip install dramatiq[redis]` "
"to add support for that backend.",
ImportWarning,
)


__all__ = ["StubBackend", "RedisBackend"]
49 changes: 49 additions & 0 deletions src/dramatiq_abort/backends/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Any, List, Optional

import redis

from ..backend import EventBackend


class RedisBackend(EventBackend):
"""A event backend for Redis_.
Parameters:
client(Redis): An optional client. If this is passed,
then all other parameters are ignored.
url(str): An optional connection URL. If both a URL and
connection paramters are provided, the URL is used.
**parameters(dict): Connection parameters are passed directly
to :class:`redis.Redis`.
.. _redis: https://redis.io
"""

def __init__(self, *, client: Any) -> None:
self.client = client

@classmethod
def from_url(cls, url: str) -> "RedisBackend":
return cls(
client=redis.StrictRedis(connection_pool=redis.ConnectionPool.from_url(url))
)

def wait_many(self, keys: List[bytes], timeout: int) -> Optional[bytes]:
assert timeout is None or timeout >= 1000, "wait timeouts must be >= 1000"
event = self.client.blpop(keys, (timeout or 0) // 1000)
if event is None:
return None
key, value = event
if value != b"x":
return None
return key

def poll(self, key: bytes) -> bool:
event = self.client.lpop(key)
return event == b"x"

def notify(self, key: bytes, ttl: int) -> None:
with self.client.pipeline() as pipe:
pipe.rpush(key, b"x")
pipe.pexpire(key, ttl)
pipe.execute()
36 changes: 36 additions & 0 deletions src/dramatiq_abort/backends/stub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from threading import Condition
from typing import List, Optional, Set

from ..backend import EventBackend


class StubBackend(EventBackend):
def __init__(self) -> None:
self.condition = Condition()
self.events: Set[bytes] = set()

def wait_many(self, keys: List[bytes], timeout: int) -> Optional[bytes]:
with self.condition:
if self.condition.wait_for(
lambda: self._anyset(keys), timeout=timeout / 1000
):
for key in keys:
if key in self.events:
self.events.remove(key)
return key
return None

def poll(self, key: bytes) -> bool:
with self.condition:
if key in self.events:
self.events.remove(key)
return True
return False

def notify(self, key: bytes, ttl: int) -> None:
with self.condition:
self.events.add(key)
self.condition.notify_all()

def _anyset(self, keys: List[bytes]) -> bool:
return any(k in self.events for k in keys)
Loading

0 comments on commit de4efea

Please sign in to comment.