Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add Task Registry #329

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions dramatiq/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from .actor import Actor, _queue_name_re


class Registry:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see this! I nearly have the same class in my projects.

I have implemented this method as well:

    def enqueue(self, message, *, delay=None):
        raise RuntimeError(
            "ActorCollector has not transferred its actors to an actual broker. "
            "Ensure transfer_actors() is called."
        )

It ensures that the user is told what is going on when trying to call send() on an actor before doing the bind.

"""
A Registry allows defining a collection of Actors not directly bound to a Broker.

This allows your code to declar Actors before configuring a Broker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This allows your code to declar Actors before configuring a Broker.
This allows your code to declare actors before configuring a broker.

"""
def __init__(self):
self.actors = {}
self.broker = None

def actor(self, fn=None, *, actor_class=Actor, actor_name=None, queue_name="default", priority=0, **options):
"""
Mimics `actor.actor` decorator, but skips the actor options check, and passes `self` as broker.
"""

def decorator(fn):
if not _queue_name_re.fullmatch(queue_name):
raise ValueError(
"Queue names must start with a letter or an underscore followed "
"by any number of letters, digits, dashes or underscores."
)

return actor_class(
fn,
actor_name=actor_name or fn.__name__,
queue_name=queue_name,
priority=priority,
broker=self,
options=options,
)

if fn is None:
return decorator
return decorator(fn)

def __getattr__(self, name):
# Proxy everything else to our Broker, if set.
return getattr(self.broker, name)

def declare_actor(self, actor):
"""
Intercept when Actor class tries to register itself.
"""
if self.broker:
self.broker.declare_actor(actor)
else:
self.actors[actor.actor_name] = actor

def bind_broker(self, broker):
self.broker = broker

for actor_name, actor in self.actors.items():
invalid_options = set(actor.options) - broker.actor_options
if invalid_options:
invalid_options_list = ", ".join(invalid_options)
raise ValueError((
"Actor %s specified the following options which are not "
"supported by this broker: %s. Did you forget to add a "
"middleware to your Broker?"
) % (actor_name, invalid_options_list))

broker.declar_actor(actor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my code I have an additional helper method to allow binding multople collectors (here it would be registries) at once.

This is handy when linking up independet code from multiple packages.

def transfer_actors(broker, collectors: List[ActorCollector]):
    for ac in collectors:
        ac.transfer_actors(broker)

22 changes: 22 additions & 0 deletions tests/test_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import dramatiq
from dramatiq.registry import Registry


def test_registry_can_be_declared():

Registry()


def test_actor_can_be_declared_on_registry():
# When I have a Registry
reg = Registry()

# Given that I've decorated a function with @registry.actor
@reg.actor
def add(x, y):
return x + y

# I expect that function to become an instance of Actor
assert isinstance(add, dramatiq.Actor)
assert add.broker is reg
assert len(reg.actors) == 1