From 5c03d01e7fbb496fcd9b2a89e07ac7a0b15a04a6 Mon Sep 17 00:00:00 2001 From: Romain Criton Date: Thu, 28 Oct 2021 22:54:07 +0200 Subject: [PATCH 1/2] Fetch the task state only once at the beginning of the method and store it in local variable --- celery_progress/backend.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/celery_progress/backend.py b/celery_progress/backend.py index bf3cbd1..f711b5c 100644 --- a/celery_progress/backend.py +++ b/celery_progress/backend.py @@ -60,8 +60,9 @@ def __init__(self, result): self.result = result def get_info(self): - response = {'state': self.result.state} - if self.result.state in ['SUCCESS', 'FAILURE']: + state = self.result.state + response = {'state': state} + if state in ['SUCCESS', 'FAILURE']: success = self.result.successful() with allow_join_result(): response.update({ @@ -70,8 +71,8 @@ def get_info(self): 'progress': _get_completed_progress(), 'result': self.result.get(self.result.id) if success else str(self.result.info), }) - elif self.result.state in ['RETRY', 'REVOKED']: - if self.result.state == 'RETRY': + elif state in ['RETRY', 'REVOKED']: + if state == 'RETRY': retry = self.result.info when = str(retry.when) if isinstance(retry.when, datetime.datetime) else str( datetime.datetime.now() + datetime.timedelta(seconds=retry.when)) @@ -84,32 +85,32 @@ def get_info(self): 'progress': _get_completed_progress(), 'result': result, }) - elif self.result.state == 'IGNORED': + elif state == 'IGNORED': response.update({ 'complete': True, 'success': None, 'progress': _get_completed_progress(), 'result': str(self.result.info) }) - elif self.result.state == PROGRESS_STATE: + elif state == PROGRESS_STATE: response.update({ 'complete': False, 'success': None, 'progress': self.result.info, }) - elif self.result.state in ['PENDING', 'STARTED']: + elif state in ['PENDING', 'STARTED']: response.update({ 'complete': False, 'success': None, - 'progress': _get_unknown_progress(self.result.state), + 'progress': _get_unknown_progress(state), }) else: - logger.error('Task %s has unknown state %s with metadata %s', self.result.id, self.result.state, self.result.info) + logger.error('Task %s has unknown state %s with metadata %s', self.result.id, state, self.result.info) response.update({ 'complete': True, 'success': False, - 'progress': _get_unknown_progress(self.result.state), - 'result': 'Unknown state {}'.format(self.result.state), + 'progress': _get_unknown_progress(state), + 'result': 'Unknown state {}'.format(state), }) return response From 7db17638b61079656996e12e26bef59af0a72b58 Mon Sep 17 00:00:00 2001 From: Romain Criton Date: Tue, 2 Nov 2021 00:31:25 +0100 Subject: [PATCH 2/2] Address another possible race condition where the task status might change between the time when we fetch "state" and the time we fetch "info" --- celery_progress/backend.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/celery_progress/backend.py b/celery_progress/backend.py index f711b5c..9cc1fbe 100644 --- a/celery_progress/backend.py +++ b/celery_progress/backend.py @@ -60,7 +60,9 @@ def __init__(self, result): self.result = result def get_info(self): - state = self.result.state + task_meta = self.result._get_task_meta() + state = task_meta["status"] + info = task_meta["result"] response = {'state': state} if state in ['SUCCESS', 'FAILURE']: success = self.result.successful() @@ -69,16 +71,16 @@ def get_info(self): 'complete': True, 'success': success, 'progress': _get_completed_progress(), - 'result': self.result.get(self.result.id) if success else str(self.result.info), + 'result': self.result.get(self.result.id) if success else str(info), }) elif state in ['RETRY', 'REVOKED']: if state == 'RETRY': - retry = self.result.info + retry = info when = str(retry.when) if isinstance(retry.when, datetime.datetime) else str( datetime.datetime.now() + datetime.timedelta(seconds=retry.when)) result = {'when': when, 'message': retry.message or str(retry.exc)} else: - result = 'Task ' + str(self.result.info) + result = 'Task ' + str(info) response.update({ 'complete': True, 'success': False, @@ -90,13 +92,13 @@ def get_info(self): 'complete': True, 'success': None, 'progress': _get_completed_progress(), - 'result': str(self.result.info) + 'result': str(info) }) elif state == PROGRESS_STATE: response.update({ 'complete': False, 'success': None, - 'progress': self.result.info, + 'progress': info, }) elif state in ['PENDING', 'STARTED']: response.update({ @@ -105,7 +107,7 @@ def get_info(self): 'progress': _get_unknown_progress(state), }) else: - logger.error('Task %s has unknown state %s with metadata %s', self.result.id, state, self.result.info) + logger.error('Task %s has unknown state %s with metadata %s', self.result.id, state, info) response.update({ 'complete': True, 'success': False,