Skip to content

Commit

Permalink
Merge pull request #63
Browse files Browse the repository at this point in the history
Support Revoked State
  • Loading branch information
EJH2 authored Oct 27, 2020
2 parents e243ad1 + 50c3e3e commit 7de1298
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
19 changes: 10 additions & 9 deletions celery_progress/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(self, result):

def get_info(self):
response = {'state': self.result.state}
if self.result.ready():
if self.result.state in ['SUCCESS', 'FAILURE']:
success = self.result.successful()
with allow_join_result():
response.update({
Expand All @@ -68,18 +68,19 @@ def get_info(self):
'progress': _get_completed_progress(),
'result': self.result.get(self.result.id) if success else str(self.result.info),
})
elif self.result.state == 'RETRY':
retry = self.result.info
when = str(retry.when) if isinstance(retry.when, datetime.datetime) else str(
datetime.datetime.now() + datetime.timedelta(seconds=retry.when))
elif self.result.state in ['RETRY', 'REVOKED']:
if self.result.state == 'RETRY':
retry = self.result.info
when = str(retry.when) if isinstance(retry.when, datetime.datetime) else str(
datetime.datetime.now() + datetime.timedelta(seconds=retry.when))
result = {'when': when, 'message': retry.message or str(retry.exc)}
else:
result = 'Task ' + str(self.result.info)
response.update({
'complete': True,
'success': False,
'progress': _get_completed_progress(),
'result': {
'when': when,
'message': retry.message or str(retry.exc)
},
'result': result,
})
elif self.result.state == PROGRESS_STATE:
response.update({
Expand Down
14 changes: 13 additions & 1 deletion celery_progress/websockets/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from celery.signals import task_postrun
from celery.signals import task_postrun, task_revoked

from .backend import WebSocketProgressRecorder
from celery_progress.backend import KnownResult, Progress
Expand All @@ -12,3 +12,15 @@ def task_postrun_handler(task_id, **kwargs):
result = KnownResult(task_id, kwargs.pop('retval'), kwargs.pop('state'))
data = Progress(result).get_info()
WebSocketProgressRecorder.push_update(task_id, data=data, final=True)


@task_revoked.connect(retry=True)
def task_revoked_handler(request, **kwargs):
"""Runs if a task has been revoked. This will be used to push a websocket update for revoked events.
If the websockets version of this package is not installed, this will fail silently."""
_result = ('terminated' if kwargs.pop('terminated') else None) or ('expired' if kwargs.pop('expired') else None) \
or 'revoked'
result = KnownResult(request.id, _result, 'REVOKED')
data = Progress(result).get_info()
WebSocketProgressRecorder.push_update(request.id, data=data, final=True)

0 comments on commit 7de1298

Please sign in to comment.