Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix prometheus endpoints #628

Merged
merged 4 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions spec/api/prometheus_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,23 @@ describe LavinMQ::HTTP::ConsumersController do
response.body.lines.any?(&.starts_with? "telemetry_scrape_duration_seconds").should be_true
end

it "should perform sanity check on sampled metrics" do
vhost = Server.vhosts.create("pmths")
vhost.declare_queue("test1", true, false)
vhost.delete_queue("test1")
vhost.declare_queue("test2", true, false)
raw = get("/metrics").body
parsed_metrics = parse_prometheus(raw)
parsed_metrics.each do |metric|
case metric[:key]
when "lavinmq_queues_declared_total"
metric[:value].should eq 2
when "lavinmq_queues"
metric[:value].should eq 1
end
end
end

it "should support specifying prefix" do
prefix = "testing"
response = get("/metrics?prefix=#{prefix}")
Expand Down Expand Up @@ -39,5 +56,17 @@ describe LavinMQ::HTTP::ConsumersController do
lines = response.body.lines
lines.any?(&.starts_with? "lavinmq_detailed_connections_opened_total").should be_false
end

it "should perform sanity check on sampled metrics" do
vhost = Server.vhosts.create("pmths")
conn1 = AMQP::Client.new.connect
conn2 = AMQP::Client.new.connect
conn1.close(no_wait: true)
raw = get("/metrics/detailed?family=connection_churn_metrics").body
parsed_metrics = parse_prometheus(raw)
parsed_metrics.find! { |metric| metric[:key] == "lavinmq_detailed_connections_opened_total" }[:value].should eq 2
parsed_metrics.find! { |metric| metric[:key] == "lavinmq_detailed_connections_closed_total" }[:value].should eq 1
conn2.close(no_wait: true)
end
end
end
57 changes: 57 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ require "file_utils"
require "../src/lavinmq/config"
require "http/client"
require "amqp-client"
require "string_scanner"

Log.setup_from_env

Expand Down Expand Up @@ -160,3 +161,59 @@ Spec.after_each do
FileUtils.rm_rf("/tmp/lavinmq-spec")
Server.restart
end


class Invalid < Exception
def initialize
super("invalid input")
end
end

KEY_RE = /[\w:]+/
VALUE_RE = /-?\d+\.?\d*E?-?\d*|NaN/
ATTR_KEY_RE = /[ \w-]+/
ATTR_VALUE_RE = %r{\s*"([\\"'\sa-zA-Z0-9\-_/.+]*)"\s*}

def parse_prometheus(raw)
s = StringScanner.new(raw)
res = [] of NamedTuple(key: String, attrs: Hash(String, String), value: Float64)
until s.eos?
if s.peek(1) == "#"
s.scan(/.*\n/)
next
end
key = s.scan KEY_RE
raise Invalid.new unless key
attrs = parse_attrs(s)
value = s.scan VALUE_RE
raise Invalid.new unless value
value = value.to_f
s.scan(/\n/)
res.push({key: key, attrs: attrs, value: value})
end
res
end

def parse_attrs(s)
attrs = Hash(String, String).new
if s.scan(/\s|{/) == "{"
loop do
if s.peek(1) == "}"
s.scan(/}/)
break
end
key = s.scan ATTR_KEY_RE
raise Invalid.new unless key
key = key.strip
s.scan(/=/)
s.scan ATTR_VALUE_RE

value = s[1]
raise Invalid.new unless value
attrs[key] = value
break if s.scan(/,|}/) == "}"
end
s.scan(/\s/)
end
attrs
end
14 changes: 11 additions & 3 deletions src/lavinmq/http/controller/prometheus.cr
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ module LavinMQ
vhosts = vhosts(u)
selected = context.request.query_params.fetch_all("vhost")
vhosts = vhosts.select { |vhost| selected.includes? vhost.name } unless selected.empty?
vhosts
vhosts.to_a
end

private def register_routes
Expand Down Expand Up @@ -109,16 +109,19 @@ module LavinMQ
end
writer = PrometheusWriter.new(io, "telemetry")
writer.write({name: "scrape_duration_seconds",
type: "counter",
value: elapsed.total_seconds,
help: "Duration for metrics collection in seconds"})
writer.write({name: "scrape_mem",
type: "gauge",
value: mem,
help: "Memory used for metrics collections in bytes"})
end

private def overview_broker_metrics(vhosts, writer)
stats = vhost_stats(vhosts)
writer.write({name: "identity_info",
type: "gauge",
value: 1,
help: "System information",
labels: {
Expand Down Expand Up @@ -159,9 +162,11 @@ module LavinMQ
type: "gauge",
help: "Open TCP sockets"})
writer.write({name: "process_resident_memory_bytes",
type: "gauge",
value: @amqp_server.rss,
help: "Memory used in bytes"})
writer.write({name: "disk_space_available_bytes",
type: "gauge",
value: @amqp_server.disk_free,
help: "Disk space available in bytes"})
writer.write({name: "process_max_fds",
Expand Down Expand Up @@ -221,6 +226,7 @@ module LavinMQ

private def custom_metrics(vhosts, writer)
writer.write({name: "uptime", value: @amqp_server.uptime.to_i,
type: "counter",
help: "Server uptime in seconds"})
writer.write({name: "cpu_system_time_total",
value: @amqp_server.sys_time,
Expand All @@ -230,8 +236,10 @@ module LavinMQ
value: @amqp_server.user_time,
type: "counter",
help: "Total CPU user time"})
writer.write({name: "rss_bytes", value: @amqp_server.rss,
help: "Memory RSS in bytes"})
writer.write({name: "rss_bytes",
type: "gauge",
value: @amqp_server.rss,
help: "Memory RSS in bytes"})
writer.write({name: "stats_collection_duration_seconds_total",
value: @amqp_server.stats_collection_duration_seconds_total.to_f,
type: "gauge",
Expand Down
Loading