Skip to content

Commit

Permalink
feat: test message metrics generation and gc
Browse files Browse the repository at this point in the history
  • Loading branch information
LoisSotoLopez committed Aug 12, 2024
1 parent 39c7332 commit 0c4e62f
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 42 deletions.
4 changes: 0 additions & 4 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,6 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
backing_queue_state = BQS,
dlx = DLX,
dlx_routing_key = RK}) ->
rabbit_log:info("### maybe message ~p", [mc:size(Message)]),
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
Expand Down Expand Up @@ -724,7 +723,6 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid},
Delivered,
State = #q{q = Q, backing_queue = BQ}) ->
rabbit_log:info("### got message ~p", [mc:size(Message)]),
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
case attempt_delivery(Delivery, Props, Delivered, State1) of
Expand All @@ -739,10 +737,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC, amqqueue:get_name(Q)),
State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
{undelivered, State2 = #q{backing_queue_state = BQS}} ->
rabbit_log:info("### Size: ~p bytes", [mc:size(Message)]),
ok = rabbit_core_metrics:messages_stats(amqqueue:get_name(Q), Props#message_properties.size),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
rabbit_log:info("### got here ~p", [mc:size(Message)]),
{Dropped, State3 = #q{backing_queue_state = BQS2}} =
maybe_drop_head(State2#q{backing_queue_state = BQS1}),
QLen = BQ:len(BQS2),
Expand Down
8 changes: 7 additions & 1 deletion deps/rabbit/src/rabbit_core_metrics_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ gc_local_queues() ->
GbSetDown = gb_sets:from_list(QueuesDown),
gc_queue_metrics(GbSet, GbSetDown),
gc_entity(queue_coarse_metrics, GbSet),
gc_entity(historic_message_metrics, GbSet),
gc_entity(historic_message_sizes_metrics, GbSet),
Followers = gb_sets:from_list([amqqueue:get_name(Q) || Q <- rabbit_amqqueue:list_local_followers() ]),
gc_leader_data(Followers).
Expand Down Expand Up @@ -150,9 +151,11 @@ gc_queue_metrics(GbSet, GbSetDown) ->
end, none, Table).

gc_entity(Table, GbSet) ->
rabbit_log:info("gcing entity ~p GbSet ~p", [Table, GbSet]),
ets:foldl(fun({{_, Id} = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
({Id = _Key, _}, none)
when Id == amqp091 ->
none;
({Id = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _}, none) ->
Expand All @@ -165,6 +168,9 @@ gc_entity(Table, GbSet) ->
({Id = Key, _, _, _, _, _, _, _, _}, none)
when Table == queue_delivery_metrics ->
gc_entity(Id, Table, Key, GbSet);
({Id = _Key, _, _, _, _, _, _, _, _, _, _, _, _, _}, none)
when Id == amqp091 ->
none;
({Id = Key, _, _, _, _, _, _, _, _, _, _, _, _, _}, none)
when Table == historic_message_sizes_metrics ->
gc_entity(Id, Table, Key, GbSet)
Expand Down
115 changes: 78 additions & 37 deletions deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ groups() ->
node_metrics,
gen_server2_metrics,
consumer_metrics,
historic_message_size_metrics
historic_message_metrics
]
}
].
Expand Down Expand Up @@ -309,20 +309,25 @@ consumer_metrics(Config) ->
dead_pid() ->
spawn(fun() -> ok end).

historic_message_size_metrics(Config) ->
historic_message_metrics(Config) ->
A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, A),


% Need to delete the tested table since node does not restart between tests
true = rabbit_ct_broker_helpers:rpc(Config, A, ets, delete_all_objects, [historic_message_sizes_metrics]),
true = rabbit_ct_broker_helpers:rpc(Config, A, ets, delete_all_objects, [historic_message_metrics]),

CQName = <<"classic_queue">>,
QQName = <<"quorum_queue">>,
SName = <<"stream_queue">>,

CQResource = q(CQName),
QQResource = q(QQName),
SQResource = q(SName),

[] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),

[] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_metrics]),

amqp_channel:call(Ch, #'queue.declare'{
queue = CQName
}),
Expand All @@ -336,14 +341,14 @@ historic_message_size_metrics(Config) ->
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]
}),

[] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),


InsertFun = fun(QueueNames, PayloadSize) ->
Payload = crypto:strong_rand_bytes(PayloadSize),
AmqpMsg = #amqp_msg{payload = Payload},

lists:foreach(
fun(QName) ->
ok = amqp_channel:call(
Expand All @@ -355,47 +360,83 @@ historic_message_size_metrics(Config) ->
AmqpMsg)
end,
QueueNames),
timer:sleep(500)
timer:sleep(150)
end,

% First insert
InsertFun([CQName, QQName, SName], 1024),

