Skip to content

Commit

Permalink
Merge branch 'events'
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Jan 21, 2016
2 parents f3091eb + 9240593 commit 39684c3
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 67 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
Changelog
=========

v1.1.0
------

* Big changes to simplify the way ``Huey`` is instantiated. No changes should
be necessary if already using ``RedisHuey``.
* Refactored the storage APIs and simplified the public interface. There is
now a single object, whereas before there were 4 components (queue, result
store, scheduler and event emitter).
* Added methods for retrieving and introspecting the pending task queue, the
schedule, results, and errors.
* Errors can now be stored, in addition to regular task results.
* Added metadata methods for tracking task execution, errors, task duration,
and more. These will be the building blocks for tools to provide some
insight into the inner-workings of your consumers and producers.
* Many new events are emitted by the consumer, and some have parameters. These
are documented [here](http://huey.readthedocs.org/en/latest/events.html).

v1.0.0
------

Expand Down
23 changes: 9 additions & 14 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ lowest-level interfaces, the :py:class:`BaseQueue` and :py:class:`BaseDataStore`
Function decorators and helpers
-------------------------------

.. py:class:: Huey(queue[, result_store=None[, schedule=None[, events=None[, store_none=False[, always_eager=False]]]]])
.. py:class:: Huey(name[, result_store=True[, events=True[, store_none=False[, always_eager=False[, store_errors=True[, blocking=False[, **storage_kwargs]]]]]]])
Huey executes tasks by exposing function decorators that cause the function
call to be enqueued for execution by the consumer.
Expand All @@ -26,15 +26,16 @@ Function decorators and helpers
have as many as you like -- the only caveat is that one consumer process
must be executed for each Huey instance.

:param queue: a queue instance, e.g. :py:class:`RedisQueue`.
:param result_store: a place to store results and the task schedule,
e.g. :py:class:`RedisDataStore`.
:param schedule: scheduler implementation, e.g. an instance of :py:class:`RedisSchedule`.
:param events: event emitter implementation, e.g. an instance of :py:class:`RedisEventEmitter`.
:param boolean store_none: Flag to indicate whether tasks that return ``None``
:param name: the name of the huey instance or application.
:param bool result_store: whether the results of tasks should be stored.
:param bool events: whether events should be emitted by the consumer.
:param bool store_none: Flag to indicate whether tasks that return ``None``
should store their results in the result store.
:param always_eager: Useful for testing, this will execute all tasks
:param bool always_eager: Useful for testing, this will execute all tasks
immediately, without enqueueing them.
:param bool store_errors: whether task errors should be stored.
:param bool blocking: whether the queue will block (if False, then the queue will poll).
:param storage_kwargs: arbitrary kwargs to pass to the storage implementation.

Example usage:

Expand All @@ -44,12 +45,6 @@ Function decorators and helpers
huey = RedisHuey('my-app')
# THIS IS EQUIVALENT TO ABOVE CODE:
#queue = RedisBlockingQueue('my-app')
#result_store = RedisDataStore('my-app')
#schedule = RedisSchedule('my-app')
#huey = Huey(queue, result_store, schedule)
@huey.task()
def slow_function(some_arg):
# ... do something ...
Expand Down
64 changes: 39 additions & 25 deletions docs/events.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Consumer Events

If you specify a :py:class:`RedisEventEmitter` when setting up your :py:class:`Huey` instance (or if you choose to use :py:class:`RedisHuey`), the consumer will publish real-time events about the status of various tasks. You can subscribe to these events in your own application.

When an event is emitted, the following information is provided (serialized as JSON):
When an event is emitted, the following information is always provided:

* ``status``: a String indicating what type of event this is.
* ``id``: the UUID of the task.
Expand All @@ -14,21 +14,35 @@ When an event is emitted, the following information is provided (serialized as J
* ``retry_delay``: how long to sleep before retrying the task in event of failure.
* ``execute_time``: A unix timestamp indicating when the task is scheduled to
execute (this may be ``None``).

If an error occurred, then the following data is also provided:

* ``error``: A boolean value indicating if there was an error.
* ``traceback``: A string traceback of the error, if one occurred.

The following events are emitted by the consumer:
When an event includes other keys, those will be noted below.

The following events are emitted by the consumer. I've listed the event name, and in parentheses the process that emits the event and any non-standard metadata it includes.

* ``EVENT_CHECKING_PERIODIC`` (Scheduler, ``timestamp``): emitted every minute when the scheduler checks for periodic tasks to execute.
* ``EVENT_FINISHED`` (Worker, ``duration``): emitted when a task executes successfully and cleanly returns.
* ``EVENT_RETRYING`` (Worker): emitted after a task failure, when the task will be retried.
* ``EVENT_REVOKED`` (Worker, ``timestamp``): emitted when a task is pulled from the queue but is not executed due to having been revoked.
* ``EVENT_SCHEDULED`` (Worker): emitted when a task specifies a delay or ETA and is not yet ready to run. This can also occur when a task is being retried and specifies a retry delay. The task is added to the schedule for later execution.
* ``EVENT_SCHEDULING_PERIODIC`` (Schedule, ``timestamp``): emitted when a periodic task is scheduled for execution.
* ``EVENT_STARTED`` (Worker, ``timestamp``): emitted when a worker begins executing a task.

Error events:

* ``scheduled``: sent when a task is added to the schedule for execution in the future. For instance the worker pops off a task, sees that it should be run in an hour, and therefore schedules it.
* ``enqueued``: sent when a task is enqueued, i.e., it is pulled off the schedule.
* ``revoked``: sent when a task is not executed because it has been revoked.
* ``started``: sent when a worker begins executing a task.
* ``finished``: sent when a worker finishes executing a task and has stored the result.
* ``error``: sent when an exception occurs while executing a task.
* ``retrying``: sent when task that failed will be retried.
* ``EVENT_ERROR_DEQUEUEING`` (Worker): emitted if an error occurs reading from the backend queue.
* ``EVENT_ERROR_ENQUEUEING`` (Schedule, Worker): emitted if an error occurs enqueueing a task for execution. This can occur when the scheduler attempts to enqueue a task that had been delayed, or when a worker attempts to retry a task after an error.
* ``EVENT_ERROR_INTERNAL`` (Worker): emitted if an unspecified error occurs. An example might be the Redis server being offline.
* ``EVENT_ERROR_SCHEDULING`` (Worker): emitted when an exception occurs enqueueing a task.
* ``EVENT_ERROR_STORING_RESULT`` (Worker, ``duration``): emitted when an exception occurs attempting to store the result of a task. In this case the task ran to completion, but the result could not be stored.
* ``EVENT_ERROR_TASK`` (Worker, ``duration``): emitted when an unspecified error occurs in the user's task code.

Listening to events
-------------------
^^^^^^^^^^^^^^^^^^^

The easiest way to listen for events is by iterating over the ``huey.events`` object.

Expand All @@ -49,26 +63,26 @@ You can also achieve the same result with a simple loop like this:
process_event(event)
Ordering of events
------------------
^^^^^^^^^^^^^^^^^^

So, for the execution of a simple task, the events emitted would be:
For the execution of a simple task, the events emitted would be:

* ``started``
* ``finished``
* ``EVENT_STARTED``
* ``EVENT_FINISHED``

If a task was scheduled to be executed in the future, the events would be:

* ``scheduled``
* ``enqueued``
* ``started``
* ``finished``
* ``EVENT_SCHEDULED``
* ``EVENT_ENQUEUED``
* ``EVENT_STARTED``
* ``EVENT_FINISHED``

If an error occurs and the task is configured to be retried, the events would be:

* ``started``
* ``error`` (includes traceback)
* ``retrying``
* ``scheduled`` (if there is a retry delay, it will go onto the schedule)
* ``enqueued`` (pulled off schedule and sent to a worker)
* ``started``
* ``finished`` (or error if the task fails again)
* ``EVENT_STARTED``
* ``EVENT_ERROR_TASK`` (includes traceback)
* ``EVENT_RETRYING``
* ``EVENT_SCHEDULED`` (if there is a retry delay, it will go onto the schedule)
* ``EVENT_ENQUEUED`` (pulled off schedule and sent to a worker)
* ``EVENT_STARTED``
* ``EVENT_FINISHED`` if task succeeds, otherwise go back to ``EVENT_ERROR_TASK``.
2 changes: 1 addition & 1 deletion huey/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__author__ = 'Charles Leifer'
__license__ = 'MIT'
__version__ = '1.0.0'
__version__ = '1.1.0'

from huey.api import crontab
from huey.api import Huey
Expand Down
19 changes: 13 additions & 6 deletions huey/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,19 +245,25 @@ def _get_task_metadata(self, task, error=False, include_data=False):
metadata['data'] = task.get_data()
return metadata

def emit_task(self, status, task, error=False):
def emit_status(self, status, **data):
if self.events:
metadata = self._get_task_metadata(task, error)
metadata['status'] = status
metadata = {'status': status}
metadata.update(data)
self.emit(json.dumps(metadata))

def emit_task(self, status, task, error=False, **data):
if self.events:
metadata = self._get_task_metadata(task, error)
metadata.update(data)
self.emit_status(status, **metadata)

@_wrapped_operation(MetadataException)
def write_metadata(self, key, value):
self.storage.write_metadata(key, value)

@_wrapped_operation(MetadataException)
def incr_metadata(self, key):
return self.storage.incr_metadata(key)
def incr_metadata(self, key, value=1):
return self.storage.incr_metadata(key, value)

def read_metadata(self, key):
return self.storage.read_metadata(key)
Expand Down Expand Up @@ -448,9 +454,10 @@ def __init__(self, data=None, task_id=None, execute_time=None, retries=0,
self.execute_time = execute_time
self.retries = retries
self.retry_delay = retry_delay
self.name = type(self).__name__

def __repr__(self):
rep = '%s: %s' % (type(self).__name__, self.task_id)
rep = '%s: %s' % (self.name, self.task_id)
if self.execute_time:
rep += ' @%s' % self.execute_time
if self.retries:
Expand Down
Loading

0 comments on commit 39684c3

Please sign in to comment.