Skip to content

Commit

Permalink
Ignore subsequent attempts to forward a message
Browse files Browse the repository at this point in the history
A given MQTT message might be published more than once, notably after a reconnect.
For each attempt the rumqttc crate notifies an `Outgoing::Publish(pkid)` event.
The first time such an event is received for a given `pkid`, the built-in bridge
has to map this `pkid` to the forwarded message (so it will be able to properly acknowledge it later).
However, when an acknoledgement is already expected for that `pkid` it
means that the `Outgoing::Publish(pkid)` event must be ignored.
Failing to do so introduces a shift in the mapping of in and out pkids,
and, in the worse case, this blocks the built-in bridge as there is no
pending message to associate to.

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Jul 17, 2024
1 parent d5e3c37 commit fc44a98
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions crates/extensions/tedge_mqtt_bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use rumqttc::Publish;
use rumqttc::SubscribeFilter;
use rumqttc::Transport;
use std::borrow::Cow;
use std::collections::hash_map;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::convert::Infallible;
Expand Down Expand Up @@ -376,20 +377,26 @@ async fn half_bridge(
}

// Keep track of packet IDs so we can acknowledge messages
Event::Outgoing(Outgoing::Publish(pkid)) => match companion_bridge_half.recv().await {
// A message was forwarded by the other bridge half, note the packet id
Some(Some((topic, msg))) => {
loop_breaker.forward_on_topic(topic, &msg);
forward_pkid_to_received_msg.insert(pkid, msg);
dbg!(name, &forward_pkid_to_received_msg);
Event::Outgoing(Outgoing::Publish(pkid)) => {
if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) {
match companion_bridge_half.recv().await {
// A message was forwarded by the other bridge half, note the packet id
Some(Some((topic, msg))) => {
loop_breaker.forward_on_topic(topic, &msg);
e.insert(msg);
dbg!(name, &forward_pkid_to_received_msg);
}

// A healthcheck message was published, ignore this packet id
Some(None) => {}

// The other bridge half has disconnected, break the loop and shut down the bridge
None => break,
}
} else {
info!("Bridge cloud connection {name} ignoring already known pkid={pkid}");
}

// A healthcheck message was published, ignore this packet id
Some(None) => {}

// The other bridge half has disconnected, break the loop and shut down the bridge
None => break,
},
}
_ => {}
}
}
Expand Down

0 comments on commit fc44a98

Please sign in to comment.