From 5a14baac2f4bc1aa0ff6da30b923f6cfa33469ce Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Thu, 12 Oct 2023 19:15:04 +0000 Subject: [PATCH] C8y inventory updates via twin/ topic channel --- .../src/tedge_config_cli/tedge_config.rs | 2 +- crates/core/tedge_api/src/mqtt_topics.rs | 8 +- .../extensions/c8y_mapper_ext/src/config.rs | 1 + .../c8y_mapper_ext/src/converter.rs | 10 +- .../c8y_mapper_ext/src/inventory.rs | 150 ++++++++++++++++++ crates/extensions/c8y_mapper_ext/src/lib.rs | 1 + .../tedge_mqtt_ext/src/test_helpers.rs | 44 +++++ .../thin-edge_device_telemetry.robot | 4 +- 8 files changed, 213 insertions(+), 7 deletions(-) create mode 100644 crates/extensions/c8y_mapper_ext/src/inventory.rs diff --git a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs index 5b482397f26..875117e8a89 100644 --- a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs @@ -369,7 +369,7 @@ define_tedge_config! { /// Set of MQTT topics the Cumulocity mapper should subscribe to #[tedge_config(example = "te/+/+/+/+/a/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+")] - #[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,tedge/health/+,tedge/health/+/+"))] + #[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/twin/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,tedge/health/+,tedge/health/+/+"))] topics: TemplatesSet, enable: { diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index 12541fde39e..9110dda7dcc 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -339,6 +339,9 @@ pub enum TopicIdError { #[derive(Debug, Clone, PartialEq, Eq)] pub enum Channel { EntityMetadata, + EntityTwinData { + fragment_key: String, + }, Measurement { measurement_type: String, }, @@ -372,7 +375,9 @@ impl FromStr for Channel { fn from_str(channel: &str) -> Result { match channel.split('/').collect::>()[..] { [""] => Ok(Channel::EntityMetadata), - + ["twin", fragment_key] => Ok(Channel::EntityTwinData { + fragment_key: fragment_key.to_string(), + }), ["m", measurement_type] => Ok(Channel::Measurement { measurement_type: measurement_type.to_string(), }), @@ -411,6 +416,7 @@ impl Display for Channel { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { Channel::EntityMetadata => Ok(()), + Channel::EntityTwinData { fragment_key } => write!(f, "twin/{fragment_key}"), Channel::Measurement { measurement_type } => write!(f, "m/{measurement_type}"), Channel::MeasurementMetadata { measurement_type } => { diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index fbf94f5e537..95f08572d04 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -166,6 +166,7 @@ impl C8yMapperConfig { pub fn default_external_topic_filter() -> TopicFilter { vec![ "te/+/+/+/+", + "te/+/+/+/+/twin/+", "te/+/+/+/+/m/+", "te/+/+/+/+/e/+", "te/+/+/+/+/a/+", diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index fed3a1a99e9..75b01e39e86 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -102,7 +102,7 @@ use tracing::log::error; const C8Y_CLOUD: &str = "c8y"; const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "device/inventory.json"; const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations"; -const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update"; +pub const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update"; const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create"; const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent"; @@ -899,6 +899,10 @@ impl CumulocityConverter { } let mut messages = match &channel { + Channel::EntityTwinData { fragment_key } => { + self.try_convert_entity_twin_data(&source, message, fragment_key)? + } + Channel::Measurement { measurement_type } => { self.try_convert_measurement(&source, message, measurement_type)? } @@ -1464,7 +1468,7 @@ pub fn check_tedge_agent_status(message: &Message) -> Result ( CumulocityConverter, diff --git a/crates/extensions/c8y_mapper_ext/src/inventory.rs b/crates/extensions/c8y_mapper_ext/src/inventory.rs new file mode 100644 index 00000000000..2bc2efa8fb0 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/inventory.rs @@ -0,0 +1,150 @@ +use crate::converter::INVENTORY_MANAGED_OBJECTS_TOPIC; +use serde_json::json; +use serde_json::Value as JsonValue; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_mqtt_ext::Message; +use tedge_mqtt_ext::Topic; +use tracing::warn; + +use crate::{converter::CumulocityConverter, error::ConversionError}; + +impl CumulocityConverter { + pub fn try_convert_entity_twin_data( + &mut self, + source: &EntityTopicId, + message: &Message, + fragment_key: &str, + ) -> Result, ConversionError> { + let target_entity = self.entity_store.try_get(source)?; + let entity_external_id = target_entity.external_id.as_ref(); + let payload = serde_json::from_slice::(message.payload_bytes())?; + + let sanitized_payload = if let JsonValue::Object(mut properties) = payload { + if properties.contains_key("name") { + warn!( + "Updating the entity `name` field via the twin/ topic channel is not supported" + ); + properties.remove("name"); + } + if properties.contains_key("type") { + warn!( + "Updating the entity `type` field via the twin/ topic channel is not supported" + ); + properties.remove("name"); + } + JsonValue::Object(properties) + } else { + payload + }; + + let mapped_json = json!({ fragment_key: sanitized_payload }); + + let topic = Topic::new_unchecked(&format!( + "{INVENTORY_MANAGED_OBJECTS_TOPIC}/{entity_external_id}" + )); + Ok(vec![Message::new(&topic, mapped_json.to_string())]) + } +} + +#[cfg(test)] +mod tests { + use crate::converter::tests::create_c8y_converter; + use serde_json::json; + use tedge_mqtt_ext::test_helpers::assert_messages_includes_json; + use tedge_mqtt_ext::{Message, Topic}; + use tedge_test_utils::fs::TempTedgeDir; + + #[tokio::test] + async fn convert_entity_twin_data_json_object() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_topic = "te/device/main///twin/device_os"; + let twin_payload = json!({ + "family": "Debian", + "version": "11" + }); + let twin_message = + Message::new(&Topic::new_unchecked(twin_topic), twin_payload.to_string()); + let inventory_messages = converter.convert(&twin_message).await; + + assert_messages_includes_json( + inventory_messages, + [( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "device_os": { + "family": "Debian", + "version": "11" + } + }), + )], + ); + } + + #[tokio::test] + async fn convert_entity_twin_data_string_value() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_message = Message::new( + &Topic::new_unchecked("te/device/main///twin/foo"), + r#""bar""#, + ); + let inventory_messages = converter.convert(&twin_message).await; + + assert_messages_includes_json( + inventory_messages, + [( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "foo": "bar" + }), + )], + ); + } + + #[tokio::test] + async fn convert_entity_twin_data_numeric_value() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_message = Message::new( + &Topic::new_unchecked("te/device/main///twin/foo"), + r#"5.6789"#, + ); + let inventory_messages = converter.convert(&twin_message).await; + + assert_messages_includes_json( + inventory_messages, + [( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "foo": 5.6789 + }), + )], + ); + } + + #[tokio::test] + async fn convert_entity_twin_data_boolean_value() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_message = Message::new( + &Topic::new_unchecked("te/device/main///twin/enabled"), + r#"false"#, + ); + let inventory_messages = converter.convert(&twin_message).await; + + assert_messages_includes_json( + inventory_messages, + [( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "enabled": false + }), + )], + ); + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/lib.rs b/crates/extensions/c8y_mapper_ext/src/lib.rs index 83493fc484a..d26525f08de 100644 --- a/crates/extensions/c8y_mapper_ext/src/lib.rs +++ b/crates/extensions/c8y_mapper_ext/src/lib.rs @@ -6,6 +6,7 @@ pub mod converter; pub mod dynamic_discovery; pub mod error; mod fragments; +mod inventory; pub mod json; mod log_upload; mod serializer; diff --git a/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs b/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs index c2e5f702a4a..26e2facde6a 100644 --- a/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs +++ b/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs @@ -59,3 +59,47 @@ pub async fn assert_received_includes_json( ); } } + +pub fn assert_messages_contains_str(messages: M, expected: I) +where + M: IntoIterator, + I: IntoIterator, + S: AsRef + Debug, +{ + for (message, expected_msg) in messages.into_iter().zip(expected) { + let expected_topic = expected_msg.0.as_ref(); + let expected_payload = expected_msg.1.as_ref(); + assert_eq!( + message.topic, + Topic::new_unchecked(expected_topic), + "\nReceived unexpected message: {:?}", + message + ); + let payload = message.payload_str().expect("non UTF-8 payload"); + assert!( + payload.contains(expected_payload), + "Payload assertion failed.\n Actual: {} \n Expected: {}", + payload, + expected_payload + ) + } +} + +pub fn assert_messages_includes_json(messages: M, expected: I) +where + M: IntoIterator, + I: IntoIterator, + S: AsRef, +{ + for (message, expected_msg) in messages.into_iter().zip(expected) { + assert_eq!(message.topic, Topic::new_unchecked(expected_msg.0.as_ref())); + let payload = serde_json::from_str::( + message.payload_str().expect("non UTF-8 payload"), + ) + .expect("non JSON payload"); + assert_json_include!( + actual: payload, + expected: expected_msg.1 + ); + } +} diff --git a/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry.robot b/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry.robot index e218c67238c..d5deca03d47 100644 --- a/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry.robot +++ b/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry.robot @@ -144,8 +144,8 @@ Thin-edge device support sending inventory data via tedge topic Thin-edge device supports sending inventory data via tedge topic to root fragments - Execute Command tedge mqtt pub "te/device/main///twin/subtype" "LinuxDeviceA" - Execute Command tedge mqtt pub "te/device/main///twin/type" "ShouldBeIgnored" + Execute Command tedge mqtt pub "te/device/main///twin/subtype" "\"LinuxDeviceA\"" + Execute Command tedge mqtt pub "te/device/main///twin/type" "\"ShouldBeIgnored\"" Cumulocity.Set Device ${DEVICE_SN} ${mo}= Device Should Have Fragments subtype Should Be Equal ${mo["subtype"]} LinuxDeviceA