Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Revoked State #63

Merged
merged 4 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions celery_progress/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from celery.result import EagerResult, allow_join_result
from celery.backends.base import DisabledBackend
from celery import states


PROGRESS_STATE = 'PROGRESS'
Expand Down Expand Up @@ -59,7 +60,7 @@ def __init__(self, result):

def get_info(self):
response = {'state': self.result.state}
if self.result.ready():
if self.result.state in [states.SUCCESS, states.FAILURE]:
success = self.result.successful()
with allow_join_result():
response.update({
Expand All @@ -68,26 +69,27 @@ def get_info(self):
'progress': _get_completed_progress(),
'result': self.result.get(self.result.id) if success else str(self.result.info),
})
elif self.result.state == 'RETRY':
retry = self.result.info
when = str(retry.when) if isinstance(retry.when, datetime.datetime) else str(
datetime.datetime.now() + datetime.timedelta(seconds=retry.when))
elif self.result.state in states.EXCEPTION_STATES:
if self.result.state == 'RETRY':
retry = self.result.info
when = str(retry.when) if isinstance(retry.when, datetime.datetime) else str(
datetime.datetime.now() + datetime.timedelta(seconds=retry.when))
result = {'when': when, 'message': retry.message or str(retry.exc)}
else:
result = 'Task ' + str(self.result.info)
response.update({
'complete': True,
'success': False,
'progress': _get_completed_progress(),
'result': {
'when': when,
'message': retry.message or str(retry.exc)
},
'result': result,
})
elif self.result.state == PROGRESS_STATE:
response.update({
'complete': False,
'success': None,
'progress': self.result.info,
})
elif self.result.state in ['PENDING', 'STARTED']:
elif self.result.state in states.UNREADY_STATES:
response.update({
'complete': False,
'success': None,
Expand Down
14 changes: 13 additions & 1 deletion celery_progress/websockets/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from celery.signals import task_postrun
from celery.signals import task_postrun, task_revoked

from .backend import WebSocketProgressRecorder
from celery_progress.backend import KnownResult, Progress
Expand All @@ -12,3 +12,15 @@ def task_postrun_handler(task_id, **kwargs):
result = KnownResult(task_id, kwargs.pop('retval'), kwargs.pop('state'))
data = Progress(result).get_info()
WebSocketProgressRecorder.push_update(task_id, data=data, final=True)


@task_revoked.connect(retry=True)
def task_revoked_handler(request, **kwargs):
"""Runs if a task has been revoked. This will be used to push a websocket update for revoked events.

If the websockets version of this package is not installed, this will fail silently."""
_result = ('terminated' if kwargs.pop('terminated') else None) or ('expired' if kwargs.pop('expired') else None) \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the value used in line 79 in get_info? How is it populated for HTTP? I just want to ensure WS and HTTP behave the same because this looks hand-crafted but the HTTP code uses a single source (result.info).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the result.info value that gets used at that line, yes. HTTP populates this value by requesting the backend, which has the same values. The handler for revoked tasks does not pass the result, so I had to manually re-create the intended result in order to maintain parity with HTTP. All of the same data is there, it's just that rather than making a trip to the backend, I've opted to use what I've already been provided to do largely the same thing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Unfortunate that result.info is not well documented in celery.

or 'revoked'
result = KnownResult(request.id, _result, 'REVOKED')
data = Progress(result).get_info()
WebSocketProgressRecorder.push_update(request.id, data=data, final=True)