Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introducing KnownResult for WS #52

Merged
merged 15 commits into from
Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions celery_progress/backend.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from abc import ABCMeta, abstractmethod
from decimal import Decimal

from celery.result import AsyncResult, allow_join_result
from celery.result import EagerResult, allow_join_result
from celery.backends.base import DisabledBackend


PROGRESS_STATE = 'PROGRESS'
Expand Down Expand Up @@ -64,9 +65,12 @@ def stop_task(self, current, total, exc):

class Progress(object):

def __init__(self, task_id):
self.task_id = task_id
self.result = AsyncResult(task_id)
def __init__(self, result):
'''
result:
an AsyncResult or an object that mimics it to a degree
'''
self.result = result

def get_info(self):
if self.result.ready():
Expand All @@ -76,7 +80,7 @@ def get_info(self):
'complete': True,
'success': success,
'progress': _get_completed_progress(),
'result': self.result.get(self.task_id) if success else str(self.result.info),
'result': self.result.get(self.result.id) if success else str(self.result.info),
}
elif self.result.state == PROGRESS_STATE:
return {
Expand All @@ -93,6 +97,25 @@ def get_info(self):
return self.result.info


class KnownResult(EagerResult):
'''Like EagerResult but supports non-ready states.'''
def __init__(self, id, ret_value, state, traceback=None):
'''
ret_value:
result, exception, or progress metadata
'''
# set backend to get state groups (like READY_STATES in ready())
self.backend = DisabledBackend
super().__init__(id, ret_value, state, traceback)

def ready(self):
return super(EagerResult, self).ready()

def __del__(self):
# throws an exception if not overridden
pass


def _get_completed_progress():
return {
'current': 100,
Expand Down
3 changes: 2 additions & 1 deletion celery_progress/views.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
from django.http import HttpResponse
from celery.result import AsyncResult
from celery_progress.backend import Progress


def get_progress(request, task_id):
progress = Progress(task_id)
progress = Progress(AsyncResult(task_id))
return HttpResponse(json.dumps(progress.get_info()), content_type='application/json')
3 changes: 2 additions & 1 deletion celery_progress/websockets/backend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from celery.result import AsyncResult
from celery_progress.backend import ProgressRecorder, Progress

try:
Expand All @@ -24,7 +25,7 @@ def push_update(task_id):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
task_id,
{'type': 'update_task_progress', 'data': {**Progress(task_id).get_info()}}
{'type': 'update_task_progress', 'data': {**Progress(AsyncResult(task_id)).get_info()}}
)
except AttributeError: # No channel layer to send to, so ignore it
pass
Expand Down
3 changes: 2 additions & 1 deletion celery_progress/websockets/consumers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from channels.generic.websocket import AsyncWebsocketConsumer
import json

from celery.result import AsyncResult
from celery_progress.backend import Progress


Expand Down Expand Up @@ -30,7 +31,7 @@ async def receive(self, text_data):
self.task_id,
{
'type': 'update_task_progress',
'data': {**Progress(self.task_id).get_info()}
'data': {**Progress(AsyncResult(self.task_id)).get_info()}
}
)

Expand Down