Skip to content

Commit

Permalink
wip: adds tests for message metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
LoisSotoLopez committed Aug 9, 2024
1 parent c3fcdf8 commit 39c7332
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 3 deletions.
4 changes: 4 additions & 0 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
backing_queue_state = BQS,
dlx = DLX,
dlx_routing_key = RK}) ->
rabbit_log:info("### maybe message ~p", [mc:size(Message)]),
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
Expand Down Expand Up @@ -723,6 +724,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid},
Delivered,
State = #q{q = Q, backing_queue = BQ}) ->
rabbit_log:info("### got message ~p", [mc:size(Message)]),
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
case attempt_delivery(Delivery, Props, Delivered, State1) of
Expand All @@ -737,8 +739,10 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC, amqqueue:get_name(Q)),
State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
{undelivered, State2 = #q{backing_queue_state = BQS}} ->
rabbit_log:info("### Size: ~p bytes", [mc:size(Message)]),
ok = rabbit_core_metrics:messages_stats(amqqueue:get_name(Q), Props#message_properties.size),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
rabbit_log:info("### got here ~p", [mc:size(Message)]),
{Dropped, State3 = #q{backing_queue_state = BQS2}} =
maybe_drop_head(State2#q{backing_queue_state = BQS1}),
QLen = BQ:len(BQS2),
Expand Down
5 changes: 5 additions & 0 deletions deps/rabbit/src/rabbit_core_metrics_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ gc_local_queues() ->
GbSetDown = gb_sets:from_list(QueuesDown),
gc_queue_metrics(GbSet, GbSetDown),
gc_entity(queue_coarse_metrics, GbSet),
gc_entity(historic_message_sizes_metrics, GbSet),
Followers = gb_sets:from_list([amqqueue:get_name(Q) || Q <- rabbit_amqqueue:list_local_followers() ]),
gc_leader_data(Followers).

Expand Down Expand Up @@ -149,6 +150,7 @@ gc_queue_metrics(GbSet, GbSetDown) ->
end, none, Table).

gc_entity(Table, GbSet) ->
rabbit_log:info("gcing entity ~p GbSet ~p", [Table, GbSet]),
ets:foldl(fun({{_, Id} = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _}, none) ->
Expand All @@ -162,6 +164,9 @@ gc_entity(Table, GbSet) ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _, _, _, _, _, _, _}, none)
when Table == queue_delivery_metrics ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _, _, _, _, _, _, _, _, _, _, _, _}, none)
when Table == historic_message_sizes_metrics ->
gc_entity(Id, Table, Key, GbSet)
end, none, Table).

Expand Down
93 changes: 92 additions & 1 deletion deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ groups() ->
channel_metrics,
node_metrics,
gen_server2_metrics,
consumer_metrics
consumer_metrics,
historic_message_size_metrics
]
}
].
Expand Down Expand Up @@ -308,6 +309,96 @@ consumer_metrics(Config) ->
dead_pid() ->
spawn(fun() -> ok end).

historic_message_size_metrics(Config) ->
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, A),

CQName = <<"classic_queue">>,
QQName = <<"quorum_queue">>,
SName = <<"stream_queue">>,

CQResource = q(CQName),
QQResource = q(QQName),
SQResource = q(SName),

[] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),

amqp_channel:call(Ch, #'queue.declare'{
queue = CQName
}),
amqp_channel:call(Ch, #'queue.declare'{
queue = QQName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]
}),
amqp_channel:call(Ch, #'queue.declare'{
queue = SName,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]
}),

[] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),


InsertFun = fun(QueueNames, PayloadSize) ->
Payload = crypto:strong_rand_bytes(PayloadSize),
AmqpMsg = #amqp_msg{payload = Payload},

lists:foreach(
fun(QName) ->
ok = amqp_channel:call(
Ch,
#'basic.publish'{
exchange = <<"">>,
routing_key = QName
},
AmqpMsg)
end,
QueueNames),
timer:sleep(500)
end,

% First insert
InsertFun([CQName, QQName, SName], 1024),

Res0 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),
{CQResource, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} =
lists:keyfind(CQResource, 1, Res0),
{QQResource, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} =
lists:keyfind(QQResource, 1, Res0),
{SQResource, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} =
lists:keyfind(SQResource, 1, Res0),
{amqp091, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} =
lists:keyfind(amqp091, 1, Res0),

InsertFun([CQName, QQName, SName], 8 * 1024 * 1024),
Res1 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),
{CQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0} =
lists:keyfind(CQResource, 1, Res1),
{QQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0} =
lists:keyfind(QQResource, 1, Res1),
{SQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0} =
lists:keyfind(SQResource, 1, Res1),
{amqp091, 0, 3, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0} =
lists:keyfind(amqp091, 1, Res1),

InsertFun([CQName, QQName, SName], 16 * 1024 * 1024),
amqp_channel:call(Ch, #'queue.delete'{queue = CQName}),
timer:sleep(500),
rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]),

Res2 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),
false =
lists:keyfind(CQResource, 1, Res2),
{QQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0} =
lists:keyfind(QQResource, 1, Res2),
{SQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0} =
lists:keyfind(SQResource, 1, Res2),
{amqp091, 0, 3, 0, 0, 0, 0, 0, 0, 3, 3, 0, 0, 0} =
lists:keyfind(amqp091, 1, Res2),

ok.

q(Name) ->
#resource{ virtual_host = <<"/">>,
kind = queue,
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit_common/include/rabbit_core_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@
% Message metrics include the number of messages ever received for each range of possible message sizes.
% Each possible range of sizes is refered to as a "bucket".
% First bucket goes from 0 to 256 bytes, second from 257 to 1024 bytes, and so on.
-define(MSG_SIZE_BUCKETS_LIMITS, [{2,256}, {3,1024}, {4, 4096}, {5, 16384}, {6,65536}, {7, 262144}, {8, 1048576}, {9, 4194304}, {10, 16777216}, {11, 67108864}, {12, 268435456}, {13, 1073741824}, {14, 4294967296}, {15, infinity}]).
-define(MSG_SIZE_BUCKETS_DEFAULT, {'_', 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}).
-define(MSG_SIZE_BUCKETS_LIMITS, [{2,256}, {3,1024}, {4, 4096}, {5, 16384}, {6,65536}, {7, 262144}, {8, 1048576}, {9, 4194304}, {10, 16777216}, {11, 67108864}, {12, 134217728}, {13, 536870912}, {14, infinity}]).
-define(MSG_SIZE_BUCKETS_DEFAULT, {'_', 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}).

3 changes: 3 additions & 0 deletions deps/rabbit_common/src/rabbit_core_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ build_match_spec_conditions_to_delete_all_queues([Queue|Queues]) ->
build_match_spec_conditions_to_delete_all_queues([]) ->
true.

messages_stats(_Domain, []) ->
ok;
messages_stats(Domain, MessagesSizes) when is_list(MessagesSizes) ->
[LargerMessageSize | _] = lists:sort(
fun(A, B) -> A > B end,
Expand All @@ -391,6 +393,7 @@ messages_stats(Domain, MessagesSizes) when is_list(MessagesSizes) ->
false ->
ok
end,

BucketIncrsAsMap = lists:foldl(
fun(MessageSize, Acc) ->
{value, {Bucket, _MaxSize}} = lists:search(
Expand Down

0 comments on commit 39c7332

Please sign in to comment.