From d98d5e239629aa42d94e5c50984ea7dd90b590a0 Mon Sep 17 00:00:00 2001
From: Albin Suresh <albin.suresh@softwareag.com>
Date: Tue, 17 Oct 2023 15:52:12 +0530
Subject: [PATCH] MQTT test helpers to ease MqttMessage assertions (#2342)

* MQTT test helpers to ease MqttMessage assertions
---
 .../c8y_mapper_ext/src/converter.rs           |  80 ++++++----
 .../tedge_mqtt_ext/src/test_helpers.rs        | 149 ++++++++++++++----
 2 files changed, 165 insertions(+), 64 deletions(-)

diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs
index fed3a1a99e9..c265a9f7787 100644
--- a/crates/extensions/c8y_mapper_ext/src/converter.rs
+++ b/crates/extensions/c8y_mapper_ext/src/converter.rs
@@ -1465,8 +1465,11 @@ pub fn check_tedge_agent_status(message: &Message) -> Result<bool, ConversionErr
 
 #[cfg(test)]
 mod tests {
+    use super::CumulocityConverter;
     use crate::actor::IdDownloadRequest;
     use crate::actor::IdDownloadResult;
+    use crate::config::C8yMapperConfig;
+    use crate::error::ConversionError;
     use crate::Capabilities;
     use anyhow::Result;
     use assert_json_diff::assert_json_eq;
@@ -1492,6 +1495,7 @@ mod tests {
     use tedge_api::entity_store::InvalidExternalIdError;
     use tedge_api::mqtt_topics::EntityTopicId;
     use tedge_api::mqtt_topics::MqttSchema;
+    use tedge_mqtt_ext::test_helpers::assert_messages_matching;
     use tedge_mqtt_ext::Message;
     use tedge_mqtt_ext::MqttMessage;
     use tedge_mqtt_ext::Topic;
@@ -1499,11 +1503,6 @@ mod tests {
     use tedge_utils::size_threshold::SizeThresholdExceededError;
     use test_case::test_case;
 
-    use crate::config::C8yMapperConfig;
-    use crate::error::ConversionError;
-
-    use super::CumulocityConverter;
-
     const OPERATIONS: &[&str] = &[
         "c8y_DownloadConfigFile",
         "c8y_LogfileRequest",
@@ -1653,37 +1652,52 @@ mod tests {
         let tmp_dir = TempTedgeDir::new();
         let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await;
 
-        let in_topic = "te/device/child1///m/";
-        let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
-        let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);
-
-        let expected_smart_rest_message = Message::new(
-            &Topic::new_unchecked("c8y/s/us"),
-            "101,test-device:device:child1,child1,thin-edge.io-child",
-        );
-        let expected_c8y_json_message = Message::new(
-            &Topic::new_unchecked("c8y/measurement/measurements/create"),
-            r#"{"externalSource":{"externalId":"test-device:device:child1","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00","type":"ThinEdgeMeasurement"}"#,
+        let in_message = Message::new(
+            &Topic::new_unchecked("te/device/child1///m/"),
+            json!({
+                "temp": 1,
+                "time": "2021-11-16T17:45:40.571760714+01:00"
+            })
+            .to_string(),
         );
 
-        // Test the first output messages contains SmartREST and C8Y JSON.
-        let out_first_messages: Vec<_> = converter
-            .convert(&in_message)
-            .await
-            .into_iter()
-            .filter(|m| m.topic.name.starts_with("c8y"))
-            .collect();
-        assert_eq!(
-            out_first_messages,
-            vec![
-                expected_smart_rest_message,
-                expected_c8y_json_message.clone()
-            ]
+        let messages = converter.convert(&in_message).await;
+
+        assert_messages_matching(
+            &messages,
+            [
+                (
+                    "te/device/child1//",
+                    json!({
+                        "@type":"child-device",
+                        "@id":"test-device:device:child1",
+                        "name":"child1"
+                    })
+                    .into(),
+                ),
+                (
+                    "c8y/s/us",
+                    "101,test-device:device:child1,child1,thin-edge.io-child".into(),
+                ),
+                (
+                    "c8y/measurement/measurements/create",
+                    json!({
+                        "externalSource":{
+                            "externalId":"test-device:device:child1",
+                            "type":"c8y_Serial"
+                        },
+                        "temp":{
+                            "temp":{
+                                "value":1.0
+                            }
+                        },
+                        "time":"2021-11-16T17:45:40.571760714+01:00",
+                        "type":"ThinEdgeMeasurement"
+                    })
+                    .into(),
+                ),
+            ],
         );
-
-        // Test the second output messages doesn't contain SmartREST child device creation.
-        let out_second_messages = converter.convert(&in_message).await;
-        assert_eq!(out_second_messages, vec![expected_c8y_json_message]);
     }
 
     #[tokio::test]
diff --git a/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs b/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs
index c2e5f702a4a..86964eea8e7 100644
--- a/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs
+++ b/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs
@@ -1,17 +1,15 @@
-use std::fmt::Debug;
-
+use crate::MqttMessage;
 use assert_json_diff::assert_json_include;
+use mqtt_channel::Message;
 use mqtt_channel::Topic;
+use std::fmt::Debug;
 use tedge_actors::MessageReceiver;
 
-use crate::MqttMessage;
-
-pub async fn assert_received_contains_str<I, S>(
+pub async fn assert_received_contains_str<'a, I>(
     messages: &mut dyn MessageReceiver<MqttMessage>,
     expected: I,
 ) where
-    I: IntoIterator<Item = (S, S)>,
-    S: AsRef<str> + Debug,
+    I: IntoIterator<Item = (&'a str, &'a str)>,
 {
     for expected_msg in expected.into_iter() {
         let message = messages.recv().await;
@@ -21,21 +19,7 @@ pub async fn assert_received_contains_str<I, S>(
             expected_msg
         );
         let message = message.unwrap();
-        let expected_topic = expected_msg.0.as_ref();
-        let expected_payload = expected_msg.1.as_ref();
-        assert_eq!(
-            message.topic,
-            Topic::new_unchecked(expected_topic),
-            "\nReceived unexpected message: {:?}",
-            message
-        );
-        let payload = message.payload_str().expect("non UTF-8 payload");
-        assert!(
-            payload.contains(expected_payload),
-            "Payload assertion failed.\n Actual: {} \n Expected: {}",
-            payload,
-            expected_payload
-        )
+        assert_message_contains_str(&message, expected_msg);
     }
 }
 
@@ -48,14 +32,117 @@ pub async fn assert_received_includes_json<I, S>(
 {
     for expected_msg in expected.into_iter() {
         let message = messages.recv().await.expect("MQTT channel closed");
-        assert_eq!(message.topic, Topic::new_unchecked(expected_msg.0.as_ref()));
-        let payload = serde_json::from_str::<serde_json::Value>(
-            message.payload_str().expect("non UTF-8 payload"),
-        )
-        .expect("non JSON payload");
-        assert_json_include!(
-            actual: payload,
-            expected: expected_msg.1
-        );
+        assert_message_includes_json(&message, expected_msg);
+    }
+}
+
+pub fn assert_message_contains_str(message: &Message, expected: (&str, &str)) {
+    let expected_topic = expected.0;
+    let expected_payload = expected.1;
+    assert_eq!(
+        message.topic,
+        Topic::new_unchecked(expected_topic),
+        "\nReceived unexpected message: {:?}",
+        message
+    );
+    let payload = message.payload_str().expect("non UTF-8 payload");
+    assert!(
+        payload.contains(expected_payload),
+        "Payload assertion failed.\n Actual: {} \n Expected: {}",
+        payload,
+        expected_payload
+    )
+}
+
+pub fn assert_message_includes_json<S>(message: &Message, expected: (S, serde_json::Value))
+where
+    S: AsRef<str>,
+{
+    assert_eq!(message.topic, Topic::new_unchecked(expected.0.as_ref()));
+    let payload = serde_json::from_str::<serde_json::Value>(
+        message.payload_str().expect("non UTF-8 payload"),
+    )
+    .expect("non JSON payload");
+    assert_json_include!(
+        actual: payload,
+        expected: expected.1
+    );
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub enum MessagePayloadMatcher {
+    StringMessage(&'static str),
+    JsonMessage(serde_json::Value),
+    Empty,
+    Skip,
+}
+
+impl From<&'static str> for MessagePayloadMatcher {
+    fn from(value: &'static str) -> Self {
+        MessagePayloadMatcher::StringMessage(value)
+    }
+}
+
+impl From<serde_json::Value> for MessagePayloadMatcher {
+    fn from(value: serde_json::Value) -> Self {
+        MessagePayloadMatcher::JsonMessage(value)
+    }
+}
+
+pub fn assert_messages_matching<'a, M, I>(messages: M, expected: I)
+where
+    M: IntoIterator<Item = &'a Message>,
+    I: IntoIterator<Item = (&'static str, MessagePayloadMatcher)>,
+{
+    let mut messages_iter = messages.into_iter();
+    let mut expected_iter = expected.into_iter();
+    loop {
+        match (messages_iter.next(), expected_iter.next()) {
+            (Some(message), Some(expected_msg)) => {
+                let message_topic = &message.topic.name;
+                let expected_topic = expected_msg.0;
+                match expected_msg.1 {
+                    MessagePayloadMatcher::StringMessage(str_payload) => {
+                        assert_message_contains_str(message, (expected_topic, str_payload))
+                    }
+                    MessagePayloadMatcher::JsonMessage(json_payload) => {
+                        assert_message_includes_json(message, (expected_topic, json_payload))
+                    }
+                    MessagePayloadMatcher::Empty => {
+                        assert_eq!(
+                            message_topic, expected_topic,
+                            "Received message on topic: {} instead of {}",
+                            message_topic, expected_topic
+                        );
+                        assert!(
+                            message.payload_bytes().is_empty(),
+                            "Received non-empty payload while expecting empty payload on {}",
+                            message_topic
+                        )
+                    }
+                    MessagePayloadMatcher::Skip => {
+                        assert_eq!(
+                            message_topic, expected_topic,
+                            "Received message on topic: {} instead of {}",
+                            message_topic, expected_topic
+                        );
+                        // Skipping payload validation
+                    }
+                }
+            }
+            (None, Some(expected_msg)) => {
+                panic!(
+                    "Input messages exhausted while expecting message on topic: {:?}",
+                    expected_msg.0
+                )
+            }
+            (Some(message), None) => {
+                panic!(
+                    "Additional message received than expected on topic: {:?}",
+                    message.topic.name
+                )
+            }
+            _ => return,
+        }
     }
 }