diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 2898677718eb..a740c64490d2 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -685,6 +685,7 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, backing_queue_state = BQS, dlx = DLX, dlx_routing_key = RK}) -> + rabbit_log:info("### maybe message ~p", [mc:size(Message)]), case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher @@ -723,6 +724,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State = #q{q = Q, backing_queue = BQ}) -> + rabbit_log:info("### got message ~p", [mc:size(Message)]), {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State1), case attempt_delivery(Delivery, Props, Delivered, State1) of @@ -737,8 +739,10 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC, amqqueue:get_name(Q)), State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}; {undelivered, State2 = #q{backing_queue_state = BQS}} -> + rabbit_log:info("### Size: ~p bytes", [mc:size(Message)]), ok = rabbit_core_metrics:messages_stats(amqqueue:get_name(Q), Props#message_properties.size), BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), + rabbit_log:info("### got here ~p", [mc:size(Message)]), {Dropped, State3 = #q{backing_queue_state = BQS2}} = maybe_drop_head(State2#q{backing_queue_state = BQS1}), QLen = BQ:len(BQS2), diff --git a/deps/rabbit/src/rabbit_core_metrics_gc.erl b/deps/rabbit/src/rabbit_core_metrics_gc.erl index 792dcb790ab2..5337d5bb6ef9 100644 --- a/deps/rabbit/src/rabbit_core_metrics_gc.erl +++ b/deps/rabbit/src/rabbit_core_metrics_gc.erl @@ -72,6 +72,7 @@ gc_local_queues() -> GbSetDown = gb_sets:from_list(QueuesDown), gc_queue_metrics(GbSet, GbSetDown), gc_entity(queue_coarse_metrics, GbSet), + gc_entity(historic_message_sizes_metrics, GbSet), Followers = gb_sets:from_list([amqqueue:get_name(Q) || Q <- rabbit_amqqueue:list_local_followers() ]), gc_leader_data(Followers). @@ -149,6 +150,7 @@ gc_queue_metrics(GbSet, GbSetDown) -> end, none, Table). gc_entity(Table, GbSet) -> + rabbit_log:info("gcing entity ~p GbSet ~p", [Table, GbSet]), ets:foldl(fun({{_, Id} = Key, _}, none) -> gc_entity(Id, Table, Key, GbSet); ({Id = Key, _}, none) -> @@ -162,6 +164,9 @@ gc_entity(Table, GbSet) -> gc_entity(Id, Table, Key, GbSet); ({Id = Key, _, _, _, _, _, _, _, _}, none) when Table == queue_delivery_metrics -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _, _, _, _, _, _, _, _, _, _, _, _, _}, none) + when Table == historic_message_sizes_metrics -> gc_entity(Id, Table, Key, GbSet) end, none, Table). diff --git a/deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl b/deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl index 4153a7b0a849..8a337a295335 100644 --- a/deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl +++ b/deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl @@ -26,7 +26,8 @@ groups() -> channel_metrics, node_metrics, gen_server2_metrics, - consumer_metrics + consumer_metrics, + historic_message_size_metrics ] } ]. @@ -308,6 +309,96 @@ consumer_metrics(Config) -> dead_pid() -> spawn(fun() -> ok end). +historic_message_size_metrics(Config) -> + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, A), + + CQName = <<"classic_queue">>, + QQName = <<"quorum_queue">>, + SName = <<"stream_queue">>, + + CQResource = q(CQName), + QQResource = q(QQName), + SQResource = q(SName), + + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]), + + amqp_channel:call(Ch, #'queue.declare'{ + queue = CQName + }), + amqp_channel:call(Ch, #'queue.declare'{ + queue = QQName, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}] + }), + amqp_channel:call(Ch, #'queue.declare'{ + queue = SName, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}] + }), + + [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]), + + + InsertFun = fun(QueueNames, PayloadSize) -> + Payload = crypto:strong_rand_bytes(PayloadSize), + AmqpMsg = #amqp_msg{payload = Payload}, + + lists:foreach( + fun(QName) -> + ok = amqp_channel:call( + Ch, + #'basic.publish'{ + exchange = <<"">>, + routing_key = QName + }, + AmqpMsg) + end, + QueueNames), + timer:sleep(500) + end, + + % First insert + InsertFun([CQName, QQName, SName], 1024), + + Res0 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]), + {CQResource, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} = + lists:keyfind(CQResource, 1, Res0), + {QQResource, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} = + lists:keyfind(QQResource, 1, Res0), + {SQResource, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} = + lists:keyfind(SQResource, 1, Res0), + {amqp091, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} = + lists:keyfind(amqp091, 1, Res0), + + InsertFun([CQName, QQName, SName], 8 * 1024 * 1024), + Res1 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]), + {CQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0} = + lists:keyfind(CQResource, 1, Res1), + {QQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0} = + lists:keyfind(QQResource, 1, Res1), + {SQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0} = + lists:keyfind(SQResource, 1, Res1), + {amqp091, 0, 3, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0} = + lists:keyfind(amqp091, 1, Res1), + + InsertFun([CQName, QQName, SName], 16 * 1024 * 1024), + amqp_channel:call(Ch, #'queue.delete'{queue = CQName}), + timer:sleep(500), + rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]), + + Res2 = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [historic_message_sizes_metrics]), + false = + lists:keyfind(CQResource, 1, Res2), + {QQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0} = + lists:keyfind(QQResource, 1, Res2), + {SQResource, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0} = + lists:keyfind(SQResource, 1, Res2), + {amqp091, 0, 3, 0, 0, 0, 0, 0, 0, 3, 3, 0, 0, 0} = + lists:keyfind(amqp091, 1, Res2), + + ok. + q(Name) -> #resource{ virtual_host = <<"/">>, kind = queue, diff --git a/deps/rabbit_common/include/rabbit_core_metrics.hrl b/deps/rabbit_common/include/rabbit_core_metrics.hrl index 892597859725..3ad082605930 100644 --- a/deps/rabbit_common/include/rabbit_core_metrics.hrl +++ b/deps/rabbit_common/include/rabbit_core_metrics.hrl @@ -64,6 +64,6 @@ % Message metrics include the number of messages ever received for each range of possible message sizes. % Each possible range of sizes is refered to as a "bucket". % First bucket goes from 0 to 256 bytes, second from 257 to 1024 bytes, and so on. --define(MSG_SIZE_BUCKETS_LIMITS, [{2,256}, {3,1024}, {4, 4096}, {5, 16384}, {6,65536}, {7, 262144}, {8, 1048576}, {9, 4194304}, {10, 16777216}, {11, 67108864}, {12, 268435456}, {13, 1073741824}, {14, 4294967296}, {15, infinity}]). --define(MSG_SIZE_BUCKETS_DEFAULT, {'_', 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}). +-define(MSG_SIZE_BUCKETS_LIMITS, [{2,256}, {3,1024}, {4, 4096}, {5, 16384}, {6,65536}, {7, 262144}, {8, 1048576}, {9, 4194304}, {10, 16777216}, {11, 67108864}, {12, 134217728}, {13, 536870912}, {14, infinity}]). +-define(MSG_SIZE_BUCKETS_DEFAULT, {'_', 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}). diff --git a/deps/rabbit_common/src/rabbit_core_metrics.erl b/deps/rabbit_common/src/rabbit_core_metrics.erl index 71f6a695f6e9..b1ca032e3202 100644 --- a/deps/rabbit_common/src/rabbit_core_metrics.erl +++ b/deps/rabbit_common/src/rabbit_core_metrics.erl @@ -378,6 +378,8 @@ build_match_spec_conditions_to_delete_all_queues([Queue|Queues]) -> build_match_spec_conditions_to_delete_all_queues([]) -> true. +messages_stats(_Domain, []) -> + ok; messages_stats(Domain, MessagesSizes) when is_list(MessagesSizes) -> [LargerMessageSize | _] = lists:sort( fun(A, B) -> A > B end, @@ -391,6 +393,7 @@ messages_stats(Domain, MessagesSizes) when is_list(MessagesSizes) -> false -> ok end, + BucketIncrsAsMap = lists:foldl( fun(MessageSize, Acc) -> {value, {Bucket, _MaxSize}} = lists:search(