Skip to content

Commit

Permalink
export RQ status as prometheus metrics
Browse files Browse the repository at this point in the history
fixes: rq#503
  • Loading branch information
terencehonles committed Aug 16, 2024
1 parent 87dd8cf commit 79f6394
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 1 deletion.
59 changes: 59 additions & 0 deletions django_rq/metrics_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from rq.job import JobStatus

from .queues import filter_connection_params, get_connection, get_queue, get_unique_connection_configs
from .workers import get_worker_class

try:
from prometheus_client import Summary
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily

class RQCollector:
"""RQ stats collector"""

summary = Summary('rq_request_processing_seconds', 'Time spent collecting RQ data')

def collect(self):
from .settings import QUEUES

with self.summary.time():
rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues'])
rq_workers_success = CounterMetricFamily('rq_workers_success', 'RQ workers success count', labels=['name', 'queues'])
rq_workers_failed = CounterMetricFamily('rq_workers_failed', 'RQ workers fail count', labels=['name', 'queues'])
rq_workers_working_time = CounterMetricFamily('rq_workers_working_time', 'RQ workers spent seconds', labels=['name', 'queues'])

rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by state', labels=['queue', 'status'])

worker_class = get_worker_class()
unique_configs = get_unique_connection_configs()
connections = {}
for queue_name, config in QUEUES.items():
index = unique_configs.index(filter_connection_params(config))
if index not in connections:
connections[index] = connection = get_connection(queue_name)

for worker in worker_class.all(connection):
name = worker.name
label_queues = ','.join(worker.queue_names())
rq_workers.add_metric([name, worker.get_state(), label_queues], 1)
rq_workers_success.add_metric([name, label_queues], worker.successful_job_count)
rq_workers_failed.add_metric([name, label_queues], worker.failed_job_count)
rq_workers_working_time.add_metric([name, label_queues], worker.total_working_time)
else:
connection = connections[index]

queue = get_queue(queue_name, connection=connection)
rq_jobs.add_metric([queue_name, JobStatus.QUEUED], queue.count)
rq_jobs.add_metric([queue_name, JobStatus.STARTED], queue.started_job_registry.count)
rq_jobs.add_metric([queue_name, JobStatus.FINISHED], queue.finished_job_registry.count)
rq_jobs.add_metric([queue_name, JobStatus.FAILED], queue.failed_job_registry.count)
rq_jobs.add_metric([queue_name, JobStatus.DEFERRED], queue.deferred_job_registry.count)
rq_jobs.add_metric([queue_name, JobStatus.SCHEDULED], queue.scheduled_job_registry.count)

yield rq_workers
yield rq_workers_success
yield rq_workers_failed
yield rq_workers_working_time
yield rq_jobs

except ImportError:
RQCollector = None
6 changes: 6 additions & 0 deletions django_rq/urls.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from django.urls import re_path

from . import views
from .metrics_collector import RQCollector

metrics_view = [
re_path(r'^metrics/?$', views.prometheus_metrics, name='rq_metrics'),
] if RQCollector else []

urlpatterns = [
re_path(r'^$', views.stats, name='rq_home'),
re_path(r'^stats.json/(?P<token>[\w]+)?/?$', views.stats_json, name='rq_home_json'),
*metrics_view,
re_path(r'^queues/(?P<queue_index>[\d]+)/$', views.jobs, name='rq_jobs'),
re_path(r'^workers/(?P<queue_index>[\d]+)/$', views.workers, name='rq_workers'),
re_path(r'^workers/(?P<queue_index>[\d]+)/(?P<key>[-\w\.\:\$]+)/$', views.worker_details, name='rq_worker_details'),
Expand Down
30 changes: 29 additions & 1 deletion django_rq/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from django.contrib import admin, messages
from django.contrib.admin.views.decorators import staff_member_required
from django.http import Http404, JsonResponse
from django.http import Http404, HttpResponse, JsonResponse
from django.shortcuts import redirect, render
from django.urls import reverse
from django.views.decorators.cache import never_cache
Expand All @@ -27,6 +27,15 @@
from .settings import API_TOKEN, QUEUES_MAP
from .utils import get_jobs, get_scheduler_statistics, get_statistics, stop_jobs

try:
import prometheus_client

from .metrics_collector import RQCollector
except ImportError:
prometheus_client = RQCollector = None

registry = None


@never_cache
@staff_member_required
Expand All @@ -48,6 +57,25 @@ def stats_json(request, token=None):
)


@never_cache
@staff_member_required
def prometheus_metrics(request):
global registry

if not RQCollector:
raise Http404

if not registry:
registry = prometheus_client.CollectorRegistry(auto_describe=True)
registry.register(RQCollector())

encoder, content_type = prometheus_client.exposition.choose_encoder(request.META.get('HTTP_ACCEPT', ''))
if 'name[]' in request.GET:
registry = registry.restricted_registry(request.GET.getlist('name[]'))

return HttpResponse(encoder(registry), headers={'Content-Type': content_type})


@never_cache
@staff_member_required
def jobs(request, queue_index):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package_data={'': ['README.rst']},
install_requires=['django>=3.2', 'rq>=1.14', 'redis>=3'],
extras_require={
'prometheus-metrics': ['prometheus_client>=0.4.0'],
'Sentry': ['sentry-sdk>=1.0.0'],
'testing': [],
},
Expand Down

0 comments on commit 79f6394

Please sign in to comment.