Skip to content

Commit

Permalink
Merge pull request #2964 from didier-wenzek/fix/too-many-software-lis…
Browse files Browse the repository at this point in the history
…t-requests

fix: too many software list requests
  • Loading branch information
didier-wenzek authored Jul 1, 2024
2 parents e783f97 + 141b931 commit a264433
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions crates/common/mqtt_channel/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use serde::Deserializer;
use serde::Serialize;
use serde::Serializer;
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::fmt::Write;

Expand All @@ -20,6 +21,21 @@ pub struct MqttMessage {
pub retain: bool,
}

impl Display for MqttMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_char('[')?;
f.write_str(&self.topic.name)?;
f.write_str(" qos=")?;
f.write_char(match self.qos {
QoS::AtMostOnce => '0',
QoS::AtLeastOnce => '1',
QoS::ExactlyOnce => '2',
})?;
f.write_str(if self.retain { " retained] " } else { "] " })?;
Display::fmt(&self.payload, f)
}
}

fn serialize_qos<S>(qos: &QoS, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
Expand Down Expand Up @@ -119,6 +135,15 @@ impl<'de> Deserialize<'de> for DebugPayload {
}
}

impl Display for DebugPayload {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self.as_str() {
Ok(str) => f.write_str(str),
Err(_) => f.write_str(&format!("non UTF-8 payload of {} bytes", self.0.len())),
}
}
}

impl DebugPayload {
/// The payload string (unless this payload is not UTF8)
pub fn as_str(&self) -> Result<&str, MqttError> {
Expand Down
72 changes: 72 additions & 0 deletions crates/common/mqtt_channel/src/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,41 @@ impl TopicFilter {
self.accept_topic(&msg.topic)
}

/// Simplify the list of patterns, removing overlaps.
///
/// Return the patterns that have been removed.
pub fn remove_overlapping_patterns(&mut self) -> Vec<String> {
let mut patterns = vec![];
let mut removed = vec![];
patterns.append(&mut self.patterns);

for pattern in patterns {
if self.include_topic(&pattern) {
removed.push(pattern)
} else {
let mut sub_patterns = vec![];
sub_patterns.append(&mut self.patterns);
for sub_pattern in sub_patterns {
if rumqttc::matches(&sub_pattern, &pattern) {
removed.push(sub_pattern);
} else {
self.patterns.push(sub_pattern);
}
}
self.patterns.push(pattern)
}
}

removed
}

/// Check if the given pattern is already matched by this filter pattern.
fn include_topic(&self, sub_pattern: &str) -> bool {
self.patterns
.iter()
.any(|pattern| rumqttc::matches(sub_pattern, pattern))
}

/// A clone topic filter with the given QoS
pub fn with_qos(self, qos: QoS) -> Self {
Self { qos, ..self }
Expand All @@ -135,6 +170,10 @@ impl TopicFilter {
})
.collect()
}

pub fn patterns(&self) -> &Vec<String> {
&self.patterns
}
}

impl TryInto<Topic> for &str {
Expand Down Expand Up @@ -235,4 +274,37 @@ mod tests {
assert!(TopicFilter::new("/a/#/b").is_err());
assert!(TopicFilter::new("/a/#/+").is_err());
}

#[test]
fn check_removing_overlapping_patterns() {
let mut topics = TopicFilter::empty();
assert!(topics.remove_overlapping_patterns().is_empty());

// One can adds several patterns, as long as non overlapping
topics.add_unchecked("te/+/+/+/+/cmd/+/+");
topics.add_unchecked("te/+/+/+/+/m/+");
topics.add_unchecked("te/device/main///e/+");
topics.add_unchecked("te/device/child///e/+");
assert!(topics.remove_overlapping_patterns().is_empty());

// If a sub pattern is added, the overlapping is detected
topics.add_unchecked("te/device/main///m/+");
let removed = topics.remove_overlapping_patterns();
assert_eq!(removed.len(), 1);
assert!(removed.contains(&"te/device/main///m/+".to_string()));

// If a super pattern is added, the sub patterns are removed
topics.add_unchecked("te/+/+/+/+/e/+");
let removed = topics.remove_overlapping_patterns();
assert_eq!(removed.len(), 2);
assert!(removed.contains(&"te/device/main///e/+".to_string()));
assert!(removed.contains(&"te/device/child///e/+".to_string()));

// Unfortunately, some overlaps are not detected
// In the following case a message published on `te/xxx/xxx` might be received twice
topics.add_unchecked("te/xxx/+");
topics.add_unchecked("te/+/xxx");
let removed = topics.remove_overlapping_patterns();
assert!(removed.is_empty());
}
}
9 changes: 7 additions & 2 deletions crates/core/tedge_actors/src/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ impl<Input: Debug> LoggingReceiver<Input> {

/// Close the input so no new messages can be sent to this receiver
pub fn close_input(&mut self) {
self.receiver.input_receiver.close();
self.receiver.signal_receiver.close();
self.receiver.close_input();
}
}

Expand Down Expand Up @@ -364,6 +363,12 @@ impl<Input> CombinedReceiver<Input> {
signal_receiver,
}
}

