From d879e0819214d941477dbbb8de4f5eb4817287a3 Mon Sep 17 00:00:00 2001 From: Alex Stefanescu Date: Tue, 4 Jun 2024 16:29:54 +0200 Subject: [PATCH] Extract Prom. metrics to separate module --- servicelayer/metrics.py | 52 +++++++++++++++++++++++++++ servicelayer/taskqueue.py | 75 +++++++++------------------------------ servicelayer/worker.py | 69 ++++++----------------------------- 3 files changed, 79 insertions(+), 117 deletions(-) create mode 100644 servicelayer/metrics.py diff --git a/servicelayer/metrics.py b/servicelayer/metrics.py new file mode 100644 index 0000000..ad28cd7 --- /dev/null +++ b/servicelayer/metrics.py @@ -0,0 +1,52 @@ +from prometheus_client import ( + start_http_server, + Counter, + Histogram, + REGISTRY, + GC_COLLECTOR, + PROCESS_COLLECTOR, +) + +REGISTRY.unregister(GC_COLLECTOR) +REGISTRY.unregister(PROCESS_COLLECTOR) + +TASKS_STARTED = Counter( + "servicelayer_tasks_started_total", + "Number of tasks that a worker started processing", + ["stage"], +) + +TASKS_SUCCEEDED = Counter( + "servicelayer_tasks_succeeded_total", + "Number of successfully processed tasks", + ["stage", "retries"], +) + +TASKS_FAILED = Counter( + "servicelayer_tasks_failed_total", + "Number of failed tasks", + ["stage", "retries", "failed_permanently"], +) + +TASK_DURATION = Histogram( + "servicelayer_task_duration_seconds", + "Task duration in seconds", + ["stage"], + # The bucket sizes are a rough guess right now, we might want to adjust + # them later based on observed runtimes + buckets=[ + 0.25, + 0.5, + 1, + 5, + 15, + 30, + 60, + 60 * 15, + 60 * 30, + 60 * 60, + 60 * 60 * 2, + 60 * 60 * 6, + 60 * 60 * 24, + ], +) diff --git a/servicelayer/taskqueue.py b/servicelayer/taskqueue.py index 153e64a..ce30fbb 100644 --- a/servicelayer/taskqueue.py +++ b/servicelayer/taskqueue.py @@ -20,19 +20,11 @@ import pika from banal import ensure_list -from prometheus_client import ( - start_http_server, - Counter, - Histogram, - REGISTRY, - GC_COLLECTOR, - PROCESS_COLLECTOR, -) - from servicelayer.cache import get_redis, make_key from servicelayer.util import pack_now, unpack_int from servicelayer import settings from servicelayer.util import service_retries, backoff +from servicelayer import metrics log = logging.getLogger(__name__) local = threading.local() @@ -42,50 +34,6 @@ TIMEOUT = 5 -REGISTRY.unregister(GC_COLLECTOR) -REGISTRY.unregister(PROCESS_COLLECTOR) - -TASKS_STARTED = Counter( - "servicelayer_tasks_started_total", - "Number of tasks that a worker started processing", - ["stage"], -) - -TASKS_SUCCEEDED = Counter( - "servicelayer_tasks_succeeded_total", - "Number of successfully processed tasks", - ["stage", "retries"], -) - -TASKS_FAILED = Counter( - "servicelayer_tasks_failed_total", - "Number of failed tasks", - ["stage", "retries", "failed_permanently"], -) - -TASK_DURATION = Histogram( - "servicelayer_task_duration_seconds", - "Task duration in seconds", - ["stage"], - # The bucket sizes are a rough guess right now, we might want to adjust - # them later based on observed runtimes - buckets=[ - 0.25, - 0.5, - 1, - 5, - 15, - 30, - 60, - 60 * 15, - 60 * 30, - 60 * 60, - 60 * 60 * 2, - 60 * 60 * 6, - 60 * 60 * 24, - ], -) - @dataclass class Task: @@ -440,6 +388,17 @@ def __init__( version=None, prefetch_count_mapping=defaultdict(lambda: 1), ): + if settings.SENTRY_DSN: + import sentry_sdk + + sentry_sdk.init( + dsn=settings.SENTRY_DSN, + traces_sample_rate=0, + release=settings.SENTRY_RELEASE, + environment=settings.SENTRY_ENVIRONMENT, + send_default_pii=False, + ) + self.conn = conn or get_redis() self.num_threads = num_threads self.queues = ensure_list(queues) @@ -523,7 +482,7 @@ def handle(self, task: Task, channel): if dataset.should_execute(task.task_id): task_retry_count = task.get_retry_count(self.conn) if task_retry_count: - TASKS_FAILED.labels( + metrics.TASKS_FAILED.labels( stage=task.operation, retries=task_retry_count, failed_permanently=False, @@ -538,7 +497,7 @@ def handle(self, task: Task, channel): task.increment_retry_count(self.conn) # Emit Prometheus metrics - TASKS_STARTED.labels(stage=task.operation).inc() + metrics.TASKS_STARTED.labels(stage=task.operation).inc() start_time = default_timer() log.info( f"Dispatching task {task.task_id} from job {task.job_id}" @@ -549,8 +508,8 @@ def handle(self, task: Task, channel): # Emit Prometheus metrics duration = max(0, default_timer() - start_time) - TASK_DURATION.labels(stage=task.operation).observe(duration) - TASKS_SUCCEEDED.labels( + metrics.TASK_DURATION.labels(stage=task.operation).observe(duration) + metrics.TASKS_SUCCEEDED.labels( stage=task.operation, retries=task_retry_count ).inc() else: @@ -562,7 +521,7 @@ def handle(self, task: Task, channel): # In this case, a task ID was found neither in the # list of Pending, nor the list of Running tasks # in Redis. It was never attempted. - TASKS_FAILED.labels( + metrics.TASKS_FAILED.labels( stage=task.operation, retries=0, failed_permanently=True, diff --git a/servicelayer/worker.py b/servicelayer/worker.py index 46904ec..ae83657 100644 --- a/servicelayer/worker.py +++ b/servicelayer/worker.py @@ -6,19 +6,12 @@ from banal import ensure_list from abc import ABC, abstractmethod -from prometheus_client import ( - start_http_server, - Counter, - Histogram, - REGISTRY, - GC_COLLECTOR, - PROCESS_COLLECTOR, -) - from servicelayer import settings from servicelayer.jobs import Stage from servicelayer.cache import get_redis from servicelayer.util import unpack_int +from servicelayer import metrics + log = logging.getLogger(__name__) @@ -29,50 +22,6 @@ INTERVAL = 2 TASK_FETCH_RETRY = 60 / INTERVAL -REGISTRY.unregister(GC_COLLECTOR) -REGISTRY.unregister(PROCESS_COLLECTOR) - -TASKS_STARTED = Counter( - "servicelayer_tasks_started_total", - "Number of tasks that a worker started processing", - ["stage"], -) - -TASKS_SUCCEEDED = Counter( - "servicelayer_tasks_succeeded_total", - "Number of successfully processed tasks", - ["stage", "retries"], -) - -TASKS_FAILED = Counter( - "servicelayer_tasks_failed_total", - "Number of failed tasks", - ["stage", "retries", "failed_permanently"], -) - -TASK_DURATION = Histogram( - "servicelayer_task_duration_seconds", - "Task duration in seconds", - ["stage"], - # The bucket sizes are a rough guess right now, we might want to adjust - # them later based on observed runtimes - buckets=[ - 0.25, - 0.5, - 1, - 5, - 15, - 30, - 60, - 60 * 15, - 60 * 30, - 60 * 60, - 60 * 60 * 2, - 60 * 60 * 6, - 60 * 60 * 24, - ], -) - class Worker(ABC): """Workers of all microservices, unite!""" @@ -108,12 +57,14 @@ def handle_safe(self, task): retries = unpack_int(task.context.get("retries")) try: - TASKS_STARTED.labels(stage=task.stage.stage).inc() + metrics.TASKS_STARTED.labels(stage=task.stage.stage).inc() start_time = default_timer() self.handle(task) duration = max(0, default_timer() - start_time) - TASK_DURATION.labels(stage=task.stage.stage).observe(duration) - TASKS_SUCCEEDED.labels(stage=task.stage.stage, retries=retries).inc() + metrics.TASK_DURATION.labels(stage=task.stage.stage).observe(duration) + metrics.TASKS_SUCCEEDED.labels( + stage=task.stage.stage, retries=retries + ).inc() except SystemExit as exc: self.exit_code = exc.code self.retry(task) @@ -140,7 +91,7 @@ def run_prometheus_server(self): def run_server(): port = settings.PROMETHEUS_PORT log.info(f"Running Prometheus metrics server on port {port}") - start_http_server(port) + metrics.start_http_server(port) thread = Thread(target=run_server) thread.start() @@ -153,7 +104,7 @@ def retry(self, task): log.warning( f"Queueing failed task for retry #{retry_count}/{settings.WORKER_RETRY}..." # noqa ) - TASKS_FAILED.labels( + metrics.TASKS_FAILED.labels( stage=task.stage.stage, retries=retries, failed_permanently=False, @@ -164,7 +115,7 @@ def retry(self, task): log.warning( f"Failed task, exhausted retry count of {settings.WORKER_RETRY}" ) - TASKS_FAILED.labels( + metrics.TASKS_FAILED.labels( stage=task.stage.stage, retries=retries, failed_permanently=True,