Skip to content

Commit

Permalink
Add support for SIGNAL_ENQUEUED.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Dec 12, 2024
1 parent 13a377c commit f66ce06
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 21 deletions.
35 changes: 30 additions & 5 deletions docs/signals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The following signals are implemented by Huey:
* ``SIGNAL_CANCELED``: task was canceled due to a pre-execute hook raising
a :py:class:`CancelExecution` exception.
* ``SIGNAL_COMPLETE``: task has been executed successfully.
* ``SIGNAL_ENQUEUED``: task has been enqueued (**see note**).
* ``SIGNAL_ERROR``: task failed due to an unhandled exception.
* ``SIGNAL_EXECUTING``: task is about to be executed.
* ``SIGNAL_EXPIRED``: task expired.
Expand All @@ -33,6 +34,14 @@ The following signals will include additional arguments:
* ``SIGNAL_ERROR``: includes a third argument ``exc``, which is the
``Exception`` that was raised while executing the task.

.. note::
Signals are run within the context of the consumer **except** that the
``SIGNAL_ENQUEUED`` signal will also run within the context of your
application code (since your application code will typically enqueue
tasks). Recall that signal handlers are run sequentially and synchronously,
so be careful about introducing overhead in them -- particularly when they
may be run by the application process.

To register a signal handler, use the :py:meth:`Huey.signal` method:

.. code-block:: python
Expand Down Expand Up @@ -88,8 +97,10 @@ Here is a simple example of a task execution we would expect to succeed:
>>> result = add(1, 2)
>>> result.get(blocking=True)
The consumer would send the following signals:
The following signals would be fired:

* ``SIGNAL_ENQUEUED`` - the task has been enqueued (happens in the application
process).
* ``SIGNAL_EXECUTING`` - the task has been dequeued and will be executed.
* ``SIGNAL_COMPLETE`` - the task has finished successfully.

Expand All @@ -102,10 +113,13 @@ Here is an example of scheduling a task for execution after a short delay:
The following signals would be sent:

* ``SIGNAL_ENQUEUED`` - the task has been enqueued (happens in the **application**
process).
* ``SIGNAL_SCHEDULED`` - the task is not yet ready to run, so it has been added
to the schedule.
* After 10 seconds, the consumer will run the task and send
the ``SIGNAL_EXECUTING`` signal.
* After 10 seconds, the consumer will re-enqueue the task as it is now ready to
run, sending the ``SIGNAL_ENQUEUED`` (in the **consumer** process!).
* Then the consumer will run the task and send the ``SIGNAL_EXECUTING`` signal.
* ``SIGNAL_COMPLETE``.

Here is an example that may fail, in which case it will be retried
Expand All @@ -124,11 +138,14 @@ automatically with a delay of 10 seconds.
Assuming the task failed the first time and succeeded the second time, we would
see the following signals being sent:

* ``SIGNAL_ENQUEUED`` - task has been enqueued.
* ``SIGNAL_EXECUTING`` - the task is being executed.
* ``SIGNAL_ERROR`` - the task raised an unhandled exception.
* ``SIGNAL_RETRYING`` - the task will be retried.
* ``SIGNAL_SCHEDULED`` - the task has been added to the schedule for execution
in ~10 seconds.
* ``SIGNAL_ENQUEUED`` - 10s have elapsed and the task is ready to run and has
been re-enqueued.
* ``SIGNAL_EXECUTING`` - second try running task.
* ``SIGNAL_COMPLETE`` - task succeeded.

Expand All @@ -141,6 +158,7 @@ What happens if we revoke the ``add()`` task and then attempt to execute it:
The following signal will be sent:

* ``SIGNAL_ENQUEUED`` - the task has been enqueued for execution.
* ``SIGNAL_REVOKED`` - this is sent before the task enters the "executing"
state. When a task is revoked, no other signals will be sent.

Expand Down Expand Up @@ -169,8 +187,9 @@ Performance considerations
--------------------------

Signal handlers are executed **synchronously** by the consumer as it processes
tasks. It is important to use care when implementing signal handlers, as one
slow signal handler can impact the overall responsiveness of the consumer.
tasks (with the exception of ``SIGNAL_ENQUEUED``). It is important to use care
when implementing signal handlers, as one slow signal handler can impact the
overall responsiveness of the consumer.

For example, if you implement a signal handler that posts some data to REST
API, everything might work fine until the REST API goes down or stops being
Expand All @@ -183,3 +202,9 @@ handles. Signal handlers are called by the consumer workers, which (depending
on how you are running the consumer) may be separate processes, threads or
greenlets. As a result, care should be taken to ensure proper initialization
and cleanup of any resources you plan to use in signal handlers.

Lastly, take care when implementing ``SIGNAL_ENQUEUED`` handlers, as these may
run in your application-code (e.g. whenever your application enqueues a task),
**or** by the consumer process (e.g. when re-enqueueing a task for retry, or
when enqueueing periodic tasks, when moving a task from the schedule to the
queue, etc).
2 changes: 2 additions & 0 deletions huey/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ def enqueue(self, task):
if task.expires:
task.resolve_expires(self.utc)