/// Close the input so no new messages can be sent to this receiver
pub fn close_input(&mut self) {
self.input_receiver.close();
self.signal_receiver.close();
}
}

#[async_trait]
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/tedge_mqtt_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ serde_json = { workspace = true }
tedge_actors = { workspace = true }
tedge_utils = { workspace = true }
tokio = { workspace = true, default_features = false, features = ["macros"] }
tracing = { workspace = true }

[dev-dependencies]
futures = { workspace = true }
Expand Down
31 changes: 20 additions & 11 deletions crates/extensions/tedge_mqtt_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use tedge_actors::futures::channel::mpsc;
use tedge_actors::Actor;
use tedge_actors::Builder;
use tedge_actors::ChannelError;
use tedge_actors::CombinedReceiver;
use tedge_actors::DynSender;
use tedge_actors::LoggingReceiver;
use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
Expand All @@ -32,17 +31,17 @@ pub use mqtt_channel::TopicFilter;

pub struct MqttActorBuilder {
mqtt_config: mqtt_channel::Config,
input_receiver: LoggingReceiver<MqttMessage>,
input_receiver: CombinedReceiver<MqttMessage>,
publish_sender: mpsc::Sender<MqttMessage>,
pub subscriber_addresses: Vec<(TopicFilter, LoggingSender<MqttMessage>)>,
pub subscriber_addresses: Vec<(TopicFilter, DynSender<MqttMessage>)>,
signal_sender: mpsc::Sender<RuntimeRequest>,
}

impl MqttActorBuilder {
pub fn new(config: mqtt_channel::Config) -> Self {
let (publish_sender, publish_receiver) = mpsc::channel(10);
let (signal_sender, signal_receiver) = mpsc::channel(10);
let input_receiver = LoggingReceiver::new("MQTT".into(), publish_receiver, signal_receiver);
let input_receiver = CombinedReceiver::new(publish_receiver, signal_receiver);

MqttActorBuilder {
mqtt_config: config,
Expand All @@ -58,8 +57,16 @@ impl MqttActorBuilder {
for (topic_filter, _) in self.subscriber_addresses.iter() {
combined_topic_filter.add_all(topic_filter.to_owned());
}
let mqtt_config = self.mqtt_config.with_subscriptions(combined_topic_filter);

let removed = combined_topic_filter.remove_overlapping_patterns();
for pattern in combined_topic_filter.patterns() {
tracing::info!(target: "MQTT sub", "{pattern}");
}
for pattern in removed {
tracing::warn!(target: "MQTT sub", "ignoring overlapping subscription to {pattern}");
}

let mqtt_config = self.mqtt_config.with_subscriptions(combined_topic_filter);
MqttActor::new(mqtt_config, self.input_receiver, self.subscriber_addresses)
}
}
Expand All @@ -72,7 +79,7 @@ impl AsMut<MqttConfig> for MqttActorBuilder {

impl MessageSource<MqttMessage, TopicFilter> for MqttActorBuilder {
fn connect_sink(&mut self, subscriptions: TopicFilter, peer: &impl MessageSink<MqttMessage>) {
let sender = LoggingSender::new("MQTT".into(), peer.get_sender());
let sender = peer.get_sender();
self.subscriber_addresses.push((subscriptions, sender));
}
}
Expand Down Expand Up @@ -102,11 +109,11 @@ impl Builder<MqttActor> for MqttActorBuilder {
}

pub struct FromPeers {
input_receiver: LoggingReceiver<MqttMessage>,
input_receiver: CombinedReceiver<MqttMessage>,
}

pub struct ToPeers {
peer_senders: Vec<(TopicFilter, LoggingSender<MqttMessage>)>,
peer_senders: Vec<(TopicFilter, DynSender<MqttMessage>)>,
}

impl FromPeers {
Expand All @@ -115,6 +122,7 @@ impl FromPeers {
outgoing_mqtt: &mut mpsc::UnboundedSender<MqttMessage>,
) -> Result<(), RuntimeError> {
while let Ok(Some(message)) = self.try_recv().await {
tracing::debug!(target: "MQTT pub", "{message}");
SinkExt::send(outgoing_mqtt, message)
.await
.map_err(Box::new)?;
Expand All @@ -139,6 +147,7 @@ impl ToPeers {
incoming_mqtt: &mut mpsc::UnboundedReceiver<MqttMessage>,
) -> Result<(), RuntimeError> {
while let Some(message) = incoming_mqtt.next().await {
tracing::debug!(target: "MQTT recv", "{message}");
self.send(message).await?;
}
Ok(())
Expand Down Expand Up @@ -178,8 +187,8 @@ pub struct MqttActor {
impl MqttActor {
fn new(
mqtt_config: mqtt_channel::Config,
input_receiver: LoggingReceiver<MqttMessage>,
peer_senders: Vec<(TopicFilter, LoggingSender<MqttMessage>)>,
input_receiver: CombinedReceiver<MqttMessage>,
peer_senders: Vec<(TopicFilter, DynSender<MqttMessage>)>,
) -> Self {
MqttActor {
mqtt_config,
Expand Down

0 comments on commit a264433

Please sign in to comment.