Skip to content

Commit

Permalink
Add get message feat on stream queues UI
Browse files Browse the repository at this point in the history
  • Loading branch information
LoisSotoLopez committed Apr 17, 2024
1 parent 9f80341 commit 608900c
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 28 deletions.
6 changes: 5 additions & 1 deletion deps/rabbitmq_management/priv/www/js/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,11 @@ function get_msgs(params) {
with_req('POST', path, JSON.stringify(params), function(resp) {
var msgs = JSON.parse(resp.responseText);
if (msgs.length == 0) {
show_popup('info', 'Queue is empty');
if ("offset" in params) {
show_popup('info', 'No messages in stream at given offset');
} else {
show_popup('info', 'Queue is empty');
}
} else {
$('#msg-wrapper').slideUp(200);
replace_content('msg-wrapper', format('messages', {'msgs': msgs}));
Expand Down
12 changes: 10 additions & 2 deletions deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -309,19 +309,21 @@

<%= format('publish', {'mode': 'queue', 'queue': queue}) %>

<% if (!is_stream(queue)) { %>
<div class="section-hidden">
<h2>Get messages</h2>
<div class="hider">
<% if (!is_stream(queue)) { %>
<p>
Warning: getting messages from a queue is a destructive action.
<span class="help" id="message-get-requeue"></span>
</p>
<% } %>
<form action="#/queues/get" method="post">
<input type="hidden" name="vhost" value="<%= fmt_string(queue.vhost) %>"/>
<input type="hidden" name="name" value="<%= fmt_string(queue.name) %>"/>
<input type="hidden" name="truncate" value="50000"/>
<table class="form">
<% if (!is_stream(queue)) { %>
<tr>
<th><label>Ack Mode:</label></th>
<td>
Expand All @@ -333,6 +335,7 @@
</select>
</td>
</tr>
<% } %>
<tr>
<th><label>Encoding:</label></th>
<td>
Expand All @@ -347,13 +350,18 @@
<th><label>Messages:</label></th>
<td><input type="text" name="count" value="1"/></td>
</tr>
<% if (is_stream(queue)) { %>
<tr>
<th><label>Offset:</label></th>
<td><input type="text" name="offset" value="0"/></td>
</tr>
<% } %>
</table>
<input type="submit" value="Get Message(s)" />
</form>
<div id="msg-wrapper"></div>
</div>
</div>
<% } %>

<% if (is_user_policymaker) { %>
<div class="section-hidden">
Expand Down
141 changes: 116 additions & 25 deletions deps/rabbitmq_management/src/rabbit_mgmt_wm_queue_get.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

-module(rabbit_mgmt_wm_queue_get).

-include_lib("kernel/include/logger.hrl").
-export([init/2, resource_exists/2, is_authorized/2, allow_missing_post/2,
allowed_methods/2, accept_content/2, content_types_provided/2,
content_types_accepted/2]).
Expand Down Expand Up @@ -47,34 +48,124 @@ accept_content(ReqData, Context) ->
do_it(ReqData0, Context) ->
VHost = rabbit_mgmt_util:vhost(ReqData0),
Q = rabbit_mgmt_util:id(queue, ReqData0),
rabbit_mgmt_util:with_decode(
[ackmode, count, encoding], ReqData0, Context,
fun([AckModeBin, CountBin, EncBin], Body, ReqData) ->
rabbit_mgmt_util:with_channel(
VHost, ReqData, Context,
fun (Ch) ->
AckMode = list_to_atom(binary_to_list(AckModeBin)),
Count = rabbit_mgmt_util:parse_int(CountBin),
Enc = case EncBin of
<<"auto">> -> auto;
<<"base64">> -> base64;
_ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>})
end,
Trunc = case maps:get(truncate, Body, undefined) of
undefined -> none;
TruncBin -> rabbit_mgmt_util:parse_int(
TruncBin)
end,

Reply = basic_gets(Count, Ch, Q, AckMode, Enc, Trunc),
maybe_return(Reply, Ch, AckMode),
rabbit_mgmt_util:reply(remove_delivery_tag(Reply),
ReqData, Context)
end)
end).
Resource = rabbit_misc:r(<<"/">>, queue, Q),
{ok, Queue} = rabbit_amqqueue:lookup(Resource),
case amqqueue:get_type(Queue) of
rabbit_stream_queue ->
rabbit_mgmt_util:with_decode(
[count, encoding, offset], ReqData0, Context,
fun([CountBin, EncBin, OffsetBin], Body, ReqData) ->
rabbit_mgmt_util:with_channel(
VHost, ReqData, Context,
fun (Ch) ->
Count = rabbit_mgmt_util:parse_int(CountBin),
Enc = case EncBin of
<<"auto">> -> auto;
<<"base64">> -> base64;
_ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>})
end,
Offset = rabbit_mgmt_util:parse_int(OffsetBin),
Trunc = case maps:get(truncate, Body, undefined) of
undefined -> none;
TruncBin -> rabbit_mgmt_util:parse_int(
TruncBin)
end,
CTag = <<"ctag">>,
Reply = start_subscription_gets(
Count, Ch, Q, CTag, Offset, Enc, Trunc),
rabbit_mgmt_util:reply(remove_delivery_tag(Reply),
ReqData, Context)
end)
end);
_ ->
rabbit_mgmt_util:with_decode(
[ackmode, count, encoding], ReqData0, Context,
fun([AckModeBin, CountBin, EncBin], Body, ReqData) ->
rabbit_mgmt_util:with_channel(
VHost, ReqData, Context,
fun (Ch) ->
AckMode = list_to_atom(binary_to_list(AckModeBin)),
Count = rabbit_mgmt_util:parse_int(CountBin),
Enc = case EncBin of
<<"auto">> -> auto;
<<"base64">> -> base64;
_ -> throw({error, <<"Unsupported encoding. Please use auto or base64.">>})
end,
Trunc = case maps:get(truncate, Body, undefined) of
undefined -> none;
TruncBin -> rabbit_mgmt_util:parse_int(
TruncBin)
end,
Reply = basic_gets(Count, Ch, Q, AckMode, Enc,
Trunc),
maybe_return(Reply, Ch, AckMode),
rabbit_mgmt_util:reply(remove_delivery_tag(Reply),
ReqData, Context)
end)
end)
end.

start_subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc) ->
qos(Ch, Count),
subscribe(Ch, Queue, false, Offset, CTag),
Replies = subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc),
cancel_subscription(Ch, CTag),
Replies.

