From fc44a98490c0abe9aeaf7eb055ba5c807098b37c Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Wed, 17 Jul 2024 09:12:38 +0200 Subject: [PATCH] Ignore subsequent attempts to forward a message A given MQTT message might be published more than once, notably after a reconnect. For each attempt the rumqttc crate notifies an `Outgoing::Publish(pkid)` event. The first time such an event is received for a given `pkid`, the built-in bridge has to map this `pkid` to the forwarded message (so it will be able to properly acknowledge it later). However, when an acknoledgement is already expected for that `pkid` it means that the `Outgoing::Publish(pkid)` event must be ignored. Failing to do so introduces a shift in the mapping of in and out pkids, and, in the worse case, this blocks the built-in bridge as there is no pending message to associate to. Signed-off-by: Didier Wenzek --- .../extensions/tedge_mqtt_bridge/src/lib.rs | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index 4a20e3f7f2d..cd49d1657d5 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -22,6 +22,7 @@ use rumqttc::Publish; use rumqttc::SubscribeFilter; use rumqttc::Transport; use std::borrow::Cow; +use std::collections::hash_map; use std::collections::HashMap; use std::collections::VecDeque; use std::convert::Infallible; @@ -376,20 +377,26 @@ async fn half_bridge( } // Keep track of packet IDs so we can acknowledge messages - Event::Outgoing(Outgoing::Publish(pkid)) => match companion_bridge_half.recv().await { - // A message was forwarded by the other bridge half, note the packet id - Some(Some((topic, msg))) => { - loop_breaker.forward_on_topic(topic, &msg); - forward_pkid_to_received_msg.insert(pkid, msg); - dbg!(name, &forward_pkid_to_received_msg); + Event::Outgoing(Outgoing::Publish(pkid)) => { + if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) { + match companion_bridge_half.recv().await { + // A message was forwarded by the other bridge half, note the packet id + Some(Some((topic, msg))) => { + loop_breaker.forward_on_topic(topic, &msg); + e.insert(msg); + dbg!(name, &forward_pkid_to_received_msg); + } + + // A healthcheck message was published, ignore this packet id + Some(None) => {} + + // The other bridge half has disconnected, break the loop and shut down the bridge + None => break, + } + } else { + info!("Bridge cloud connection {name} ignoring already known pkid={pkid}"); } - - // A healthcheck message was published, ignore this packet id - Some(None) => {} - - // The other bridge half has disconnected, break the loop and shut down the bridge - None => break, - }, + } _ => {} } }