diff --git a/dramatiq/middleware/middleware.py b/dramatiq/middleware/middleware.py index 774addfc..5ace255c 100644 --- a/dramatiq/middleware/middleware.py +++ b/dramatiq/middleware/middleware.py @@ -138,18 +138,22 @@ def after_worker_shutdown(self, broker, worker): """Called after the worker process shuts down. """ + def after_consumer_thread_boot(self, broker, thread): + """Called from a consumer thread after it starts but before it starts its run loop. + """ + def before_consumer_thread_shutdown(self, broker, thread): """Called before a consumer thread shuts down. This may be used to clean up thread-local resources (such as Django database connections). + """ - There is no ``after_consumer_thread_boot``. + def after_worker_thread_boot(self, broker, thread): + """Called from a worker thread after it starts but before it starts its run loop. """ def before_worker_thread_shutdown(self, broker, thread): """Called before a worker thread shuts down. This may be used to clean up thread-local resources (such as Django database connections). - - There is no ``after_worker_thread_boot``. """ diff --git a/dramatiq/worker.py b/dramatiq/worker.py index 46e9f0c1..a3058420 100644 --- a/dramatiq/worker.py +++ b/dramatiq/worker.py @@ -247,6 +247,7 @@ def __init__(self, *, broker, queue_name, prefetch, work_queue, worker_timeout): def run(self): self.logger.debug("Running consumer thread...") self.running = True + self.broker.emit_after("consumer_thread_boot", self) while self.running: if self.paused: self.logger.debug("Consumer is paused. Sleeping for %.02fms...", self.worker_timeout) @@ -448,6 +449,7 @@ def __init__(self, *, broker, consumers, work_queue, worker_timeout): def run(self): self.logger.debug("Running worker thread...") self.running = True + self.broker.emit_after("worker_thread_boot", self) while self.running: if self.paused: self.logger.debug("Worker is paused. Sleeping for %.02f...", self.timeout)