Skip to content

Commit

Permalink
Send NACK is a task exhausts retries
Browse files Browse the repository at this point in the history
  • Loading branch information
catileptic committed Jun 6, 2024
1 parent 8dcfab3 commit 00de8e1
Showing 1 changed file with 21 additions and 16 deletions.
37 changes: 21 additions & 16 deletions servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,20 +494,21 @@ def handle(self, task: Task, channel):
dataset = Dataset(
conn=self.conn, name=dataset_from_collection_id(task.collection_id)
)
if dataset.should_execute(task.task_id):
task_retry_count = task.get_retry_count(self.conn)
# The worker will attempt to complete a task a number of times
# defined by WORKER_RETRY.
task_retry_count = task.get_retry_count(self.conn)
# The worker will only attempt to run a task of the task_id
# exists in Redis in the "running" or "pending" lists.
should_execute = dataset.should_execute(task.task_id)
if should_execute or task_retry_count <= settings.WORKER_RETRY:
# Increase the counter of tasks that have failed but will be re-attempted
if task_retry_count:
metrics.TASKS_FAILED.labels(
stage=task.operation,
retries=task_retry_count,
failed_permanently=False,
).inc()

if task_retry_count > settings.WORKER_RETRY:
raise MaxRetriesExceededError(
f"Max retries reached for task {task.task_id}. Aborting."
)

dataset.checkout_task(task.task_id, task.operation)
task.increment_retry_count(self.conn)

Expand All @@ -528,19 +529,23 @@ def handle(self, task: Task, channel):
stage=task.operation, retries=task_retry_count
).inc()
else:
reason = ""
if not should_execute:
reason = f"Task {task.task_id} was not in found either the Running nor Pending lists"
elif task_retry_count > settings.WORKER_RETRY:
reason = f"Task {task.task_id} was attempted more than {settings.WORKER_RETRY} times"
# This task can be tracked as having failed permanently.
metrics.TASKS_FAILED.labels(
stage=task.operation,
retries=0,
failed_permanently=True,
).inc()
log.info(
f"Sending a NACK for message {task.delivery_tag}"
f" for task_id {task.task_id}."
f"Message will be requeued."
f"{reason}. Message will be requeued."
)
# In this case, a task ID was found neither in the
# list of Pending, nor the list of Running tasks
# in Redis. It was never attempted.
metrics.TASKS_FAILED.labels(
stage=task.operation,
retries=0,
failed_permanently=True,
).inc()

if channel.is_open:
channel.basic_nack(task.delivery_tag)
except Exception:
Expand Down

0 comments on commit 00de8e1

Please sign in to comment.