subscription_gets(0, _Ch, _Queue, _CTag, _Offset, _Enc, _Trunc) ->
[];
subscription_gets(Count, Ch, Queue, CTag, Offset, Enc, Trunc) ->
case subscription_get(Ch, Enc, Trunc) of
none -> [];
Reply -> [Reply | subscription_gets(Count - 1, Ch, Queue, CTag, Offset, Enc, Trunc)]
end.

subscription_get(Ch, Enc, Trunc) ->
receive
{#'basic.deliver'{redelivered = Redelivered,
exchange = Exchange,
routing_key = RoutingKey,
delivery_tag = DeliveryTag,
consumer_tag = ConsumerTag},
#amqp_msg{props = Props, payload = Payload}} ->
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = false}),
[{payload_bytes, size(Payload)},
{redelivered, Redelivered},
{exchange, Exchange},
{routing_key, RoutingKey},
{consumer_tag, ConsumerTag},
{properties, rabbit_mgmt_format:basic_properties(Props)}] ++
payload_part(maybe_truncate(Payload, Trunc), Enc)
after
300 ->
none
end.

subscribe(Ch, Queue, NoAck, Offset, CTag) ->
amqp_channel:subscribe(
Ch,
#'basic.consume'{queue = Queue,
no_ack = NoAck,
consumer_tag = CTag,
arguments = [{<<"x-stream-offset">>, long, Offset}]},
self()),
receive
#'basic.consume_ok'{consumer_tag = CTag} ->
ok
end.

qos(Ch, Prefetch) ->
#'basic.qos_ok'{} = amqp_channel:call(
Ch,
#'basic.qos'{global = false, prefetch_count = Prefetch}).

cancel_subscription(Ch, CTag) ->
amqp_channel:call(
Ch,
#'basic.cancel'{
consumer_tag = CTag,
nowait = false}).

basic_gets(0, _, _, _, _, _) ->
[];
Expand Down

0 comments on commit 608900c

Please sign in to comment.