From 45524daae67520f1c371c238d8ddbf7d44540ddb Mon Sep 17 00:00:00 2001 From: Christina Date: Wed, 31 Jan 2024 16:01:41 +0100 Subject: [PATCH 1/4] let vhosts be an array so we can iterate over it multiple times --- src/lavinmq/http/controller/prometheus.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lavinmq/http/controller/prometheus.cr b/src/lavinmq/http/controller/prometheus.cr index 377878761a..bf5f6e1867 100644 --- a/src/lavinmq/http/controller/prometheus.cr +++ b/src/lavinmq/http/controller/prometheus.cr @@ -57,7 +57,7 @@ module LavinMQ vhosts = vhosts(u) selected = context.request.query_params.fetch_all("vhost") vhosts = vhosts.select { |vhost| selected.includes? vhost.name } unless selected.empty? - vhosts + vhosts.to_a end private def register_routes From 63c5168818321a76931b822cff3179d641395c6d Mon Sep 17 00:00:00 2001 From: Christina Date: Thu, 1 Feb 2024 08:49:52 +0100 Subject: [PATCH 2/4] make sure all metrics have a type so datadog can scrape them --- src/lavinmq/http/controller/prometheus.cr | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/lavinmq/http/controller/prometheus.cr b/src/lavinmq/http/controller/prometheus.cr index bf5f6e1867..35c36f3072 100644 --- a/src/lavinmq/http/controller/prometheus.cr +++ b/src/lavinmq/http/controller/prometheus.cr @@ -109,9 +109,11 @@ module LavinMQ end writer = PrometheusWriter.new(io, "telemetry") writer.write({name: "scrape_duration_seconds", + type: "counter", value: elapsed.total_seconds, help: "Duration for metrics collection in seconds"}) writer.write({name: "scrape_mem", + type: "gauge", value: mem, help: "Memory used for metrics collections in bytes"}) end @@ -119,6 +121,7 @@ module LavinMQ private def overview_broker_metrics(vhosts, writer) stats = vhost_stats(vhosts) writer.write({name: "identity_info", + type: "gauge", value: 1, help: "System information", labels: { @@ -159,9 +162,11 @@ module LavinMQ type: "gauge", help: "Open TCP sockets"}) writer.write({name: "process_resident_memory_bytes", + type: "gauge", value: @amqp_server.rss, help: "Memory used in bytes"}) writer.write({name: "disk_space_available_bytes", + type: "gauge", value: @amqp_server.disk_free, help: "Disk space available in bytes"}) writer.write({name: "process_max_fds", @@ -221,6 +226,7 @@ module LavinMQ private def custom_metrics(vhosts, writer) writer.write({name: "uptime", value: @amqp_server.uptime.to_i, + type: "counter", help: "Server uptime in seconds"}) writer.write({name: "cpu_system_time_total", value: @amqp_server.sys_time, @@ -230,7 +236,9 @@ module LavinMQ value: @amqp_server.user_time, type: "counter", help: "Total CPU user time"}) - writer.write({name: "rss_bytes", value: @amqp_server.rss, + writer.write({name: "rss_bytes", + type: "gauge", + value: @amqp_server.rss, help: "Memory RSS in bytes"}) writer.write({name: "stats_collection_duration_seconds_total", value: @amqp_server.stats_collection_duration_seconds_total.to_f, From 546045d65d4a91c9bca324850d679a3e27760edf Mon Sep 17 00:00:00 2001 From: Christina Date: Wed, 7 Feb 2024 11:54:08 +0100 Subject: [PATCH 3/4] add specs that sample metrics and validates them, also add prometheus parser to spec_helper --- spec/api/prometheus_spec.cr | 29 ++++++++++++ spec/spec_helper.cr | 57 +++++++++++++++++++++++ src/lavinmq/http/controller/prometheus.cr | 14 +++--- 3 files changed, 93 insertions(+), 7 deletions(-) diff --git a/spec/api/prometheus_spec.cr b/spec/api/prometheus_spec.cr index c952d03334..520067b122 100644 --- a/spec/api/prometheus_spec.cr +++ b/spec/api/prometheus_spec.cr @@ -8,6 +8,23 @@ describe LavinMQ::HTTP::ConsumersController do response.body.lines.any?(&.starts_with? "telemetry_scrape_duration_seconds").should be_true end + it "should perform sanity check on sampled metrics" do + vhost = Server.vhosts.create("pmths") + vhost.declare_queue("test1", true, false) + vhost.delete_queue("test1") + vhost.declare_queue("test2", true, false) + raw = get("/metrics").body + parsed_metrics = parse_prometheus(raw) + parsed_metrics.each do |metric| + case metric[:key] + when "lavinmq_queues_declared_total" + metric[:value].should eq 2 + when "lavinmq_queues" + metric[:value].should eq 1 + end + end + end + it "should support specifying prefix" do prefix = "testing" response = get("/metrics?prefix=#{prefix}") @@ -39,5 +56,17 @@ describe LavinMQ::HTTP::ConsumersController do lines = response.body.lines lines.any?(&.starts_with? "lavinmq_detailed_connections_opened_total").should be_false end + + it "should perform sanity check on sampled metrics" do + vhost = Server.vhosts.create("pmths") + conn1 = AMQP::Client.new.connect + conn2 = AMQP::Client.new.connect + conn1.close(no_wait: true) + raw = get("/metrics/detailed?family=connection_churn_metrics").body + parsed_metrics = parse_prometheus(raw) + parsed_metrics.find! { |metric| metric[:key] == "lavinmq_detailed_connections_opened_total" }[:value].should eq 2 + parsed_metrics.find! { |metric| metric[:key] == "lavinmq_detailed_connections_closed_total" }[:value].should eq 1 + conn2.close(no_wait: true) + end end end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 85584f3b12..a66e3233db 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -3,6 +3,7 @@ require "file_utils" require "../src/lavinmq/config" require "http/client" require "amqp-client" +require "string_scanner" Log.setup_from_env @@ -160,3 +161,59 @@ Spec.after_each do FileUtils.rm_rf("/tmp/lavinmq-spec") Server.restart end + + +class Invalid < Exception + def initialize + super("invalid input") + end +end + +KEY_RE = /[\w:]+/ +VALUE_RE = /-?\d+\.?\d*E?-?\d*|NaN/ +ATTR_KEY_RE = /[ \w-]+/ +ATTR_VALUE_RE = %r{\s*"([\\"'\sa-zA-Z0-9\-_/.+]*)"\s*} + +def parse_prometheus(raw) + s = StringScanner.new(raw) + res = [] of NamedTuple(key: String, attrs: Hash(String, String), value: Float64) + until s.eos? + if s.peek(1) == "#" + s.scan(/.*\n/) + next + end + key = s.scan KEY_RE + raise Invalid.new unless key + attrs = parse_attrs(s) + value = s.scan VALUE_RE + raise Invalid.new unless value + value = value.to_f + s.scan(/\n/) + res.push({key: key, attrs: attrs, value: value}) + end + res +end + +def parse_attrs(s) + attrs = Hash(String, String).new + if s.scan(/\s|{/) == "{" + loop do + if s.peek(1) == "}" + s.scan(/}/) + break + end + key = s.scan ATTR_KEY_RE + raise Invalid.new unless key + key = key.strip + s.scan(/=/) + s.scan ATTR_VALUE_RE + + value = s[1] + raise Invalid.new unless value + attrs[key] = value + break if s.scan(/,|}/) == "}" + end + s.scan(/\s/) + end + attrs +end diff --git a/src/lavinmq/http/controller/prometheus.cr b/src/lavinmq/http/controller/prometheus.cr index 35c36f3072..c7d3c6cc93 100644 --- a/src/lavinmq/http/controller/prometheus.cr +++ b/src/lavinmq/http/controller/prometheus.cr @@ -109,11 +109,11 @@ module LavinMQ end writer = PrometheusWriter.new(io, "telemetry") writer.write({name: "scrape_duration_seconds", - type: "counter", + type: "counter", value: elapsed.total_seconds, help: "Duration for metrics collection in seconds"}) writer.write({name: "scrape_mem", - type: "gauge", + type: "gauge", value: mem, help: "Memory used for metrics collections in bytes"}) end @@ -162,11 +162,11 @@ module LavinMQ type: "gauge", help: "Open TCP sockets"}) writer.write({name: "process_resident_memory_bytes", - type: "gauge", + type: "gauge", value: @amqp_server.rss, help: "Memory used in bytes"}) writer.write({name: "disk_space_available_bytes", - type: "gauge", + type: "gauge", value: @amqp_server.disk_free, help: "Disk space available in bytes"}) writer.write({name: "process_max_fds", @@ -236,10 +236,10 @@ module LavinMQ value: @amqp_server.user_time, type: "counter", help: "Total CPU user time"}) - writer.write({name: "rss_bytes", - type: "gauge", + writer.write({name: "rss_bytes", + type: "gauge", value: @amqp_server.rss, - help: "Memory RSS in bytes"}) + help: "Memory RSS in bytes"}) writer.write({name: "stats_collection_duration_seconds_total", value: @amqp_server.stats_collection_duration_seconds_total.to_f, type: "gauge", From 69711025162e464c4c273cf84a04c8182942755b Mon Sep 17 00:00:00 2001 From: Christina Date: Wed, 7 Feb 2024 16:40:26 +0100 Subject: [PATCH 4/4] format --- spec/api/prometheus_spec.cr | 2 +- spec/spec_helper.cr | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/spec/api/prometheus_spec.cr b/spec/api/prometheus_spec.cr index 520067b122..ff52fb3099 100644 --- a/spec/api/prometheus_spec.cr +++ b/spec/api/prometheus_spec.cr @@ -19,7 +19,7 @@ describe LavinMQ::HTTP::ConsumersController do case metric[:key] when "lavinmq_queues_declared_total" metric[:value].should eq 2 - when "lavinmq_queues" + when "lavinmq_queues" metric[:value].should eq 1 end end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index a66e3233db..3301d19b58 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -162,7 +162,6 @@ Spec.after_each do Server.restart end - class Invalid < Exception def initialize super("invalid input")