diff --git a/Cargo.lock b/Cargo.lock index 067c7051719..1e9c150cb99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4118,6 +4118,7 @@ dependencies = [ "tedge_actors", "tedge_utils", "tokio", + "tracing", ] [[package]] diff --git a/crates/common/mqtt_channel/src/messages.rs b/crates/common/mqtt_channel/src/messages.rs index e2ca13c05c5..d631f9ae474 100644 --- a/crates/common/mqtt_channel/src/messages.rs +++ b/crates/common/mqtt_channel/src/messages.rs @@ -7,6 +7,7 @@ use serde::Deserializer; use serde::Serialize; use serde::Serializer; use std::fmt::Debug; +use std::fmt::Display; use std::fmt::Formatter; use std::fmt::Write; @@ -20,6 +21,21 @@ pub struct MqttMessage { pub retain: bool, } +impl Display for MqttMessage { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_char('[')?; + f.write_str(&self.topic.name)?; + f.write_str(" qos=")?; + f.write_char(match self.qos { + QoS::AtMostOnce => '0', + QoS::AtLeastOnce => '1', + QoS::ExactlyOnce => '2', + })?; + f.write_str(if self.retain { " retained] " } else { "] " })?; + Display::fmt(&self.payload, f) + } +} + fn serialize_qos(qos: &QoS, serializer: S) -> Result where S: serde::Serializer, @@ -119,6 +135,15 @@ impl<'de> Deserialize<'de> for DebugPayload { } } +impl Display for DebugPayload { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self.as_str() { + Ok(str) => f.write_str(str), + Err(_) => f.write_str(&format!("non UTF-8 payload of {} bytes", self.0.len())), + } + } +} + impl DebugPayload { /// The payload string (unless this payload is not UTF8) pub fn as_str(&self) -> Result<&str, MqttError> { diff --git a/crates/common/mqtt_channel/src/topics.rs b/crates/common/mqtt_channel/src/topics.rs index 9e3f1e8d8b0..ab87a104a88 100644 --- a/crates/common/mqtt_channel/src/topics.rs +++ b/crates/common/mqtt_channel/src/topics.rs @@ -119,6 +119,41 @@ impl TopicFilter { self.accept_topic(&msg.topic) } + /// Simplify the list of patterns, removing overlaps. + /// + /// Return the patterns that have been removed. + pub fn remove_overlapping_patterns(&mut self) -> Vec { + let mut patterns = vec![]; + let mut removed = vec![]; + patterns.append(&mut self.patterns); + + for pattern in patterns { + if self.include_topic(&pattern) { + removed.push(pattern) + } else { + let mut sub_patterns = vec![]; + sub_patterns.append(&mut self.patterns); + for sub_pattern in sub_patterns { + if rumqttc::matches(&sub_pattern, &pattern) { + removed.push(sub_pattern); + } else { + self.patterns.push(sub_pattern); + } + } + self.patterns.push(pattern) + } + } + + removed + } + + /// Check if the given pattern is already matched by this filter pattern. + fn include_topic(&self, sub_pattern: &str) -> bool { + self.patterns + .iter() + .any(|pattern| rumqttc::matches(sub_pattern, pattern)) + } + /// A clone topic filter with the given QoS pub fn with_qos(self, qos: QoS) -> Self { Self { qos, ..self } @@ -135,6 +170,10 @@ impl TopicFilter { }) .collect() } + + pub fn patterns(&self) -> &Vec { + &self.patterns + } } impl TryInto for &str { @@ -235,4 +274,37 @@ mod tests { assert!(TopicFilter::new("/a/#/b").is_err()); assert!(TopicFilter::new("/a/#/+").is_err()); } + + #[test] + fn check_removing_overlapping_patterns() { + let mut topics = TopicFilter::empty(); + assert!(topics.remove_overlapping_patterns().is_empty()); + + // One can adds several patterns, as long as non overlapping + topics.add_unchecked("te/+/+/+/+/cmd/+/+"); + topics.add_unchecked("te/+/+/+/+/m/+"); + topics.add_unchecked("te/device/main///e/+"); + topics.add_unchecked("te/device/child///e/+"); + assert!(topics.remove_overlapping_patterns().is_empty()); + + // If a sub pattern is added, the overlapping is detected + topics.add_unchecked("te/device/main///m/+"); + let removed = topics.remove_overlapping_patterns(); + assert_eq!(removed.len(), 1); + assert!(removed.contains(&"te/device/main///m/+".to_string())); + + // If a super pattern is added, the sub patterns are removed + topics.add_unchecked("te/+/+/+/+/e/+"); + let removed = topics.remove_overlapping_patterns(); + assert_eq!(removed.len(), 2); + assert!(removed.contains(&"te/device/main///e/+".to_string())); + assert!(removed.contains(&"te/device/child///e/+".to_string())); + + // Unfortunately, some overlaps are not detected + // In the following case a message published on `te/xxx/xxx` might be received twice + topics.add_unchecked("te/xxx/+"); + topics.add_unchecked("te/+/xxx"); + let removed = topics.remove_overlapping_patterns(); + assert!(removed.is_empty()); + } } diff --git a/crates/core/tedge_actors/src/message_boxes.rs b/crates/core/tedge_actors/src/message_boxes.rs index 6d20c0884ce..8bd4028a56e 100644 --- a/crates/core/tedge_actors/src/message_boxes.rs +++ b/crates/core/tedge_actors/src/message_boxes.rs @@ -155,8 +155,7 @@ impl LoggingReceiver { /// Close the input so no new messages can be sent to this receiver pub fn close_input(&mut self) { - self.receiver.input_receiver.close(); - self.receiver.signal_receiver.close(); + self.receiver.close_input(); } } @@ -364,6 +363,12 @@ impl CombinedReceiver { signal_receiver, } } + + /// Close the input so no new messages can be sent to this receiver + pub fn close_input(&mut self) { + self.input_receiver.close(); + self.signal_receiver.close(); + } } #[async_trait] diff --git a/crates/extensions/tedge_mqtt_ext/Cargo.toml b/crates/extensions/tedge_mqtt_ext/Cargo.toml index dbf94ce5fbb..68af90403f9 100644 --- a/crates/extensions/tedge_mqtt_ext/Cargo.toml +++ b/crates/extensions/tedge_mqtt_ext/Cargo.toml @@ -22,6 +22,7 @@ serde_json = { workspace = true } tedge_actors = { workspace = true } tedge_utils = { workspace = true } tokio = { workspace = true, default_features = false, features = ["macros"] } +tracing = { workspace = true } [dev-dependencies] futures = { workspace = true } diff --git a/crates/extensions/tedge_mqtt_ext/src/lib.rs b/crates/extensions/tedge_mqtt_ext/src/lib.rs index 3f45453def4..59615c67112 100644 --- a/crates/extensions/tedge_mqtt_ext/src/lib.rs +++ b/crates/extensions/tedge_mqtt_ext/src/lib.rs @@ -11,9 +11,8 @@ use tedge_actors::futures::channel::mpsc; use tedge_actors::Actor; use tedge_actors::Builder; use tedge_actors::ChannelError; +use tedge_actors::CombinedReceiver; use tedge_actors::DynSender; -use tedge_actors::LoggingReceiver; -use tedge_actors::LoggingSender; use tedge_actors::MessageReceiver; use tedge_actors::MessageSink; use tedge_actors::MessageSource; @@ -32,9 +31,9 @@ pub use mqtt_channel::TopicFilter; pub struct MqttActorBuilder { mqtt_config: mqtt_channel::Config, - input_receiver: LoggingReceiver, + input_receiver: CombinedReceiver, publish_sender: mpsc::Sender, - pub subscriber_addresses: Vec<(TopicFilter, LoggingSender)>, + pub subscriber_addresses: Vec<(TopicFilter, DynSender)>, signal_sender: mpsc::Sender, } @@ -42,7 +41,7 @@ impl MqttActorBuilder { pub fn new(config: mqtt_channel::Config) -> Self { let (publish_sender, publish_receiver) = mpsc::channel(10); let (signal_sender, signal_receiver) = mpsc::channel(10); - let input_receiver = LoggingReceiver::new("MQTT".into(), publish_receiver, signal_receiver); + let input_receiver = CombinedReceiver::new(publish_receiver, signal_receiver); MqttActorBuilder { mqtt_config: config, @@ -58,8 +57,16 @@ impl MqttActorBuilder { for (topic_filter, _) in self.subscriber_addresses.iter() { combined_topic_filter.add_all(topic_filter.to_owned()); } - let mqtt_config = self.mqtt_config.with_subscriptions(combined_topic_filter); + let removed = combined_topic_filter.remove_overlapping_patterns(); + for pattern in combined_topic_filter.patterns() { + tracing::info!(target: "MQTT sub", "{pattern}"); + } + for pattern in removed { + tracing::warn!(target: "MQTT sub", "ignoring overlapping subscription to {pattern}"); + } + + let mqtt_config = self.mqtt_config.with_subscriptions(combined_topic_filter); MqttActor::new(mqtt_config, self.input_receiver, self.subscriber_addresses) } } @@ -72,7 +79,7 @@ impl AsMut for MqttActorBuilder { impl MessageSource for MqttActorBuilder { fn connect_sink(&mut self, subscriptions: TopicFilter, peer: &impl MessageSink) { - let sender = LoggingSender::new("MQTT".into(), peer.get_sender()); + let sender = peer.get_sender(); self.subscriber_addresses.push((subscriptions, sender)); } } @@ -102,11 +109,11 @@ impl Builder for MqttActorBuilder { } pub struct FromPeers { - input_receiver: LoggingReceiver, + input_receiver: CombinedReceiver, } pub struct ToPeers { - peer_senders: Vec<(TopicFilter, LoggingSender)>, + peer_senders: Vec<(TopicFilter, DynSender)>, } impl FromPeers { @@ -115,6 +122,7 @@ impl FromPeers { outgoing_mqtt: &mut mpsc::UnboundedSender, ) -> Result<(), RuntimeError> { while let Ok(Some(message)) = self.try_recv().await { + tracing::debug!(target: "MQTT pub", "{message}"); SinkExt::send(outgoing_mqtt, message) .await .map_err(Box::new)?; @@ -139,6 +147,7 @@ impl ToPeers { incoming_mqtt: &mut mpsc::UnboundedReceiver, ) -> Result<(), RuntimeError> { while let Some(message) = incoming_mqtt.next().await { + tracing::debug!(target: "MQTT recv", "{message}"); self.send(message).await?; } Ok(()) @@ -178,8 +187,8 @@ pub struct MqttActor { impl MqttActor { fn new( mqtt_config: mqtt_channel::Config, - input_receiver: LoggingReceiver, - peer_senders: Vec<(TopicFilter, LoggingSender)>, + input_receiver: CombinedReceiver, + peer_senders: Vec<(TopicFilter, DynSender)>, ) -> Self { MqttActor { mqtt_config,