From 32e836af57f12214c1afc20226a33ac2f17a09df Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Mon, 8 Jul 2024 14:58:44 +0100 Subject: [PATCH 1/8] Add debug logs for built-in bridge to run on OSADL device Signed-off-by: James Rhodes --- crates/extensions/tedge_mqtt_bridge/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index fd62a04c668..4a20e3f7f2d 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -36,6 +36,7 @@ use tedge_actors::RuntimeError; use tedge_actors::RuntimeRequest; use tedge_actors::RuntimeRequestSink; use tracing::info; +use tracing::log::debug; pub type MqttConfig = mqtt_channel::Config; @@ -329,6 +330,7 @@ async fn half_bridge( continue; } }; + debug!("Received notification ({name}) {notification:?}"); match notification { Event::Incoming(Incoming::ConnAck(_)) => { @@ -342,6 +344,7 @@ async fn half_bridge( // Forward messages from event loop to target Event::Incoming(Incoming::Publish(publish)) => { + dbg!(name, publish.pkid); if let Some(publish) = loop_breaker.ensure_not_looped(publish).await { if let Some(topic) = transformer.convert_topic(&publish.topic) { target @@ -378,6 +381,7 @@ async fn half_bridge( Some(Some((topic, msg))) => { loop_breaker.forward_on_topic(topic, &msg); forward_pkid_to_received_msg.insert(pkid, msg); + dbg!(name, &forward_pkid_to_received_msg); } // A healthcheck message was published, ignore this packet id From c0b4595c5815f3cdd426766496b2dcb216b7a5a1 Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Tue, 16 Jul 2024 14:02:45 +0100 Subject: [PATCH 2/8] Add test to reproduce error after disconnection --- .../tedge_mqtt_bridge/tests/bridge.rs | 75 ++++++++++++++++++- 1 file changed, 72 insertions(+), 3 deletions(-) diff --git a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs index 4196375fad4..c23248e8ee3 100644 --- a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs +++ b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs @@ -28,7 +28,7 @@ use tokio::time::timeout; use tracing::info; use tracing::warn; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(3); fn new_broker_and_client(name: &str, port: u16) -> (AsyncClient, EventLoop) { let mut broker = Broker::new(get_rumqttd_config(port)); @@ -64,7 +64,7 @@ const HEALTH: &str = "te/device/main/#"; // TODO acknowledgement with lost connection bridge, check we acknowledge the correct message #[tokio::test] async fn bridge_many_messages() { - std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info"); + std::env::set_var("RUST_LOG", "rumqttd=debug,tedge_mqtt_bridge=info"); let _ = env_logger::try_init(); let local_broker_port = free_port().await; let cloud_broker_port = free_port().await; @@ -116,6 +116,75 @@ async fn bridge_many_messages() { next_received_message(&mut local).await.unwrap(); } +#[tokio::test] +async fn bridge_disconnect_while_sending() { + std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info"); + let _ = env_logger::try_init(); + let local_broker_port = free_port().await; + let cloud_broker_port = free_port().await; + let (local, mut ev_local) = new_broker_and_client("local", local_broker_port); + let (cloud, mut ev_cloud) = new_broker_and_client("cloud", cloud_broker_port); + + // We can't easily restart rumqttd, so instead, we'll connect via a proxy + // that we can interrupt the connection of + let cloud_proxy = Proxy::start(cloud_broker_port).await; + + let mut rules = BridgeConfig::new(); + rules.forward_from_local("s/us", "c8y/", "").unwrap(); + rules.forward_from_remote("s/ds", "c8y/", "").unwrap(); + + start_mqtt_bridge(local_broker_port, cloud_proxy.port, rules).await; + + local.subscribe(HEALTH, QoS::AtLeastOnce).await.unwrap(); + + wait_until_health_status_is("up", &mut ev_local) + .await + .unwrap(); + + local.unsubscribe(HEALTH).await.unwrap(); + local.subscribe("c8y/s/ds", QoS::AtLeastOnce).await.unwrap(); + await_subscription(&mut ev_local).await; + cloud.subscribe("s/us", QoS::AtLeastOnce).await.unwrap(); + await_subscription(&mut ev_cloud).await; + + let poll_local = EventPoller::run_in_bg(ev_local); + + // Verify messages are forwarded from cloud to local + for i in 1..10000 { + local + .publish( + "c8y/s/us", + QoS::AtMostOnce, + false, + format!("a,fake,smartrest,message{i}"), + ) + .await + .unwrap(); + } + cloud_proxy.interrupt_connections(); + let _ev_cloud = EventPoller::run_in_bg(ev_cloud); + for _ in 1..10000 { + local + .publish( + "c8y/s/us", + QoS::AtMostOnce, + false, + "a,fake,smartrest,message", + ) + .await + .unwrap(); + } + + // tokio::time::sleep(Duration::from_secs(5)).await; + let mut local = poll_local.stop_polling().await; + cloud + .publish("s/ds", QoS::AtLeastOnce, true, "test") + .await + .unwrap(); + + next_received_message(&mut local).await.unwrap(); +} + #[tokio::test] async fn bridge_reconnects_successfully_after_cloud_connection_interrupted() { std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info"); @@ -398,7 +467,7 @@ impl Proxy { }; write_socket.shutdown().await.unwrap(); - write_conn.shutdown().await.unwrap(); + let _ = write_conn.shutdown().await; //.await.unwrap(); }); } } From 6597b4f423dd2ceae1e852f228f112ad17621e05 Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Wed, 17 Jul 2024 09:12:38 +0200 Subject: [PATCH 3/8] Ignore subsequent attempts to forward a message A given MQTT message might be published more than once, notably after a reconnect. For each attempt the rumqttc crate notifies an `Outgoing::Publish(pkid)` event. The first time such an event is received for a given `pkid`, the built-in bridge has to map this `pkid` to the forwarded message (so it will be able to properly acknowledge it later). However, when an acknoledgement is already expected for that `pkid` it means that the `Outgoing::Publish(pkid)` event must be ignored. Failing to do so introduces a shift in the mapping of in and out pkids, and, in the worse case, this blocks the built-in bridge as there is no pending message to associate to. Signed-off-by: Didier Wenzek --- .../extensions/tedge_mqtt_bridge/src/lib.rs | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index 4a20e3f7f2d..cd49d1657d5 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -22,6 +22,7 @@ use rumqttc::Publish; use rumqttc::SubscribeFilter; use rumqttc::Transport; use std::borrow::Cow; +use std::collections::hash_map; use std::collections::HashMap; use std::collections::VecDeque; use std::convert::Infallible; @@ -376,20 +377,26 @@ async fn half_bridge( } // Keep track of packet IDs so we can acknowledge messages - Event::Outgoing(Outgoing::Publish(pkid)) => match companion_bridge_half.recv().await { - // A message was forwarded by the other bridge half, note the packet id - Some(Some((topic, msg))) => { - loop_breaker.forward_on_topic(topic, &msg); - forward_pkid_to_received_msg.insert(pkid, msg); - dbg!(name, &forward_pkid_to_received_msg); + Event::Outgoing(Outgoing::Publish(pkid)) => { + if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) { + match companion_bridge_half.recv().await { + // A message was forwarded by the other bridge half, note the packet id + Some(Some((topic, msg))) => { + loop_breaker.forward_on_topic(topic, &msg); + e.insert(msg); + dbg!(name, &forward_pkid_to_received_msg); + } + + // A healthcheck message was published, ignore this packet id + Some(None) => {} + + // The other bridge half has disconnected, break the loop and shut down the bridge + None => break, + } + } else { + info!("Bridge cloud connection {name} ignoring already known pkid={pkid}"); } - - // A healthcheck message was published, ignore this packet id - Some(None) => {} - - // The other bridge half has disconnected, break the loop and shut down the bridge - None => break, - }, + } _ => {} } } From 8e5b7962a9f58455082076051bf6d4bd25866360 Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Wed, 17 Jul 2024 15:07:43 +0100 Subject: [PATCH 4/8] Remove redundant log messages These messages were not very informative, the first gives us the same information as the `debug` level notification log and the other spits out a large map, but it's very difficult to mentally piece together into anything useful, and makes it very hard to identify the specific messages in play as the payload isn't included in the output. Signed-off-by: James Rhodes --- crates/extensions/tedge_mqtt_bridge/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index cd49d1657d5..e9686cc4044 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -345,7 +345,6 @@ async fn half_bridge( // Forward messages from event loop to target Event::Incoming(Incoming::Publish(publish)) => { - dbg!(name, publish.pkid); if let Some(publish) = loop_breaker.ensure_not_looped(publish).await { if let Some(topic) = transformer.convert_topic(&publish.topic) { target @@ -384,7 +383,6 @@ async fn half_bridge( Some(Some((topic, msg))) => { loop_breaker.forward_on_topic(topic, &msg); e.insert(msg); - dbg!(name, &forward_pkid_to_received_msg); } // A healthcheck message was published, ignore this packet id From 3e0f4fc99e145cc960ade2603ffec1b9381fda6c Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Fri, 26 Jul 2024 17:02:01 +0100 Subject: [PATCH 5/8] Address review comments Signed-off-by: James Rhodes --- crates/extensions/tedge_mqtt_bridge/src/lib.rs | 2 +- crates/extensions/tedge_mqtt_bridge/tests/bridge.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index e9686cc4044..538d7485823 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -37,7 +37,7 @@ use tedge_actors::RuntimeError; use tedge_actors::RuntimeRequest; use tedge_actors::RuntimeRequestSink; use tracing::info; -use tracing::log::debug; +use tracing::debug; pub type MqttConfig = mqtt_channel::Config; diff --git a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs index c23248e8ee3..36070f02b6a 100644 --- a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs +++ b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs @@ -175,7 +175,6 @@ async fn bridge_disconnect_while_sending() { .unwrap(); } - // tokio::time::sleep(Duration::from_secs(5)).await; let mut local = poll_local.stop_polling().await; cloud .publish("s/ds", QoS::AtLeastOnce, true, "test") @@ -467,7 +466,7 @@ impl Proxy { }; write_socket.shutdown().await.unwrap(); - let _ = write_conn.shutdown().await; //.await.unwrap(); + let _ = write_conn.shutdown().await; }); } } From 7d7883ab80711d3ba741710ab20a17c6eb8b6073 Mon Sep 17 00:00:00 2001 From: James Rhodes Date: Fri, 26 Jul 2024 17:07:12 +0100 Subject: [PATCH 6/8] Run formatter Signed-off-by: James Rhodes --- crates/extensions/tedge_mqtt_bridge/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index 538d7485823..20e5e81381e 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -36,8 +36,8 @@ use tedge_actors::NullSender; use tedge_actors::RuntimeError; use tedge_actors::RuntimeRequest; use tedge_actors::RuntimeRequestSink; -use tracing::info; use tracing::debug; +use tracing::info; pub type MqttConfig = mqtt_channel::Config; From 3ef5a5344b6a83876f6daadac0d1cc79605936c2 Mon Sep 17 00:00:00 2001 From: Reuben Miller Date: Mon, 29 Jul 2024 17:33:24 +0200 Subject: [PATCH 7/8] improve debugging during assertion of local client Signed-off-by: Reuben Miller --- .../tedge_mqtt_bridge/tests/bridge.rs | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs index 36070f02b6a..b0fe13f0faf 100644 --- a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs +++ b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs @@ -250,7 +250,7 @@ async fn bridge_reconnects_successfully_after_cloud_connection_interrupted() { #[tokio::test] async fn bridge_reconnects_successfully_after_local_connection_interrupted() { - std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info"); + std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info,bridge=info"); let _ = env_logger::try_init(); let local_broker_port = free_port().await; let cloud_broker_port = free_port().await; @@ -551,12 +551,57 @@ async fn await_subscription(event_loop: &mut EventLoop) { async fn next_received_message(event_loop: &mut EventLoop) -> anyhow::Result { loop { - if let Ok(Event::Incoming(Incoming::Publish(publish))) = - timeout(DEFAULT_TIMEOUT, event_loop.poll()) - .await - .context("timed-out waiting for received message")? - { - break Ok(publish); + let response = timeout(DEFAULT_TIMEOUT, event_loop.poll()) + .await + .context("timed-out waiting for received message")?; + + match response { + // Incoming messages + Ok(Event::Incoming(Incoming::Publish(publish))) => break Ok(publish), + Ok(Event::Incoming(Incoming::ConnAck(v))) => { + info!("Incoming::ConnAck: ({:?}, {})", v.code, v.session_present) + } + Ok(Event::Incoming(Incoming::Connect(v))) => info!( + "Incoming::Connect: client_id={}, clean_session={}", + v.client_id, v.clean_session + ), + Ok(Event::Incoming(Incoming::Disconnect)) => info!("Incoming::Disconnect"), + Ok(Event::Incoming(Incoming::PingReq)) => info!("Incoming::PingReq"), + Ok(Event::Incoming(Incoming::PingResp)) => info!("Incoming::PingResp"), + Ok(Event::Incoming(Incoming::PubAck(v))) => info!("Incoming::PubAck: pkid={}", v.pkid), + Ok(Event::Incoming(Incoming::PubComp(v))) => { + info!("Incoming::PubComp: pkid={}", v.pkid) + } + Ok(Event::Incoming(Incoming::PubRec(v))) => info!("Incoming::PubRec: pkid={}", v.pkid), + Ok(Event::Incoming(Incoming::PubRel(v))) => info!("Incoming::PubRel: pkid={}", v.pkid), + Ok(Event::Incoming(Incoming::SubAck(v))) => info!("Incoming::SubAck: pkid={}", v.pkid), + Ok(Event::Incoming(Incoming::Subscribe(v))) => { + info!("Incoming::Subscribe: pkid={}", v.pkid) + } + Ok(Event::Incoming(Incoming::UnsubAck(v))) => { + info!("Incoming::UnsubAck: pkid={}", v.pkid) + } + Ok(Event::Incoming(Incoming::Unsubscribe(v))) => { + info!("Incoming::Unsubscribe: pkid={}", v.pkid) + } + + // Outgoing messages + Ok(Event::Outgoing(Outgoing::PingReq)) => info!("Outgoing::PingReq"), + Ok(Event::Outgoing(Outgoing::PingResp)) => info!("Outgoing::PingResp"), + Ok(Event::Outgoing(Outgoing::Publish(v))) => info!("Outgoing::Publish: pkid={v}"), + Ok(Event::Outgoing(Outgoing::Subscribe(v))) => info!("Outgoing::Subscribe: pkid={v}"), + Ok(Event::Outgoing(Outgoing::Unsubscribe(v))) => { + info!("outgoing Unsubscribe: pkid={v}") + } + Ok(Event::Outgoing(Outgoing::PubAck(v))) => info!("Outgoing::PubAck: pkid={v}"), + Ok(Event::Outgoing(Outgoing::PubRec(v))) => info!("Outgoing::PubRec: pkid={v}"), + Ok(Event::Outgoing(Outgoing::PubRel(v))) => info!("Outgoing::PubRel: pkid={v}"), + Ok(Event::Outgoing(Outgoing::PubComp(v))) => info!("Outgoing::PubComp: pkid={v}"), + Ok(Event::Outgoing(Outgoing::Disconnect)) => info!("Outgoing::Disconnect"), + Ok(Event::Outgoing(Outgoing::AwaitAck(v))) => info!("Outgoing::AwaitAck: pkid={v}"), + Err(err) => { + info!("Connection error (ignoring). {err}"); + } } } } From 1cab1a7fe203ed7a86660b4ee966946cb952bb46 Mon Sep 17 00:00:00 2001 From: Reuben Miller Date: Mon, 29 Jul 2024 17:36:14 +0200 Subject: [PATCH 8/8] add workaround for a flaky test Signed-off-by: Reuben Miller --- crates/extensions/tedge_mqtt_bridge/tests/bridge.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs index b0fe13f0faf..eb5ce6dfe56 100644 --- a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs +++ b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs @@ -24,6 +24,7 @@ use tedge_test_utils::fs::TempTedgeDir; use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::sync::oneshot; +use tokio::time::sleep; use tokio::time::timeout; use tracing::info; use tracing::warn; @@ -272,6 +273,13 @@ async fn bridge_reconnects_successfully_after_local_connection_interrupted() { wait_until_health_status_is("up", &mut ev_local) .await .unwrap(); + + // TODO (flaky): Investigate why adding this sleep makes the test more reliable. + // Current theory: If a sub-ack is not received, then the subscription + // is not remembered by the client and not resubscribed after + // a connection outage + sleep(Duration::from_millis(100)).await; + local_proxy.interrupt_connections(); wait_until_health_status_is("up", &mut ev_local) .await