self._emit(S.SIGNAL_ENQUEUED, task)

if self._immediate:
self.execute(task)
else:
Expand Down
3 changes: 2 additions & 1 deletion huey/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
SIGNAL_RETRYING = 'retrying'
SIGNAL_REVOKED = 'revoked'
SIGNAL_SCHEDULED = 'scheduled'
SIGNAL_INTERRUPTED = "interrupted"
SIGNAL_INTERRUPTED = 'interrupted'
SIGNAL_ENQUEUED = 'enqueued'


class Signal(object):
Expand Down
35 changes: 20 additions & 15 deletions huey/tests/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ def task_a(n):
return n + 1

r = task_a(3)
self.assertSignals([])
self.assertSignals([SIGNAL_ENQUEUED])
self.assertEqual(self.execute_next(), 4)
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])

r = task_a.schedule((2,), delay=60)
self.assertSignals([])
self.assertSignals([SIGNAL_ENQUEUED])
self.assertTrue(self.execute_next() is None)
self.assertSignals([SIGNAL_SCHEDULED])

r = task_a(None)
self.assertSignals([])
self.assertSignals([SIGNAL_ENQUEUED])
self.assertTrue(self.execute_next() is None)
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_ERROR])

Expand All @@ -58,9 +58,10 @@ def task_a(n):
return n + 1

r = task_a(None)
self.assertSignals([])
self.assertSignals([SIGNAL_ENQUEUED])
self.assertTrue(self.execute_next() is None)
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_ERROR, SIGNAL_RETRYING])
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_ERROR, SIGNAL_RETRYING,
SIGNAL_ENQUEUED])
self.assertTrue(self.execute_next() is None)
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_ERROR])

Expand All @@ -69,7 +70,7 @@ def task_b(n):
return n + 1

r = task_b(None)
self.assertSignals([])
self.assertSignals([SIGNAL_ENQUEUED])
self.assertTrue(self.execute_next() is None)
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_ERROR, SIGNAL_RETRYING,
SIGNAL_SCHEDULED])
Expand All @@ -81,13 +82,14 @@ def task_a(n):

task_a.revoke(revoke_once=True)
r = task_a(2)
self.assertSignals([])
self.assertSignals([SIGNAL_ENQUEUED])
self.assertTrue(self.execute_next() is None)
self.assertSignals([SIGNAL_REVOKED])

r = task_a(3)
self.assertEqual(self.execute_next(), 4)
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])
self.assertSignals([SIGNAL_ENQUEUED, SIGNAL_EXECUTING,
SIGNAL_COMPLETE])

def test_signals_locked(self):
@self.huey.task()
Expand All @@ -96,13 +98,13 @@ def task_a(n):
return n + 1

r = task_a(1)
self.assertSignals([])
self.assertSignals([SIGNAL_ENQUEUED])
self.assertEqual(self.execute_next(), 2)
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])

with self.huey.lock_task('lock-a'):
r = task_a(2)
self.assertSignals([])
self.assertSignals([SIGNAL_ENQUEUED])
self.assertTrue(self.execute_next() is None)
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_LOCKED])

Expand All @@ -114,12 +116,12 @@ def task_a(n):
now = datetime.datetime.now()
expires = now + datetime.timedelta(seconds=15)
r = task_a(2)
self.assertSignals([])
self.assertSignals([SIGNAL_ENQUEUED])
self.assertTrue(self.execute_next(expires) is None)
self.assertSignals([SIGNAL_EXPIRED])

r = task_a(3)
self.assertSignals([])
self.assertSignals([SIGNAL_ENQUEUED])
self.assertTrue(self.execute_next(), 4)
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])

Expand All @@ -138,18 +140,21 @@ def task_a(n):
self.assertEqual(extra_state, [])
self.assertEqual(self.execute_next(), 4)
self.assertEqual(extra_state, [3])
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])
self.assertSignals([SIGNAL_ENQUEUED, SIGNAL_EXECUTING,
SIGNAL_COMPLETE])

r2 = task_a(1)
self.assertEqual(self.execute_next(), 2)
self.assertEqual(extra_state, [3, 1])
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])
self.assertSignals([SIGNAL_ENQUEUED, SIGNAL_EXECUTING,
SIGNAL_COMPLETE])

self.huey.disconnect_signal(extra_handler, SIGNAL_EXECUTING)
r3 = task_a(2)
self.assertEqual(self.execute_next(), 3)
self.assertEqual(extra_state, [3, 1])
self.assertSignals([SIGNAL_EXECUTING, SIGNAL_COMPLETE])
self.assertSignals([SIGNAL_ENQUEUED, SIGNAL_EXECUTING,
SIGNAL_COMPLETE])

def test_multi_handlers(self):
state1 = []
Expand Down

0 comments on commit f66ce06

Please sign in to comment.