Skip to content

Commit

Permalink
Add Rust regression test for #2326
Browse files Browse the repository at this point in the history
"tedge-mapper stops mapping MQTT messages after high volume (e.g. 1000 single messages within ~210 milliseconds) without any warning. The tedge-mapper-c8y needs to be restarted in order to become responsive to mapping measurements again."

The test attempts to reproduce the deadlock that was occurring to cause the bug, and reliably fails against the previous code.

Signed-off-by: James Rhodes <[email protected]>
  • Loading branch information
jarhodes314 committed Oct 13, 2023
1 parent 002ca09 commit cc1aa32
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/extensions/tedge_mqtt_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ tedge_utils = { workspace = true }
tokio = { workspace = true, default_features = false, features = ["macros"] }

[dev-dependencies]
futures = { workspace = true }
mqtt_tests = { path = "../../tests/mqtt_tests" }
58 changes: 58 additions & 0 deletions crates/extensions/tedge_mqtt_ext/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::*;
use mqtt_channel::Topic;
use std::time::Duration;
use tedge_actors::Builder;
use tedge_actors::ServiceConsumer;
use tedge_actors::SimpleMessageBox;
Expand All @@ -19,6 +20,17 @@ impl MqttClientBuilder {
box_builder: SimpleMessageBoxBuilder::new(name, 16),
}
}

fn new_with_capacity(
name: &str,
subscriptions: &(impl Clone + Into<TopicFilter>),
capacity: usize,
) -> Self {
MqttClientBuilder {
subscriptions: subscriptions.clone().into(),
box_builder: SimpleMessageBoxBuilder::new(name, capacity),
}
}
}

impl ServiceConsumer<MqttMessage, MqttMessage, TopicFilter> for MqttClientBuilder {
Expand Down Expand Up @@ -47,6 +59,52 @@ impl Builder<MqttClient> for MqttClientBuilder {
}
}

#[tokio::test]
async fn mqtt_actor_can_reliably_forward_messages_under_load() {
let broker = mqtt_tests::test_mqtt_broker();
let mqtt_config = MqttConfig::default().with_port(broker.port);
let mut mqtt = MqttActorBuilder::new(mqtt_config);

let mut child_device = MqttClientBuilder::new("Child device", &TopicFilter::empty())
.with_connection(&mut mqtt)
.build();

let c8y_mapper_topic = Topic::new_unchecked("some/tedge/topic");

// Simulate a mapper with no message buffer, which will make the problem
// occur more quickly if it exists
let mut c8y_mapper = MqttClientBuilder::new_with_capacity("C8y mapper", &c8y_mapper_topic, 0)
.with_connection(&mut mqtt)
.build();

// Assume Cumulocity just accepts all messages, so don't bother attaching a client
let c8y_topic = Topic::new_unchecked("c8y/s/us");

// Simulate the c8y mapper, forwarding messages from the child device to Cumulocity
tokio::spawn(async move {
while let Some(mut msg) = c8y_mapper.recv().await {
tokio::time::sleep(Duration::from_secs(5)).await;
msg.topic = c8y_topic.clone();
// If the actor doesn't process incoming/outgoing MQTT messages concurrently,
// this will cause a deadlock
c8y_mapper.send(msg).await.unwrap();
}
});

tokio::spawn(mqtt_actor(mqtt));

for _ in 1..100 {
// This timeout should only be triggered if the actor isn't progressing
tokio::time::timeout(
Duration::from_millis(50),
child_device.send(MqttMessage::new(&c8y_mapper_topic, "Hi Bob")),
)
.await
.expect("messages should be forwarded (is the MQTT actor processing incoming/outgoing messages concurrently?)")
.expect("send should succeed");
}
}

#[tokio::test]
async fn communicate_over_mqtt() {
let broker = mqtt_tests::test_mqtt_broker();
Expand Down

0 comments on commit cc1aa32

Please sign in to comment.