diff --git a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs index 3e4edd8ce6b..25c4b252e80 100644 --- a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs @@ -369,7 +369,7 @@ define_tedge_config! { /// Set of MQTT topics the Cumulocity mapper should subscribe to #[tedge_config(example = "te/+/+/+/+/a/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+")] - #[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))] + #[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/twin/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))] topics: TemplatesSet, enable: { diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index 84207c3edd4..c5bb68fde8d 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -454,6 +454,9 @@ pub enum TopicIdError { #[derive(Debug, Clone, PartialEq, Eq)] pub enum Channel { EntityMetadata, + EntityTwinData { + fragment_key: String, + }, Measurement { measurement_type: String, }, @@ -488,7 +491,9 @@ impl FromStr for Channel { fn from_str(channel: &str) -> Result { match channel.split('/').collect::>()[..] { [""] => Ok(Channel::EntityMetadata), - + ["twin", fragment_key] => Ok(Channel::EntityTwinData { + fragment_key: fragment_key.to_string(), + }), ["m", measurement_type] => Ok(Channel::Measurement { measurement_type: measurement_type.to_string(), }), @@ -528,6 +533,7 @@ impl Display for Channel { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { Channel::EntityMetadata => Ok(()), + Channel::EntityTwinData { fragment_key } => write!(f, "twin/{fragment_key}"), Channel::Measurement { measurement_type } => write!(f, "m/{measurement_type}"), Channel::MeasurementMetadata { measurement_type } => { diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index 3da9e99aed1..1994d9a4853 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -6,11 +6,14 @@ use camino::Utf8PathBuf; use std::net::IpAddr; use std::path::Path; use std::path::PathBuf; +use std::str::FromStr; use tedge_api::mqtt_topics::ChannelFilter::Command; use tedge_api::mqtt_topics::ChannelFilter::CommandMetadata; use tedge_api::mqtt_topics::EntityFilter::AnyEntity; +use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; +use tedge_api::mqtt_topics::TopicIdError; use tedge_api::path::DataDir; use tedge_api::topic::ResponseTopic; use tedge_config::ConfigNotSet; @@ -26,6 +29,7 @@ pub struct C8yMapperConfig { pub logs_path: Utf8PathBuf, pub data_dir: DataDir, pub device_id: String, + pub device_topic_id: EntityTopicId, pub device_type: String, pub service_type: String, pub ops_dir: PathBuf, @@ -44,6 +48,7 @@ impl C8yMapperConfig { logs_path: Utf8PathBuf, data_dir: DataDir, device_id: String, + device_topic_id: EntityTopicId, device_type: String, service_type: String, c8y_host: String, @@ -60,6 +65,7 @@ impl C8yMapperConfig { logs_path, data_dir, device_id, + device_topic_id, device_type, service_type, ops_dir, @@ -82,6 +88,7 @@ impl C8yMapperConfig { let data_dir: DataDir = tedge_config.data.path.clone().into(); let device_id = tedge_config.device.id.try_read(tedge_config)?.to_string(); let device_type = tedge_config.device.ty.clone(); + let device_topic_id = EntityTopicId::from_str(&tedge_config.mqtt.device_topic_id)?; let service_type = tedge_config.service.ty.clone(); let c8y_host = tedge_config.c8y.http.or_config_not_set()?.to_string(); let tedge_http_address = tedge_config.http.bind.address; @@ -129,6 +136,7 @@ impl C8yMapperConfig { logs_path, data_dir, device_id, + device_topic_id, device_type, service_type, c8y_host, @@ -166,6 +174,7 @@ impl C8yMapperConfig { pub fn default_external_topic_filter() -> TopicFilter { vec![ "te/+/+/+/+", + "te/+/+/+/+/twin/+", "te/+/+/+/+/m/+", "te/+/+/+/+/e/+", "te/+/+/+/+/a/+", @@ -186,6 +195,9 @@ pub enum C8yMapperConfigBuildError { #[error(transparent)] FromC8yMapperConfigError(#[from] C8yMapperConfigError), + + #[error(transparent)] + FromTopicIdError(#[from] TopicIdError), } #[derive(thiserror::Error, Debug)] diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 7737168826b..af9ac6c0e33 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -2,8 +2,6 @@ use super::alarm_converter::AlarmConverter; use super::config::C8yMapperConfig; use super::config::MQTT_MESSAGE_SIZE_THRESHOLD; use super::error::CumulocityMapperError; -use super::fragments::C8yAgentFragment; -use super::fragments::C8yDeviceDataFragment; use super::service_monitor; use crate::actor::CmdId; use crate::actor::IdDownloadRequest; @@ -55,8 +53,6 @@ use service_monitor::convert_health_status_message; use std::collections::HashMap; use std::collections::HashSet; use std::fs; -use std::fs::File; -use std::io::Read; use std::path::Path; use std::path::PathBuf; use tedge_actors::LoggingSender; @@ -95,14 +91,11 @@ use thiserror::Error; use time::format_description::well_known::Rfc3339; use tokio::time::Duration; use tracing::debug; -use tracing::info; use tracing::log::error; use tracing::trace; const C8Y_CLOUD: &str = "c8y"; -const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "device/inventory.json"; const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations"; -const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update"; const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create"; const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent"; @@ -171,7 +164,8 @@ pub struct CumulocityConverter { pub config: C8yMapperConfig, pub(crate) mapper_config: MapperConfig, pub device_name: String, - device_type: String, + pub(crate) device_topic_id: EntityTopicId, + pub(crate) device_type: String, alarm_converter: AlarmConverter, pub operations: Operations, operation_logs: OperationLogs, @@ -187,6 +181,7 @@ pub struct CumulocityConverter { pub auth_proxy: ProxyUrlGenerator, pub downloader_sender: LoggingSender, pub pending_operations: HashMap, + pub inventory_model: Value, // Holds a live view of aggregated inventory, derived from various twin data } impl CumulocityConverter { @@ -198,6 +193,7 @@ impl CumulocityConverter { downloader_sender: LoggingSender, ) -> Result { let device_id = config.device_id.clone(); + let device_topic_id = config.device_topic_id.clone(); let device_type = config.device_type.clone(); let service_type = config.service_type.clone(); let c8y_host = config.c8y_host.clone(); @@ -230,11 +226,17 @@ impl CumulocityConverter { ) .unwrap(); + let inventory_model = json!({ + "name": device_id.clone(), + "type": device_type.clone(), + }); + Ok(CumulocityConverter { size_threshold, config, mapper_config, device_name: device_id, + device_topic_id, device_type, alarm_converter, operations, @@ -251,6 +253,7 @@ impl CumulocityConverter { auth_proxy, downloader_sender, pending_operations: HashMap::new(), + inventory_model, }) } @@ -893,6 +896,10 @@ impl CumulocityConverter { } let mut messages = match &channel { + Channel::EntityTwinData { fragment_key } => { + self.try_convert_entity_twin_data(&source, message, fragment_key)? + } + Channel::Measurement { measurement_type } => { self.try_convert_measurement(&source, message, measurement_type)? } @@ -1060,31 +1067,24 @@ impl CumulocityConverter { } fn try_init_messages(&mut self) -> Result, ConversionError> { - let inventory_fragments_message = self.wrap_error(create_inventory_fragments_message( - &self.device_name, - &self.cfg_dir, - )); + let mut messages = self.parse_base_inventory_file()?; - let supported_operations_message = self.wrap_error( - self.create_supported_operations(&self.cfg_dir.join("operations").join("c8y")), - ); + let supported_operations_message = + self.create_supported_operations(&self.cfg_dir.join("operations").join("c8y"))?; - let cloud_child_devices_message = create_request_for_cloud_child_devices(); + let device_data_message = self.inventory_device_type_update_message()?; - let device_data_message = self.wrap_error(create_device_data_fragments( - &self.device_name, - &self.device_type, - )); + let pending_operations_message = create_get_pending_operations_message()?; - let pending_operations_message = self.wrap_error(create_get_pending_operations_message()); + let cloud_child_devices_message = create_request_for_cloud_child_devices(); - Ok(vec![ - inventory_fragments_message, + messages.append(&mut vec![ supported_operations_message, device_data_message, pending_operations_message, cloud_child_devices_message, - ]) + ]); + Ok(messages) } fn create_supported_operations(&self, path: &Path) -> Result { @@ -1159,17 +1159,6 @@ fn get_child_id(dir_path: &PathBuf) -> Result { } } -fn create_device_data_fragments( - device_name: &str, - device_type: &str, -) -> Result { - let device_data = C8yDeviceDataFragment::from_type(device_type)?; - let ops_msg = device_data.to_json()?; - - let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}",)); - Ok(Message::new(&topic, ops_msg.to_string())) -} - fn create_get_software_list_message() -> Result { let request = SoftwareListRequest::default(); let topic = Topic::new(RequestTopic::SoftwareListRequest.as_str())?; @@ -1202,17 +1191,6 @@ fn create_request_for_cloud_child_devices() -> Message { Message::new(&Topic::new_unchecked("c8y/s/us"), "105") } -fn create_inventory_fragments_message( - device_name: &str, - cfg_dir: &Path, -) -> Result { - let inventory_file_path = format!("{}/{INVENTORY_FRAGMENTS_FILE_LOCATION}", cfg_dir.display()); - let ops_msg = get_inventory_fragments(&inventory_file_path)?; - - let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}")); - Ok(Message::new(&topic, ops_msg.to_string())) -} - impl CumulocityConverter { async fn register_restart_operation( &self, @@ -1395,48 +1373,6 @@ pub fn get_local_child_devices_list( .collect::>()) } -/// reads a json file to serde_json::Value -fn read_json_from_file(file_path: &str) -> Result { - let mut file = File::open(Path::new(file_path))?; - let mut data = String::new(); - file.read_to_string(&mut data)?; - let json: serde_json::Value = serde_json::from_str(&data)?; - info!("Read the fragments from {file_path} file"); - Ok(json) -} - -/// gets a serde_json::Value of inventory -fn get_inventory_fragments( - inventory_file_path: &str, -) -> Result { - let agent_fragment = C8yAgentFragment::new()?; - let json_fragment = agent_fragment.to_json()?; - - match read_json_from_file(inventory_file_path) { - Ok(mut json) => { - json.as_object_mut() - .ok_or(ConversionError::FromOptionError)? - .insert( - "c8y_Agent".to_string(), - json_fragment - .get("c8y_Agent") - .ok_or(ConversionError::FromOptionError)? - .to_owned(), - ); - Ok(json) - } - Err(ConversionError::FromStdIo(_)) => { - info!("Could not read inventory fragments from file {inventory_file_path}"); - Ok(json_fragment) - } - Err(ConversionError::FromSerdeJson(e)) => { - info!("Could not parse the {inventory_file_path} file due to: {e}"); - Ok(json_fragment) - } - Err(_) => Ok(json_fragment), - } -} - async fn create_tedge_agent_supported_ops(ops_dir: &Path) -> Result<(), ConversionError> { create_file_with_defaults(ops_dir.join("c8y_SoftwareUpdate"), None)?; @@ -1451,7 +1387,7 @@ pub struct HealthStatus { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::CumulocityConverter; use crate::actor::IdDownloadRequest; use crate::actor::IdDownloadResult; @@ -2458,7 +2394,7 @@ mod tests { assert!(!second_registration_message_mapped); } - async fn create_c8y_converter( + pub(crate) async fn create_c8y_converter( tmp_dir: &TempTedgeDir, ) -> ( CumulocityConverter, @@ -2468,6 +2404,7 @@ mod tests { tmp_dir.dir("tedge").dir("agent"); let device_id = "test-device".into(); + let device_topic_id = EntityTopicId::default_main_device(); let device_type = "test-device-type".into(); let service_type = "service".into(); let c8y_host = "test.c8y.io".into(); @@ -2485,6 +2422,7 @@ mod tests { tmp_dir.utf8_path_buf(), tmp_dir.utf8_path_buf().into(), device_id, + device_topic_id, device_type, service_type, c8y_host, diff --git a/crates/extensions/c8y_mapper_ext/src/inventory.rs b/crates/extensions/c8y_mapper_ext/src/inventory.rs new file mode 100644 index 00000000000..7f2c8c96226 --- /dev/null +++ b/crates/extensions/c8y_mapper_ext/src/inventory.rs @@ -0,0 +1,382 @@ +//! This module provides converter functions to update Cumulocity inventory with entity twin data. +use crate::converter::CumulocityConverter; +use crate::error::ConversionError; +use crate::fragments::C8yAgentFragment; +use crate::fragments::C8yDeviceDataFragment; +use serde_json::json; +use serde_json::Value as JsonValue; +use std::fs::File; +use std::io::Read; +use std::path::Path; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_mqtt_ext::Message; +use tedge_mqtt_ext::Topic; +use tracing::info; +use tracing::warn; + +const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "device/inventory.json"; +const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update"; + +impl CumulocityConverter { + /// Update the live `inventory_model` of this converter with the provided fragment data + fn update_inventory_model(&mut self, key: String, value: JsonValue) { + if let JsonValue::Object(map) = &mut self.inventory_model { + map.insert(key, value); + } else { + panic!("This can't happen as the inventory_model is always a map"); + } + } + + /// Creates the inventory update message with fragments from inventory.json file + /// while also updating the live `inventory_model` of this converter + pub(crate) fn parse_base_inventory_file(&mut self) -> Result, ConversionError> { + let mut messages = vec![]; + let inventory_file_path = self.cfg_dir.join(INVENTORY_FRAGMENTS_FILE_LOCATION); + let mut inventory_base = Self::get_inventory_fragments(&inventory_file_path)?; + + if let Some(map) = inventory_base.as_object_mut() { + if map.remove("name").is_some() { + warn!("Ignoring `name` fragment key from inventory.json as updating the same using this file is not supported"); + } + if map.remove("type").is_some() { + warn!("Ignoring `type` fragment key from inventory.json as updating the same using this file is not supported"); + } + } + + let message = + self.inventory_update_message(&self.device_topic_id, inventory_base.clone())?; + messages.push(message); + + if let JsonValue::Object(map) = inventory_base { + for (key, value) in map { + self.update_inventory_model(key.clone(), value.clone()); + let mapped_message = self.register_twin_data_message( + &EntityTopicId::default_main_device(), + key.clone(), + value.clone(), + ); + messages.push(mapped_message); + } + } + + Ok(messages) + } + + /// Create an entity twin data message with the provided fragment + fn register_twin_data_message( + &self, + entity: &EntityTopicId, + fragment_key: String, + fragment_value: JsonValue, + ) -> Message { + let twin_channel = Channel::EntityTwinData { fragment_key }; + let topic = self.mqtt_schema.topic_for(entity, &twin_channel); + Message::new(&topic, fragment_value.to_string()).with_retain() + } + + /// Convert a twin metadata message into Cumulocity inventory update messages. + /// Updating the `name` and `type` fragments are not supported. + /// Empty payload is mapped to a clear inventory fragment message in Cumulocity. + pub(crate) fn try_convert_entity_twin_data( + &mut self, + source: &EntityTopicId, + message: &Message, + fragment_key: &str, + ) -> Result, ConversionError> { + if fragment_key == "name" || fragment_key == "type" { + warn!("Updating the entity `name` and `type` fields via the twin/ topic channel is not supported"); + return Ok(vec![]); + } + + let payload = if message.payload_bytes().is_empty() { + JsonValue::Null + } else { + let payload = serde_json::from_slice::(message.payload_bytes())?; + let existing = self.inventory_model.get(fragment_key); + if existing.is_some_and(|v| payload.eq(v)) { + return Ok(vec![]); + } + + self.update_inventory_model(fragment_key.into(), payload.clone()); + payload + }; + + let mapped_json = json!({ fragment_key: payload }); + let mapped_message = self.inventory_update_message(source, mapped_json)?; + Ok(vec![mapped_message]) + } + + /// Create a Cumulocity inventory update message from a JSON fragment + fn inventory_update_message( + &self, + source: &EntityTopicId, + fragment_value: JsonValue, + ) -> Result { + let entity_external_id = self.entity_store.try_get(source)?.external_id.as_ref(); + let inventory_update_topic = Topic::new_unchecked(&format!( + "{INVENTORY_MANAGED_OBJECTS_TOPIC}/{entity_external_id}" + )); + + Ok(Message::new( + &inventory_update_topic, + fragment_value.to_string(), + )) + } + + /// Create the inventory update message to update the `type` of the main device + pub(crate) fn inventory_device_type_update_message(&self) -> Result { + let device_data = C8yDeviceDataFragment::from_type(&self.device_type)?; + let device_type_fragment = device_data.to_json()?; + + self.inventory_update_message(&self.device_topic_id, device_type_fragment) + } + + /// Return the contents of inventory.json file as a `JsonValue` + fn get_inventory_fragments(inventory_file_path: &Path) -> Result { + let agent_fragment = C8yAgentFragment::new()?; + let json_fragment = agent_fragment.to_json()?; + + match Self::read_json_from_file(inventory_file_path) { + Ok(mut json) => { + json.as_object_mut() + .ok_or(ConversionError::FromOptionError)? + .insert( + "c8y_Agent".to_string(), + json_fragment + .get("c8y_Agent") + .ok_or(ConversionError::FromOptionError)? + .to_owned(), + ); + Ok(json) + } + Err(ConversionError::FromStdIo(_)) => { + info!("Could not read inventory fragments from file {inventory_file_path:?}"); + Ok(json_fragment) + } + Err(ConversionError::FromSerdeJson(e)) => { + info!("Could not parse the {inventory_file_path:?} file due to: {e}"); + Ok(json_fragment) + } + Err(_) => Ok(json_fragment), + } + } + + /// reads a json file to serde_json::Value + fn read_json_from_file(file_path: &Path) -> Result { + let mut file = File::open(Path::new(file_path))?; + let mut data = String::new(); + file.read_to_string(&mut data)?; + let json: serde_json::Value = serde_json::from_str(&data)?; + info!("Read the fragments from {file_path:?} file"); + Ok(json) + } +} + +#[cfg(test)] +mod tests { + use crate::converter::tests::create_c8y_converter; + use serde_json::json; + use tedge_mqtt_ext::test_helpers::assert_messages_matching; + use tedge_mqtt_ext::Message; + use tedge_mqtt_ext::Topic; + use tedge_test_utils::fs::TempTedgeDir; + + #[tokio::test] + async fn convert_entity_twin_data_json_object() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_topic = "te/device/main///twin/device_os"; + let twin_payload = json!({ + "family": "Debian", + "version": "11" + }); + let twin_message = + Message::new(&Topic::new_unchecked(twin_topic), twin_payload.to_string()); + let inventory_messages = converter.convert(&twin_message).await; + + assert_messages_matching( + &inventory_messages, + [( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "device_os": { + "family": "Debian", + "version": "11" + } + }) + .into(), + )], + ); + } + + #[tokio::test] + async fn convert_entity_twin_data_string_value() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_message = Message::new( + &Topic::new_unchecked("te/device/main///twin/foo"), + r#""bar""#, // String values must be quoted to be valid JSON string values + ); + let inventory_messages = converter.convert(&twin_message).await; + + assert_messages_matching( + &inventory_messages, + [( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "foo": "bar" + }) + .into(), + )], + ); + } + + #[tokio::test] + async fn unquoted_string_value_invalid() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_message = Message::new( + &Topic::new_unchecked("te/device/main///twin/foo"), + "unquoted value", + ); + let messages = converter.convert(&twin_message).await; + assert_messages_matching(&messages, [("tedge/errors", "expected value".into())]) + } + + #[tokio::test] + async fn convert_entity_twin_data_numeric_value() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_message = Message::new( + &Topic::new_unchecked("te/device/main///twin/foo"), + r#"5.6789"#, + ); + let inventory_messages = converter.convert(&twin_message).await; + + assert_messages_matching( + &inventory_messages, + [( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "foo": 5.6789 + }) + .into(), + )], + ); + } + + #[tokio::test] + async fn convert_entity_twin_data_boolean_value() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_message = Message::new( + &Topic::new_unchecked("te/device/main///twin/enabled"), + r#"false"#, + ); + let inventory_messages = converter.convert(&twin_message).await; + + assert_messages_matching( + &inventory_messages, + [( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "enabled": false + }) + .into(), + )], + ); + } + + #[tokio::test] + async fn forbidden_fragment_keys() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_message = Message::new( + &Topic::new_unchecked("te/device/main///twin/name"), + r#"New Name"#, + ); + let inventory_messages = converter.convert(&twin_message).await; + println!("{:?}", inventory_messages); + assert!( + inventory_messages.is_empty(), + "Expected no converted messages, but received {:?}", + &inventory_messages + ); + } + + #[tokio::test] + async fn clear_inventory_fragment() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_message = Message::new(&Topic::new_unchecked("te/device/main///twin/foo"), ""); + let inventory_messages = converter.convert(&twin_message).await; + + assert_messages_matching( + &inventory_messages, + [( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "foo": null + }) + .into(), + )], + ); + } + + #[tokio::test] + async fn convert_entity_twin_data_ignores_duplicate_fragment() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_topic = "te/device/main///twin/device_os"; + let twin_payload = json!({ + "family": "Debian", + "version": "11" + }); + let twin_message = + Message::new(&Topic::new_unchecked(twin_topic), twin_payload.to_string()); + let inventory_messages = converter.convert(&twin_message).await; + + assert_messages_matching( + &inventory_messages, + [( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "device_os": { + "family": "Debian", + "version": "11" + } + }) + .into(), + )], + ); + + // Assert duplicated payload not converted + let inventory_messages = converter.convert(&twin_message).await; + assert!( + inventory_messages.is_empty(), + "Expected no converted messages, but received {:?}", + &inventory_messages + ); + + // Assert that the same payload with different key order is also ignored + let twin_message = Message::new( + &Topic::new_unchecked(twin_topic), + r#"{"version": "11","family": "Debian"}"#, + ); + let inventory_messages = converter.convert(&twin_message).await; + assert!( + inventory_messages.is_empty(), + "Expected no converted messages, but received {:?}", + &inventory_messages + ); + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/lib.rs b/crates/extensions/c8y_mapper_ext/src/lib.rs index 197ab904c8c..838e398838e 100644 --- a/crates/extensions/c8y_mapper_ext/src/lib.rs +++ b/crates/extensions/c8y_mapper_ext/src/lib.rs @@ -6,6 +6,7 @@ pub mod converter; pub mod dynamic_discovery; pub mod error; mod fragments; +mod inventory; pub mod json; mod log_upload; mod serializer; diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 8f62dc5375c..ca0bb9cbc70 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -27,6 +27,7 @@ use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; use tedge_actors::WrappedInput; use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; use tedge_api::SoftwareUpdateResponse; @@ -66,6 +67,15 @@ async fn mapper_publishes_init_messages_on_startup() { "c8y/inventory/managedObjects/update/test-device", default_fragment_content.as_str(), ), + ( + "te/device/main///twin/c8y_Agent", + &json!({ + "name": "thin-edge.io", + "url": "https://thin-edge.io", + "version": version + }) + .to_string(), + ), ("c8y/s/us", "114"), ( "c8y/inventory/managedObjects/update/test-device", @@ -1022,7 +1032,7 @@ async fn mapper_publishes_supported_operations() { let (mqtt, _http, _fs, _timer, _dl) = spawn_c8y_mapper_actor(&cfg_dir, false).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); - mqtt.skip(1).await; + mqtt.skip(2).await; // Expect smartrest message on `c8y/s/us` with expected payload "114,c8y_TestOp1,c8y_TestOp2" assert_received_contains_str(&mut mqtt, [("c8y/s/us", "114,c8y_TestOp1,c8y_TestOp2")]).await; @@ -1417,7 +1427,10 @@ async fn mapper_updating_the_inventory_fragments_from_file() { let cfg_dir = TempTedgeDir::new(); let version = env!("CARGO_PKG_VERSION"); - let custom_fragment_content = &json!({ + let custom_fragment_content = json!({ + "boolean_key": true, + "numeric_key": 10, + "string_key": "value", "c8y_Agent": { "name": "thin-edge.io", "url": "https://thin-edge.io", @@ -1428,19 +1441,95 @@ async fn mapper_updating_the_inventory_fragments_from_file() { "url": "31aab9856861b1a587e2094690c2f6e272712cb1", "version": "1.20140107-1" } - }) - .to_string(); - create_inventroy_json_file_with_content(&cfg_dir, custom_fragment_content); + }); + create_inventroy_json_file_with_content(&cfg_dir, &custom_fragment_content.to_string()); let (mqtt, _http, _fs, _timer, _dl) = spawn_c8y_mapper_actor(&cfg_dir, true).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); - assert_received_contains_str( + assert_received_includes_json( &mut mqtt, - [( - "c8y/inventory/managedObjects/update/test-device", - custom_fragment_content.as_str(), - )], + [ + ( + "c8y/inventory/managedObjects/update/test-device", + custom_fragment_content, + ), + ("te/device/main///twin/boolean_key", json!(true)), + ( + "te/device/main///twin/c8y_Agent", + json!({ + "name": "thin-edge.io", + "url": "https://thin-edge.io", + "version": version + }), + ), + ( + "te/device/main///twin/c8y_Firmware", + json!({ + "name": "raspberrypi-bootloader", + "url": "31aab9856861b1a587e2094690c2f6e272712cb1", + "version": "1.20140107-1" + }), + ), + ("te/device/main///twin/numeric_key", json!(10)), + ("te/device/main///twin/string_key", json!("value")), + ], + ) + .await; +} + +#[tokio::test] +async fn forbidden_keys_in_inventory_fragments_file_ignored() { + // The test Creates an inventory file in (Temp_base_Dir)/device/inventory.json + // The tedge-mapper parses the inventory fragment file and publishes on c8y/inventory/managedObjects/update/test-device + // Verify the fragment message that is published + let cfg_dir = TempTedgeDir::new(); + + let version = env!("CARGO_PKG_VERSION"); + let custom_fragment_content = json!({ + "name": "new-name", + "type": "new-name", + "c8y_Firmware": { + "name": "raspberrypi-bootloader", + "url": "31aab9856861b1a587e2094690c2f6e272712cb1", + "version": "1.20140107-1" + } + }); + create_inventroy_json_file_with_content(&cfg_dir, &custom_fragment_content.to_string()); + + let (mqtt, _http, _fs, _timer, _dl) = spawn_c8y_mapper_actor(&cfg_dir, true).await; + let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); + + assert_received_includes_json( + &mut mqtt, + [ + ( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "c8y_Firmware": { + "name": "raspberrypi-bootloader", + "url": "31aab9856861b1a587e2094690c2f6e272712cb1", + "version": "1.20140107-1" + } + }), + ), + ( + "te/device/main///twin/c8y_Agent", + json!({ + "name": "thin-edge.io", + "url": "https://thin-edge.io", + "version": version + }), + ), + ( + "te/device/main///twin/c8y_Firmware", + json!({ + "name": "raspberrypi-bootloader", + "url": "31aab9856861b1a587e2094690c2f6e272712cb1", + "version": "1.20140107-1" + }), + ), + ], ) .await; } @@ -2285,6 +2374,7 @@ pub(crate) async fn spawn_c8y_mapper_actor( } let device_name = "test-device".into(); + let device_topic_id = EntityTopicId::default_main_device(); let device_type = "test-device-type".into(); let service_type = "service".into(); let c8y_host = "test.c8y.io".into(); @@ -2307,6 +2397,7 @@ pub(crate) async fn spawn_c8y_mapper_actor( config_dir.utf8_path_buf(), config_dir.utf8_path_buf().into(), device_name, + device_topic_id, device_type, service_type, c8y_host, @@ -2356,6 +2447,7 @@ pub(crate) async fn skip_init_messages(mqtt: &mut impl MessageReceiver +## Twin + +The `twin` metadata is mapped to [inventory data](https://cumulocity.com/guides/concepts/domain-model/#inventory) in Cumulocity. + +#### Twin - Main device + +A device's digital twin model can be updated by publishing to a specific topic. + +The type part of the topic is used to group the data so it is easier for components to subscribe to relevant parts. + +
+ +**Thin Edge (input)** + +```text title="Topic (retain=true)" +te/device/main///twin/device_OS +``` + +```json5 title="Payload" +{ + "family": "Debian", + "version": "11" +} +``` + +
+ +
+ +**Cumulocity IoT (output)** + +```text title="Topic" +c8y/inventory/managedObjects/update/ +``` + +```json5 title="Payload" +{ + "device_OS": { + "family": "Debian", + "version": "11" + } +} +``` + +
+ + +#### Twin - Child Device + +
+ +**Thin Edge (input)** + +```text title="Topic (retain=true)" +te/device/child01///twin/device_OS +``` + +```json5 title="Payload" +{ + "family": "Debian", + "version": "11" +} +``` + +
+ +
+ +**Cumulocity IoT (output)** + +```text title="Topic" +c8y/inventory/managedObjects/update/:device:child01 +``` + +```json5 title="Payload" +{ + "device_OS": { + "family": "Debian", + "version": "11" + } +} +``` + +
+ +#### Twin - Service on Main Device + +
+ +**Thin Edge (input)** + +```text title="Topic (retain=true)" +te/device/main/service/tedge-agent/twin/runtime_stats +``` + +```json5 title="Payload" +{ + "memory": 3024, + "uptime": 86400 +} +``` + +
+ +
+ +**Cumulocity IoT (output)** + +```text title="Topic" +c8y/inventory/managedObjects/update/:device:main:service:tedge-agent +``` + +```json5 title="Payload" +{ + "runtime_stats": { + "memory": 3.3, + "uptime": 86400 + } +} +``` + +
+ + +#### Twin - Service on Child Device + +
+ +**Thin Edge (input)** + +```text title="Topic (retain=true)" +te/device/child01/service/tedge-agent/twin/runtime_stats +``` + +```json5 title="Payload" +{ + "memory": 3.3, + "uptime": 86400 +} +``` + +
+ +
+ +**Cumulocity IoT (output)** + +```text title="Topic" +c8y/inventory/managedObjects/update/:device:child01:service:tedge-agent +``` + +```json5 title="Payload" +{ + "runtime_stats": { + "memory": 3.3, + "uptime": 86400 + } +} +``` + +
+ + +### Twin data - Root fragments + +Data can be added on the root level of the twin by publishing the values directly to the topic with the key used as type. +The payload can be any valid JSON value other than a JSON object. +JSON objects must be published to their typed topics directly. + +
+ +**Thin Edge (input)** + +```text title="Topic (retain=true)" +te/device/main///twin/subtype +``` + +```json5 title="Payload" +"my-custom-type" +``` + +
+ +
+ +**Cumulocity IoT (output)** + +```text title="Topic" +c8y/inventory/managedObjects/update/ +``` + +```json5 title="Payload" +{ + "subtype": "my-custom-type" +} +``` + +
+ +:::warning +Updating the following properties via the `twin` channel is not supported + +* `name` +* `type` + +as they are included in the entity registration message and can only be updated with another registration message. +::: + + +### Twin - Deleting a fragment + +
+ +**Thin Edge (input)** + +```text title="Topic (retain=true)" +te/device/child01/service/tedge-agent/twin/runtime_stats +``` + +```json5 title="Payload" +<> +``` + +
+ +
+ +**Cumulocity IoT (output)** + +```text title="Topic" +c8y/inventory/managedObjects/update/:device:child01:service:tedge-agent +``` + +```json5 title="Payload" +{ + "runtime_stats": null +} +``` + +
+ +### Base inventory model + +The contents of `{tedge_config_dir}/device/inventory.json` are used to populate the initial inventory fragments +of the the main thin-edge device in Cumulocity. +For example, if the `inventory.json` contains the following fragments: + +```json title="inventory.json" +{ + "c8y_Firmware": { + "name": "raspberrypi-bootloader", + "version": "1.20140107-1", + "url": "31aab9856861b1a587e2094690c2f6e272712cb1" + }, + "c8y_Hardware": { + "model": "BCM2708", + "revision": "000e", + "serialNumber": "00000000e2f5ad4d" + } +} +``` + +It is mapped to the following Cumulocity message: + +```text title="Topic" +c8y/inventory/managedObjects/update +``` + +```json5 title="Payload" +{ + "c8y_Agent": { + "name": "thin-edge.io", + "url": "https://thin-edge.io", + "version": "x.x.x" + }, + "c8y_Firmware": { + "name": "raspberrypi-bootloader", + "version": "1.20140107-1", + "url": "31aab9856861b1a587e2094690c2f6e272712cb1" + }, + "c8y_Hardware": { + "model": "BCM2708", + "revision": "000e", + "serialNumber": "00000000e2f5ad4d" + } +} +``` + +Where the `c8y_Agent` fragment is auto-generated by thin-edge and appended to the contents of the file before it is published. + +The fragments in this file are also published to the `te/device/main///twin/` topics so that +the local twin metadata on the broker is also up-to-date and other components can also consume it. +For example, the above `inventory.json` would result in the following `twin` messages: + +```text title="Topic" +te/device/main///twin/c8y_Agent +``` + +```json5 title="Payload" +{ + "name": "thin-edge.io", + "url": "https://thin-edge.io", + "version": "x.x.x" +} +``` + +```text title="Topic" +te/device/main///twin/c8y_Firmware +``` + +```json5 title="Payload" +{ + "name": "raspberrypi-bootloader", + "version": "1.20140107-1", + "url": "31aab9856861b1a587e2094690c2f6e272712cb1" +} +``` + +```text title="Topic" +te/device/main///twin/c8y_Hardware +``` + +```json5 title="Payload" +{ + "model": "BCM2708", + "revision": "000e", + "serialNumber": "00000000e2f5ad4d" +} +``` + +:::warning +The following keys in the `inventory.json` file are also ignored: + +* `name` +* `type` + +as they are included in the entity registration message and can only be updated with another registration message. +::: + +### Updating entity type in inventory + +After updating the inventory with `inventory.json` file contents, +the device `type` of the main device, set using the `device.type` tedge config key, +is also updated in the inventory with the following message: + +```text title="Topic" +c8y/inventory/managedObjects/update +``` + +```json5 title="Payload" +{ + "type": "configured-device-type" +} +``` + + ## Operations/Commands Operations from Cumulocity are mapped to their equivalent commands in Thin Edge format. diff --git a/docs/src/references/mqtt-api.md b/docs/src/references/mqtt-api.md index 335232481e3..420ebcf9942 100644 --- a/docs/src/references/mqtt-api.md +++ b/docs/src/references/mqtt-api.md @@ -248,6 +248,7 @@ The following is an overview of the channel categories which are available. |e|Events| |a|Alarms| |cmd|Commands| +|twin|Entity twin metadata| ## Entity registration @@ -435,6 +436,7 @@ so that it can process them. |Measurements|`te//m/`| |Events|`te//e/`| |Alarms|`te//a/`| +|Twin|`te//twin/`| ### Examples: With default device/service topic semantics @@ -515,6 +517,23 @@ tedge mqtt pub -r te/device/main///m/battery_reading/meta '{ The metadata fields supported by each data type will be defined in detail later. +## Twin metadata + +The `twin` metadata type can be used to store additional information about entities (devices and services). +Such information could included: operation system name/version, communication statistics, device status, +or any other information that is not suited to be measurements, events or alarms. + +```text title="Topic (retain=true)" +te/device/main///twin/device_OS +``` + +```json5 title="Payload" +{ + "family": "Debian", + "version": "11" +} +``` + ## Commands The topic scheme for commands can be visualized using the diagram below. diff --git a/tests/RobotFramework/tests/cumulocity/telemetry/child_device_telemetry.robot b/tests/RobotFramework/tests/cumulocity/telemetry/child_device_telemetry.robot index 7e35d2c4d94..b8f7d7e9bb7 100644 --- a/tests/RobotFramework/tests/cumulocity/telemetry/child_device_telemetry.robot +++ b/tests/RobotFramework/tests/cumulocity/telemetry/child_device_telemetry.robot @@ -70,6 +70,30 @@ Child devices support sending inventory data via c8y topic Should Be Equal ${mo["custom"]["fragment"]} yes +Child devices support sending inventory data via tedge topic with type + Execute Command tedge mqtt pub "te/device/${CHILD_SN}///twin/device_OS" '{"family":"Debian","version":11,"complex":[1,"2",3],"object":{"foo":"bar"}}' + Cumulocity.Set Device ${CHILD_SN} + ${mo}= Device Should Have Fragments device_OS + Should Be Equal ${mo["device_OS"]["family"]} Debian + Should Be Equal As Integers ${mo["device_OS"]["version"]} 11 + + Should Be Equal As Integers ${mo["device_OS"]["complex"][0]} 1 + Should Be Equal As Strings ${mo["device_OS"]["complex"][1]} 2 + Should Be Equal As Integers ${mo["device_OS"]["complex"][2]} 3 + Should Be Equal ${mo["device_OS"]["object"]["foo"]} bar + + +Child devices supports sending inventory data via tedge topic to root fragments + Execute Command tedge mqtt pub "te/device/${CHILD_SN}///twin/subtype" '"LinuxDeviceA"' + Execute Command tedge mqtt pub "te/device/${CHILD_SN}///twin/type" '"ShouldBeIgnored"' + Execute Command tedge mqtt pub "te/device/${CHILD_SN}///twin/name" '"ShouldBeIgnored"' + Cumulocity.Set Device ${CHILD_SN} + ${mo}= Device Should Have Fragments subtype + Should Be Equal ${mo["subtype"]} LinuxDeviceA + Should Be Equal ${mo["type"]} thin-edge.io-child + Should Be Equal ${mo["name"]} ${CHILD_SN} + + Child device supports sending custom child device measurements directly to c8y Execute Command tedge mqtt pub "c8y/measurement/measurements/create" '{"time":"2023-03-20T08:03:56.940907Z","externalSource":{"externalId":"${CHILD_SN}","type":"c8y_Serial"},"environment":{"temperature":{"value":29.9,"unit":"°C"}},"type":"10min_average","meta":{"sensorLocation":"Brisbane, Australia"}}' Cumulocity.Set Device ${CHILD_SN} @@ -78,6 +102,24 @@ Child device supports sending custom child device measurements directly to c8y Should Be Equal ${measurements[0]["meta"]["sensorLocation"]} Brisbane, Australia Should Be Equal ${measurements[0]["type"]} 10min_average +Nested child devices support sending inventory data via tedge topic + ${nested_child}= Get Random Name + Execute Command tedge mqtt pub --retain 'te/device/${nested_child}//' '{"@type":"child-device","@parent":"device/${CHILD_SN}//","@id":"${nested_child}"}' + + Execute Command tedge mqtt pub "te/device/${nested_child}///twin/device_OS" '{"family":"Debian","version":11}' + Execute Command tedge mqtt pub "te/device/${nested_child}///twin/subtype" '"LinuxDeviceB"' + Execute Command tedge mqtt pub "te/device/${nested_child}///twin/type" '"ShouldBeIgnored"' + Execute Command tedge mqtt pub "te/device/${nested_child}///twin/name" '"ShouldBeIgnored"' + + Cumulocity.Set Device ${nested_child} + ${mo}= Device Should Have Fragments device_OS + Should Be Equal ${mo["device_OS"]["family"]} Debian + Should Be Equal As Integers ${mo["device_OS"]["version"]} 11 + ${mo}= Device Should Have Fragments subtype + Should Be Equal ${mo["subtype"]} LinuxDeviceB + Should Be Equal ${mo["type"]} thin-edge.io-child + Should Be Equal ${mo["name"]} ${nested_child} + *** Keywords *** diff --git a/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry.robot b/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry.robot index 3f336e52db0..4360e743d76 100644 --- a/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry.robot +++ b/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry.robot @@ -130,6 +130,29 @@ Thin-edge device support sending inventory data via c8y topic Should Be Equal ${mo["subType"]} customType +Thin-edge device support sending inventory data via tedge topic + Execute Command tedge mqtt pub "te/device/main///twin/device_OS" '{"family":"Debian","version":11,"complex":[1,"2",3],"object":{"foo":"bar"}}' + Cumulocity.Set Device ${DEVICE_SN} + ${mo}= Device Should Have Fragments device_OS + Should Be Equal ${mo["device_OS"]["family"]} Debian + Should Be Equal As Integers ${mo["device_OS"]["version"]} 11 + + Should Be Equal As Integers ${mo["device_OS"]["complex"][0]} 1 + Should Be Equal As Strings ${mo["device_OS"]["complex"][1]} 2 + Should Be Equal As Integers ${mo["device_OS"]["complex"][2]} 3 + Should Be Equal ${mo["device_OS"]["object"]["foo"]} bar + + +Thin-edge device supports sending inventory data via tedge topic to root fragments + Execute Command tedge mqtt pub "te/device/main///twin/subtype" '"LinuxDeviceA"' + Execute Command tedge mqtt pub "te/device/main///twin/type" '"ShouldBeIgnored"' + Execute Command tedge mqtt pub "te/device/main///twin/name" '"ShouldBeIgnored"' + Cumulocity.Set Device ${DEVICE_SN} + ${mo}= Device Should Have Fragments subtype + Should Be Equal ${mo["subtype"]} LinuxDeviceA + Should Be Equal ${mo["type"]} thin-edge.io + Should Be Equal ${mo["name"]} ${DEVICE_SN} + *** Keywords *** Custom Setup diff --git a/tests/RobotFramework/tests/tedge/call_tedge_config_list.robot b/tests/RobotFramework/tests/tedge/call_tedge_config_list.robot index 9235c20099a..ffe5a5fa372 100644 --- a/tests/RobotFramework/tests/tedge/call_tedge_config_list.robot +++ b/tests/RobotFramework/tests/tedge/call_tedge_config_list.robot @@ -105,7 +105,7 @@ set/unset c8y.topics ${unset} Execute Command tedge config list Should Contain ... ${unset} - ... c8y.topics=["te/+/+/+/+", "te/+/+/+/+/m/+", "te/+/+/+/+/e/+", "te/+/+/+/+/a/+", "te/+/+/+/+/status/health"] + ... c8y.topics=["te/+/+/+/+", "te/+/+/+/+/twin/+", "te/+/+/+/+/m/+", "te/+/+/+/+/e/+", "te/+/+/+/+/a/+", "te/+/+/+/+/status/health"] set/unset az.root_cert_path Execute Command sudo tedge config set az.root_cert_path /etc/ssl/certs1 # Changing az.root_cert_path