Skip to content

Commit

Permalink
MQTT test helpers to ease MqttMessage assertions (#2342)
Browse files Browse the repository at this point in the history
* MQTT test helpers to ease MqttMessage assertions
  • Loading branch information
albinsuresh authored Oct 17, 2023
1 parent 4ef2233 commit d98d5e2
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 64 deletions.
80 changes: 47 additions & 33 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1465,8 +1465,11 @@ pub fn check_tedge_agent_status(message: &Message) -> Result<bool, ConversionErr

#[cfg(test)]
mod tests {
use super::CumulocityConverter;
use crate::actor::IdDownloadRequest;
use crate::actor::IdDownloadResult;
use crate::config::C8yMapperConfig;
use crate::error::ConversionError;
use crate::Capabilities;
use anyhow::Result;
use assert_json_diff::assert_json_eq;
Expand All @@ -1492,18 +1495,14 @@ mod tests {
use tedge_api::entity_store::InvalidExternalIdError;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_mqtt_ext::test_helpers::assert_messages_matching;
use tedge_mqtt_ext::Message;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::Topic;
use tedge_test_utils::fs::TempTedgeDir;
use tedge_utils::size_threshold::SizeThresholdExceededError;
use test_case::test_case;

use crate::config::C8yMapperConfig;
use crate::error::ConversionError;

use super::CumulocityConverter;

const OPERATIONS: &[&str] = &[
"c8y_DownloadConfigFile",
"c8y_LogfileRequest",
Expand Down Expand Up @@ -1653,37 +1652,52 @@ mod tests {
let tmp_dir = TempTedgeDir::new();
let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;

let in_topic = "te/device/child1///m/";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);

let expected_smart_rest_message = Message::new(
&Topic::new_unchecked("c8y/s/us"),
"101,test-device:device:child1,child1,thin-edge.io-child",
);
let expected_c8y_json_message = Message::new(
&Topic::new_unchecked("c8y/measurement/measurements/create"),
r#"{"externalSource":{"externalId":"test-device:device:child1","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00","type":"ThinEdgeMeasurement"}"#,
let in_message = Message::new(
&Topic::new_unchecked("te/device/child1///m/"),
json!({
"temp": 1,
"time": "2021-11-16T17:45:40.571760714+01:00"
})
.to_string(),
);

// Test the first output messages contains SmartREST and C8Y JSON.
let out_first_messages: Vec<_> = converter
.convert(&in_message)
.await
.into_iter()
.filter(|m| m.topic.name.starts_with("c8y"))
.collect();
assert_eq!(
out_first_messages,
vec![
expected_smart_rest_message,
expected_c8y_json_message.clone()
]
let messages = converter.convert(&in_message).await;

assert_messages_matching(
&messages,
[
(
"te/device/child1//",
json!({
"@type":"child-device",
"@id":"test-device:device:child1",
"name":"child1"
})
.into(),
),
(
"c8y/s/us",
"101,test-device:device:child1,child1,thin-edge.io-child".into(),
),
(
"c8y/measurement/measurements/create",
json!({
"externalSource":{
"externalId":"test-device:device:child1",
"type":"c8y_Serial"
},
"temp":{
"temp":{
"value":1.0
}
},
"time":"2021-11-16T17:45:40.571760714+01:00",
"type":"ThinEdgeMeasurement"
})
.into(),
),
],
);

// Test the second output messages doesn't contain SmartREST child device creation.
let out_second_messages = converter.convert(&in_message).await;
assert_eq!(out_second_messages, vec![expected_c8y_json_message]);
}

#[tokio::test]
Expand Down
149 changes: 118 additions & 31 deletions crates/extensions/tedge_mqtt_ext/src/test_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::fmt::Debug;

use crate::MqttMessage;
use assert_json_diff::assert_json_include;
use mqtt_channel::Message;
use mqtt_channel::Topic;
use std::fmt::Debug;
use tedge_actors::MessageReceiver;

use crate::MqttMessage;

pub async fn assert_received_contains_str<I, S>(
pub async fn assert_received_contains_str<'a, I>(
messages: &mut dyn MessageReceiver<MqttMessage>,
expected: I,
) where
I: IntoIterator<Item = (S, S)>,
S: AsRef<str> + Debug,
I: IntoIterator<Item = (&'a str, &'a str)>,
{
for expected_msg in expected.into_iter() {
let message = messages.recv().await;
Expand All @@ -21,21 +19,7 @@ pub async fn assert_received_contains_str<I, S>(
expected_msg
);
let message = message.unwrap();
let expected_topic = expected_msg.0.as_ref();
let expected_payload = expected_msg.1.as_ref();
assert_eq!(
message.topic,
Topic::new_unchecked(expected_topic),
"\nReceived unexpected message: {:?}",
message
);
let payload = message.payload_str().expect("non UTF-8 payload");
assert!(
payload.contains(expected_payload),
"Payload assertion failed.\n Actual: {} \n Expected: {}",
payload,
expected_payload
)
assert_message_contains_str(&message, expected_msg);
}
}

Expand All @@ -48,14 +32,117 @@ pub async fn assert_received_includes_json<I, S>(
{
for expected_msg in expected.into_iter() {
let message = messages.recv().await.expect("MQTT channel closed");
assert_eq!(message.topic, Topic::new_unchecked(expected_msg.0.as_ref()));
let payload = serde_json::from_str::<serde_json::Value>(
message.payload_str().expect("non UTF-8 payload"),
)
.expect("non JSON payload");
assert_json_include!(
actual: payload,
expected: expected_msg.1
);
assert_message_includes_json(&message, expected_msg);
}
}

pub fn assert_message_contains_str(message: &Message, expected: (&str, &str)) {
let expected_topic = expected.0;
let expected_payload = expected.1;
assert_eq!(
message.topic,
Topic::new_unchecked(expected_topic),
"\nReceived unexpected message: {:?}",
message
);
let payload = message.payload_str().expect("non UTF-8 payload");
assert!(
payload.contains(expected_payload),
"Payload assertion failed.\n Actual: {} \n Expected: {}",
payload,
expected_payload
)
}

pub fn assert_message_includes_json<S>(message: &Message, expected: (S, serde_json::Value))
where
S: AsRef<str>,
{
assert_eq!(message.topic, Topic::new_unchecked(expected.0.as_ref()));
let payload = serde_json::from_str::<serde_json::Value>(
message.payload_str().expect("non UTF-8 payload"),
)
.expect("non JSON payload");
assert_json_include!(
actual: payload,
expected: expected.1
);
}

#[derive(Debug, PartialEq, Eq)]
pub enum MessagePayloadMatcher {
StringMessage(&'static str),
JsonMessage(serde_json::Value),
Empty,
Skip,
}

impl From<&'static str> for MessagePayloadMatcher {
fn from(value: &'static str) -> Self {
MessagePayloadMatcher::StringMessage(value)
}
}

impl From<serde_json::Value> for MessagePayloadMatcher {
fn from(value: serde_json::Value) -> Self {
MessagePayloadMatcher::JsonMessage(value)
}
}

pub fn assert_messages_matching<'a, M, I>(messages: M, expected: I)
where
M: IntoIterator<Item = &'a Message>,
I: IntoIterator<Item = (&'static str, MessagePayloadMatcher)>,
{
let mut messages_iter = messages.into_iter();
let mut expected_iter = expected.into_iter();
loop {
match (messages_iter.next(), expected_iter.next()) {
(Some(message), Some(expected_msg)) => {
let message_topic = &message.topic.name;
let expected_topic = expected_msg.0;
match expected_msg.1 {
MessagePayloadMatcher::StringMessage(str_payload) => {
assert_message_contains_str(message, (expected_topic, str_payload))
}
MessagePayloadMatcher::JsonMessage(json_payload) => {
assert_message_includes_json(message, (expected_topic, json_payload))
}
MessagePayloadMatcher::Empty => {
assert_eq!(
message_topic, expected_topic,
"Received message on topic: {} instead of {}",
message_topic, expected_topic
);
assert!(
message.payload_bytes().is_empty(),
"Received non-empty payload while expecting empty payload on {}",
message_topic
)
}
MessagePayloadMatcher::Skip => {
assert_eq!(
message_topic, expected_topic,
"Received message on topic: {} instead of {}",
message_topic, expected_topic
);
// Skipping payload validation
}
}
}
(None, Some(expected_msg)) => {
panic!(
"Input messages exhausted while expecting message on topic: {:?}",
expected_msg.0
)
}
(Some(message), None) => {
panic!(
"Additional message received than expected on topic: {:?}",
message.topic.name
)
}
_ => return,
}
}
}

1 comment on commit d98d5e2

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
317 2 3 319 99.37 1h7m55.901s

Failed Tests

Name Message ⏱️ Duration Suite
Publish measurements varying period benchmark.py run --count 1000 ... returned an unexpected exit code stdout: { "ok": false, "iterations": 5, "passed": 4, "failed": 1, "results": [ { "worker": 0, "messages": 1000, "dropped_percent": 0.0, "dropped_messages": 0, "total": 0.257939, "total_non_idle": 0.25794, "idle": 0.0, "parameters": { "count": 1000, "beats": 100, "beats_delay": 0, [ Message content over the limit has been removed. ] 2023-10-17 10:44:51,434 - root - WARNING - Burst time is exceeding the period. Skipping delay. diff=-0.000 2023-10-17 10:44:51,924 - root - INFO - Waiting for last message to be published 2023-10-17 10:44:53,941 - root - INFO - Stopping mqtt client 2023-10-17 10:44:54,018 - root - WARNING - Detected dropped messages 2023-10-17 10:44:55,024 - root - INFO - Starting benchmark: count=1000, beats=100, beats_delay=0ms, period=75ms 2023-10-17 10:44:55,070 - root - INFO - Subscribing to cloud topic 2023-10-17 10:44:55,986 - root - INFO - Waiting for last message to be published 2023-10-17 10:44:57,170 - root - INFO - Stopping mqtt client 2023-10-17 10:44:58,193 - root - INFO - Starting benchmark: count=1000, beats=100, beats_delay=0ms, period=100ms 2023-10-17 10:44:58,204 - root - INFO - Subscribing to cloud topic 2023-10-17 10:44:59,334 - root - INFO - Waiting for last message to be published 2023-10-17 10:44:59,947 - root - INFO - Stopping mqtt client 2023-10-17 10:45:00,952 - root - INFO - Finished benchmark Benchmark failed 53.712 s Benchmarks
Successful log operation Device is missing some fragments. wanted=['c8y_SupportedOperations'], got=['type', 'name', 'owner', 'c8y_SupportedLogs'] 38.092 s Log Operation Child

Please sign in to comment.