From 9cabc30968f6ebb06cba6be02b8e843bc25830f5 Mon Sep 17 00:00:00 2001 From: "Terence D. Honles" Date: Fri, 16 Aug 2024 14:32:46 +0200 Subject: [PATCH] export RQ status as prometheus metrics fixes: https://github.com/rq/django-rq/issues/503 --- django_rq/apps.py | 65 ++++++++++++++++++++++++++++++++++++++++++++++ django_rq/urls.py | 10 +++++++ django_rq/views.py | 21 ++++++++++++++- setup.py | 1 + 4 files changed, 96 insertions(+), 1 deletion(-) diff --git a/django_rq/apps.py b/django_rq/apps.py index f3f2e3b1..6459dfce 100644 --- a/django_rq/apps.py +++ b/django_rq/apps.py @@ -1,6 +1,71 @@ from django.apps import AppConfig +from .queues import filter_connection_params, get_connection, get_queue_class, get_unique_connection_configs +from .workers import get_worker_class + +try: + from rq.job import JobStatus + from prometheus_client import Summary, REGISTRY + from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily + + class RQCollector: + """RQ stats collector""" + + summary = Summary('rq_request_processing_seconds', 'Time spent collecting RQ data') + + def collect(self): + from .settings import QUEUES + + with self.summary.time(): + rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues']) + rq_workers_success = CounterMetricFamily('rq_workers_success', 'RQ workers success count', labels=['name', 'queues']) + rq_workers_failed = CounterMetricFamily('rq_workers_failed', 'RQ workers fail count', labels=['name', 'queues']) + rq_workers_working_time = CounterMetricFamily('rq_workers_working_time', 'RQ workers spent seconds', labels=['name', 'queues']) + + rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by state', labels=['queue', 'status']) + + worker_class = get_worker_class() + unique_configs = get_unique_connection_configs() + connections = {} + for queue_name, config in QUEUES.items(): + index = unique_configs.index(filter_connection_params(config)) + if index not in connections: + connections[index] = connection = get_connection(queue_name) + + for worker in worker_class.all(connection): + name = worker.name + label_queues = ','.join(worker.queue_names()) + rq_workers.add_metric([name, worker.get_state(), label_queues], 1) + rq_workers_success.add_metric([name, label_queues], worker.successful_job_count) + rq_workers_failed.add_metric([name, label_queues], worker.failed_job_count) + rq_workers_working_time.add_metric([name, label_queues], worker.total_working_time) + else: + connection = connections[index] + + yield rq_workers + yield rq_workers_success + yield rq_workers_failed + yield rq_workers_working_time + + queue_class = get_queue_class(config) + for queue in queue_class.all(connection): + rq_jobs.add_metric([queue_name, JobStatus.QUEUED], queue.count) + rq_jobs.add_metric([queue_name, JobStatus.STARTED], queue.started_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.FINISHED], queue.finished_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.FAILED], queue.failed_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.DEFERRED], queue.deferred_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.SCHEDULED], queue.scheduled_job_registry.count) + + yield rq_jobs + +except ImportError: + RQCollector = None + class DjangoRqAdminConfig(AppConfig): default_auto_field = "django.db.models.AutoField" name = "django_rq" + + def ready(self): + if RQCollector is not None: + REGISTRY.register(RQCollector()) diff --git a/django_rq/urls.py b/django_rq/urls.py index 1aff8d9c..15cce015 100644 --- a/django_rq/urls.py +++ b/django_rq/urls.py @@ -2,9 +2,19 @@ from . import views +try: + import rq_exporter + + metrics_view = [ + re_path(r'^metrics/?$', views.prometheus_metrics, name='rq_metrics'), + ] +except ImportError: + metrics_view = [] + urlpatterns = [ re_path(r'^$', views.stats, name='rq_home'), re_path(r'^stats.json/(?P[\w]+)?/?$', views.stats_json, name='rq_home_json'), + *metrics_view, re_path(r'^queues/(?P[\d]+)/$', views.jobs, name='rq_jobs'), re_path(r'^workers/(?P[\d]+)/$', views.workers, name='rq_workers'), re_path(r'^workers/(?P[\d]+)/(?P[-\w\.\:\$]+)/$', views.worker_details, name='rq_worker_details'), diff --git a/django_rq/views.py b/django_rq/views.py index 95f5a43f..5c5a1add 100644 --- a/django_rq/views.py +++ b/django_rq/views.py @@ -4,7 +4,7 @@ from django.contrib import admin, messages from django.contrib.admin.views.decorators import staff_member_required -from django.http import Http404, JsonResponse +from django.http import Http404, HttpResponse, JsonResponse from django.shortcuts import redirect, render from django.urls import reverse from django.views.decorators.cache import never_cache @@ -27,6 +27,11 @@ from .settings import API_TOKEN, QUEUES_MAP from .utils import get_jobs, get_scheduler_statistics, get_statistics, stop_jobs +try: + import prometheus_client +except ImportError: + prometheus_client = None + @never_cache @staff_member_required @@ -48,6 +53,20 @@ def stats_json(request, token=None): ) +@never_cache +@staff_member_required +def prometheus_metrics(request): + if not prometheus_client: + raise Http404 + + registry = prometheus_client.REGISTRY + encoder, content_type = prometheus_client.exposition.choose_encoder(request.META.get('HTTP_ACCEPT', '')) + if 'name[]' in request.GET: + registry = registry.restricted_registry(request.GET.getlist('name[]')) + + return HttpResponse(encoder(registry), headers={'Content-Type': content_type}) + + @never_cache @staff_member_required def jobs(request, queue_index): diff --git a/setup.py b/setup.py index fdf0f065..8bc95887 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ package_data={'': ['README.rst']}, install_requires=['django>=3.2', 'rq>=1.14', 'redis>=3'], extras_require={ + 'prometheus-metrics': ['prometheus_client>=0.4.0'], 'Sentry': ['raven>=6.1.0'], 'testing': ['mock>=2.0.0'], },