diff --git a/Cargo.lock b/Cargo.lock index d02f114f661..882a4e36bce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4094,6 +4094,7 @@ dependencies = [ "mockall", "mqtt_channel", "mutants", + "pin-project", "rcgen", "rumqttc", "rumqttd", diff --git a/crates/extensions/tedge_mqtt_bridge/Cargo.toml b/crates/extensions/tedge_mqtt_bridge/Cargo.toml index 415810678c0..549260c5ea1 100644 --- a/crates/extensions/tedge_mqtt_bridge/Cargo.toml +++ b/crates/extensions/tedge_mqtt_bridge/Cargo.toml @@ -21,6 +21,7 @@ certificate = { workspace = true } futures = { workspace = true } mqtt_channel = { workspace = true } mutants = { workspace = true } +pin-project = { workspace = true } rumqttc = { workspace = true } tedge_actors = { workspace = true } tedge_config = { workspace = true } diff --git a/crates/extensions/tedge_mqtt_bridge/src/health.rs b/crates/extensions/tedge_mqtt_bridge/src/health.rs index e24284c0d8e..02f1c7db003 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/health.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/health.rs @@ -71,6 +71,7 @@ type NotificationRes = Result; /// A client for [BridgeHealthMonitor] /// /// This is used by each bridge half to log and notify the monitor of health status updates +#[derive(Debug)] pub struct BridgeHealth { name: &'static str, tx_health: mpsc::Sender<(&'static str, Status)>, @@ -104,7 +105,8 @@ impl BridgeHealth { } self.last_err = err; let status = self.last_err.as_ref().map_or(Status::Up, |_| Status::Down); - self.tx_health.send((name, status)).await.unwrap() + let mut tx_health = self.tx_health.clone(); + tokio::spawn(async move { tx_health.send((name, status)).await.unwrap() }); } } } diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index 41f5059899a..d562b43dab0 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -22,10 +22,14 @@ use rumqttc::Publish; use rumqttc::SubscribeFilter; use rumqttc::Transport; use std::borrow::Cow; -use std::collections::hash_map; use std::collections::HashMap; use std::collections::VecDeque; use std::convert::Infallible; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use std::time::Duration; use std::time::Instant; use tedge_actors::futures::channel::mpsc; @@ -169,12 +173,14 @@ fn bidirectional_channel(buffer: usize) -> [BidirectionalChannelHalf; 2] { ] } +#[derive(Debug)] struct BidirectionalChannelHalf { tx: mpsc::Sender, rx: mpsc::Receiver, } impl<'a, T> BidirectionalChannelHalf { + #[allow(unused)] pub fn send(&'a mut self, item: T) -> futures::sink::Send<'a, mpsc::Sender, T> { self.tx.send(item) } @@ -188,6 +194,483 @@ impl<'a, T> BidirectionalChannelHalf { } } +#[derive(Debug)] +enum MutOrOwned<'a, T> { + Mut(&'a mut T), + Owned(T), +} + +impl<'a, T: Clone> MutOrOwned<'a, T> { + fn into_owned(self) -> T { + match self { + Self::Mut(mut_t) => (*mut_t).clone(), + Self::Owned(owned_t) => owned_t, + } + } +} + +impl<'a, T> std::ops::Deref for MutOrOwned<'a, T> { + type Target = T; + fn deref(&self) -> &T { + match self { + Self::Mut(mmut) => mmut, + Self::Owned(owned) => &owned, + } + } +} +impl<'a, T> std::ops::DerefMut for MutOrOwned<'a, T> { + fn deref_mut(&mut self) -> &mut T { + match self { + Self::Mut(mmut) => mmut, + Self::Owned(ref mut owned) => owned, + } + } +} + +#[derive(Debug)] +struct ForwardMessageState<'a> { + topic: Cow<'a, str>, + publish: Cow<'a, Publish>, + target: Cow<'a, AsyncClient>, + companion_bridge_half: MutOrOwned<'a, mpsc::Sender>>, +} + +impl<'a> ForwardMessageState<'a> { + async fn process(mut self) { + self.target + .publish( + self.topic.clone(), + self.publish.qos, + self.publish.retain, + self.publish.payload.clone(), + ) + .await + .unwrap(); + self.companion_bridge_half + .send(Some((self.topic.into_owned(), self.publish.into_owned()))) + .await + .unwrap(); + } +} + +#[derive(Debug)] +struct SubscribeToState<'a> { + recv_client: Cow<'a, AsyncClient>, + topics: Vec, +} + +impl<'a> SubscribeToState<'a> { + fn to_static(self) -> SubscribeToState<'static> { + SubscribeToState { + recv_client: Cow::Owned(self.recv_client.into_owned()), + topics: self.topics, + } + } + + async fn process(self) { + self.recv_client.subscribe_many(self.topics).await.unwrap(); + } +} + +#[derive(Debug)] +struct AwaitForwardedMessageState<'a> { + companion_bridge_half: &'a mut BidirectionalChannelHalf>, + pkid: u16, +} + +#[derive(Debug)] +enum Published { + Forwarded { + topic: String, + original_message: Publish, + pkid: u16, + }, + Healthcheck(u16), + ChannelClosed, +} + +impl<'a> AwaitForwardedMessageState<'a> { + async fn process(self) -> Published { + match self.companion_bridge_half.recv().await { + Some(Some((topic, original_message))) => Published::Forwarded { + topic, + original_message, + pkid: self.pkid, + }, + Some(None) => Published::Healthcheck(self.pkid), + None => Published::ChannelClosed, + } + } +} + +#[derive(Debug)] +struct ForwardAckState<'a> { + target: Cow<'a, AsyncClient>, + message: Publish, +} + +struct UpdateHealthMessageState<'a> { + bridge_health: &'a mut BridgeHealth, + event_loop_result: Result, +} + +impl<'a> UpdateHealthMessageState<'a> { + async fn process(self) -> Result { + self.bridge_health.update(&self.event_loop_result).await; + self.event_loop_result + } +} + +impl<'a> fmt::Debug for UpdateHealthMessageState<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("UpdateHealthMessageState") + .field("bridge_health", &self.bridge_health) + .field("event_loop_result_is_ok", &self.event_loop_result.is_ok()) + .finish() + } +} + +struct EnsureMessageNotLoopedState<'a> { + loop_breaker: &'a mut MessageLoopBreaker, + publish: Publish, +} + +impl<'a> fmt::Debug for EnsureMessageNotLoopedState<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EnsureMessageNotLoopedState") + .field("loop_breaker", &"...") + .field("publish", &self.publish) + .finish() + } +} + +impl<'a> EnsureMessageNotLoopedState<'a> { + async fn process(self) -> Option { + self.loop_breaker.ensure_not_looped(self.publish).await + } +} + +impl<'a> ForwardAckState<'a> { + fn to_static(self) -> ForwardAckState<'static> { + ForwardAckState { + target: Cow::Owned(self.target.into_owned()), + message: self.message, + } + } + + async fn process(self) { + self.target.ack(&self.message).await.unwrap(); + } +} + +impl<'a> ForwardMessageState<'a> { + fn to_static(self) -> ForwardMessageState<'static> { + ForwardMessageState { + target: Cow::Owned(self.target.into_owned()), + topic: Cow::Owned(self.topic.into_owned()), + publish: Cow::Owned(self.publish.into_owned()), + companion_bridge_half: MutOrOwned::Owned(self.companion_bridge_half.into_owned()), + } + } +} + +enum BridgeState<'a> { + PollRumqttc(&'a mut EventLoop), + ForwardMessage(ForwardMessageState<'a>), + AwaitForwardedMessage(AwaitForwardedMessageState<'a>), + ForwardAck(ForwardAckState<'a>), + UpdateHealthMessage(UpdateHealthMessageState<'a>), + Backoff(Duration), + EnsureMessageNotLooped(EnsureMessageNotLoopedState<'a>), + SubscribeTo(SubscribeToState<'a>), +} + +impl<'a> fmt::Debug for BridgeState<'a> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::PollRumqttc(_) => f.write_str("PollRumqttc"), + Self::ForwardMessage(fw) => f.debug_tuple("ForwardMessage").field(&fw).finish(), + Self::AwaitForwardedMessage(fw) => { + f.debug_tuple("AwaitForwardedMessage").field(&fw).finish() + } + Self::ForwardAck(fw) => f.debug_tuple("ForwardAck").field(&fw).finish(), + Self::UpdateHealthMessage(fw) => { + f.debug_tuple("UpdateHealthMessage").field(&fw).finish() + } + Self::Backoff(dur) => f.debug_tuple("Backoff").field(&dur).finish(), + Self::EnsureMessageNotLooped(mnl) => { + f.debug_tuple("EnsureMessageNotLooped").field(&mnl).finish() + } + Self::SubscribeTo(mnl) => f.debug_tuple("SubscribeTo").field(&mnl).finish(), + } + } +} + +#[derive(Debug)] +enum BridgeResponse { + PollRumqttc, + ReceivedEvent(Result), + UpdatedHealth(Result), + ReadyToForward(Publish), + ProcessedPublish(Published), +} + +impl<'a> BridgeState<'a> { + async fn process(self) -> BridgeResponse { + use BridgeResponse as Res; + match self { + Self::PollRumqttc(recv_event_loop) => Res::ReceivedEvent(recv_event_loop.poll().await), + Self::ForwardMessage(forward) => { + tokio::spawn(forward.to_static().process()); + Res::PollRumqttc + } + Self::AwaitForwardedMessage(await_forward) => { + Res::ProcessedPublish(await_forward.process().await) + } + Self::ForwardAck(forward_ack) => { + tokio::spawn(forward_ack.to_static().process()); + Res::PollRumqttc + } + Self::UpdateHealthMessage(update_health) => { + Res::UpdatedHealth(update_health.process().await) + } + Self::Backoff(time) => { + tokio::time::sleep(time).await; + Res::PollRumqttc + } + Self::EnsureMessageNotLooped(ensure) => match ensure.process().await { + Some(publish) => Res::ReadyToForward(publish), + None => Res::PollRumqttc, + }, + Self::SubscribeTo(subscribe) => { + tokio::spawn(subscribe.to_static().process()); + Res::PollRumqttc + } + } + } + + async fn process_with_timeout(self, timeout: Duration) -> BridgeResponse { + match &self { + Self::PollRumqttc(_) | Self::Backoff(_) => self.process().await, + _ => { + let error = format!("Waiting on {self:?}"); + LogLongRunning::new(self.process(), timeout, error).await + } + } + } +} + +#[pin_project::pin_project] +struct LogLongRunning { + #[pin] + fut: F, + #[pin] + sleep: tokio::time::Sleep, + start: Option, + log_after: Duration, + logged: bool, + msg: String, +} + +impl LogLongRunning { + fn new(fut: F, log_after: Duration, msg: String) -> Self { + Self { + fut, + log_after, + sleep: tokio::time::sleep(log_after), + start: None, + logged: false, + msg, + } + } +} + +impl Future for LogLongRunning { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let start = this.start.get_or_insert_with(Instant::now); + if !*this.logged { + let _ = this.sleep.poll(cx); + if !*this.logged && start.elapsed() > *this.log_after { + tracing::warn!( + "A task has been waiting for a long time ({:?}): {}", + this.log_after, + this.msg + ); + *this.logged = true; + } + } + this.fut.poll(cx) + } +} + +struct HalfBridge { + recv_event_loop: EventLoop, + target: AsyncClient, + recv_client: AsyncClient, + transformer: TopicConverter, + companion_bridge_half: BidirectionalChannelHalf>, + name: &'static str, + topics: Vec, + backoff: CustomBackoff<::backoff::SystemClock>, + bridge_health: BridgeHealth, + loop_breaker: MessageLoopBreaker, + forward_pkid_to_received_msg: HashMap>, +} + +impl HalfBridge { + fn new( + recv_event_loop: EventLoop, + target: AsyncClient, + recv_client: AsyncClient, + transformer: TopicConverter, + bidirectional_topic_filters: Vec>, + companion_bridge_half: BidirectionalChannelHalf>, + tx_health: mpsc::Sender<(&'static str, Status)>, + name: &'static str, + topics: Vec, + reconnect_policy: TEdgeConfigReaderMqttBridgeReconnectPolicy, + ) -> Self { + let backoff = CustomBackoff::new( + ::backoff::SystemClock {}, + reconnect_policy.initial_interval.duration(), + reconnect_policy.maximum_interval.duration(), + reconnect_policy.reset_window.duration(), + ); + Self { + recv_event_loop, + target, + loop_breaker: MessageLoopBreaker::new(recv_client.clone(), bidirectional_topic_filters), + recv_client, + transformer, + companion_bridge_half, + name, + topics, + backoff, + forward_pkid_to_received_msg: <_>::default(), + bridge_health: BridgeHealth::new(name, tx_health), + } + } + + async fn run(&mut self) { + let mut input = BridgeResponse::PollRumqttc; + let name = self.name; + loop { + let state = self.process(std::mem::replace(&mut input, BridgeResponse::PollRumqttc)); + debug!("{name} state = {state:?}"); + input = state.process_with_timeout(Duration::from_secs(2)).await; + debug!("{name} input = {input:?}"); + } + } + + fn process(&mut self, message: BridgeResponse) -> BridgeState<'_> { + match message { + BridgeResponse::PollRumqttc => { + return BridgeState::PollRumqttc(&mut self.recv_event_loop) + } + BridgeResponse::ReceivedEvent(event_loop_result) => { + return BridgeState::UpdateHealthMessage(UpdateHealthMessageState { + bridge_health: &mut self.bridge_health, + event_loop_result, + }) + } + BridgeResponse::UpdatedHealth(Ok(notification)) => { + self.backoff.mark_success(); + match notification { + Event::Incoming(Incoming::ConnAck(_)) => { + return BridgeState::SubscribeTo(SubscribeToState { + recv_client: Cow::Borrowed(&self.recv_client), + topics: self.topics.clone(), + }) + } + Event::Incoming(Incoming::Publish(publish)) => { + return BridgeState::EnsureMessageNotLooped(EnsureMessageNotLoopedState { + loop_breaker: &mut self.loop_breaker, + publish, + }) + } + + Event::Incoming( + Incoming::PubAck(PubAck { pkid: ack_pkid }) + | Incoming::PubRec(PubRec { pkid: ack_pkid }), + ) => { + if let Some(Some(msg)) = self.forward_pkid_to_received_msg.remove(&ack_pkid) + { + return BridgeState::ForwardAck(self.forward_ack(msg)); + } else { + debug!("Not forwarding ack for {ack_pkid}"); + } + } + Event::Outgoing(Outgoing::Publish(pkid)) => { + if !self.forward_pkid_to_received_msg.contains_key(&pkid) { + return BridgeState::AwaitForwardedMessage( + AwaitForwardedMessageState { + companion_bridge_half: &mut self.companion_bridge_half, + pkid, + }, + ); + } else { + debug!("Received duplicate message for {pkid}"); + } + } + event => debug!("MQTT bridge ignoring event {event:?}"), + } + } + BridgeResponse::UpdatedHealth(Err(_)) => { + let time = self.backoff.backoff(); + if !time.is_zero() { + info!( + "Waiting {time:?} until attempting reconnection to {} broker", + self.name + ); + return BridgeState::Backoff(time); + } + } + BridgeResponse::ReadyToForward(publish) => { + if let Some(topic) = self.transformer.convert_topic(&publish.topic) { + return BridgeState::ForwardMessage( + self.forward(topic.into_owned(), publish.clone()), + ); + } + } + BridgeResponse::ProcessedPublish(Published::Forwarded { + topic, + original_message, + pkid, + }) => { + self.loop_breaker.forward_on_topic(topic, &original_message); + self.forward_pkid_to_received_msg + .insert(pkid, Some(original_message)); + } + BridgeResponse::ProcessedPublish(Published::Healthcheck(pkid)) => { + self.forward_pkid_to_received_msg.insert(pkid, None); + } + BridgeResponse::ProcessedPublish(Published::ChannelClosed) => { + todo!("Clean shutdown") + } + } + self.process(BridgeResponse::PollRumqttc) + } + + fn forward<'a>(&'a mut self, topic: String, publish: Publish) -> ForwardMessageState<'a> { + ForwardMessageState { + topic: Cow::Owned(topic), + publish: Cow::Owned(publish), + target: Cow::Borrowed(&self.target), + companion_bridge_half: MutOrOwned::Mut(&mut self.companion_bridge_half.tx), + } + } + + fn forward_ack<'a>(&'a self, message: Publish) -> ForwardAckState<'a> { + ForwardAckState { + target: Cow::Borrowed(&self.target), + message, + } + } +} + /// Forward messages received from `recv_event_loop` to `target` /// /// The result of running this function constitutes half the MQTT bridge, hence the name. @@ -230,41 +713,41 @@ impl<'a, T> BidirectionalChannelHalf { /// - The `half_bridge(cloud_event_loop,local_client)` handles the acknowledgements: waiting for messages be acknowledged by the cloud, before sending acks for the original messages. /// /// ```text -/// ┌───────────────┐ ┌───────────────┐ -/// │ (EventLoop) │ │ (client) │ -/// Incoming::PubAck │ ┌──┐ │ │ ┌──┐ │ client.ack -/// ─────────────────┼────►│6.├──────┼───────────────────┬───────────────┼────►│7.├──────┼──────────────► -/// │ └──┘ │ │ │ └──┘ │ -/// │ │ │ │ │ -/// │ │ ┌─────┼────────┐ │ │ -/// │ │ │ │ │ │ │ +/// ┌───────────────┐ ┌───────────────┐ +/// │ (EventLoop) │ │ (client) │ +/// Incoming::PubAck │ ┌──┐ │ │ ┌──┐ │ client.ack +/// ─────────────────┼────►│6.├──────┼───────────────────┬───────────────┼────►│7.├──────┼──────────────► +/// │ └──┘ │ │ │ └──┘ │ +/// │ │ │ │ │ +/// │ │ ┌─────┼────────┐ │ │ +/// │ │ │ │ │ │ │ /// Outgoing::Publish │ ┌──┐ │ │ ┌┴─┐ │ │ │ -/// ─────────────────┼────►│4.├──────┼─────────────┼───►│5.│ │ │ │ -/// │ └─▲┘ │ │ └▲─┘ │ │ │ -/// │ │ │ │ │ │ │ │ -/// │ │ │ │ │ │ │ │ -/// │ │ │ │ │ │ │ │ -/// │ │ │ │ │ │ │ │ +/// ─────────────────┼────►│4.├──────┼─────────────┼───►│5.│ │ │ │ +/// │ └─▲┘ │ │ └▲─┘ │ │ │ +/// │ │ │ │ │ │ │ │ +/// │ │ │ │ │ │ │ │ +/// │ │ │ │ │ │ │ │ +/// │ │ │ │ │ │ │ │ /// │ │ │ │ │ │ │ │ half_bridge(cloud_event_loop,local_client) -/// │ │ │ │ │ │ │ │ -/// │ │ │ │ │ │ │ │ +/// │ │ │ │ │ │ │ │ +/// │ │ │ │ │ │ │ │ /// xxxxxxxxxxxxxxxxxxxxxxxxxx│xxxxxxxxxxxxxxxxxxxxx│xxxxx│xxxxxxxx│xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx -/// │ │ │ │ │ │ │ │ -/// │ │ │ │ │ │ │ │ +/// │ │ │ │ │ │ │ │ +/// │ │ │ │ │ │ │ │ /// │ │ │ │ │ │ │ │ half_bridge(local_event_loop,cloud_client) -/// │ │ │ │ │ │ │ │ -/// │ │ │ │ │ │ │ │ -/// │ │ │ │ │ │ │ │ -/// client.publish │ ┌┴─┐ │ │ ┌┴─┐ │ │ ┌──┐ │ Incoming::Publish -/// ◄────────────────┼──────┤3.◄─────┼─────────────┼────┤2.│◄─────┼──────┼─────┤1.│◄─────┼─────────────── -/// │ └──┘ │ │ └──┘ │ │ └──┘ │ -/// │ │ │ │ │ │ -/// │ MQTT │ │ │ │ MQTT │ -/// │ cloud │ └──────────────┘ │ local │ -/// │ connection │ │ connection │ -/// │ │ │ │ -/// │ (client) │ │ (EventLoop) │ -/// └───────────────┘ └───────────────┘ +/// │ │ │ │ │ │ │ │ +/// │ │ │ │ │ │ │ │ +/// │ │ │ │ │ │ │ │ +/// client.publish │ ┌┴─┐ │ │ ┌┴─┐ │ │ ┌──┐ │ Incoming::Publish +/// ◄────────────────┼──────┤3.◄─────┼─────────────┼────┤2.│◄─────┼──────┼─────┤1.│◄─────┼─────────────── +/// │ └──┘ │ │ └──┘ │ │ └──┘ │ +/// │ │ │ │ │ │ +/// │ MQTT │ │ │ │ MQTT │ +/// │ cloud │ └──────────────┘ │ local │ +/// │ connection │ │ connection │ +/// │ │ │ │ +/// │ (client) │ │ (EventLoop) │ +/// └───────────────┘ └───────────────┘ /// ``` /// /// 1. A message is received via the local_event_loop. @@ -274,7 +757,7 @@ impl<'a, T> BidirectionalChannelHalf { /// 5. The message cloud `pkid` is joined with the original local message sent step 2. The pair (cloud `pkid`, local message) is cached /// 6. The cloud MQTT end-point acknowledges the message, providing its cloud `pkid`. /// 7. The pair (cloud `pkid`, local message) is extracted from the cache and the local message is finally acknowledged. -/// +/// /// ## Bridging cloud messages to the local broker /// /// The very same two `half-bridge` instances ensure the reverse flow. Their roles are simply swapped: @@ -291,118 +774,131 @@ impl<'a, T> BidirectionalChannelHalf { /// is dropped. #[allow(clippy::too_many_arguments)] async fn half_bridge( - mut recv_event_loop: EventLoop, + recv_event_loop: EventLoop, target: AsyncClient, recv_client: AsyncClient, transformer: TopicConverter, bidirectional_topic_filters: Vec>, - mut companion_bridge_half: BidirectionalChannelHalf>, + companion_bridge_half: BidirectionalChannelHalf>, tx_health: mpsc::Sender<(&'static str, Status)>, name: &'static str, topics: Vec, reconnect_policy: TEdgeConfigReaderMqttBridgeReconnectPolicy, ) { - let mut backoff = CustomBackoff::new( - ::backoff::SystemClock {}, - reconnect_policy.initial_interval.duration(), - reconnect_policy.maximum_interval.duration(), - reconnect_policy.reset_window.duration(), + let mut hb = HalfBridge::new( + recv_event_loop, + target, + recv_client, + transformer, + bidirectional_topic_filters, + companion_bridge_half, + tx_health, + name, + topics, + reconnect_policy, ); - let mut forward_pkid_to_received_msg = HashMap::new(); - let mut bridge_health = BridgeHealth::new(name, tx_health); - let mut loop_breaker = - MessageLoopBreaker::new(recv_client.clone(), bidirectional_topic_filters); - - loop { - let res = recv_event_loop.poll().await; - bridge_health.update(&res).await; - - let notification = match res { - Ok(notification) => { - backoff.mark_success(); - notification - } - Err(_) => { - let time = backoff.backoff(); - if !time.is_zero() { - info!("Waiting {time:?} until attempting reconnection to {name} broker"); - } - tokio::time::sleep(time).await; - continue; - } - }; - debug!("Received notification ({name}) {notification:?}"); - let n = format!("{:?}", notification); - - match notification { - Event::Incoming(Incoming::ConnAck(_)) => { - info!("Bridge cloud connection {name:?} subscribing to {topics:?}"); - let recv_client = recv_client.clone(); - let topics = topics.clone(); - // We have to subscribe to this asynchronously (i.e. in a task) since we might at - // this point have filled our cloud event loop with outgoing messages - tokio::spawn(async move { recv_client.subscribe_many(topics).await.unwrap() }); - } - - // Forward messages from event loop to target - Event::Incoming(Incoming::Publish(publish)) => { - if let Some(publish) = loop_breaker.ensure_not_looped(publish).await { - if let Some(topic) = transformer.convert_topic(&publish.topic) { - target - .publish( - topic.clone(), - publish.qos, - publish.retain, - publish.payload.clone(), - ) - .await - .unwrap(); - companion_bridge_half - .send(Some((topic.into_owned(), publish))) - .await - .unwrap(); - } - } - } - - // Forward acks from event loop to target - Event::Incoming( - Incoming::PubAck(PubAck { pkid: ack_pkid }) - | Incoming::PubRec(PubRec { pkid: ack_pkid }), - ) => { - if let Some(Some(msg)) = forward_pkid_to_received_msg.remove(&ack_pkid) { - let target = target.clone(); - tokio::spawn(async move { target.ack(&msg).await.unwrap() }); - } - } - - // Keep track of packet IDs so we can acknowledge messages - Event::Outgoing(Outgoing::Publish(pkid)) => { - if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) { - match companion_bridge_half.recv().await { - // A message was forwarded by the other bridge half, note the packet id - Some(Some((topic, msg))) => { - loop_breaker.forward_on_topic(topic, &msg); - e.insert(Some(msg)); - } - - // A healthcheck message was published, keep track of this packet id in case it's re-published - Some(None) => { - e.insert(None); - } - - // The other bridge half has disconnected, break the loop and shut down the bridge - None => break, - } - } else { - info!("Bridge cloud connection {name} ignoring already known pkid={pkid}"); - } - } - _ => {} - } - - debug!("Processed notification ({name}) {n}"); - } + hb.run().await; + // let mut backoff = CustomBackoff::new( + // ::backoff::SystemClock {}, + // reconnect_policy.initial_interval.duration(), + // reconnect_policy.maximum_interval.duration(), + // reconnect_policy.reset_window.duration(), + // ); + // let mut forward_pkid_to_received_msg = HashMap::new(); + // let mut bridge_health = BridgeHealth::new(name, tx_health); + // let mut loop_breaker = + // MessageLoopBreaker::new(recv_client.clone(), bidirectional_topic_filters); + + // loop { + // let res = recv_event_loop.poll().await; + // bridge_health.update(&res).await; + + // let notification = match res { + // Ok(notification) => { + // backoff.mark_success(); + // notification + // } + // Err(_) => { + // let time = backoff.backoff(); + // if !time.is_zero() { + // info!("Waiting {time:?} until attempting reconnection to {name} broker"); + // } + // tokio::time::sleep(time).await; + // continue; + // } + // }; + // debug!("Received notification ({name}) {notification:?}"); + // let n = format!("{:?}", notification); + + // match notification { + // Event::Incoming(Incoming::ConnAck(_)) => { + // info!("Bridge connection {name:?} subscribing to {topics:?}"); + // let recv_client = recv_client.clone(); + // let topics = topics.clone(); + // // We have to subscribe to this asynchronously (i.e. in a task) since we might at + // // this point have filled our cloud event loop with outgoing messages + // tokio::spawn(async move { recv_client.subscribe_many(topics).await.unwrap() }); + // } + + // // Forward messages from event loop to target + // Event::Incoming(Incoming::Publish(publish)) => { + // if let Some(publish) = loop_breaker.ensure_not_looped(publish).await { + // if let Some(topic) = transformer.convert_topic(&publish.topic) { + // target + // .publish( + // topic.clone(), + // publish.qos, + // publish.retain, + // publish.payload.clone(), + // ) + // .await + // .unwrap(); + // companion_bridge_half + // .send(Some((topic.into_owned(), publish))) + // .await + // .unwrap(); + // } + // } + // } + + // // Forward acks from event loop to target + // Event::Incoming( + // Incoming::PubAck(PubAck { pkid: ack_pkid }) + // | Incoming::PubRec(PubRec { pkid: ack_pkid }), + // ) => { + // if let Some(Some(msg)) = forward_pkid_to_received_msg.remove(&ack_pkid) { + // let target = target.clone(); + // tokio::spawn(async move { target.ack(&msg).await.unwrap() }); + // } + // } + + // // Keep track of packet IDs so we can acknowledge messages + // Event::Outgoing(Outgoing::Publish(pkid)) => { + // if let hash_map::Entry::Vacant(e) = forward_pkid_to_received_msg.entry(pkid) { + // match companion_bridge_half.recv().await { + // // A message was forwarded by the other bridge half, note the packet id + // Some(Some((topic, msg))) => { + // loop_breaker.forward_on_topic(topic, &msg); + // e.insert(Some(msg)); + // } + + // // A healthcheck message was published, keep track of this packet id in case it's re-published + // Some(None) => { + // e.insert(None); + // } + + // // The other bridge half has disconnected, break the loop and shut down the bridge + // None => break, + // } + // } else { + // info!("Bridge cloud connection {name} ignoring already known pkid={pkid}"); + // } + // } + // _ => {} + // } + + // debug!("Processed notification ({name}) {n}"); + // } } #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -480,6 +976,7 @@ impl MessageLoopBreaker { impl MessageLoopBreaker { async fn ensure_not_looped(&mut self, received: Publish) -> Option { + debug!("Ensure not looped: {received:?}"); self.clean_old_messages(); if self .forwarded_messages @@ -495,6 +992,7 @@ impl MessageLoopBreaker { } fn forward_on_topic(&mut self, topic: impl Into + AsRef, publish: &Publish) { + debug!("Forward on topic ({}): {publish:?}", topic.as_ref()); if self.is_bidirectional(topic.as_ref()) { let mut publish_with_topic = Publish::new(topic, publish.qos, publish.payload.clone()); publish_with_topic.retain = publish.retain; diff --git a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs index abf879e1ce8..44ba5353de6 100644 --- a/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs +++ b/crates/extensions/tedge_mqtt_bridge/tests/bridge.rs @@ -252,7 +252,7 @@ async fn bridge_reconnects_successfully_after_cloud_connection_interrupted() { #[tokio::test] async fn bridge_reconnects_successfully_after_local_connection_interrupted() { - std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=info"); + std::env::set_var("RUST_LOG", "tedge_mqtt_bridge=debug"); let _ = env_logger::try_init(); let local_broker_port = free_port().await; let cloud_broker_port = free_port().await; @@ -313,6 +313,8 @@ async fn bridge_reconnects_successfully_after_local_connection_interrupted() { #[tokio::test] async fn bidirectional_forwarding_avoids_infinite_loop() { + std::env::set_var("RUST_LOG", "bridge=info,tedge_mqtt_bridge=debug"); + let _ = env_logger::try_init(); let local_port = free_port().await; let cloud_port = free_port().await; let (local_client, mut local_ev_loop) = new_broker_and_client("local", local_port); @@ -386,6 +388,7 @@ async fn bidirectional_forwarding_avoids_infinite_loop() { .unwrap() { Event::Incoming(Incoming::Publish(publish)) => { + info!("Local broker received {publish:?}"); assert_eq!(publish.topic, "aws/shadow/request"); assert_eq!(from_utf8(&publish.payload).unwrap(), "test message"); local_client @@ -397,13 +400,18 @@ async fn bidirectional_forwarding_avoids_infinite_loop() { ) .await .unwrap(); + info!("Local broker published response"); } Event::Outgoing(Outgoing::Publish(_)) => break, _ => (), } } - timeout(DEFAULT_TIMEOUT, cloud).await.unwrap().unwrap(); + timeout(DEFAULT_TIMEOUT, cloud) + .await + .context("Awaiting response to cloud") + .unwrap() + .unwrap(); } async fn wait_until_health_status_is(