diff --git a/crates/common/mqtt_channel/src/topics.rs b/crates/common/mqtt_channel/src/topics.rs index e34d0027a43..ab87a104a88 100644 --- a/crates/common/mqtt_channel/src/topics.rs +++ b/crates/common/mqtt_channel/src/topics.rs @@ -119,6 +119,41 @@ impl TopicFilter { self.accept_topic(&msg.topic) } + /// Simplify the list of patterns, removing overlaps. + /// + /// Return the patterns that have been removed. + pub fn remove_overlapping_patterns(&mut self) -> Vec { + let mut patterns = vec![]; + let mut removed = vec![]; + patterns.append(&mut self.patterns); + + for pattern in patterns { + if self.include_topic(&pattern) { + removed.push(pattern) + } else { + let mut sub_patterns = vec![]; + sub_patterns.append(&mut self.patterns); + for sub_pattern in sub_patterns { + if rumqttc::matches(&sub_pattern, &pattern) { + removed.push(sub_pattern); + } else { + self.patterns.push(sub_pattern); + } + } + self.patterns.push(pattern) + } + } + + removed + } + + /// Check if the given pattern is already matched by this filter pattern. + fn include_topic(&self, sub_pattern: &str) -> bool { + self.patterns + .iter() + .any(|pattern| rumqttc::matches(sub_pattern, pattern)) + } + /// A clone topic filter with the given QoS pub fn with_qos(self, qos: QoS) -> Self { Self { qos, ..self } @@ -239,4 +274,37 @@ mod tests { assert!(TopicFilter::new("/a/#/b").is_err()); assert!(TopicFilter::new("/a/#/+").is_err()); } + + #[test] + fn check_removing_overlapping_patterns() { + let mut topics = TopicFilter::empty(); + assert!(topics.remove_overlapping_patterns().is_empty()); + + // One can adds several patterns, as long as non overlapping + topics.add_unchecked("te/+/+/+/+/cmd/+/+"); + topics.add_unchecked("te/+/+/+/+/m/+"); + topics.add_unchecked("te/device/main///e/+"); + topics.add_unchecked("te/device/child///e/+"); + assert!(topics.remove_overlapping_patterns().is_empty()); + + // If a sub pattern is added, the overlapping is detected + topics.add_unchecked("te/device/main///m/+"); + let removed = topics.remove_overlapping_patterns(); + assert_eq!(removed.len(), 1); + assert!(removed.contains(&"te/device/main///m/+".to_string())); + + // If a super pattern is added, the sub patterns are removed + topics.add_unchecked("te/+/+/+/+/e/+"); + let removed = topics.remove_overlapping_patterns(); + assert_eq!(removed.len(), 2); + assert!(removed.contains(&"te/device/main///e/+".to_string())); + assert!(removed.contains(&"te/device/child///e/+".to_string())); + + // Unfortunately, some overlaps are not detected + // In the following case a message published on `te/xxx/xxx` might be received twice + topics.add_unchecked("te/xxx/+"); + topics.add_unchecked("te/+/xxx"); + let removed = topics.remove_overlapping_patterns(); + assert!(removed.is_empty()); + } } diff --git a/crates/extensions/tedge_mqtt_ext/src/lib.rs b/crates/extensions/tedge_mqtt_ext/src/lib.rs index 23e81de3580..59615c67112 100644 --- a/crates/extensions/tedge_mqtt_ext/src/lib.rs +++ b/crates/extensions/tedge_mqtt_ext/src/lib.rs @@ -55,13 +55,18 @@ impl MqttActorBuilder { pub(crate) fn build_actor(self) -> MqttActor { let mut combined_topic_filter = TopicFilter::empty(); for (topic_filter, _) in self.subscriber_addresses.iter() { - for pattern in topic_filter.patterns() { - tracing::info!(target: "MQTT sub", "{pattern}"); - } combined_topic_filter.add_all(topic_filter.to_owned()); } - let mqtt_config = self.mqtt_config.with_subscriptions(combined_topic_filter); + let removed = combined_topic_filter.remove_overlapping_patterns(); + for pattern in combined_topic_filter.patterns() { + tracing::info!(target: "MQTT sub", "{pattern}"); + } + for pattern in removed { + tracing::warn!(target: "MQTT sub", "ignoring overlapping subscription to {pattern}"); + } + + let mqtt_config = self.mqtt_config.with_subscriptions(combined_topic_filter); MqttActor::new(mqtt_config, self.input_receiver, self.subscriber_addresses) } }