From 176b0a9489532cb47f5e71760d3c82509333ef79 Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Fri, 17 Jan 2025 17:44:43 +0000 Subject: [PATCH 01/16] Improve robustness of mqtt test broker startup --- crates/tests/mqtt_tests/src/test_mqtt_server.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/tests/mqtt_tests/src/test_mqtt_server.rs b/crates/tests/mqtt_tests/src/test_mqtt_server.rs index 7c3ded468f0..3f1b81c090a 100644 --- a/crates/tests/mqtt_tests/src/test_mqtt_server.rs +++ b/crates/tests/mqtt_tests/src/test_mqtt_server.rs @@ -10,7 +10,6 @@ use rumqttc::QoS; use rumqttd::Broker; use rumqttd::Config; use rumqttd::ConnectionSettings; -use rumqttd::ConsoleSettings; use rumqttd::ServerSettings; static SERVER: Lazy = Lazy::new(MqttProcessHandler::new); @@ -115,7 +114,11 @@ fn spawn_broker() -> u16 { } match broker_thread.join() { - Ok(Ok(())) => unreachable!("`broker.start()` does not terminate"), + Ok(Ok(())) => { + // I don't know why it happened, but I have observed this once while testing + // So just log the error and retry starting the broker on a new port + eprintln!("MQTT-TEST ERROR: `broker.start()` should not terminate until after `spawn_broker` returns") + }, Ok(Err(err)) => { eprintln!( "MQTT-TEST ERROR: fail to start the test MQTT broker: {:?}", @@ -184,9 +187,6 @@ fn get_rumqttd_config(port: u16) -> Config { connections: connections_settings, }; - let mut console_settings = ConsoleSettings::default(); - console_settings.listen = "localhost:3030".to_string(); - let mut servers = HashMap::new(); servers.insert("1".to_string(), server_config); @@ -194,7 +194,7 @@ fn get_rumqttd_config(port: u16) -> Config { id: 0, router: router_config, cluster: None, - console: Some(console_settings), + console: None, v4: Some(servers), ws: None, v5: None, From 28f09e5bb611f0b2695d7e22de31d751ca9f37dd Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Fri, 17 Jan 2025 17:49:21 +0000 Subject: [PATCH 02/16] Ensure mqtt_channel waits for messages to publish before closing connection gracefully --- crates/common/mqtt_channel/src/connection.rs | 27 +++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs index b1d2cff473b..8e1e99ac4aa 100644 --- a/crates/common/mqtt_channel/src/connection.rs +++ b/crates/common/mqtt_channel/src/connection.rs @@ -16,6 +16,7 @@ use rumqttc::EventLoop; use rumqttc::Incoming; use rumqttc::Outgoing; use rumqttc::Packet; +use std::task::Poll; use std::time::Duration; use tokio::time::sleep; @@ -88,19 +89,22 @@ impl Connection { let (mqtt_client, event_loop) = Connection::open(config, received_sender.clone(), error_sender.clone()).await?; + let (awaiting_acks_sender, awaiting_acks_receiver) = mpsc::unbounded(); tokio::spawn(Connection::receiver_loop( mqtt_client.clone(), config.clone(), event_loop, received_sender, error_sender.clone(), + awaiting_acks_receiver, + pub_done_sender, )); tokio::spawn(Connection::sender_loop( mqtt_client, published_receiver, error_sender, config.last_will_message.clone(), - pub_done_sender, + awaiting_acks_sender, )); Ok(Connection { @@ -200,8 +204,17 @@ impl Connection { mut event_loop: EventLoop, mut message_sender: mpsc::UnboundedSender, mut error_sender: mpsc::UnboundedSender, + awaiting_acks: mpsc::UnboundedReceiver<()>, + done: oneshot::Sender<()>, ) -> Result<(), MqttError> { + let awaiting_acks = awaiting_acks.peekable(); + futures::pin_mut!(awaiting_acks); loop { + if let Poll::Ready(None) = futures::poll!(awaiting_acks.as_mut().peek()) { + // If the channel is dropped, the sender loop has stopped + // and we've received an ack for every message published + mqtt_client.disconnect().await.unwrap(); + } match event_loop.poll().await { Ok(Event::Incoming(Packet::Publish(msg))) => { if msg.payload.len() > config.max_packet_size { @@ -252,6 +265,11 @@ impl Connection { break; } + Ok(Event::Incoming(Incoming::PubAck(_))) => { + // Mark one message as consumed + awaiting_acks.next().await; + } + Err(err) => { error!("MQTT connection error: {err}"); @@ -266,6 +284,7 @@ impl Connection { // No more messages will be forwarded to the client let _ = message_sender.close().await; let _ = error_sender.close().await; + let _ = done.send(()); Ok(()) } @@ -274,7 +293,7 @@ impl Connection { mut messages_receiver: mpsc::UnboundedReceiver, mut error_sender: mpsc::UnboundedSender, last_will: Option, - done: oneshot::Sender<()>, + mut awaiting_acks: mpsc::UnboundedSender<()>, ) { loop { match messages_receiver.next().await { @@ -290,6 +309,8 @@ impl Connection { .await { let _ = error_sender.send(err.into()).await; + } else { + awaiting_acks.send(()).await.unwrap(); } } } @@ -303,8 +324,6 @@ impl Connection { .publish(last_will.topic, last_will.qos, last_will.retain, payload) .await; } - let _ = mqtt_client.disconnect().await; - let _ = done.send(()); } pub(crate) async fn do_pause() { From 97c45501b20ecff4f9394334c1367d019c877b27 Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Fri, 17 Jan 2025 17:50:52 +0000 Subject: [PATCH 03/16] Fix flakiness from out of order messages --- crates/extensions/c8y_mapper_ext/src/tests.rs | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index bf9fb95bf5f..1d209e87cc1 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2086,27 +2086,37 @@ async fn json_custom_operation_status_multiple_operations_in_one_mqtt_message() ); mqtt.send(input_message).await.expect("Send failed"); - assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,111")]).await; - assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,222")]).await; - assert_received_contains_str(&mut mqtt, [("c8y/s/us", "504,333")]).await; + let mut messages = vec![]; + for _ in 0..6 { + messages.push(mqtt.recv().await.unwrap()); + } + let (mut requests, mut responses): (Vec<_>, Vec<_>) = messages + .iter() + .map(|msg| (msg.topic.name.as_str(), msg.payload.as_str().unwrap())) + .partition(|(_topic, payload)| payload.starts_with("504,")); + // The messages might get processed out of order, we don't care about the ordering of the messages + requests.sort(); + responses.sort(); + + assert_eq!( + requests, + [ + ("c8y/s/us", "504,111"), + ("c8y/s/us", "504,222"), + ("c8y/s/us", "504,333"), + ] + ); // escapes: we input JSON over MQTT, but emit Smartrest, thus initial: `do something "1"` becomes `"do something // ""1""\n"` (outer "" for the Smartrest record field, and then inside double quotes escape a single quote) - assert_received_contains_str( - &mut mqtt, - [("c8y/s/us", "506,111,\"do something \"\"1\"\"\n\"")], - ) - .await; - assert_received_contains_str( - &mut mqtt, - [("c8y/s/us", "506,222,\"do something \"\"2\"\"\n\"")], - ) - .await; - assert_received_contains_str( - &mut mqtt, - [("c8y/s/us", "506,333,\"do something \"\"3\"\"\n\"")], - ) - .await; + assert_eq!( + responses, + [ + ("c8y/s/us", "506,111,\"do something \"\"1\"\"\n\""), + ("c8y/s/us", "506,222,\"do something \"\"2\"\"\n\""), + ("c8y/s/us", "506,333,\"do something \"\"3\"\"\n\""), + ] + ); } #[tokio::test] From 31d2bde0d77d0db0af03b1e19df0d32cbde71355 Mon Sep 17 00:00:00 2001 From: James Rhodes <30299230+jarhodes314@users.noreply.github.com> Date: Mon, 20 Jan 2025 10:52:29 +0000 Subject: [PATCH 04/16] Update crates/common/mqtt_channel/src/connection.rs Co-authored-by: Didier Wenzek --- crates/common/mqtt_channel/src/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs index 8e1e99ac4aa..5e17e0e5371 100644 --- a/crates/common/mqtt_channel/src/connection.rs +++ b/crates/common/mqtt_channel/src/connection.rs @@ -265,7 +265,7 @@ impl Connection { break; } - Ok(Event::Incoming(Incoming::PubAck(_))) => { + Ok(Event::Incoming(Incoming::PubAck(_))) | Ok(Event::Incoming(Incoming::PubComp(_))) => { // Mark one message as consumed awaiting_acks.next().await; } From 838f5aab34a49dfc93442f2dbf56626dab28a9ac Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Tue, 21 Jan 2025 15:16:53 +0000 Subject: [PATCH 05/16] Handle republishes and track last will publish --- crates/common/mqtt_channel/src/connection.rs | 23 ++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs index 5e17e0e5371..15edc9c3c54 100644 --- a/crates/common/mqtt_channel/src/connection.rs +++ b/crates/common/mqtt_channel/src/connection.rs @@ -16,6 +16,9 @@ use rumqttc::EventLoop; use rumqttc::Incoming; use rumqttc::Outgoing; use rumqttc::Packet; +use rumqttc::PubAck; +use rumqttc::PubComp; +use std::collections::HashSet; use std::task::Poll; use std::time::Duration; use tokio::time::sleep; @@ -209,11 +212,16 @@ impl Connection { ) -> Result<(), MqttError> { let awaiting_acks = awaiting_acks.peekable(); futures::pin_mut!(awaiting_acks); + let mut outstanding_msgs = HashSet::new(); + let mut disconnected = false; loop { if let Poll::Ready(None) = futures::poll!(awaiting_acks.as_mut().peek()) { - // If the channel is dropped, the sender loop has stopped + // If the channel is empty, the sender loop has stopped // and we've received an ack for every message published - mqtt_client.disconnect().await.unwrap(); + if !disconnected && outstanding_msgs.is_empty() { + mqtt_client.disconnect().await.unwrap(); + disconnected = true; + } } match event_loop.poll().await { Ok(Event::Incoming(Packet::Publish(msg))) => { @@ -265,9 +273,15 @@ impl Connection { break; } - Ok(Event::Incoming(Incoming::PubAck(_))) | Ok(Event::Incoming(Incoming::PubComp(_))) => { + Ok(Event::Incoming(Incoming::PubAck(PubAck{ pkid}))) | Ok(Event::Incoming(Incoming::PubComp(PubComp { pkid }))) => { // Mark one message as consumed - awaiting_acks.next().await; + outstanding_msgs.remove(&pkid); + } + + Ok(Event::Outgoing(Outgoing::Publish(p))) => { + if outstanding_msgs.insert(p) { + awaiting_acks.next().await.unwrap(); + } } Err(err) => { @@ -323,6 +337,7 @@ impl Connection { let _ = mqtt_client .publish(last_will.topic, last_will.qos, last_will.retain, payload) .await; + awaiting_acks.send(()).await.unwrap(); } } From ac286008d29577610cbe453e200d329e6cdbfa23 Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Tue, 21 Jan 2025 17:38:41 +0000 Subject: [PATCH 06/16] Avoid blocking on awaiting acks --- crates/common/mqtt_channel/src/connection.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs index 15edc9c3c54..7c864f0961d 100644 --- a/crates/common/mqtt_channel/src/connection.rs +++ b/crates/common/mqtt_channel/src/connection.rs @@ -218,7 +218,10 @@ impl Connection { if let Poll::Ready(None) = futures::poll!(awaiting_acks.as_mut().peek()) { // If the channel is empty, the sender loop has stopped // and we've received an ack for every message published - if !disconnected && outstanding_msgs.is_empty() { + if !disconnected + && outstanding_msgs.is_empty() + && event_loop.state.events.is_empty() + { mqtt_client.disconnect().await.unwrap(); disconnected = true; } @@ -273,14 +276,15 @@ impl Connection { break; } - Ok(Event::Incoming(Incoming::PubAck(PubAck{ pkid}))) | Ok(Event::Incoming(Incoming::PubComp(PubComp { pkid }))) => { + Ok(Event::Incoming(Incoming::PubAck(PubAck { pkid }))) + | Ok(Event::Incoming(Incoming::PubComp(PubComp { pkid }))) => { // Mark one message as consumed outstanding_msgs.remove(&pkid); } Ok(Event::Outgoing(Outgoing::Publish(p))) => { if outstanding_msgs.insert(p) { - awaiting_acks.next().await.unwrap(); + // awaiting_acks.next().await.unwrap(); } } @@ -307,7 +311,7 @@ impl Connection { mut messages_receiver: mpsc::UnboundedReceiver, mut error_sender: mpsc::UnboundedSender, last_will: Option, - mut awaiting_acks: mpsc::UnboundedSender<()>, + _awaiting_acks: mpsc::UnboundedSender<()>, ) { loop { match messages_receiver.next().await { @@ -323,8 +327,6 @@ impl Connection { .await { let _ = error_sender.send(err.into()).await; - } else { - awaiting_acks.send(()).await.unwrap(); } } } @@ -337,7 +339,6 @@ impl Connection { let _ = mqtt_client .publish(last_will.topic, last_will.qos, last_will.retain, payload) .await; - awaiting_acks.send(()).await.unwrap(); } } From 584daf2b0e81731ff56bdf7e91f80cecfa439128 Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Tue, 21 Jan 2025 18:16:35 +0000 Subject: [PATCH 07/16] Don't risk blocking the mqtt-channel task --- crates/common/mqtt_channel/src/tests.rs | 50 ++++++++----------- .../tests/mqtt_tests/src/test_mqtt_server.rs | 2 +- 2 files changed, 22 insertions(+), 30 deletions(-) diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs index 84397b2ea56..698c4e370c8 100644 --- a/crates/common/mqtt_channel/src/tests.rs +++ b/crates/common/mqtt_channel/src/tests.rs @@ -444,35 +444,27 @@ async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), any // An mqtt process publishing messages // must ensure the messages have been sent before process exit. - std::thread::spawn(move || { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { - let mqtt_config = Config::default().with_port(broker.port); - - let topic = Topic::new_unchecked(topic); - let mut con = Connection::new(&mqtt_config).await.expect("a connection"); - - con.published - .send(MqttMessage::new(&topic, "datum 1")) - .await - .expect("message sent"); - con.published - .send(MqttMessage::new(&topic, "datum 2")) - .await - .expect("message sent"); - con.published - .send(MqttMessage::new(&topic, "datum 3")) - .await - .expect("message sent"); - - // Wait for all the messages to be actually sent - // before the runtime is shutdown dropping the mqtt sender loop. - con.close().await; - }); - }); + let mqtt_config = Config::default().with_port(broker.port); + + let topic = Topic::new_unchecked(topic); + let mut con = Connection::new(&mqtt_config).await.expect("a connection"); + + con.published + .send(MqttMessage::new(&topic, "datum 1")) + .await + .expect("message sent"); + con.published + .send(MqttMessage::new(&topic, "datum 2")) + .await + .expect("message sent"); + con.published + .send(MqttMessage::new(&topic, "datum 3")) + .await + .expect("message sent"); + + // Wait for all the messages to be actually sent + // before the runtime is shutdown dropping the mqtt sender loop. + tokio::time::timeout(Duration::from_secs(5),con.close()).await.expect("MQTT channel should close"); mqtt_tests::assert_received( &mut messages, diff --git a/crates/tests/mqtt_tests/src/test_mqtt_server.rs b/crates/tests/mqtt_tests/src/test_mqtt_server.rs index 3f1b81c090a..788853cbc5d 100644 --- a/crates/tests/mqtt_tests/src/test_mqtt_server.rs +++ b/crates/tests/mqtt_tests/src/test_mqtt_server.rs @@ -118,7 +118,7 @@ fn spawn_broker() -> u16 { // I don't know why it happened, but I have observed this once while testing // So just log the error and retry starting the broker on a new port eprintln!("MQTT-TEST ERROR: `broker.start()` should not terminate until after `spawn_broker` returns") - }, + } Ok(Err(err)) => { eprintln!( "MQTT-TEST ERROR: fail to start the test MQTT broker: {:?}", From 88c40c0382e6d7b2d729b471a994b05893548d7c Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Tue, 21 Jan 2025 18:18:05 +0000 Subject: [PATCH 08/16] Ensure the bridge test proxy is always cleaned up This should fix #3021 --- crates/extensions/tedge_mqtt_bridge/tests/bridge.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs index 6bf02e1f235..aef93259b85 100644 --- a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs +++ b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs @@ -512,8 +512,8 @@ impl Proxy { let (mut read_conn, mut write_conn) = conn.split(); tokio::select! { - res = tokio::io::copy(&mut read_socket, &mut write_conn) => { res.unwrap(); }, - res = tokio::io::copy(&mut read_conn, &mut write_socket) => { res.unwrap(); }, + _ = tokio::io::copy(&mut read_socket, &mut write_conn) => (), + _ = tokio::io::copy(&mut read_conn, &mut write_socket) => (), _ = stop.changed() => info!("shutting down proxy"), }; @@ -697,9 +697,6 @@ fn get_rumqttd_config(port: u16) -> Config { connections: connections_settings, }; - let mut console_settings = ConsoleSettings::default(); - console_settings.listen = format!("localhost:{}", port + 6); - let mut servers = HashMap::new(); servers.insert("1".to_string(), server_config); @@ -707,7 +704,7 @@ fn get_rumqttd_config(port: u16) -> Config { id: 0, router: router_config, cluster: None, - console: Some(console_settings), + console: None, v4: Some(servers), ws: None, v5: None, From 69ae10a1f048ac7ff0aa99b7b4c435c6a2a50dac Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Wed, 22 Jan 2025 17:40:31 +0000 Subject: [PATCH 09/16] Explicitly close the connection to make sure the connection is actually closed --- crates/common/mqtt_channel/src/tests.rs | 49 +++++++++++++------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs index 698c4e370c8..b4995a4c91f 100644 --- a/crates/common/mqtt_channel/src/tests.rs +++ b/crates/common/mqtt_channel/src/tests.rs @@ -214,9 +214,8 @@ async fn receiving_messages_while_not_connected() -> Result<(), anyhow::Error> { .with_session_name(session_name) .with_subscriptions(topic.try_into()?); { - let _con = Connection::new(&mqtt_config).await?; - - // A connection is disconnected on drop + let con = Connection::new(&mqtt_config).await?; + con.close().await; } // Any messages published on that topic while down ... @@ -444,27 +443,29 @@ async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), any // An mqtt process publishing messages // must ensure the messages have been sent before process exit. - let mqtt_config = Config::default().with_port(broker.port); - - let topic = Topic::new_unchecked(topic); - let mut con = Connection::new(&mqtt_config).await.expect("a connection"); - - con.published - .send(MqttMessage::new(&topic, "datum 1")) - .await - .expect("message sent"); - con.published - .send(MqttMessage::new(&topic, "datum 2")) - .await - .expect("message sent"); - con.published - .send(MqttMessage::new(&topic, "datum 3")) - .await - .expect("message sent"); - - // Wait for all the messages to be actually sent - // before the runtime is shutdown dropping the mqtt sender loop. - tokio::time::timeout(Duration::from_secs(5),con.close()).await.expect("MQTT channel should close"); + let mqtt_config = Config::default().with_port(broker.port); + + let topic = Topic::new_unchecked(topic); + let mut con = Connection::new(&mqtt_config).await.expect("a connection"); + + con.published + .send(MqttMessage::new(&topic, "datum 1")) + .await + .expect("message sent"); + con.published + .send(MqttMessage::new(&topic, "datum 2")) + .await + .expect("message sent"); + con.published + .send(MqttMessage::new(&topic, "datum 3")) + .await + .expect("message sent"); + + // Wait for all the messages to be actually sent + // before the runtime is shutdown dropping the mqtt sender loop. + tokio::time::timeout(Duration::from_secs(5), con.close()) + .await + .expect("MQTT channel should close"); mqtt_tests::assert_received( &mut messages, From c67990781093e5ebbcdc53032225bcbd9191c4cf Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Thu, 23 Jan 2025 10:07:21 +0000 Subject: [PATCH 10/16] Replace unneeded channel with simpler abstraction --- crates/common/mqtt_channel/src/connection.rs | 89 ++++++++++++------- .../tedge_mqtt_bridge/tests/bridge.rs | 1 - 2 files changed, 58 insertions(+), 32 deletions(-) diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs index 7c864f0961d..2c860058650 100644 --- a/crates/common/mqtt_channel/src/connection.rs +++ b/crates/common/mqtt_channel/src/connection.rs @@ -6,6 +6,7 @@ use crate::PubChannel; use crate::SubChannel; use futures::channel::mpsc; use futures::channel::oneshot; +use futures::poll; use futures::SinkExt; use futures::StreamExt; use log::error; @@ -19,6 +20,7 @@ use rumqttc::Packet; use rumqttc::PubAck; use rumqttc::PubComp; use std::collections::HashSet; +use std::sync::Arc; use std::task::Poll; use std::time::Duration; use tokio::time::sleep; @@ -92,14 +94,14 @@ impl Connection { let (mqtt_client, event_loop) = Connection::open(config, received_sender.clone(), error_sender.clone()).await?; - let (awaiting_acks_sender, awaiting_acks_receiver) = mpsc::unbounded(); + let (guard, guard2) = ConnectionGuard::new_pair(); tokio::spawn(Connection::receiver_loop( mqtt_client.clone(), config.clone(), event_loop, received_sender, error_sender.clone(), - awaiting_acks_receiver, + guard, pub_done_sender, )); tokio::spawn(Connection::sender_loop( @@ -107,7 +109,7 @@ impl Connection { published_receiver, error_sender, config.last_will_message.clone(), - awaiting_acks_sender, + guard2, )); Ok(Connection { @@ -207,26 +209,47 @@ impl Connection { mut event_loop: EventLoop, mut message_sender: mpsc::UnboundedSender, mut error_sender: mpsc::UnboundedSender, - awaiting_acks: mpsc::UnboundedReceiver<()>, + guard: ConnectionGuard, done: oneshot::Sender<()>, ) -> Result<(), MqttError> { - let awaiting_acks = awaiting_acks.peekable(); - futures::pin_mut!(awaiting_acks); let mut outstanding_msgs = HashSet::new(); let mut disconnected = false; + + enum PollEvent { + Event(Result), + RequestDisconnect, + } + + async fn next_event( + ev: &mut EventLoop, + guard: &ConnectionGuard, + disconnected: bool, + ) -> PollEvent { + let poll_ev = ev.poll(); + tokio::pin!(poll_ev); + loop { + if let Poll::Ready(r) = poll!(&mut poll_ev) { + break PollEvent::Event(r); + } else if !disconnected && guard.other_half_dropped() { + break PollEvent::RequestDisconnect; + } + tokio::task::yield_now().await; + } + } + loop { - if let Poll::Ready(None) = futures::poll!(awaiting_acks.as_mut().peek()) { - // If the channel is empty, the sender loop has stopped - // and we've received an ack for every message published - if !disconnected - && outstanding_msgs.is_empty() - && event_loop.state.events.is_empty() + let event = match next_event(&mut event_loop, &guard, disconnected).await { + PollEvent::RequestDisconnect + if outstanding_msgs.is_empty() && event_loop.state.events.is_empty() => { mqtt_client.disconnect().await.unwrap(); disconnected = true; + continue; } - } - match event_loop.poll().await { + PollEvent::RequestDisconnect => event_loop.poll().await, + PollEvent::Event(e) => e, + }; + match event { Ok(Event::Incoming(Packet::Publish(msg))) => { if msg.payload.len() > config.max_packet_size { error!("Dropping message received on topic {} with payload size {} that exceeds the maximum packet size of {}", @@ -311,24 +334,15 @@ impl Connection { mut messages_receiver: mpsc::UnboundedReceiver, mut error_sender: mpsc::UnboundedSender, last_will: Option, - _awaiting_acks: mpsc::UnboundedSender<()>, + _guard: ConnectionGuard, ) { - loop { - match messages_receiver.next().await { - None => { - // The sender channel has been closed by the client - // No more messages will be published by the client - break; - } - Some(message) => { - let payload = Vec::from(message.payload_bytes()); - if let Err(err) = mqtt_client - .publish(message.topic, message.qos, message.retain, payload) - .await - { - let _ = error_sender.send(err.into()).await; - } - } + while let Some(message) = messages_receiver.next().await { + let payload = Vec::from(message.payload_bytes()); + if let Err(err) = mqtt_client + .publish(message.topic, message.qos, message.retain, payload) + .await + { + let _ = error_sender.send(err.into()).await; } } @@ -356,3 +370,16 @@ impl Connection { .map_err(MqttError::ClientError) } } + +struct ConnectionGuard(Arc<()>); + +impl ConnectionGuard { + fn new_pair() -> (Self, Self) { + let count = Arc::new(()); + (Self(count.clone()), Self(count)) + } + + fn other_half_dropped(&self) -> bool { + Arc::strong_count(&self.0) == 1 + } +} diff --git a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs index aef93259b85..1e90940b99c 100644 --- a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs +++ b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs @@ -11,7 +11,6 @@ use rumqttc::QoS; use rumqttd::Broker; use rumqttd::Config; use rumqttd::ConnectionSettings; -use rumqttd::ConsoleSettings; use rumqttd::ServerSettings; use std::collections::HashMap; use std::str::from_utf8; From 608f20b401de013e69720db03145cbbd0fe0fd9f Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Fri, 24 Jan 2025 16:10:05 +0000 Subject: [PATCH 11/16] Replace `ConnectionGuard` with `tokio::Semaphore` to ensure waking happens correctly --- crates/common/mqtt_channel/Cargo.toml | 2 +- crates/common/mqtt_channel/src/connection.rs | 98 +++++++------------- 2 files changed, 32 insertions(+), 68 deletions(-) diff --git a/crates/common/mqtt_channel/Cargo.toml b/crates/common/mqtt_channel/Cargo.toml index 7ed60aafe64..4e3b0dc51c5 100644 --- a/crates/common/mqtt_channel/Cargo.toml +++ b/crates/common/mqtt_channel/Cargo.toml @@ -17,7 +17,7 @@ log = { workspace = true } rumqttc = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true, features = ["rt", "time"] } +tokio = { workspace = true, features = ["rt", "time", "rt-multi-thread"] } zeroize = { workspace = true } [dev-dependencies] diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs index 2c860058650..1fa79f47a87 100644 --- a/crates/common/mqtt_channel/src/connection.rs +++ b/crates/common/mqtt_channel/src/connection.rs @@ -6,7 +6,7 @@ use crate::PubChannel; use crate::SubChannel; use futures::channel::mpsc; use futures::channel::oneshot; -use futures::poll; +use futures::future::Either; use futures::SinkExt; use futures::StreamExt; use log::error; @@ -17,12 +17,10 @@ use rumqttc::EventLoop; use rumqttc::Incoming; use rumqttc::Outgoing; use rumqttc::Packet; -use rumqttc::PubAck; -use rumqttc::PubComp; -use std::collections::HashSet; use std::sync::Arc; -use std::task::Poll; use std::time::Duration; +use tokio::sync::OwnedSemaphorePermit; +use tokio::sync::Semaphore; use tokio::time::sleep; /// A connection to some MQTT server @@ -94,22 +92,23 @@ impl Connection { let (mqtt_client, event_loop) = Connection::open(config, received_sender.clone(), error_sender.clone()).await?; - let (guard, guard2) = ConnectionGuard::new_pair(); + let permits = Arc::new(Semaphore::new(1)); + let permit = permits.clone().acquire_owned().await.unwrap(); tokio::spawn(Connection::receiver_loop( mqtt_client.clone(), config.clone(), event_loop, received_sender, error_sender.clone(), - guard, pub_done_sender, + permits, )); tokio::spawn(Connection::sender_loop( mqtt_client, published_receiver, error_sender, config.last_will_message.clone(), - guard2, + permit, )); Ok(Connection { @@ -209,46 +208,36 @@ impl Connection { mut event_loop: EventLoop, mut message_sender: mpsc::UnboundedSender, mut error_sender: mpsc::UnboundedSender, - guard: ConnectionGuard, done: oneshot::Sender<()>, + permits: Arc, ) -> Result<(), MqttError> { - let mut outstanding_msgs = HashSet::new(); - let mut disconnected = false; - - enum PollEvent { - Event(Result), - RequestDisconnect, - } - - async fn next_event( - ev: &mut EventLoop, - guard: &ConnectionGuard, - disconnected: bool, - ) -> PollEvent { - let poll_ev = ev.poll(); - tokio::pin!(poll_ev); - loop { - if let Poll::Ready(r) = poll!(&mut poll_ev) { - break PollEvent::Event(r); - } else if !disconnected && guard.other_half_dropped() { - break PollEvent::RequestDisconnect; - } - tokio::task::yield_now().await; - } - } + let mut triggered_disconnect = false; + let mut disconnect_permit = None; loop { - let event = match next_event(&mut event_loop, &guard, disconnected).await { - PollEvent::RequestDisconnect - if outstanding_msgs.is_empty() && event_loop.state.events.is_empty() => - { - mqtt_client.disconnect().await.unwrap(); - disconnected = true; + let remaining_events_empty = event_loop.state.inflight() == 0; + if disconnect_permit.is_some() && !triggered_disconnect && remaining_events_empty { + let client = mqtt_client.clone(); + tokio::spawn(async move { client.disconnect().await }); + // tokio::fs::write("/tmp/thing.txt",format!("{:#?}", &events.event_loop.state)).await.unwrap(); + triggered_disconnect = true; + } + let next_event = event_loop.poll(); + let next_permit = permits.clone().acquire_owned(); + tokio::pin!(next_event); + tokio::pin!(next_permit); + let event = futures::future::select(next_event.as_mut(), next_permit.as_mut()).await; + let event = match event { + Either::Left((event, _)) => { + disconnect_permit.take(); + event + } + Either::Right((permit, _)) => { + disconnect_permit = Some(permit.unwrap()); continue; } - PollEvent::RequestDisconnect => event_loop.poll().await, - PollEvent::Event(e) => e, }; + match event { Ok(Event::Incoming(Packet::Publish(msg))) => { if msg.payload.len() > config.max_packet_size { @@ -299,18 +288,6 @@ impl Connection { break; } - Ok(Event::Incoming(Incoming::PubAck(PubAck { pkid }))) - | Ok(Event::Incoming(Incoming::PubComp(PubComp { pkid }))) => { - // Mark one message as consumed - outstanding_msgs.remove(&pkid); - } - - Ok(Event::Outgoing(Outgoing::Publish(p))) => { - if outstanding_msgs.insert(p) { - // awaiting_acks.next().await.unwrap(); - } - } - Err(err) => { error!("MQTT connection error: {err}"); @@ -334,7 +311,7 @@ impl Connection { mut messages_receiver: mpsc::UnboundedReceiver, mut error_sender: mpsc::UnboundedSender, last_will: Option, - _guard: ConnectionGuard, + _guard: OwnedSemaphorePermit, ) { while let Some(message) = messages_receiver.next().await { let payload = Vec::from(message.payload_bytes()); @@ -370,16 +347,3 @@ impl Connection { .map_err(MqttError::ClientError) } } - -struct ConnectionGuard(Arc<()>); - -impl ConnectionGuard { - fn new_pair() -> (Self, Self) { - let count = Arc::new(()); - (Self(count.clone()), Self(count)) - } - - fn other_half_dropped(&self) -> bool { - Arc::strong_count(&self.0) == 1 - } -} From bcceb2388176b7655be52edacbf6d8e65500c688 Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Fri, 24 Jan 2025 16:20:27 +0000 Subject: [PATCH 12/16] Remove `serial_test` from mqtt-channel --- Cargo.lock | 2 +- crates/common/mqtt_channel/Cargo.toml | 1 - crates/common/mqtt_channel/src/tests.rs | 168 ++++++++---------- crates/tests/mqtt_tests/Cargo.toml | 5 +- .../tests/mqtt_tests/src/test_mqtt_client.rs | 2 +- .../tests/mqtt_tests/src/test_mqtt_server.rs | 16 +- 6 files changed, 93 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 41b40225bbc..e593ba8b373 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2411,7 +2411,6 @@ dependencies = [ "rumqttc", "serde", "serde_json", - "serial_test", "thiserror", "tokio", "zeroize", @@ -2422,6 +2421,7 @@ name = "mqtt_tests" version = "1.4.2" dependencies = [ "anyhow", + "backoff", "fastrand 1.9.0", "futures", "once_cell", diff --git a/crates/common/mqtt_channel/Cargo.toml b/crates/common/mqtt_channel/Cargo.toml index 4e3b0dc51c5..1dd519c0911 100644 --- a/crates/common/mqtt_channel/Cargo.toml +++ b/crates/common/mqtt_channel/Cargo.toml @@ -24,7 +24,6 @@ zeroize = { workspace = true } anyhow = { workspace = true } mqtt_tests = { workspace = true } serde_json = { workspace = true } -serial_test = { workspace = true } [lints] workspace = true diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs index b4995a4c91f..927e8d6dca1 100644 --- a/crates/common/mqtt_channel/src/tests.rs +++ b/crates/common/mqtt_channel/src/tests.rs @@ -1,30 +1,38 @@ use crate::*; use futures::SinkExt; use futures::StreamExt; -use serial_test::serial; use std::convert::TryInto; use std::time::Duration; const TIMEOUT: Duration = Duration::from_millis(1000); +/// Prefixes a topic/session name with a module path and line number +/// +/// This allows multiple tests to share an MQTT broker, allowing them to +/// run concurrently within a single test process. +macro_rules! uniquify { + ($name:literal) => { + ::std::concat!(::std::module_path!(), ::std::line!(), "-", $name) + }; +} + #[tokio::test] -#[serial] -async fn subscribing_to_messages() -> Result<(), anyhow::Error> { +async fn subscribing_to_messages() { // Given an MQTT broker let broker = mqtt_tests::test_mqtt_broker(); let mqtt_config = Config::default().with_port(broker.port); // A client subscribes to a topic on connect - let topic = "a/test/topic"; + let topic = uniquify!("a/test/topic"); let mqtt_config = mqtt_config - .with_session_name("test_client") - .with_subscriptions(topic.try_into()?); - let mut con = Connection::new(&mqtt_config).await?; + .with_session_name(uniquify!("test_client")) + .with_subscriptions(topic.try_into().unwrap()); + let mut con = Connection::new(&mqtt_config).await.unwrap(); // Any messages published on that topic ... - broker.publish(topic, "msg 1").await?; - broker.publish(topic, "msg 2").await?; - broker.publish(topic, "msg 3").await?; + broker.publish(topic, "msg 1").await.unwrap(); + broker.publish(topic, "msg 2").await.unwrap(); + broker.publish(topic, "msg 3").await.unwrap(); // ... must be received by the client assert_eq!( @@ -39,8 +47,6 @@ async fn subscribing_to_messages() -> Result<(), anyhow::Error> { MaybeMessage::Next(message(topic, "msg 3")), next_message(&mut con.received).await ); - - Ok(()) } #[derive(Debug, Clone, Eq, PartialEq)] @@ -65,40 +71,39 @@ async fn next_message(received: &mut (impl StreamExt + Unpin } #[tokio::test] -#[serial] -async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> { +async fn subscribing_to_many_topics() { // Given an MQTT broker let broker = mqtt_tests::test_mqtt_broker(); let mqtt_config = Config::default().with_port(broker.port); // A client can subscribe to many topics let topics = vec![ - "/a/first/topic", - "/a/second/topic", - "/a/+/pattern", // one can use + pattern - "/any/#", // one can use # pattern + "many_topics/a/first/topic", + "many_topics/a/second/topic", + "many_topics/a/+/pattern", // one can use + pattern + "many_topics/any/#", // one can use # pattern ] .try_into() .expect("a list of topic filters"); let mqtt_config = mqtt_config - .with_session_name("client_subscribing_to_many_topics") + .with_session_name(uniquify!("client_subscribing_to_many_topics")) .with_subscriptions(topics); - let con = Connection::new(&mqtt_config).await?; + let con = Connection::new(&mqtt_config).await.unwrap(); // The messages for these topics will all be received on the same message stream let mut messages = con.received; // A message published on any of the subscribed topics must be received for (topic, payload) in vec![ - ("/a/first/topic", "a first message"), - ("/a/second/topic", "a second message"), - ("/a/plus/pattern", "a third message"), - ("/any/sub/topic", "a fourth message"), + ("many_topics/a/first/topic", "a first message"), + ("many_topics/a/second/topic", "a second message"), + ("many_topics/a/plus/pattern", "a third message"), + ("many_topics/any/sub/topic", "a fourth message"), ] .into_iter() { - broker.publish(topic, payload).await?; + broker.publish(topic, payload).await.unwrap(); assert_eq!( MaybeMessage::Next(message(topic, payload)), next_message(&mut messages).await @@ -107,41 +112,35 @@ async fn subscribing_to_many_topics() -> Result<(), anyhow::Error> { // No message should be received from un-subscribed topics for (topic, payload) in vec![ - ("/a/third/topic", "unrelated message"), - ("/unrelated/topic", "unrelated message"), + ("many_topics/a/third/topic", "unrelated message"), + ("many_topics/unrelated/topic", "unrelated message"), ] .into_iter() { - broker.publish(topic, payload).await?; + broker.publish(topic, payload).await.unwrap(); assert_eq!(MaybeMessage::Timeout, next_message(&mut messages).await); } - - Ok(()) } #[tokio::test] -#[serial] async fn publishing_messages() -> Result<(), anyhow::Error> { // Given an MQTT broker let broker = mqtt_tests::test_mqtt_broker(); let mqtt_config = Config::default().with_port(broker.port); - let mut all_messages = broker.messages_published_on("#").await; + let topic = uniquify!("foo/topic"); + let mut all_messages = broker.messages_published_on(topic).await; // A client that wish only publish messages doesn't have to subscribe to any topics - let mqtt_config = mqtt_config.with_session_name("publishing_messages"); + let mqtt_config = mqtt_config.with_session_name(uniquify!("publishing_messages")); let mut con = Connection::new(&mqtt_config).await?; // Then all messages produced on the `con.published` channel + con.published.send(message(topic, "foo payload")).await?; con.published - .send(message("foo/topic", "foo payload")) - .await?; - con.published - .send(message("foo/topic", "again a foo payload")) - .await?; - con.published - .send(message("bar/topic", "bar payload")) + .send(message(topic, "again a foo payload")) .await?; + con.published.send(message(topic, "bar payload")).await?; // ... must be actually published mqtt_tests::assert_received( @@ -155,20 +154,19 @@ async fn publishing_messages() -> Result<(), anyhow::Error> { } #[tokio::test] -#[serial] async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> { // Given an MQTT broker let broker = mqtt_tests::test_mqtt_broker(); let mqtt_config = Config::default().with_port(broker.port); // and an MQTT connection with input and output topics - let in_topic = "mapper/input".try_into().expect("a valid topic filter"); - let out_topic = "mapper/output".try_into().expect("a valid topic name"); - let mut out_messages = broker.messages_published_on("mapper/output").await; + let in_topic = uniquify!("mapper/input"); + let out_topic = uniquify!("mapper/output"); + let mut out_messages = broker.messages_published_on(out_topic).await; let mqtt_config = mqtt_config - .with_session_name("mapper") - .with_subscriptions(in_topic); + .with_session_name(uniquify!("mapper")) + .with_subscriptions(in_topic.try_into().expect("a valid topic filter")); let con = Connection::new(&mqtt_config).await?; // A message mapper can be implemented as @@ -181,7 +179,7 @@ async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> { while let MaybeMessage::Next(msg) = next_message(&mut input).await { let req = msg.payload_str().expect("utf8 payload"); let res = req.to_uppercase(); - let msg = MqttMessage::new(&out_topic, res.as_bytes()); + let msg = message(out_topic, &res); if output.send(msg).await.is_err() { // the connection has been closed break; @@ -190,9 +188,9 @@ async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> { }); // Any messages published on the input topic ... - broker.publish("mapper/input", "msg 1").await?; - broker.publish("mapper/input", "msg 2").await?; - broker.publish("mapper/input", "msg 3").await?; + broker.publish(in_topic, "msg 1").await?; + broker.publish(in_topic, "msg 2").await?; + broker.publish(in_topic, "msg 3").await?; // ... is then transformed and published on the output topic. mqtt_tests::assert_received(&mut out_messages, TIMEOUT, vec!["MSG 1", "MSG 2", "MSG 3"]).await; @@ -201,7 +199,6 @@ async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> { } #[tokio::test] -#[serial] async fn receiving_messages_while_not_connected() -> Result<(), anyhow::Error> { // Given an MQTT broker let broker = mqtt_tests::test_mqtt_broker(); @@ -209,7 +206,7 @@ async fn receiving_messages_while_not_connected() -> Result<(), anyhow::Error> { // A client that connects with a well-known session name, subscribing to some topic. let session_name = "remember_me"; - let topic = "my/nice/topic"; + let topic = uniquify!("my/nice/topic"); let mqtt_config = mqtt_config .with_session_name(session_name) .with_subscriptions(topic.try_into()?); @@ -243,16 +240,15 @@ async fn receiving_messages_while_not_connected() -> Result<(), anyhow::Error> { } #[tokio::test] -#[serial] async fn testing_an_mqtt_client_without_mqtt() -> Result<(), anyhow::Error> { + static OUT_TOPIC: &str = uniquify!("out/topic"); + static IN_TOPIC: &str = uniquify!("in/topic"); // Given an mqtt client async fn run(mut input: impl SubChannel, mut output: impl PubChannel) { - let out_topic = Topic::new_unchecked("out/topic"); - while let Some(msg) = input.next().await { let req = msg.payload_str().expect("utf8 payload"); let res = req.to_uppercase(); - let msg = MqttMessage::new(&out_topic, res.as_bytes()); + let msg = message(OUT_TOPIC, &res); if output.send(msg).await.is_err() { break; } @@ -262,14 +258,14 @@ async fn testing_an_mqtt_client_without_mqtt() -> Result<(), anyhow::Error> { // This client can be tested without any MQTT broker. let input = vec![ - message("in/topic", "a message"), - message("in/topic", "another message"), - message("in/topic", "yet another message"), + message(IN_TOPIC, "a message"), + message(IN_TOPIC, "another message"), + message(IN_TOPIC, "yet another message"), ]; let expected = vec![ - message("out/topic", "A MESSAGE"), - message("out/topic", "ANOTHER MESSAGE"), - message("out/topic", "YET ANOTHER MESSAGE"), + message(OUT_TOPIC, "A MESSAGE"), + message(OUT_TOPIC, "ANOTHER MESSAGE"), + message(OUT_TOPIC, "YET ANOTHER MESSAGE"), ]; let input_stream = mqtt_tests::input_stream(input).await; @@ -280,18 +276,18 @@ async fn testing_an_mqtt_client_without_mqtt() -> Result<(), anyhow::Error> { // This very same client can be tested with an MQTT broker let broker = mqtt_tests::test_mqtt_broker(); let mqtt_config = Config::default().with_port(broker.port); - let mut out_messages = broker.messages_published_on("out/topic").await; + let mut out_messages = broker.messages_published_on(OUT_TOPIC).await; - let in_topic = "in/topic".try_into().expect("a valid topic filter"); + let in_topic = IN_TOPIC.try_into().expect("a valid topic filter"); let mqtt_config = mqtt_config .with_session_name("mapper under test") .with_subscriptions(in_topic); let con = Connection::new(&mqtt_config).await?; tokio::spawn(async move { run(con.received, con.published).await }); - broker.publish("in/topic", "msg 1, over MQTT").await?; - broker.publish("in/topic", "msg 2, over MQTT").await?; - broker.publish("in/topic", "msg 3, over MQTT").await?; + broker.publish(IN_TOPIC, "msg 1, over MQTT").await?; + broker.publish(IN_TOPIC, "msg 2, over MQTT").await?; + broker.publish(IN_TOPIC, "msg 3, over MQTT").await?; mqtt_tests::assert_received( &mut out_messages, @@ -304,15 +300,14 @@ async fn testing_an_mqtt_client_without_mqtt() -> Result<(), anyhow::Error> { } #[tokio::test] -#[serial] async fn creating_a_session() -> Result<(), anyhow::Error> { // Given an MQTT broker let broker = mqtt_tests::test_mqtt_broker(); let mqtt_config = Config::default().with_port(broker.port); // Given an MQTT config with a well-known session name - let session_name = "my-session-name"; - let topic = "my/topic"; + let session_name = uniquify!("my-session-name"); + let topic = uniquify!("my/topic"); let mqtt_config = mqtt_config .with_session_name(session_name) .with_subscriptions(topic.try_into()?); @@ -351,7 +346,6 @@ async fn creating_a_session() -> Result<(), anyhow::Error> { } #[tokio::test] -#[serial] async fn a_session_must_have_a_name() { let broker = mqtt_tests::test_mqtt_broker(); let mqtt_config = Config::default().with_port(broker.port); @@ -362,12 +356,11 @@ async fn a_session_must_have_a_name() { } #[tokio::test] -#[serial] async fn a_named_session_must_not_set_clean_session() { let broker = mqtt_tests::test_mqtt_broker(); let mqtt_config = Config::default() .with_port(broker.port) - .with_session_name("useless name") + .with_session_name(uniquify!("useless name")) .with_clean_session(true); let result = init_session(&mqtt_config).await; @@ -376,15 +369,14 @@ async fn a_named_session_must_not_set_clean_session() { } #[tokio::test] -#[serial] async fn cleaning_a_session() -> Result<(), anyhow::Error> { // Given an MQTT broker let broker = mqtt_tests::test_mqtt_broker(); let mqtt_config = Config::default().with_port(broker.port); // Given an MQTT config with a well-known session name - let session_name = "a-session-name"; - let topic = "a/topic"; + let session_name = uniquify!("a-session-name"); + let topic = uniquify!("a/topic"); let mqtt_config = mqtt_config .with_session_name(session_name) .with_subscriptions(topic.try_into()?); @@ -424,7 +416,6 @@ async fn cleaning_a_session() -> Result<(), anyhow::Error> { } #[tokio::test] -#[serial] async fn to_be_cleared_a_session_must_have_a_name() { let broker = mqtt_tests::test_mqtt_broker(); let mqtt_config = Config::default().with_port(broker.port); @@ -435,10 +426,9 @@ async fn to_be_cleared_a_session_must_have_a_name() { } #[tokio::test] -#[serial] async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), anyhow::Error> { let broker = mqtt_tests::test_mqtt_broker(); - let topic = "data/topic"; + let topic = uniquify!("data/topic"); let mut messages = broker.messages_published_on(topic).await; // An mqtt process publishing messages @@ -478,9 +468,8 @@ async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), any } #[tokio::test] -#[serial] async fn ensure_that_last_will_message_is_delivered() -> Result<(), anyhow::Error> { - let topic = "test/lwp"; + let topic = uniquify!("test/lwp"); let broker = mqtt_tests::test_mqtt_broker(); // start a subscriber to capture all the messages let mut messages = broker.messages_published_on(topic).await; @@ -527,13 +516,12 @@ async fn ensure_that_last_will_message_is_delivered() -> Result<(), anyhow::Erro } #[tokio::test] -#[serial] async fn test_retain_message_delivery() -> Result<(), anyhow::Error> { // Given an MQTT broker let broker = mqtt_tests::test_mqtt_broker(); let mqtt_config = Config::default().with_port(broker.port); - let topic = "retained/topic"; + let topic = uniquify!("retained/topic"); let mqtt_config = mqtt_config.with_subscriptions(topic.try_into()?); // A client that subsribes to a topic. @@ -541,12 +529,7 @@ async fn test_retain_message_delivery() -> Result<(), anyhow::Error> { //Raise retained alarm message broker - .publish_with_opts( - "retained/topic", - "a retained message", - QoS::AtLeastOnce, - true, - ) + .publish_with_opts(topic, "a retained message", QoS::AtLeastOnce, true) .await .unwrap(); @@ -559,7 +542,7 @@ async fn test_retain_message_delivery() -> Result<(), anyhow::Error> { //Clear the last raised retained message broker .publish_with_opts( - "retained/topic", + topic, "", //Empty message indicates clear QoS::AtLeastOnce, true, @@ -586,7 +569,6 @@ async fn test_retain_message_delivery() -> Result<(), anyhow::Error> { } #[tokio::test] -#[serial] async fn test_max_packet_size_validation() -> Result<(), anyhow::Error> { // Given an MQTT broker let broker = mqtt_tests::test_mqtt_broker(); @@ -595,9 +577,9 @@ async fn test_max_packet_size_validation() -> Result<(), anyhow::Error> { .with_max_packet_size(4); // A client subscribes to a topic on connect - let topic = "a/test/topic"; + let topic = uniquify!("a/test/topic"); let mqtt_config = mqtt_config - .with_session_name("test_client") + .with_session_name(uniquify!("test_client")) .with_subscriptions(topic.try_into()?); let mut con = Connection::new(&mqtt_config).await?; diff --git a/crates/tests/mqtt_tests/Cargo.toml b/crates/tests/mqtt_tests/Cargo.toml index f090228c073..6c335c58273 100644 --- a/crates/tests/mqtt_tests/Cargo.toml +++ b/crates/tests/mqtt_tests/Cargo.toml @@ -10,12 +10,15 @@ repository = { workspace = true } [dependencies] anyhow = { workspace = true } +backoff = { workspace = true } fastrand = { workspace = true } futures = { workspace = true } once_cell = { workspace = true } rumqttc = { workspace = true } rumqttd = { workspace = true } -tokio = { workspace = true, default_features = false, features = [] } +tokio = { workspace = true, default_features = false, features = [ + "rt-multi-thread", +] } [lints] workspace = true diff --git a/crates/tests/mqtt_tests/src/test_mqtt_client.rs b/crates/tests/mqtt_tests/src/test_mqtt_client.rs index 1b3d237bff6..2fb352637b8 100644 --- a/crates/tests/mqtt_tests/src/test_mqtt_client.rs +++ b/crates/tests/mqtt_tests/src/test_mqtt_client.rs @@ -189,7 +189,7 @@ impl TestCon { self.client.publish(topic, qos, retain, payload).await?; loop { - if let Event::Incoming(Packet::PubAck(_)) = self.eventloop.poll().await? { + if let Ok(Event::Incoming(Packet::PubAck(_))) = self.eventloop.poll().await { return Ok(()); } } diff --git a/crates/tests/mqtt_tests/src/test_mqtt_server.rs b/crates/tests/mqtt_tests/src/test_mqtt_server.rs index 788853cbc5d..1aa8902aed1 100644 --- a/crates/tests/mqtt_tests/src/test_mqtt_server.rs +++ b/crates/tests/mqtt_tests/src/test_mqtt_server.rs @@ -2,6 +2,9 @@ use std::collections::HashMap; use std::net::TcpListener; use std::time::Duration; +use backoff::backoff::Backoff; +use backoff::exponential::ExponentialBackoff; +use backoff::SystemClock; use futures::channel::mpsc::UnboundedReceiver; use once_cell::sync::Lazy; use rumqttc::Event; @@ -24,8 +27,14 @@ pub struct MqttProcessHandler { impl MqttProcessHandler { pub fn new() -> MqttProcessHandler { - let port = spawn_broker(); - MqttProcessHandler { port } + let mut backoff = ExponentialBackoff::::default(); + loop { + if let Ok(port) = std::panic::catch_unwind(spawn_broker) { + break MqttProcessHandler { port }; + } else { + std::thread::sleep(backoff.next_backoff().unwrap()); + } + } } pub async fn publish(&self, topic: &str, payload: &str) -> Result<(), anyhow::Error> { @@ -144,7 +153,6 @@ fn spawn_broker() -> u16 { loop { let msg = connection.recv(); - eprintln!("{msg:#?}"); if let Ok(Ok(Event::Incoming(Incoming::Publish(publish)))) = msg { let payload = match std::str::from_utf8(publish.payload.as_ref()) { Ok(payload) => format!("{:.110}", payload), @@ -165,7 +173,7 @@ fn get_rumqttd_config(port: u16) -> Config { let router_config = rumqttd::RouterConfig { max_segment_size: 10240, max_segment_count: 10, - max_connections: 10, + max_connections: 1000, initialized_filters: None, ..Default::default() }; From ad6572e08748d7ca7d82fdf049a79838a54d7eaf Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Fri, 24 Jan 2025 16:21:05 +0000 Subject: [PATCH 13/16] Make tests more portable to allow them to run under `cargo stress` --- crates/common/axum_tls/src/files.rs | 23 +++++++++++++---------- crates/core/plugin_sm/tests/plugin.rs | 8 +------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/crates/common/axum_tls/src/files.rs b/crates/common/axum_tls/src/files.rs index f327551343f..6e3cc308cd3 100644 --- a/crates/common/axum_tls/src/files.rs +++ b/crates/common/axum_tls/src/files.rs @@ -114,6 +114,7 @@ mod tests { use assert_matches::assert_matches; use axum::routing::get; use axum::Router; + use camino::Utf8PathBuf; use std::io::Cursor; mod read_trust_store { @@ -199,27 +200,28 @@ mod tests { } fn copy_test_file_to(test_file: &str, path: impl AsRef) -> io::Result { - std::fs::copy(format!("./test_data/{test_file}"), path) + let dir = env!("CARGO_MANIFEST_DIR"); + std::fs::copy(format!("{dir}/test_data/{test_file}"), path) } } #[test] fn load_pkey_fails_when_given_x509_certificate() { + let dir = env!("CARGO_MANIFEST_DIR"); + let path = Utf8PathBuf::from(format!("{dir}/test_data/ec.crt")); assert_eq!( - load_pkey(Utf8Path::new("./test_data/ec.crt")) - .unwrap_err() - .to_string(), - "expected private key in \"./test_data/ec.crt\", found an X509 certificate" + load_pkey(&path).unwrap_err().to_string(), + format!("expected private key in {path:?}, found an X509 certificate") ); } #[test] fn load_pkey_fails_when_given_certificate_revocation_list() { + let dir = env!("CARGO_MANIFEST_DIR"); + let path = Utf8PathBuf::from(format!("{dir}/test_data/demo.crl")); assert_eq!( - load_pkey(Utf8Path::new("./test_data/demo.crl")) - .unwrap_err() - .to_string(), - "expected private key in \"./test_data/demo.crl\", found a CRL" + load_pkey(&path).unwrap_err().to_string(), + format!("expected private key in {path:?}, found a CRL") ); } @@ -288,7 +290,8 @@ mod tests { } fn test_data(file_name: &str) -> String { - std::fs::read_to_string(format!("./test_data/{file_name}")) + let dir = env!("CARGO_MANIFEST_DIR"); + std::fs::read_to_string(format!("{dir}/test_data/{file_name}")) .with_context(|| format!("opening file {file_name} from test_data")) .unwrap() } diff --git a/crates/core/plugin_sm/tests/plugin.rs b/crates/core/plugin_sm/tests/plugin.rs index 1a5936db274..d55700078ac 100644 --- a/crates/core/plugin_sm/tests/plugin.rs +++ b/crates/core/plugin_sm/tests/plugin.rs @@ -1,6 +1,5 @@ #[cfg(test)] mod tests { - use certificate::CloudRootCerts; use plugin_sm::plugin::deserialize_module_info; use plugin_sm::plugin::sm_path; @@ -9,7 +8,6 @@ mod tests { use std::io::Write; use std::path::Path; use std::path::PathBuf; - use std::str::FromStr; use tedge_api::SoftwareError; use tedge_api::SoftwareModule; use tedge_config::SudoCommandBuilder; @@ -194,13 +192,9 @@ mod tests { } fn get_dummy_plugin_path() -> PathBuf { - // Return a path to a dummy plugin in target directory. - let package_dir = std::env::var("CARGO_MANIFEST_DIR").unwrap(); - // To get the plugin binary path we need to find the `target` directory which is 3 levels above the `Cargo.toml` file of the package // CARGO_MANIFEST_DIR == ./thin-edge.io/crates/core/plugin_sm - let dummy_plugin_path = PathBuf::from_str(package_dir.as_str()) - .unwrap() + let dummy_plugin_path = Path::new(env!("CARGO_MANIFEST_DIR")) .parent() //./thin-edge.io/crates/core/ .unwrap() .parent() // ./thin-edge.io/crates/ From 51408a0aac71d22998b5b6294f28a0134c016d81 Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Fri, 24 Jan 2025 16:22:25 +0000 Subject: [PATCH 14/16] Make `cargo-nextest` output less verbose --- justfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/justfile b/justfile index 3d6bb554605..d467398adc7 100644 --- a/justfile +++ b/justfile @@ -121,7 +121,7 @@ test *ARGS: # Run unit tests test-unit *ARGS: - cargo nextest run --no-fail-fast --all-features --all-targets {{ARGS}} + cargo nextest run --status-level fail --no-fail-fast --all-features --all-targets {{ARGS}} # Run doc tests test-docs *ARGS: From 529e71b9e1dc91a304ce6b1b7aa26447d357f91f Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Mon, 27 Jan 2025 12:20:06 +0000 Subject: [PATCH 15/16] Improve explanation of shutdown process --- crates/common/mqtt_channel/src/connection.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs index 1fa79f47a87..d6e0801fed8 100644 --- a/crates/common/mqtt_channel/src/connection.rs +++ b/crates/common/mqtt_channel/src/connection.rs @@ -215,24 +215,28 @@ impl Connection { let mut disconnect_permit = None; loop { + // Check if we are ready to disconnect. Due to ownership of the + // event loop, this needs to be done before we call + // `event_loop.poll()` let remaining_events_empty = event_loop.state.inflight() == 0; if disconnect_permit.is_some() && !triggered_disconnect && remaining_events_empty { + // `sender_loop` is not running and we have no remaining + // publishes to process let client = mqtt_client.clone(); tokio::spawn(async move { client.disconnect().await }); - // tokio::fs::write("/tmp/thing.txt",format!("{:#?}", &events.event_loop.state)).await.unwrap(); triggered_disconnect = true; } + let next_event = event_loop.poll(); let next_permit = permits.clone().acquire_owned(); tokio::pin!(next_event); tokio::pin!(next_permit); + let event = futures::future::select(next_event.as_mut(), next_permit.as_mut()).await; let event = match event { - Either::Left((event, _)) => { - disconnect_permit.take(); - event - } + Either::Left((event, _)) => event, Either::Right((permit, _)) => { + // The `sender_loop` has now concluded disconnect_permit = Some(permit.unwrap()); continue; } @@ -311,7 +315,7 @@ impl Connection { mut messages_receiver: mpsc::UnboundedReceiver, mut error_sender: mpsc::UnboundedSender, last_will: Option, - _guard: OwnedSemaphorePermit, + _disconnect_permit: OwnedSemaphorePermit, ) { while let Some(message) = messages_receiver.next().await { let payload = Vec::from(message.payload_bytes()); @@ -331,6 +335,9 @@ impl Connection { .publish(last_will.topic, last_will.qos, last_will.retain, payload) .await; } + + // At this point, `_disconnect_permit` is dropped + // This allows `receiver_loop` acquire a permit and commence the shutdown process } pub(crate) async fn do_pause() { From 24838098956ffcb44321cf40b9b35dfad3e6193d Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Mon, 27 Jan 2025 13:28:09 +0000 Subject: [PATCH 16/16] Simplify event polling using `tokio::select` --- crates/common/mqtt_channel/src/connection.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs index d6e0801fed8..72588d0e714 100644 --- a/crates/common/mqtt_channel/src/connection.rs +++ b/crates/common/mqtt_channel/src/connection.rs @@ -6,7 +6,6 @@ use crate::PubChannel; use crate::SubChannel; use futures::channel::mpsc; use futures::channel::oneshot; -use futures::future::Either; use futures::SinkExt; use futures::StreamExt; use log::error; @@ -227,15 +226,15 @@ impl Connection { triggered_disconnect = true; } - let next_event = event_loop.poll(); - let next_permit = permits.clone().acquire_owned(); - tokio::pin!(next_event); - tokio::pin!(next_permit); + let event = tokio::select! { + // If there is an event, we need to process that first + // Otherwise we risk shutting down early + // e.g. a `Publish` request from the sender is not "inflight" + // but will immediately be returned by `event_loop.poll()` + biased; - let event = futures::future::select(next_event.as_mut(), next_permit.as_mut()).await; - let event = match event { - Either::Left((event, _)) => event, - Either::Right((permit, _)) => { + event = event_loop.poll() => event, + permit = permits.clone().acquire_owned() => { // The `sender_loop` has now concluded disconnect_permit = Some(permit.unwrap()); continue;