Res0 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),
Metrics0 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_metrics]),
{CQResource, 1024} = lists:keyfind(CQResource, 1, Metrics0),
{QQResource, 1024} = lists:keyfind(QQResource, 1, Metrics0),
{SQResource, 1024} = lists:keyfind(SQResource, 1, Metrics0),
{amqp091, 1024} = lists:keyfind(amqp091, 1, Metrics0),
Sizes0 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),
{CQResource, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} =
lists:keyfind(CQResource, 1, Res0),
lists:keyfind(CQResource, 1, Sizes0),
{QQResource, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} =
lists:keyfind(QQResource, 1, Res0),
lists:keyfind(QQResource, 1, Sizes0),
{SQResource, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} =
lists:keyfind(SQResource, 1, Res0),
lists:keyfind(SQResource, 1, Sizes0),
{amqp091, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} =
lists:keyfind(amqp091, 1, Res0),

InsertFun([CQName, QQName, SName], 8 * 1024 * 1024),
Res1 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),
{CQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0} =
lists:keyfind(CQResource, 1, Res1),
{QQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0} =
lists:keyfind(QQResource, 1, Res1),
{SQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0} =
lists:keyfind(SQResource, 1, Res1),
{amqp091, 0, 3, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0} =
lists:keyfind(amqp091, 1, Res1),

InsertFun([CQName, QQName, SName], 16 * 1024 * 1024),
lists:keyfind(amqp091, 1, Sizes0),

% Second insert
InsertFun([CQName, QQName, SName], 1 * 1024 * 1024),
Metrics1 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_metrics]),
{CQResource, 1 * 1024 * 1024} = lists:keyfind(CQResource, 1, Metrics1),
{QQResource, 1 * 1024 * 1024} = lists:keyfind(QQResource, 1, Metrics1),
{SQResource, 1 * 1024 * 1024} = lists:keyfind(SQResource, 1, Metrics1),
{amqp091, 1 * 1024 * 1024} = lists:keyfind(amqp091, 1, Metrics1),
Sizes1 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),
{CQResource, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0} =
lists:keyfind(CQResource, 1, Sizes1),
{QQResource, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0} =
lists:keyfind(QQResource, 1, Sizes1),
{SQResource, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0} =
lists:keyfind(SQResource, 1, Sizes1),
{amqp091, 0, 3, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0} =
lists:keyfind(amqp091, 1, Sizes1),

% Third insert, queue delete and gc
InsertFun([CQName, QQName, SName], 4 * 1024 * 1024),
amqp_channel:call(Ch, #'queue.delete'{queue = CQName}),
timer:sleep(500),
rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]),
timer:sleep(150),

Metrics2 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_metrics]),
false = lists:keyfind(CQResource, 1, Metrics2),
{QQResource, 4 * 1024 * 1024} = lists:keyfind(QQResource, 1, Metrics2),
{SQResource, 4 * 1024 * 1024} = lists:keyfind(SQResource, 1, Metrics2),
{amqp091, 4 * 1024 * 1024} = lists:keyfind(amqp091, 1, Metrics2),
Sizes2 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),
false =
lists:keyfind(CQResource, 1, Sizes2),
{QQResource, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0} =
lists:keyfind(QQResource, 1, Sizes2),
{SQResource, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0} =
lists:keyfind(SQResource, 1, Sizes2),
{amqp091, 0, 3, 0, 0, 0, 0, 3, 3, 0, 0, 0, 0, 0} =
lists:keyfind(amqp091, 1, Sizes2),

% Fourth insert. Just on one queue.
InsertFun([QQName], 16 * 1024 * 1024),
timer:sleep(150),
rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]),

Res2 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),
Metrics3 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_metrics]),
false = lists:keyfind(CQResource, 1, Metrics2),
{QQResource, 16 * 1024 * 1024} = lists:keyfind(QQResource, 1, Metrics3),
{SQResource, 4 * 1024 * 1024} = lists:keyfind(SQResource, 1, Metrics3),
{amqp091, 16 * 1024 * 1024} = lists:keyfind(amqp091, 1, Metrics3),
Sizes3 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]),
false =
lists:keyfind(CQResource, 1, Res2),
{QQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0} =
lists:keyfind(QQResource, 1, Res2),
{SQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0} =
lists:keyfind(SQResource, 1, Res2),
{amqp091, 0, 3, 0, 0, 0, 0, 0, 0, 3, 3, 0, 0, 0} =
lists:keyfind(amqp091, 1, Res2),
lists:keyfind(CQResource, 1, Sizes3),
{QQResource, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0} =
lists:keyfind(QQResource, 1, Sizes3),
{SQResource, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0} =
lists:keyfind(SQResource, 1, Sizes3),
{amqp091, 0, 3, 0, 0, 0, 0, 3, 3, 1, 0, 0, 0, 0} =
lists:keyfind(amqp091, 1, Sizes3),

ok.

Expand Down

0 comments on commit 0c4e62f

Please sign in to comment.