Skip to content

Commit

Permalink
Enable Prometheus metrics on the RabbitMQ Worker
Browse files Browse the repository at this point in the history
  • Loading branch information
catileptic committed Jun 4, 2024
1 parent 397acbb commit d0e4679
Showing 1 changed file with 100 additions and 1 deletion.
101 changes: 100 additions & 1 deletion servicelayer/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,24 @@
from queue import Queue, Empty
import platform
from collections import defaultdict
from threading import Thread
from timeit import default_timer

import pika.spec

from structlog.contextvars import clear_contextvars, bind_contextvars
import pika
from banal import ensure_list

from prometheus_client import (
start_http_server,
Counter,
Histogram,
REGISTRY,
GC_COLLECTOR,
PROCESS_COLLECTOR,
)

from servicelayer.cache import get_redis, make_key
from servicelayer.util import pack_now, unpack_int
from servicelayer import settings
Expand All @@ -31,6 +42,50 @@

TIMEOUT = 5

REGISTRY.unregister(GC_COLLECTOR)
REGISTRY.unregister(PROCESS_COLLECTOR)

TASKS_STARTED = Counter(
"servicelayer_tasks_started_total",
"Number of tasks that a worker started processing",
["stage"],
)

TASKS_SUCCEEDED = Counter(
"servicelayer_tasks_succeeded_total",
"Number of successfully processed tasks",
["stage", "retries"],
)

TASKS_FAILED = Counter(
"servicelayer_tasks_failed_total",
"Number of failed tasks",
["stage", "retries", "failed_permanently"],
)

TASK_DURATION = Histogram(
"servicelayer_task_duration_seconds",
"Task duration in seconds",
["stage"],
# The bucket sizes are a rough guess right now, we might want to adjust
# them later based on observed runtimes
buckets=[
0.25,
0.5,
1,
5,
15,
30,
60,
60 * 15,
60 * 30,
60 * 60,
60 * 60 * 2,
60 * 60 * 6,
60 * 60 * 24,
],
)


@dataclass
class Task:
Expand Down Expand Up @@ -392,6 +447,19 @@ def __init__(
self.local_queue = Queue()
self.prefetch_count_mapping = prefetch_count_mapping

def run_prometheus_server(self):
if not settings.PROMETHEUS_ENABLED:
return

def run_server():
port = settings.PROMETHEUS_PORT
log.info(f"Running Prometheus metrics server on port {port}")
start_http_server(port)

thread = Thread(target=run_server)
thread.start()
thread.join()

def on_signal(self, signal, _):
log.warning(f"Shutting down worker (signal {signal})")
# Exit eagerly without waiting for current task to finish running
Expand Down Expand Up @@ -453,23 +521,52 @@ def handle(self, task: Task, channel):
conn=self.conn, name=dataset_from_collection_id(task.collection_id)
)
if dataset.should_execute(task.task_id):
if task.get_retry_count(self.conn) > settings.WORKER_RETRY:
task_retry_count = task.get_retry_count(self.conn)
if task_retry_count:
TASKS_FAILED.labels(
stage=task.operation,
retries=task_retry_count,
failed_permanently=False,
).inc()

if task_retry_count > settings.WORKER_RETRY:
raise MaxRetriesExceededError(
f"Max retries reached for task {task.task_id}. Aborting."
)

dataset.checkout_task(task.task_id, task.operation)
task.increment_retry_count(self.conn)

# Emit Prometheus metrics
TASKS_STARTED.labels(stage=task.operation).inc()
start_time = default_timer()
log.info(
f"Dispatching task {task.task_id} from job {task.job_id}"
f"to worker {platform.node()}"
)

task = self.dispatch_task(task)

# Emit Prometheus metrics
duration = max(0, default_timer() - start_time)
TASK_DURATION.labels(stage=task.operation).observe(duration)
TASKS_SUCCEEDED.labels(
stage=task.operation, retries=task_retry_count
).inc()
else:
log.info(
f"Sending a NACK for message {task.delivery_tag}"
f" for task_id {task.task_id}."
f"Message will be requeued."
)
# In this case, a task ID was found neither in the
# list of Pending, nor the list of Running tasks
# in Redis. It was never attempted.
TASKS_FAILED.labels(
stage=task.operation,
retries=0,
failed_permanently=True,
).inc()
if channel.is_open:
channel.basic_nack(task.delivery_tag)
except Exception:
Expand Down Expand Up @@ -520,6 +617,8 @@ def run(self):
signal.signal(signal.SIGINT, self.on_signal)
signal.signal(signal.SIGTERM, self.on_signal)

self.run_prometheus_server()

# worker threads
def process():
return self.process(blocking=True)
Expand Down

0 comments on commit d0e4679

Please sign in to comment.