diff --git a/Cargo.lock b/Cargo.lock index b8c6a4d4cc0..2a0e607610b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3904,6 +3904,7 @@ version = "0.12.0" dependencies = [ "assert-json-diff", "async-trait", + "futures", "mqtt_channel", "mqtt_tests", "serde_json", diff --git a/crates/extensions/tedge_mqtt_ext/Cargo.toml b/crates/extensions/tedge_mqtt_ext/Cargo.toml index 7255fa9a089..4b003e5f77d 100644 --- a/crates/extensions/tedge_mqtt_ext/Cargo.toml +++ b/crates/extensions/tedge_mqtt_ext/Cargo.toml @@ -24,4 +24,5 @@ tedge_utils = { workspace = true } tokio = { workspace = true, default_features = false, features = ["macros"] } [dev-dependencies] +futures = { workspace = true } mqtt_tests = { path = "../../tests/mqtt_tests" } diff --git a/crates/extensions/tedge_mqtt_ext/src/tests.rs b/crates/extensions/tedge_mqtt_ext/src/tests.rs index f1400302328..b553cdd1c47 100644 --- a/crates/extensions/tedge_mqtt_ext/src/tests.rs +++ b/crates/extensions/tedge_mqtt_ext/src/tests.rs @@ -1,5 +1,6 @@ use crate::*; use mqtt_channel::Topic; +use std::time::Duration; use tedge_actors::Builder; use tedge_actors::ServiceConsumer; use tedge_actors::SimpleMessageBox; @@ -19,6 +20,17 @@ impl MqttClientBuilder { box_builder: SimpleMessageBoxBuilder::new(name, 16), } } + + fn new_with_capacity( + name: &str, + subscriptions: &(impl Clone + Into), + capacity: usize, + ) -> Self { + MqttClientBuilder { + subscriptions: subscriptions.clone().into(), + box_builder: SimpleMessageBoxBuilder::new(name, capacity), + } + } } impl ServiceConsumer for MqttClientBuilder { @@ -47,6 +59,52 @@ impl Builder for MqttClientBuilder { } } +#[tokio::test] +async fn mqtt_actor_can_reliably_forward_messages_under_load() { + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = MqttConfig::default().with_port(broker.port); + let mut mqtt = MqttActorBuilder::new(mqtt_config); + + let mut child_device = MqttClientBuilder::new("Child device", &TopicFilter::empty()) + .with_connection(&mut mqtt) + .build(); + + let c8y_mapper_topic = Topic::new_unchecked("some/tedge/topic"); + + // Simulate a mapper with no message buffer, which will make the problem + // occur more quickly if it exists + let mut c8y_mapper = MqttClientBuilder::new_with_capacity("C8y mapper", &c8y_mapper_topic, 0) + .with_connection(&mut mqtt) + .build(); + + // Assume Cumulocity just accepts all messages, so don't bother attaching a client + let c8y_topic = Topic::new_unchecked("c8y/s/us"); + + // Simulate the c8y mapper, forwarding messages from the child device to Cumulocity + tokio::spawn(async move { + while let Some(mut msg) = c8y_mapper.recv().await { + tokio::time::sleep(Duration::from_secs(5)).await; + msg.topic = c8y_topic.clone(); + // If the actor doesn't process incoming/outgoing MQTT messages concurrently, + // this will cause a deadlock + c8y_mapper.send(msg).await.unwrap(); + } + }); + + tokio::spawn(mqtt_actor(mqtt)); + + for _ in 1..100 { + // This timeout should only be triggered if the actor isn't progressing + tokio::time::timeout( + Duration::from_millis(50), + child_device.send(MqttMessage::new(&c8y_mapper_topic, "Hi Bob")), + ) + .await + .expect("messages should be forwarded (is the MQTT actor processing incoming/outgoing messages concurrently?)") + .expect("send should succeed"); + } +} + #[tokio::test] async fn communicate_over_mqtt() { let broker = mqtt_tests::test_mqtt_broker();