Skip to content

Commit

Permalink
Merge pull request #150 from tarunps/feat/agent-rq-monitor
Browse files Browse the repository at this point in the history
feat(exporter): Add endpoint for RQ metrics
  • Loading branch information
adityahase authored Feb 10, 2025
2 parents c5a80f4 + 89f9ceb commit 9650bfb
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 0 deletions.
4 changes: 4 additions & 0 deletions agent/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,9 @@ def start(self):
ssh_port = self.bench_config.get("ssh_port", self.bench_config["web_port"] + 4000)
ssh_ip = self.bench_config.get("private_ip", "127.0.0.1")

rq_port = self.bench_config.get("rq_port")
rq_port_mapping = f"-p 127.0.0.1:{rq_port}:11000 "

bench_directory = "/home/frappe/frappe-bench"
mounts = self.prepare_mounts_on_host(bench_directory)

Expand All @@ -703,6 +706,7 @@ def start(self):
f"-p 127.0.0.1:{self.bench_config['web_port']}:8000 "
f"-p 127.0.0.1:{self.bench_config['socketio_port']}:9000 "
f"-p 127.0.0.1:{self.bench_config['codeserver_port']}:8088 "
f"{rq_port_mapping if rq_port else ''}"
f"-p {ssh_ip}:{ssh_port}:2200 "
f"-v {self.sites_directory}:{bench_directory}/sites "
f"-v {self.logs_directory}:{bench_directory}/logs "
Expand Down
109 changes: 109 additions & 0 deletions agent/exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily
from prometheus_client.registry import Collector
from redis import Redis
from rq import Queue, Worker
from rq.job import JobStatus


def get_metrics(name: str, port: int):
from prometheus_client.exposition import generate_latest

return generate_latest(RQCollector(name, port))


def get_workers_stats(connection: Redis):
workers = Worker.all(connection=connection)

return [
{
"name": w.name,
"queues": w.queue_names(),
"state": w.get_state(),
"successful_job_count": w.successful_job_count,
"failed_job_count": w.failed_job_count,
"total_working_time": w.total_working_time,
}
for w in workers
]


def get_jobs_by_queue(connection: Redis):
return {
queue.name: {
JobStatus.QUEUED: queue.count,
JobStatus.STARTED: queue.started_job_registry.count,
JobStatus.FINISHED: queue.finished_job_registry.count,
JobStatus.FAILED: queue.failed_job_registry.count,
JobStatus.DEFERRED: queue.deferred_job_registry.count,
JobStatus.SCHEDULED: queue.scheduled_job_registry.count,
}
for queue in Queue.all(connection=connection)
}


class RQCollector(Collector):
def __init__(self, name: str, port: int):
self.name = name
self.port = port
self.conn = Redis(port=port)
super().__init__()

def collect(self):
rq_workers = GaugeMetricFamily(
"rq_workers",
"RQ workers",
labels=["bench", "name", "state", "queues"],
)
rq_workers_success = CounterMetricFamily(
"rq_workers_success",
"RQ workers success count",
labels=["bench", "name", "queues"],
)
rq_workers_failed = CounterMetricFamily(
"rq_workers_failed",
"RQ workers fail count",
labels=["bench", "name", "queues"],
)
rq_workers_working_time = CounterMetricFamily(
"rq_workers_working_time",
"RQ workers spent seconds",
labels=["bench", "name", "queues"],
)

rq_jobs = GaugeMetricFamily("rq_jobs", "RQ jobs by state", labels=["bench", "queue", "status"])

workers = get_workers_stats(self.conn)
for worker in workers:
label_queues = ",".join(worker["queues"])
rq_workers.add_metric(
[
self.name,
worker["name"],
worker["state"],
label_queues,
],
1,
)
rq_workers_success.add_metric(
[self.name, worker["name"], label_queues],
worker["successful_job_count"],
)
rq_workers_failed.add_metric(
[self.name, worker["name"], label_queues],
worker["failed_job_count"],
)
rq_workers_working_time.add_metric(
[self.name, worker["name"], label_queues],
worker["total_working_time"],
)

yield rq_workers
yield rq_workers_success
yield rq_workers_failed
yield rq_workers_working_time

for queue_name, jobs in get_jobs_by_queue(self.conn).items():
for status, count in jobs.items():
rq_jobs.add_metric([self.name, queue_name, status], count)

yield rq_jobs
8 changes: 8 additions & 0 deletions agent/templates/agent/nginx.conf.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ server {
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Port $server_port;

location /agent/benches/metrics {
return 301 /metrics/rq;
}

proxy_pass http://127.0.0.1:{{ web_port }}/;
}

Expand Down Expand Up @@ -135,6 +139,10 @@ server {
proxy_pass http://127.0.0.1:9114/metrics;
}

location /metrics/rq {
proxy_pass http://127.0.0.1:{{ web_port }}/benches/metrics;
}

}

{% if registry %}
Expand Down
28 changes: 28 additions & 0 deletions agent/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def wrapper(*args, **kwargs):

@application.before_request
def validate_access_token():
exempt_endpoints = ["get_metrics"]
if request.endpoint in exempt_endpoints:
return None

try:
if application.debug:
return None
Expand Down Expand Up @@ -225,12 +229,36 @@ def get_benches():
return {name: bench.dump() for name, bench in Server().benches.items()}


@application.route("/benches/metrics")
def get_metrics():
from agent.exporter import get_metrics

benches_metrics = [
get_metrics(name, rq_port)
for name, bench in Server().benches.items()
if (rq_port := bench.bench_config.get("rq_port")) is not None
]
return Response(benches_metrics, mimetype="text/plain")


@application.route("/benches/<string:bench>")
@validate_bench
def get_bench(bench):
return Server().benches[bench].dump()


@application.route("/benches/<string:bench_str>/metrics", methods=["GET"])
def get_bench_metrics(bench_str):
from agent.exporter import get_metrics

bench = Server().benches[bench_str]
rq_port = bench.bench_config.get("rq_port")
if rq_port:
return Response(get_metrics(bench_str, rq_port), mimetype="text/plain")

return Response("Unavailable", status=400, mimetype="text/plain")


@application.route("/benches/<string:bench>/info", methods=["POST", "GET"])
@validate_bench
def fetch_sites_info(bench):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Jinja2==2.10.3
MarkupSafe==1.1.1
passlib==1.7.2
peewee==3.13.1
prometheus_client==0.20.0
PyMySQL==0.9.3
python-crontab==2.5.1
python-dateutil==2.8.2
Expand Down

0 comments on commit 9650bfb

Please sign in to comment.