diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index 4a20e3f7f2d..cd49d1657d5 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -22,6 +22,7 @@ use rumqttc::Publish; use rumqttc::SubscribeFilter; use rumqttc::Transport; use std::borrow::Cow; +use std::collections::hash_map; use std::collections::HashMap; use std::collections::VecDeque; use std::convert::Infallible; @@ -376,20 +377,26 @@ async fn half_bridge( } // Keep track of packet IDs so we can acknowledge messages - Event::Outgoing(Outgoing::Publish(pkid)) => match companion_bridge_half.recv().await { - // A message was forwarded by the other bridge half, note the packet id - Some(Some((topic, msg))) => { - loop_breaker.forward_on_topic(topic, &msg); - forward_pkid_to_received_msg.insert(pkid, msg); - dbg!(name, &forward_pkid_to_received_msg); + Event::Outgoing(Outgoing::Publish(pkid)) => { + if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) { + match companion_bridge_half.recv().await { + // A message was forwarded by the other bridge half, note the packet id + Some(Some((topic, msg))) => { + loop_breaker.forward_on_topic(topic, &msg); + e.insert(msg); + dbg!(name, &forward_pkid_to_received_msg); + } + + // A healthcheck message was published, ignore this packet id + Some(None) => {} + + // The other bridge half has disconnected, break the loop and shut down the bridge + None => break, + } + } else { + info!("Bridge cloud connection {name} ignoring already known pkid={pkid}"); } - - // A healthcheck message was published, ignore this packet id - Some(None) => {} - - // The other bridge half has disconnected, break the loop and shut down the bridge - None => break, - }, + } _ => {} } }