Skip to content

Commit

Permalink
add deliver_get to message stats (#793)
Browse files Browse the repository at this point in the history
Adds deliver_get to message_stats as a total of all messages sent (get + deliver).
  • Loading branch information
viktorerlingsson authored Sep 30, 2024
1 parent 8e78c73 commit f0c8695
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Added cluster_status to lavinmqctl [#787](https://github.com/cloudamqp/lavinmq/pull/787)
- Added deliver_get to message_stats [#793](https://github.com/cloudamqp/lavinmq/pull/793)

## [2.0.0-rc.4] - 2024-08-21

Expand Down
29 changes: 29 additions & 0 deletions spec/api/http_api_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ describe LavinMQ::HTTP::Server do
response = http.get("/api/overview")
before_ack_count = JSON.parse(response.body).dig("message_stats", "ack")
before_deliver_count = JSON.parse(response.body).dig("message_stats", "deliver")
before_deliver_get_count = JSON.parse(response.body).dig("message_stats", "deliver_get")

with_channel(s) do |ch|
q1 = ch.queue("stats_q1", exclusive: true)
Expand All @@ -110,6 +111,8 @@ describe LavinMQ::HTTP::Server do
count.should eq(before_ack_count.as_i + 5)
count = JSON.parse(response.body).dig("message_stats", "deliver")
count.should eq(before_deliver_count.as_i + 5)
count = JSON.parse(response.body).dig("message_stats", "deliver_get")
count.should eq(before_deliver_get_count.as_i + 5)
end
end

Expand Down Expand Up @@ -143,6 +146,7 @@ describe LavinMQ::HTTP::Server do
with_http_server do |http, s|
response = http.get("/api/overview")
before_count = JSON.parse(response.body).dig("message_stats", "get")
before_count_deliver_get = JSON.parse(response.body).dig("message_stats", "deliver_get")

with_channel(s) do |ch|
q1 = ch.queue("stats_q1", exclusive: true)
Expand All @@ -157,9 +161,33 @@ describe LavinMQ::HTTP::Server do
response = http.get("/api/overview")
count = JSON.parse(response.body).dig("message_stats", "get")
count.should eq(before_count.as_i + 5)
count = JSON.parse(response.body).dig("message_stats", "deliver_get")
count.should eq(before_count_deliver_get.as_i + 5)
end
end
end

it "should return the number of message deliver_gets" do
with_http_server do |http, s|
response = http.get("/api/overview")
before_count = JSON.parse(response.body).dig("message_stats", "deliver_get")

with_channel(s) do |ch|
q1 = ch.queue("stats_q1", exclusive: true)
10.times do
q1.publish_confirm("m")
end
5.times do
q1.get.not_nil!
end
end

response = http.get("/api/overview")
count = JSON.parse(response.body).dig("message_stats", "deliver_get")
count.should eq(before_count.as_i + 5)
end
end

describe "GET /api/aliveness-test/vhost" do
it "should run aliveness-test" do
with_http_server do |http, _|
Expand All @@ -170,6 +198,7 @@ describe LavinMQ::HTTP::Server do
end
end
end

describe "Pagination" do
it "should page results" do
with_http_server do |http, _|
Expand Down
22 changes: 21 additions & 1 deletion spec/api/queues_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,32 @@ describe LavinMQ::HTTP::QueuesController do
with_http_server do |http, s|
with_channel(s) do |ch|
q = ch.queue("stats_q")
q.publish "m1"
3.times { q.publish "m1" }
end

msgs = Channel(Int32).new
with_channel(s) do |ch|
q = ch.queue("stats_q")
1.times { q.get }

ch.prefetch 1
q.subscribe(no_ack: false) do |msg|
msgs.send 1
msg.ack
end
end
msgs.receive

body = %({ "count": 1, "ack_mode": "get", "encoding": "auto" })
http.post("/api/queues/%2f/stats_q/get", body: body)

response = http.get("/api/queues/%2f/stats_q")
response.status_code.should eq 200
body = JSON.parse(response.body)
body["message_stats"]["publish_details"]["rate"].nil?.should be_false
body["message_stats"]["get"].should eq 2
body["message_stats"]["deliver"].should eq 1
body["message_stats"]["deliver_get"].should eq 3
end
end

Expand Down
4 changes: 3 additions & 1 deletion src/lavinmq/amqp/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ module LavinMQ
@tx = false
@next_msg_body_tmp = IO::Memory.new

rate_stats({"ack", "get", "publish", "deliver", "redeliver", "reject", "confirm", "return_unroutable"})
rate_stats({"ack", "get", "publish", "deliver", "deliver_get", "redeliver", "reject", "confirm", "return_unroutable"})

Log = ::Log.for "AMQP.channel"

Expand Down Expand Up @@ -330,6 +330,7 @@ module LavinMQ
@client.vhost.event_tick(EventType::ClientRedeliver)
else
@deliver_count += 1
@deliver_get_count += 1
@client.vhost.event_tick(EventType::ClientDeliver)
end
end
Expand Down Expand Up @@ -384,6 +385,7 @@ module LavinMQ
@client.send_not_implemented(frame, "Stream queues does not support basic_get")
else
@get_count += 1
@deliver_get_count += 1
@client.vhost.event_tick(EventType::ClientGet)
ok = q.basic_get(frame.no_ack) do |env|
delivery_tag = next_delivery_tag(q, env.segment_position, frame.no_ack, nil)
Expand Down
2 changes: 1 addition & 1 deletion src/lavinmq/http/controller/main.cr
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module LavinMQ
class MainController < Controller
include StatsHelpers

OVERVIEW_STATS = {"ack", "deliver", "get", "publish", "confirm", "redeliver", "reject"}
OVERVIEW_STATS = {"ack", "deliver", "get", "deliver_get", "publish", "confirm", "redeliver", "reject"}
EXCHANGE_TYPES = {"direct", "fanout", "topic", "headers", "x-federation-upstream", "x-consistent-hash"}
CHURN_STATS = {"connection_created", "connection_closed", "channel_created", "channel_closed",
"queue_declared", "queue_deleted"}
Expand Down
10 changes: 8 additions & 2 deletions src/lavinmq/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ module LavinMQ

# Creates @[x]_count and @[x]_rate and @[y]_log
rate_stats(
{"ack", "deliver", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable"},
{"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable"},
{"message_count", "unacked_count"})

getter name, arguments, vhost, consumers, last_get_time
Expand Down Expand Up @@ -666,6 +666,7 @@ module LavinMQ
@last_get_time = RoughTime.monotonic
@queue_expiration_ttl_change.try_send? nil
@get_count += 1
@deliver_get_count += 1
get(no_ack) do |env|
yield env
end
Expand All @@ -675,7 +676,12 @@ module LavinMQ
def consume_get(consumer, & : Envelope -> Nil) : Bool
get(consumer.no_ack?) do |env|
yield env
env.redelivered ? (@redeliver_count += 1) : (@deliver_count += 1)
if env.redelivered
@redeliver_count += 1
else
@deliver_count += 1
@deliver_get_count += 1
end
end
end

Expand Down
7 changes: 6 additions & 1 deletion src/lavinmq/queue/stream_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ module LavinMQ
def consume_get(consumer : AMQP::StreamConsumer, & : Envelope -> Nil) : Bool
get(consumer) do |env|
yield env
env.redelivered ? (@redeliver_count += 1) : (@deliver_count += 1)
if env.redelivered
@redeliver_count += 1
else
@deliver_count += 1
@deliver_get_count += 1
end
end
end

Expand Down
12 changes: 8 additions & 4 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ module LavinMQ
include Stats

rate_stats({"channel_closed", "channel_created", "connection_closed", "connection_created",
"queue_declared", "queue_deleted", "ack", "deliver", "get", "publish", "confirm",
"queue_declared", "queue_deleted", "ack", "deliver", "deliver_get", "get", "publish", "confirm",
"redeliver", "reject", "consumer_added", "consumer_removed"})

getter name, exchanges, queues, data_dir, operator_policies, policies, parameters, shovels,
Expand Down Expand Up @@ -220,13 +220,14 @@ module LavinMQ

def message_details
ready = unacked = 0_u64
ack = confirm = deliver = get = get_no_ack = publish = redeliver = return_unroutable = 0_u64
ack = confirm = deliver = get = get_no_ack = publish = redeliver = return_unroutable = deliver_get = 0_u64
@queues.each_value do |q|
ready += q.message_count
unacked += q.unacked_count
ack += q.ack_count
confirm += q.confirm_count
deliver += q.deliver_count
deliver_get += q.deliver_get_count
get += q.get_count
get_no_ack += q.get_no_ack_count
publish += q.publish_count
Expand All @@ -243,6 +244,7 @@ module LavinMQ
deliver: deliver,
get: get,
get_no_ack: get_no_ack,
deliver_get: deliver_get,
publish: publish,
redeliver: redeliver,
return_unroutable: return_unroutable,
Expand Down Expand Up @@ -718,14 +720,16 @@ module LavinMQ
in EventType::QueueDeclared then @queue_declared_count += 1
in EventType::QueueDeleted then @queue_deleted_count += 1
in EventType::ClientAck then @ack_count += 1
in EventType::ClientDeliver then @deliver_count += 1
in EventType::ClientGet then @get_count += 1
in EventType::ClientPublish then @publish_count += 1
in EventType::ClientPublishConfirm then @confirm_count += 1
in EventType::ClientRedeliver then @redeliver_count += 1
in EventType::ClientReject then @reject_count += 1
in EventType::ConsumerAdded then @consumer_added_count += 1
in EventType::ConsumerRemoved then @consumer_removed_count += 1
in EventType::ClientGet then @get_count += 1
in EventType::ClientDeliver
@deliver_count += 1
@deliver_get_count += 1
end
end

Expand Down

0 comments on commit f0c8695

Please sign in to comment.