Skip to content

Commit

Permalink
Merge pull request rabbitmq#12118 from rabbitmq/issue-11985
Browse files Browse the repository at this point in the history
MQTT and Streams: handle connection shutdown via CLI command gracefully
  • Loading branch information
dcorbacho authored Aug 28, 2024
2 parents c8fa044 + 8c905b9 commit afa28cb
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 7 deletions.
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,6 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
Node = node(Pid),
rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]);
close_connection(#tracked_connection{pid = Pid}, Message) ->
% best effort, this will work for connections to the stream plugin
Node = node(Pid),
rpc:call(Node, gen_server, call, [Pid, {shutdown, Message}, infinity]).
%% Best effort will work for following plugins:
%% rabbitmq_stream, rabbitmq_mqtt, rabbitmq_web_mqtt
Pid ! {shutdown, Message}.
5 changes: 2 additions & 3 deletions deps/rabbit/src/rabbit_networking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,8 @@ close_connections(Pids, Explanation) ->

-spec close_all_user_connections(rabbit_types:username(), string()) -> 'ok'.
close_all_user_connections(Username, Explanation) ->
Pids = [Pid || #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list_of_user(Username)],
[close_connection(Pid, Explanation) || Pid <- Pids],
ok.
Tracked = rabbit_connection_tracking:list_of_user(Username),
rabbit_connection_tracking:close_connections(Tracked, Explanation, 0).

%% Meant to be used by tests only
-spec close_all_connections(string()) -> 'ok'.
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
{noreply, State, ?HIBERNATE_AFTER};

handle_info({shutdown, Explanation} = Reason, State = #state{conn_name = ConnName}) ->
%% rabbitmq_management plugin requests to close connection.
%% rabbitmq_management plugin or CLI command requests to close connection.
?LOG_INFO("MQTT closing connection ~tp: ~p", [ConnName, Explanation]),
{stop, Reason, State};

Expand Down
20 changes: 20 additions & 0 deletions deps/rabbitmq_mqtt/test/shared_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ cluster_size_1_tests() ->
,block_only_publisher
,many_qos1_messages
,session_expiry
,cli_close_all_connections
,cli_close_all_user_connections
,management_plugin_connection
,management_plugin_enable
,disconnect
Expand Down Expand Up @@ -1165,6 +1167,24 @@ rabbit_mqtt_qos0_queue_kill_node(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
?assertEqual([], rpc(Config, rabbit_db_binding, get_all, [])).

cli_close_all_connections(Config) ->
ClientId = atom_to_binary(?FUNCTION_NAME),
C = connect(ClientId, Config),
process_flag(trap_exit, true),
{ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(
Config, 0, ["close_all_connections", "bye"]),
?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])),
ok = await_exit(C).

cli_close_all_user_connections(Config) ->
ClientId = atom_to_binary(?FUNCTION_NAME),
C = connect(ClientId, Config),
process_flag(trap_exit, true),
{ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(
Config, 0, ["close_all_user_connections","guest", "bye"]),
?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])),
ok = await_exit(C).

%% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin.
management_plugin_connection(Config) ->
KeepaliveSecs = 99,
Expand Down

0 comments on commit afa28cb

Please sign in to comment.