Skip to content

Commit

Permalink
Avoid overlapping subscriptions to MQTT topics
Browse files Browse the repository at this point in the history
When a message matches several subscriptions of a connection,
the very same message might be delivered several time to the connection.
This is notably the case with NanoMQ (while Mosquitto delivers the
message just once).

The proposed overlap-detection mechanism doesn't remove all the
overlaps, but only those for which holds a sub-pattern/super-pattern
relationship (e.g. te/+/+/+/+/m/+ includes te/device/main///m/+).

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Jul 1, 2024
1 parent 700d52e commit 141b931
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
68 changes: 68 additions & 0 deletions crates/common/mqtt_channel/src/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,41 @@ impl TopicFilter {
self.accept_topic(&msg.topic)
}

/// Simplify the list of patterns, removing overlaps.
///
/// Return the patterns that have been removed.
pub fn remove_overlapping_patterns(&mut self) -> Vec<String> {
let mut patterns = vec![];
let mut removed = vec![];
patterns.append(&mut self.patterns);

for pattern in patterns {
if self.include_topic(&pattern) {
removed.push(pattern)
} else {
let mut sub_patterns = vec![];
sub_patterns.append(&mut self.patterns);
for sub_pattern in sub_patterns {
if rumqttc::matches(&sub_pattern, &pattern) {
removed.push(sub_pattern);
} else {
self.patterns.push(sub_pattern);
}
}
self.patterns.push(pattern)
}
}

removed
}

/// Check if the given pattern is already matched by this filter pattern.
fn include_topic(&self, sub_pattern: &str) -> bool {
self.patterns
.iter()
.any(|pattern| rumqttc::matches(sub_pattern, pattern))
}

/// A clone topic filter with the given QoS
pub fn with_qos(self, qos: QoS) -> Self {
Self { qos, ..self }
Expand Down Expand Up @@ -239,4 +274,37 @@ mod tests {
assert!(TopicFilter::new("/a/#/b").is_err());
assert!(TopicFilter::new("/a/#/+").is_err());
}

#[test]
fn check_removing_overlapping_patterns() {
let mut topics = TopicFilter::empty();
assert!(topics.remove_overlapping_patterns().is_empty());

// One can adds several patterns, as long as non overlapping
topics.add_unchecked("te/+/+/+/+/cmd/+/+");
topics.add_unchecked("te/+/+/+/+/m/+");
topics.add_unchecked("te/device/main///e/+");
topics.add_unchecked("te/device/child///e/+");
assert!(topics.remove_overlapping_patterns().is_empty());

// If a sub pattern is added, the overlapping is detected
topics.add_unchecked("te/device/main///m/+");
let removed = topics.remove_overlapping_patterns();
assert_eq!(removed.len(), 1);
assert!(removed.contains(&"te/device/main///m/+".to_string()));

// If a super pattern is added, the sub patterns are removed
topics.add_unchecked("te/+/+/+/+/e/+");
let removed = topics.remove_overlapping_patterns();
assert_eq!(removed.len(), 2);
assert!(removed.contains(&"te/device/main///e/+".to_string()));
assert!(removed.contains(&"te/device/child///e/+".to_string()));

// Unfortunately, some overlaps are not detected
// In the following case a message published on `te/xxx/xxx` might be received twice
topics.add_unchecked("te/xxx/+");
topics.add_unchecked("te/+/xxx");
let removed = topics.remove_overlapping_patterns();
assert!(removed.is_empty());
}
}
13 changes: 9 additions & 4 deletions crates/extensions/tedge_mqtt_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,18 @@ impl MqttActorBuilder {
pub(crate) fn build_actor(self) -> MqttActor {
let mut combined_topic_filter = TopicFilter::empty();
for (topic_filter, _) in self.subscriber_addresses.iter() {
for pattern in topic_filter.patterns() {
tracing::info!(target: "MQTT sub", "{pattern}");
}
combined_topic_filter.add_all(topic_filter.to_owned());
}
let mqtt_config = self.mqtt_config.with_subscriptions(combined_topic_filter);

let removed = combined_topic_filter.remove_overlapping_patterns();
for pattern in combined_topic_filter.patterns() {
tracing::info!(target: "MQTT sub", "{pattern}");
}
for pattern in removed {
tracing::warn!(target: "MQTT sub", "ignoring overlapping subscription to {pattern}");
}

let mqtt_config = self.mqtt_config.with_subscriptions(combined_topic_filter);
MqttActor::new(mqtt_config, self.input_receiver, self.subscriber_addresses)
}
}
Expand Down

0 comments on commit 141b931

Please sign in to comment.