diff --git a/django_rq/metrics_collector.py b/django_rq/metrics_collector.py new file mode 100644 index 00000000..2e211810 --- /dev/null +++ b/django_rq/metrics_collector.py @@ -0,0 +1,59 @@ +from rq.job import JobStatus + +from .queues import filter_connection_params, get_connection, get_queue, get_unique_connection_configs +from .workers import get_worker_class + +try: + from prometheus_client import Summary + from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily + + class RQCollector: + """RQ stats collector""" + + summary = Summary('rq_request_processing_seconds', 'Time spent collecting RQ data') + + def collect(self): + from .settings import QUEUES + + with self.summary.time(): + rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues']) + rq_workers_success = CounterMetricFamily('rq_workers_success', 'RQ workers success count', labels=['name', 'queues']) + rq_workers_failed = CounterMetricFamily('rq_workers_failed', 'RQ workers fail count', labels=['name', 'queues']) + rq_workers_working_time = CounterMetricFamily('rq_workers_working_time', 'RQ workers spent seconds', labels=['name', 'queues']) + + rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by state', labels=['queue', 'status']) + + worker_class = get_worker_class() + unique_configs = get_unique_connection_configs() + connections = {} + for queue_name, config in QUEUES.items(): + index = unique_configs.index(filter_connection_params(config)) + if index not in connections: + connections[index] = connection = get_connection(queue_name) + + for worker in worker_class.all(connection): + name = worker.name + label_queues = ','.join(worker.queue_names()) + rq_workers.add_metric([name, worker.get_state(), label_queues], 1) + rq_workers_success.add_metric([name, label_queues], worker.successful_job_count) + rq_workers_failed.add_metric([name, label_queues], worker.failed_job_count) + rq_workers_working_time.add_metric([name, label_queues], worker.total_working_time) + else: + connection = connections[index] + + queue = get_queue(queue_name, connection=connection) + rq_jobs.add_metric([queue_name, JobStatus.QUEUED], queue.count) + rq_jobs.add_metric([queue_name, JobStatus.STARTED], queue.started_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.FINISHED], queue.finished_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.FAILED], queue.failed_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.DEFERRED], queue.deferred_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.SCHEDULED], queue.scheduled_job_registry.count) + + yield rq_workers + yield rq_workers_success + yield rq_workers_failed + yield rq_workers_working_time + yield rq_jobs + +except ImportError: + RQCollector = None diff --git a/django_rq/urls.py b/django_rq/urls.py index 1aff8d9c..d60c19d3 100644 --- a/django_rq/urls.py +++ b/django_rq/urls.py @@ -1,10 +1,16 @@ from django.urls import re_path from . import views +from .metrics_collector import RQCollector + +metrics_view = [ + re_path(r'^metrics/?$', views.prometheus_metrics, name='rq_metrics'), +] if RQCollector else [] urlpatterns = [ re_path(r'^$', views.stats, name='rq_home'), re_path(r'^stats.json/(?P[\w]+)?/?$', views.stats_json, name='rq_home_json'), + *metrics_view, re_path(r'^queues/(?P[\d]+)/$', views.jobs, name='rq_jobs'), re_path(r'^workers/(?P[\d]+)/$', views.workers, name='rq_workers'), re_path(r'^workers/(?P[\d]+)/(?P[-\w\.\:\$]+)/$', views.worker_details, name='rq_worker_details'), diff --git a/django_rq/views.py b/django_rq/views.py index 95f5a43f..01a1ca6e 100644 --- a/django_rq/views.py +++ b/django_rq/views.py @@ -4,7 +4,7 @@ from django.contrib import admin, messages from django.contrib.admin.views.decorators import staff_member_required -from django.http import Http404, JsonResponse +from django.http import Http404, HttpResponse, JsonResponse from django.shortcuts import redirect, render from django.urls import reverse from django.views.decorators.cache import never_cache @@ -27,6 +27,15 @@ from .settings import API_TOKEN, QUEUES_MAP from .utils import get_jobs, get_scheduler_statistics, get_statistics, stop_jobs +try: + import prometheus_client + + from .metrics_collector import RQCollector +except ImportError: + prometheus_client, RQCollector = None + +registry = None + @never_cache @staff_member_required @@ -48,6 +57,25 @@ def stats_json(request, token=None): ) +@never_cache +@staff_member_required +def prometheus_metrics(request): + global registry + + if not RQCollector: + raise Http404 + + if not registry: + registry = CollectorRegistry(auto_describe=True) + registry.register(RQCollector()) + + encoder, content_type = prometheus_client.exposition.choose_encoder(request.META.get('HTTP_ACCEPT', '')) + if 'name[]' in request.GET: + registry = registry.restricted_registry(request.GET.getlist('name[]')) + + return HttpResponse(encoder(registry), headers={'Content-Type': content_type}) + + @never_cache @staff_member_required def jobs(request, queue_index): diff --git a/setup.py b/setup.py index fdf0f065..8bc95887 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ package_data={'': ['README.rst']}, install_requires=['django>=3.2', 'rq>=1.14', 'redis>=3'], extras_require={ + 'prometheus-metrics': ['prometheus_client>=0.4.0'], 'Sentry': ['raven>=6.1.0'], 'testing': ['mock>=2.0.0'], },