From 494c1b82099a40bc721054edd478bfa202501e91 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Sun, 25 Aug 2024 23:30:33 +0200 Subject: [PATCH 1/4] mqtt: handle connection shutdown `{shutdown, Reason}` must be handled into handle_call and not handle_info `rabbitmqctl close_all_user_connections` calls rabbit_reader which does a call into the process, the same as rabbitmq_management --- deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index c37a6e0ef64e..e0eaf69ee3d1 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -109,6 +109,11 @@ init(Ref) -> handle_call({info, InfoItems}, _From, State) -> {reply, infos(InfoItems, State), State, ?HIBERNATE_AFTER}; +handle_call({shutdown, Explanation} = Reason, _From, State = #state{conn_name = ConnName}) -> + %% rabbit_networking:close_all_user_connections -> rabbit_reader:shutdow + ?LOG_INFO("MQTT closing connection ~tp: ~p", [ConnName, Explanation]), + {stop, Reason, ok, State}; + handle_call(Msg, From, State) -> {stop, {mqtt_unexpected_call, Msg, From}, State}. From ea6ef17cc05e0edd579e71d625f524f5a2d11185 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Mon, 26 Aug 2024 13:05:59 +0200 Subject: [PATCH 2/4] Mqtt: test close connection --- .../rabbit/src/rabbit_connection_tracking.erl | 4 ++ deps/rabbit/src/rabbit_networking.erl | 5 +-- deps/rabbitmq_mqtt/test/shared_SUITE.erl | 42 ++++++++++++++++++- 3 files changed, 46 insertions(+), 5 deletions(-) diff --git a/deps/rabbit/src/rabbit_connection_tracking.erl b/deps/rabbit/src/rabbit_connection_tracking.erl index da906fa41144..05f866db2ac7 100644 --- a/deps/rabbit/src/rabbit_connection_tracking.erl +++ b/deps/rabbit/src/rabbit_connection_tracking.erl @@ -427,6 +427,10 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) -> %% Do an RPC call to the node running the direct client. Node = node(Pid), rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]); +close_connection(#tracked_connection{pid = Pid, + protocol = {'Web MQTT', _}}, Message) -> + % this will work for connections to web mqtt plugin + Pid ! {shutdown, Message}; close_connection(#tracked_connection{pid = Pid}, Message) -> % best effort, this will work for connections to the stream plugin Node = node(Pid), diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index 508e0a0e2b9f..82371ec9c2cd 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -531,9 +531,8 @@ close_connections(Pids, Explanation) -> -spec close_all_user_connections(rabbit_types:username(), string()) -> 'ok'. close_all_user_connections(Username, Explanation) -> - Pids = [Pid || #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list_of_user(Username)], - [close_connection(Pid, Explanation) || Pid <- Pids], - ok. + Tracked = rabbit_connection_tracking:list_of_user(Username), + rabbit_connection_tracking:close_connections(Tracked, Explanation, 0). %% Meant to be used by tests only -spec close_all_connections(string()) -> 'ok'. diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index a401b664df6a..9f3df8bc64e0 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -129,6 +129,8 @@ cluster_size_1_tests() -> ,retained_message_conversion ,bind_exchange_to_exchange ,bind_exchange_to_exchange_single_message + ,cli_close_all_connections + ,cli_close_all_user_connections ]. cluster_size_3_tests() -> @@ -141,6 +143,8 @@ cluster_size_3_tests() -> rabbit_mqtt_qos0_queue, rabbit_mqtt_qos0_queue_kill_node, cli_list_queues, + cli_close_all_connections, + cli_close_all_user_connections, delete_create_queue, session_reconnect, session_takeover, @@ -207,7 +211,9 @@ end_per_group(_, Config) -> init_per_testcase(T, Config) when T =:= management_plugin_connection; - T =:= management_plugin_enable -> + T =:= management_plugin_enable; + T =:= cli_close_all_user_connections; + T =:= cli_close_all_connections -> inets:start(), init_per_testcase0(T, Config); init_per_testcase(Testcase, Config) -> @@ -220,7 +226,9 @@ init_per_testcase0(Testcase, Config) -> end_per_testcase(T, Config) when T =:= management_plugin_connection; - T =:= management_plugin_enable -> + T =:= management_plugin_enable; + T =:= cli_close_all_user_connections; + T =:= cli_close_all_connections -> ok = inets:stop(), end_per_testcase0(T, Config); end_per_testcase(Testcase, Config) -> @@ -1208,6 +1216,36 @@ management_plugin_enable(Config) -> ok = emqtt:disconnect(C). +cli_close_all_connections(Config) -> + KeepaliveSecs = 99, + ClientId = atom_to_binary(?FUNCTION_NAME), + + _ = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]), + eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10), + + process_flag(trap_exit, true), + {ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["close_all_connections", "bye"]), + ?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])), + + process_flag(trap_exit, false), + eventually(?_assertEqual([], http_get(Config, "/connections")), + 1000, 10). + +cli_close_all_user_connections(Config) -> + KeepaliveSecs = 99, + ClientId = atom_to_binary(?FUNCTION_NAME), + + _ = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]), + eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10), + + process_flag(trap_exit, true), + {ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["close_all_user_connections","guest", "bye"]), + ?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])), + + process_flag(trap_exit, false), + eventually(?_assertEqual([], http_get(Config, "/connections")), + 1000, 10). + %% Test that queues of type rabbit_mqtt_qos0_queue can be listed via rabbitmqctl. cli_list_queues(Config) -> C = connect(?FUNCTION_NAME, Config), From 69d407e6b61906e494d6d9e351c2d584b129eb94 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 28 Aug 2024 12:27:46 +0200 Subject: [PATCH 3/4] Simplify test cases 1. Only run the CLI tests on a single node cluster. The shared_SUITE is already very big. Testing the same CLI commands against node-0 on a 3-node cluster brings no benefit. 2. Move the two new CLI test cases in front of management_plugin_connection because they are similar in that all three tests close the MQTT connection. 3. There is no need to query the HTTP API for the two new CLI test cases. 4. There is no need to set keepalive in the two new CLI test cases. --- deps/rabbitmq_mqtt/test/shared_SUITE.erl | 62 +++++++++--------------- 1 file changed, 22 insertions(+), 40 deletions(-) diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index 9f3df8bc64e0..9f5bd81edf14 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -91,6 +91,8 @@ cluster_size_1_tests() -> ,block_only_publisher ,many_qos1_messages ,session_expiry + ,cli_close_all_connections + ,cli_close_all_user_connections ,management_plugin_connection ,management_plugin_enable ,disconnect @@ -129,8 +131,6 @@ cluster_size_1_tests() -> ,retained_message_conversion ,bind_exchange_to_exchange ,bind_exchange_to_exchange_single_message - ,cli_close_all_connections - ,cli_close_all_user_connections ]. cluster_size_3_tests() -> @@ -143,8 +143,6 @@ cluster_size_3_tests() -> rabbit_mqtt_qos0_queue, rabbit_mqtt_qos0_queue_kill_node, cli_list_queues, - cli_close_all_connections, - cli_close_all_user_connections, delete_create_queue, session_reconnect, session_takeover, @@ -211,9 +209,7 @@ end_per_group(_, Config) -> init_per_testcase(T, Config) when T =:= management_plugin_connection; - T =:= management_plugin_enable; - T =:= cli_close_all_user_connections; - T =:= cli_close_all_connections -> + T =:= management_plugin_enable -> inets:start(), init_per_testcase0(T, Config); init_per_testcase(Testcase, Config) -> @@ -226,9 +222,7 @@ init_per_testcase0(Testcase, Config) -> end_per_testcase(T, Config) when T =:= management_plugin_connection; - T =:= management_plugin_enable; - T =:= cli_close_all_user_connections; - T =:= cli_close_all_connections -> + T =:= management_plugin_enable -> ok = inets:stop(), end_per_testcase0(T, Config); end_per_testcase(Testcase, Config) -> @@ -1173,6 +1167,24 @@ rabbit_mqtt_qos0_queue_kill_node(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, 1), ?assertEqual([], rpc(Config, rabbit_db_binding, get_all, [])). +cli_close_all_connections(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + C = connect(ClientId, Config), + process_flag(trap_exit, true), + {ok, String} = rabbit_ct_broker_helpers:rabbitmqctl( + Config, 0, ["close_all_connections", "bye"]), + ?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])), + ok = await_exit(C). + +cli_close_all_user_connections(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + C = connect(ClientId, Config), + process_flag(trap_exit, true), + {ok, String} = rabbit_ct_broker_helpers:rabbitmqctl( + Config, 0, ["close_all_user_connections","guest", "bye"]), + ?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])), + ok = await_exit(C). + %% Test that MQTT connection can be listed and closed via the rabbitmq_management plugin. management_plugin_connection(Config) -> KeepaliveSecs = 99, @@ -1216,36 +1228,6 @@ management_plugin_enable(Config) -> ok = emqtt:disconnect(C). -cli_close_all_connections(Config) -> - KeepaliveSecs = 99, - ClientId = atom_to_binary(?FUNCTION_NAME), - - _ = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]), - eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10), - - process_flag(trap_exit, true), - {ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["close_all_connections", "bye"]), - ?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])), - - process_flag(trap_exit, false), - eventually(?_assertEqual([], http_get(Config, "/connections")), - 1000, 10). - -cli_close_all_user_connections(Config) -> - KeepaliveSecs = 99, - ClientId = atom_to_binary(?FUNCTION_NAME), - - _ = connect(ClientId, Config, [{keepalive, KeepaliveSecs}]), - eventually(?_assertEqual(1, length(http_get(Config, "/connections"))), 1000, 10), - - process_flag(trap_exit, true), - {ok, String} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["close_all_user_connections","guest", "bye"]), - ?assertEqual(match, re:run(String, "Closing .* reason: bye", [{capture, none}])), - - process_flag(trap_exit, false), - eventually(?_assertEqual([], http_get(Config, "/connections")), - 1000, 10). - %% Test that queues of type rabbit_mqtt_qos0_queue can be listed via rabbitmqctl. cli_list_queues(Config) -> C = connect(?FUNCTION_NAME, Config), From 8c905b90098a375128f8705e73903f594faec407 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 28 Aug 2024 13:07:31 +0200 Subject: [PATCH 4/4] Avoid crash in stream connection 1. Prior to this commit, closing a stream connection via: ``` ./sbin/rabbitmqctl close_all_user_connections guest enough ``` crashed the stream process as follows: ``` 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> crasher: 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> initial call: rabbit_stream_reader:init/1 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> pid: <0.1098.0> 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> registered_name: [] 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> exception error: no function clause matching 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> rabbit_stream_reader:open({call, 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> {<0.1233.0>, 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> #Ref<0.519694519.1387790337.15898>}}, 2024-08-28 13:00:18.969931+02:00 [error] <0.1098.0> {shutdown,<<"enough">>}, ``` This commit fixes this crash. 2. Both CLI commands and management plugin use the same way to close MQTT, Web MQTT, and Stream connections: They all send a message via `Pid ! {shutdown, Reason}` to the connection. 3. This commit avoids making `rabbit` core app to know about 'Web MQTT'. 4 This commit simplifies rabbit_mqtt_reader by avoiding another handle_call clause --- deps/rabbit/src/rabbit_connection_tracking.erl | 10 +++------- deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl | 7 +------ 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/deps/rabbit/src/rabbit_connection_tracking.erl b/deps/rabbit/src/rabbit_connection_tracking.erl index 05f866db2ac7..207bcd9fc570 100644 --- a/deps/rabbit/src/rabbit_connection_tracking.erl +++ b/deps/rabbit/src/rabbit_connection_tracking.erl @@ -427,11 +427,7 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) -> %% Do an RPC call to the node running the direct client. Node = node(Pid), rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]); -close_connection(#tracked_connection{pid = Pid, - protocol = {'Web MQTT', _}}, Message) -> - % this will work for connections to web mqtt plugin - Pid ! {shutdown, Message}; close_connection(#tracked_connection{pid = Pid}, Message) -> - % best effort, this will work for connections to the stream plugin - Node = node(Pid), - rpc:call(Node, gen_server, call, [Pid, {shutdown, Message}, infinity]). + %% Best effort will work for following plugins: + %% rabbitmq_stream, rabbitmq_mqtt, rabbitmq_web_mqtt + Pid ! {shutdown, Message}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index e0eaf69ee3d1..2ff0a6920611 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -109,11 +109,6 @@ init(Ref) -> handle_call({info, InfoItems}, _From, State) -> {reply, infos(InfoItems, State), State, ?HIBERNATE_AFTER}; -handle_call({shutdown, Explanation} = Reason, _From, State = #state{conn_name = ConnName}) -> - %% rabbit_networking:close_all_user_connections -> rabbit_reader:shutdow - ?LOG_INFO("MQTT closing connection ~tp: ~p", [ConnName, Explanation]), - {stop, Reason, ok, State}; - handle_call(Msg, From, State) -> {stop, {mqtt_unexpected_call, Msg, From}, State}. @@ -252,7 +247,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> {noreply, State, ?HIBERNATE_AFTER}; handle_info({shutdown, Explanation} = Reason, State = #state{conn_name = ConnName}) -> - %% rabbitmq_management plugin requests to close connection. + %% rabbitmq_management plugin or CLI command requests to close connection. ?LOG_INFO("MQTT closing connection ~tp: ~p", [ConnName, Explanation]), {stop, Reason, State};