Skip to content

Commit

Permalink
Provide per-exchange/queue metrics w/out channelID
Browse files Browse the repository at this point in the history
  • Loading branch information
LoisSotoLopez committed Jun 26, 2024
1 parent 34d3f94 commit dbc5592
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 20 deletions.
8 changes: 8 additions & 0 deletions deps/rabbit_common/include/rabbit_core_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
{auth_attempt_metrics, set},
{auth_attempt_detailed_metrics, set}]).

% `CORE_NON_CHANNEL_TABLES` are tables that store counters representing the
% same info as some of the channel_queue_metrics, channel_exchange_metrics and
% channel_queue_exchange_metrics but without including the channel ID in the
% key.
-define(CORE_NON_CHANNEL_TABLES, [{queue_counter_metrics, set},
{exchange_metrics, set},
{queue_exchange_metrics, set}]).

-define(CONNECTION_CHURN_METRICS, {node(), 0, 0, 0, 0, 0, 0, 0}).

%% connection_created :: {connection_id, proplist}
Expand Down
44 changes: 29 additions & 15 deletions deps/rabbit_common/src/rabbit_core_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ create_table({Table, Type}) ->
{read_concurrency, true}]).

init() ->
_ = [create_table({Table, Type})
|| {Table, Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES,
_ = [create_table({Table, Type})
|| {Table, Type} <- Tables],
ok.

terminate() ->
Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES,
[ets:delete(Table)
|| {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
|| {Table, _Type} <- Tables],
ok.

connection_created(Pid, Infos) ->
Expand Down Expand Up @@ -166,53 +168,65 @@ channel_stats(reductions, Id, Value) ->
ets:insert(channel_process_metrics, {Id, Value}),
ok.

channel_stats(exchange_stats, publish, Id, Value) ->
channel_stats(exchange_stats, publish, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {2, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, confirm, Id, Value) ->
channel_stats(exchange_stats, confirm, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {3, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, return_unroutable, Id, Value) ->
channel_stats(exchange_stats, return_unroutable, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {4, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, drop_unroutable, Id, Value) ->
channel_stats(exchange_stats, drop_unroutable, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {5, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_exchange_stats, publish, Id, Value) ->
channel_stats(queue_exchange_stats, publish, {_ChannelPid, QueueExchange} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}),
_ = ets:update_counter(queue_exchange_metrics, QueueExchange, Value, {QueueExchange, 0, 0}),
ok;
channel_stats(queue_stats, get, Id, Value) ->
channel_stats(queue_stats, get, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {2, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, get_no_ack, Id, Value) ->
channel_stats(queue_stats, get_no_ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {3, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, deliver, Id, Value) ->
channel_stats(queue_stats, deliver, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {4, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, deliver_no_ack, Id, Value) ->
channel_stats(queue_stats, deliver_no_ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {5, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, redeliver, Id, Value) ->
channel_stats(queue_stats, redeliver, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {6, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, ack, Id, Value) ->
channel_stats(queue_stats, ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {7, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, get_empty, Id, Value) ->
channel_stats(queue_stats, get_empty, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {8, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok.

delete(Table, Key) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,15 @@
{2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes},
{2, undefined, stream_segments, counter, "Total number of stream segment files", segments}
]},

{queue_counter_metrics, [
{2, undefined, queue_get_ack_total, counter, "Total number of messages fetched with basic.get in manual acknowledgement mode"},
{3, undefined, queue_get_total, counter, "Total number of messages fetched with basic.get in automatic acknowledgement mode"},
{4, undefined, queue_messages_delivered_ack_total, counter, "Total number of messages delivered to consumers in manual acknowledgement mode"},
{5, undefined, queue_messages_delivered_total, counter, "Total number of messages delivered to consumers in automatic acknowledgement mode"},
{6, undefined, queue_messages_redelivered_total, counter, "Total number of messages redelivered to consumers"},
{7, undefined, queue_messages_acked_total, counter, "Total number of messages acknowledged by consumers"},
{8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message"}
]},
%%% Metrics that contain reference to a channel. Some of them also have
%%% a queue name, but in this case filtering on it doesn't make any
%%% sense, as the queue is not an object of interest here.
Expand All @@ -176,6 +184,13 @@
{2, undefined, channel_prefetch, gauge, "Total limit of unacknowledged messages for all consumers on a channel", global_prefetch_count}
]},

{exchange_metrics, [
{2, undefined, exchange_messages_published_total, counter, "Total number of messages published into an exchange on a channel"},
{3, undefined, exchange_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"},
{4, undefined, exchange_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"},
{5, undefined, exchange_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"}
]},

{channel_exchange_metrics, [
{2, undefined, channel_messages_published_total, counter, "Total number of messages published into an exchange on a channel"},
{3, undefined, channel_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"},
Expand Down Expand Up @@ -210,6 +225,10 @@
{2, undefined, connection_channels, gauge, "Channels on a connection", channels}
]},

{queue_exchange_metrics, [
{2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published to queues"}
]},

{channel_queue_exchange_metrics, [
{2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"}
]}
Expand Down Expand Up @@ -544,15 +563,20 @@ get_data(queue_metrics = Table, false, VHostsFilter) ->
{disk_reads, A15}, {disk_writes, A16}, {segments, A17}]}];
get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
Table == queue_coarse_metrics;
Table == queue_counter_metrics;
Table == channel_queue_metrics;
Table == connection_coarse_metrics;
Table == exchange_metrics;
Table == queue_exchange_metrics;
Table == channel_queue_exchange_metrics;
Table == ra_metrics;
Table == channel_process_metrics ->
Result = ets:foldl(fun
%% For queue_coarse_metrics
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({_, V1}, {T, A1}) ->
{T, V1 + A1};
({_, V1, _}, {T, A1}) ->
Expand All @@ -579,6 +603,36 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
_ ->
[Result]
end;
get_data(exchange_metrics = Table, true, VHostsFilter) ->
ets:foldl(fun
({#resource{kind = exchange, virtual_host = VHost}, _, _, _, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_counter_metrics = Table, true, VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_exchange_metrics = Table, true, VHostsFilter) ->
ets:foldl(fun
({{
#resource{kind = queue, virtual_host = VHost},
#resource{kind = exchange, virtual_host = VHost}
}, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
Expand Down Expand Up @@ -671,15 +725,15 @@ division(A, B) ->
accumulate_count_and_sum(Value, {Count, Sum}) ->
{Count + 1, Sum + Value}.

empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
{T, 0};
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
{T, 0, 0, 0};
empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
{T, 0, 0, 0, 0};
empty(T) when T == ra_metrics ->
{T, 0, 0, 0, 0, 0, {0, 0}};
empty(T) when T == channel_queue_metrics; T == channel_metrics ->
empty(T) when T == channel_queue_metrics; T == queue_counter_metrics; T == channel_metrics ->
{T, 0, 0, 0, 0, 0, 0, 0};
empty(queue_metrics = T) ->
{T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}.
Expand Down
94 changes: 93 additions & 1 deletion deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ groups() ->
{config_path, [], generic_tests()},
{global_labels, [], generic_tests()},
{aggregated_metrics, [], [
aggregated_metrics_test,
aggregated_metrics_test,
specific_erlang_metrics_present_test,
global_metrics_present_test,
global_metrics_single_metric_family_test
Expand All @@ -57,6 +57,8 @@ groups() ->
queue_consumer_count_single_vhost_per_object_test,
queue_consumer_count_all_vhosts_per_object_test,
queue_coarse_metrics_per_object_test,
queue_counter_metrics_per_object_test,
queue_exchange_metrics_per_object_test,
queue_metrics_per_object_test,
queue_consumer_count_and_queue_metrics_mutually_exclusive_test,
vhost_status_metric,
Expand Down Expand Up @@ -523,6 +525,96 @@ queue_coarse_metrics_per_object_test(Config) ->
map_get(rabbitmq_detailed_queue_messages, parse_response(Body3))),
ok.

queue_counter_metrics_per_object_test(Config) ->
Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]},

{_, Body1} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-1&family=queue_counter_metrics",
[], 200),
?assertEqual(
Expected1,
map_get(
rabbitmq_detailed_queue_messages_delivered_ack_total,
parse_response(Body1))),

{_, Body2} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-2&family=queue_counter_metrics",
[], 200),
Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11]},

?assertEqual(
Expected2,
map_get(
rabbitmq_detailed_queue_messages_delivered_ack_total,
parse_response(Body2))),

%% Maybe missing, tests for the queue_exchange_metrics
ok.


queue_exchange_metrics_per_object_test(Config) ->
Expected1 = #{
#{
queue => "vhost-1-queue-with-messages",
vhost => "vhost-1",
exchange => ""
} => [7],
#{
exchange => "",
queue => "vhost-1-queue-with-consumer",
vhost => "vhost-1"
} => [7]
},

{_, Body1} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-1&family=queue_exchange_metrics",
[], 200),
?assertEqual(
Expected1,
map_get(
rabbitmq_detailed_queue_exchange_messages_published_total,
parse_response(Body1))),


{_, Body2} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-2&family=queue_exchange_metrics",
[], 200),


Expected2 = #{
#{
queue => "vhost-2-queue-with-messages",
vhost => "vhost-2",
exchange => ""
} => [11],
#{
exchange => "",
queue => "vhost-2-queue-with-consumer",
vhost => "vhost-2"
} => [11]
},

?assertEqual(
Expected2,
map_get(
rabbitmq_detailed_queue_exchange_messages_published_total,
parse_response(Body2))),

ok.

exchange_metrics_per_object_test(Config) ->
Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]},

{_, Body} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-1&family=exchange_metrics",
[], 200),
?assertEqual(
Expected1,
map_get(
rabbitmq_detailed_queue_messages_delivered_ack_total,
parse_response(Body))),
ok.

queue_metrics_per_object_test(Config) ->
Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7],
#{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [1]},
Expand Down

0 comments on commit dbc5592

Please sign in to comment.