diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index e0776444719..f55bbf92044 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -2,8 +2,6 @@ use super::alarm_converter::AlarmConverter; use super::config::C8yMapperConfig; use super::config::MQTT_MESSAGE_SIZE_THRESHOLD; use super::error::CumulocityMapperError; -use super::fragments::C8yAgentFragment; -use super::fragments::C8yDeviceDataFragment; use super::service_monitor; use crate::actor::CmdId; use crate::actor::IdDownloadRequest; @@ -56,8 +54,6 @@ use service_monitor::convert_health_status_message; use std::collections::HashMap; use std::collections::HashSet; use std::fs; -use std::fs::File; -use std::io::Read; use std::path::Path; use std::path::PathBuf; use tedge_actors::LoggingSender; @@ -96,13 +92,10 @@ use thiserror::Error; use time::format_description::well_known::Rfc3339; use tokio::time::Duration; use tracing::debug; -use tracing::info; use tracing::log::error; const C8Y_CLOUD: &str = "c8y"; -const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "device/inventory.json"; const SUPPORTED_OPERATIONS_DIRECTORY: &str = "operations"; -pub const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update"; const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create"; const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent"; @@ -172,7 +165,8 @@ pub struct CumulocityConverter { pub config: C8yMapperConfig, pub(crate) mapper_config: MapperConfig, pub device_name: String, - device_type: String, + pub(crate) device_type: String, + pub(crate) device_topic_id: EntityTopicId, alarm_converter: AlarmConverter, pub operations: Operations, operation_logs: OperationLogs, @@ -188,6 +182,7 @@ pub struct CumulocityConverter { pub auth_proxy: ProxyUrlGenerator, pub downloader_sender: LoggingSender, pub pending_operations: HashMap, + pub inventory_model: Value, } impl CumulocityConverter { @@ -230,12 +225,18 @@ impl CumulocityConverter { ) .unwrap(); + let inventory_model = json!({ + "name": device_id.clone(), + "type": device_type.clone(), + }); + Ok(CumulocityConverter { size_threshold, config, mapper_config, device_name: device_id, device_type, + device_topic_id: EntityTopicId::default_main_device(), alarm_converter, operations, operation_logs, @@ -251,6 +252,7 @@ impl CumulocityConverter { auth_proxy, downloader_sender, pending_operations: HashMap::new(), + inventory_model, }) } @@ -1072,31 +1074,26 @@ impl CumulocityConverter { } fn try_init_messages(&mut self) -> Result, ConversionError> { - let inventory_fragments_message = self.wrap_error(create_inventory_fragments_message( - &self.device_name, - &self.cfg_dir, - )); + let mut messages = vec![]; + + let mut inventory_fragments_message = self.create_base_inventory_message()?; + messages.append(&mut inventory_fragments_message); let supported_operations_message = self.wrap_error( self.create_supported_operations(&self.cfg_dir.join("operations").join("c8y")), ); + messages.push(supported_operations_message); - let cloud_child_devices_message = create_request_for_cloud_child_devices(); + let device_data_message = self.inventory_device_type_update_message()?; + messages.push(device_data_message); - let device_data_message = self.wrap_error(create_device_data_fragments( - &self.device_name, - &self.device_type, - )); + let pending_operations_message = create_get_pending_operations_message()?; + messages.push(pending_operations_message); - let pending_operations_message = self.wrap_error(create_get_pending_operations_message()); + let cloud_child_devices_message = create_request_for_cloud_child_devices(); + messages.push(cloud_child_devices_message); - Ok(vec![ - inventory_fragments_message, - supported_operations_message, - device_data_message, - pending_operations_message, - cloud_child_devices_message, - ]) + Ok(messages) } fn create_supported_operations(&self, path: &Path) -> Result { @@ -1171,17 +1168,6 @@ fn get_child_id(dir_path: &PathBuf) -> Result { } } -fn create_device_data_fragments( - device_name: &str, - device_type: &str, -) -> Result { - let device_data = C8yDeviceDataFragment::from_type(device_type)?; - let ops_msg = device_data.to_json()?; - - let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}",)); - Ok(Message::new(&topic, ops_msg.to_string())) -} - fn create_get_software_list_message() -> Result { let request = SoftwareListRequest::default(); let topic = Topic::new(RequestTopic::SoftwareListRequest.as_str())?; @@ -1227,17 +1213,6 @@ fn add_external_device_registration_message( false } -fn create_inventory_fragments_message( - device_name: &str, - cfg_dir: &Path, -) -> Result { - let inventory_file_path = format!("{}/{INVENTORY_FRAGMENTS_FILE_LOCATION}", cfg_dir.display()); - let ops_msg = get_inventory_fragments(&inventory_file_path)?; - - let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}")); - Ok(Message::new(&topic, ops_msg.to_string())) -} - impl CumulocityConverter { async fn register_restart_operation( &self, @@ -1404,48 +1379,6 @@ pub fn get_local_child_devices_list( .collect::>()) } -/// reads a json file to serde_json::Value -fn read_json_from_file(file_path: &str) -> Result { - let mut file = File::open(Path::new(file_path))?; - let mut data = String::new(); - file.read_to_string(&mut data)?; - let json: serde_json::Value = serde_json::from_str(&data)?; - info!("Read the fragments from {file_path} file"); - Ok(json) -} - -/// gets a serde_json::Value of inventory -fn get_inventory_fragments( - inventory_file_path: &str, -) -> Result { - let agent_fragment = C8yAgentFragment::new()?; - let json_fragment = agent_fragment.to_json()?; - - match read_json_from_file(inventory_file_path) { - Ok(mut json) => { - json.as_object_mut() - .ok_or(ConversionError::FromOptionError)? - .insert( - "c8y_Agent".to_string(), - json_fragment - .get("c8y_Agent") - .ok_or(ConversionError::FromOptionError)? - .to_owned(), - ); - Ok(json) - } - Err(ConversionError::FromStdIo(_)) => { - info!("Could not read inventory fragments from file {inventory_file_path}"); - Ok(json_fragment) - } - Err(ConversionError::FromSerdeJson(e)) => { - info!("Could not parse the {inventory_file_path} file due to: {e}"); - Ok(json_fragment) - } - Err(_) => Ok(json_fragment), - } -} - async fn create_tedge_agent_supported_ops(ops_dir: PathBuf) -> Result<(), ConversionError> { create_file_with_defaults(ops_dir.join("c8y_SoftwareUpdate"), None)?; diff --git a/crates/extensions/c8y_mapper_ext/src/inventory.rs b/crates/extensions/c8y_mapper_ext/src/inventory.rs index 8b572e91ce5..2edc29fa451 100644 --- a/crates/extensions/c8y_mapper_ext/src/inventory.rs +++ b/crates/extensions/c8y_mapper_ext/src/inventory.rs @@ -1,36 +1,156 @@ use crate::converter::CumulocityConverter; -use crate::converter::INVENTORY_MANAGED_OBJECTS_TOPIC; use crate::error::ConversionError; +use crate::fragments::C8yAgentFragment; +use crate::fragments::C8yDeviceDataFragment; use serde_json::json; use serde_json::Value as JsonValue; +use std::fs::File; +use std::io::Read; +use std::path::Path; +use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::EntityTopicId; use tedge_mqtt_ext::Message; use tedge_mqtt_ext::Topic; +use tracing::info; use tracing::warn; +const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "device/inventory.json"; +pub const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update"; + impl CumulocityConverter { - pub fn try_convert_entity_twin_data( + fn update_inventory_model(&mut self, key: String, value: JsonValue) { + if let JsonValue::Object(map) = &mut self.inventory_model { + map.insert(key, value); + } else { + panic!("This can't happen as the inventory_model is always a map"); + } + } + + pub(crate) fn create_base_inventory_message( + &mut self, + ) -> Result, ConversionError> { + let mut messages = vec![]; + let inventory_file_path = self.cfg_dir.join(INVENTORY_FRAGMENTS_FILE_LOCATION); + let inventory_base = Self::get_inventory_fragments(&inventory_file_path)?; + self.inventory_model = inventory_base.clone(); + + let message = + self.inventory_update_message(&self.device_topic_id, inventory_base.clone())?; + messages.push(message); + + if let JsonValue::Object(map) = inventory_base { + for (key, value) in map { + self.update_inventory_model(key.clone(), value.clone()); + let mapped_message = self.register_twin_data_message( + &EntityTopicId::default_main_device(), + key, + value, + ); + messages.push(mapped_message); + } + } + + Ok(messages) + } + + pub(crate) fn register_twin_data_message( + &self, + entity: &EntityTopicId, + fragment_key: String, + fragment_value: JsonValue, + ) -> Message { + let twin_channel = Channel::EntityTwinData { fragment_key }; + let topic = self.mqtt_schema.topic_for(entity, &twin_channel); + Message::new(&topic, fragment_value.to_string()).with_retain() + } + + pub(crate) fn try_convert_entity_twin_data( &mut self, source: &EntityTopicId, message: &Message, fragment_key: &str, ) -> Result, ConversionError> { - let target_entity = self.entity_store.try_get(source)?; - let entity_external_id = target_entity.external_id.as_ref(); - if fragment_key == "name" || fragment_key == "type" { warn!("Updating the entity `name` and `type` fields via the twin/ topic channel is not supported"); return Ok(vec![]); } let payload = serde_json::from_slice::(message.payload_bytes())?; + let existing = self.inventory_model.get(fragment_key); + if existing.is_some_and(|v| payload.eq(v)) { + return Ok(vec![]); + } + + self.update_inventory_model(fragment_key.into(), payload.clone()); let mapped_json = json!({ fragment_key: payload }); + let mapped_message = self.inventory_update_message(source, mapped_json)?; + Ok(vec![mapped_message]) + } - let topic = Topic::new_unchecked(&format!( + pub(crate) fn inventory_update_message( + &self, + source: &EntityTopicId, + fragment_value: JsonValue, + ) -> Result { + let entity_external_id = self.entity_store.try_get(source)?.external_id.as_ref(); + let inventory_update_topic = Topic::new_unchecked(&format!( "{INVENTORY_MANAGED_OBJECTS_TOPIC}/{entity_external_id}" )); - Ok(vec![Message::new(&topic, mapped_json.to_string())]) + + Ok(Message::new( + &inventory_update_topic, + fragment_value.to_string(), + )) + } + + pub(crate) fn inventory_device_type_update_message(&self) -> Result { + let device_data = C8yDeviceDataFragment::from_type(&self.device_type)?; + let device_type_fragment = device_data.to_json()?; + + self.inventory_update_message(&self.device_topic_id, device_type_fragment) + } + + /// gets a serde_json::Value of inventory + pub(crate) fn get_inventory_fragments( + inventory_file_path: &Path, + ) -> Result { + let agent_fragment = C8yAgentFragment::new()?; + let json_fragment = agent_fragment.to_json()?; + + match Self::read_json_from_file(inventory_file_path) { + Ok(mut json) => { + json.as_object_mut() + .ok_or(ConversionError::FromOptionError)? + .insert( + "c8y_Agent".to_string(), + json_fragment + .get("c8y_Agent") + .ok_or(ConversionError::FromOptionError)? + .to_owned(), + ); + Ok(json) + } + Err(ConversionError::FromStdIo(_)) => { + info!("Could not read inventory fragments from file {inventory_file_path:?}"); + Ok(json_fragment) + } + Err(ConversionError::FromSerdeJson(e)) => { + info!("Could not parse the {inventory_file_path:?} file due to: {e}"); + Ok(json_fragment) + } + Err(_) => Ok(json_fragment), + } + } + + /// reads a json file to serde_json::Value + fn read_json_from_file(file_path: &Path) -> Result { + let mut file = File::open(Path::new(file_path))?; + let mut data = String::new(); + file.read_to_string(&mut data)?; + let json: serde_json::Value = serde_json::from_str(&data)?; + info!("Read the fragments from {file_path:?} file"); + Ok(json) } } @@ -165,6 +285,59 @@ mod tests { ); let inventory_messages = converter.convert(&twin_message).await; println!("{:?}", inventory_messages); - assert!(inventory_messages.is_empty(), "No mapped messages expected"); + assert!( + inventory_messages.is_empty(), + "Expected no converted messages, but received {:?}", + &inventory_messages + ); + } + + #[tokio::test] + async fn convert_entity_twin_data_ignores_duplicate_fragment() { + let tmp_dir = TempTedgeDir::new(); + let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; + + let twin_topic = "te/device/main///twin/device_os"; + let twin_payload = json!({ + "family": "Debian", + "version": "11" + }); + let twin_message = + Message::new(&Topic::new_unchecked(twin_topic), twin_payload.to_string()); + let inventory_messages = converter.convert(&twin_message).await; + + assert_messages_matching( + &inventory_messages, + [( + "c8y/inventory/managedObjects/update/test-device", + json!({ + "device_os": { + "family": "Debian", + "version": "11" + } + }) + .into(), + )], + ); + + // Assert duplicated payload not converted + let inventory_messages = converter.convert(&twin_message).await; + assert!( + inventory_messages.is_empty(), + "Expected no converted messages, but received {:?}", + &inventory_messages + ); + + // Assert that the same payload with different key order is also ignored + let twin_message = Message::new( + &Topic::new_unchecked(twin_topic), + r#"{"version": "11","family": "Debian"}"#, + ); + let inventory_messages = converter.convert(&twin_message).await; + assert!( + inventory_messages.is_empty(), + "Expected no converted messages, but received {:?}", + &inventory_messages + ); } } diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index e5d9b7cfc23..4a79eb1a39d 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -66,6 +66,15 @@ async fn mapper_publishes_init_messages_on_startup() { "c8y/inventory/managedObjects/update/test-device", default_fragment_content.as_str(), ), + ( + "te/device/main///twin/c8y_Agent", + &json!({ + "name": "thin-edge.io", + "url": "https://thin-edge.io", + "version": version + }) + .to_string(), + ), ("c8y/s/us", "114"), ( "c8y/inventory/managedObjects/update/test-device", @@ -1022,7 +1031,7 @@ async fn mapper_publishes_supported_operations() { let (mqtt, _http, _fs, _timer, _dl) = spawn_c8y_mapper_actor(&cfg_dir, false).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); - mqtt.skip(1).await; + mqtt.skip(2).await; // Expect smartrest message on `c8y/s/us` with expected payload "114,c8y_TestOp1,c8y_TestOp2" assert_received_contains_str(&mut mqtt, [("c8y/s/us", "114,c8y_TestOp1,c8y_TestOp2")]).await; @@ -1416,7 +1425,10 @@ async fn mapper_updating_the_inventory_fragments_from_file() { let cfg_dir = TempTedgeDir::new(); let version = env!("CARGO_PKG_VERSION"); - let custom_fragment_content = &json!({ + let custom_fragment_content = json!({ + "boolean_key": true, + "numeric_key": 10, + "string_key": "value", "c8y_Agent": { "name": "thin-edge.io", "url": "https://thin-edge.io", @@ -1427,19 +1439,39 @@ async fn mapper_updating_the_inventory_fragments_from_file() { "url": "31aab9856861b1a587e2094690c2f6e272712cb1", "version": "1.20140107-1" } - }) - .to_string(); - create_inventroy_json_file_with_content(&cfg_dir, custom_fragment_content); + }); + create_inventroy_json_file_with_content(&cfg_dir, &custom_fragment_content.to_string()); let (mqtt, _http, _fs, _timer, _dl) = spawn_c8y_mapper_actor(&cfg_dir, true).await; let mut mqtt = mqtt.with_timeout(TEST_TIMEOUT_MS); - assert_received_contains_str( + assert_received_includes_json( &mut mqtt, - [( - "c8y/inventory/managedObjects/update/test-device", - custom_fragment_content.as_str(), - )], + [ + ( + "c8y/inventory/managedObjects/update/test-device", + custom_fragment_content, + ), + ("te/device/main///twin/boolean_key", json!(true)), + ( + "te/device/main///twin/c8y_Agent", + json!({ + "name": "thin-edge.io", + "url": "https://thin-edge.io", + "version": version + }), + ), + ( + "te/device/main///twin/c8y_Firmware", + json!({ + "name": "raspberrypi-bootloader", + "url": "31aab9856861b1a587e2094690c2f6e272712cb1", + "version": "1.20140107-1" + }), + ), + ("te/device/main///twin/numeric_key", json!(10)), + ("te/device/main///twin/string_key", json!("value")), + ], ) .await; } @@ -2355,6 +2387,7 @@ pub(crate) async fn skip_init_messages(mqtt: &mut impl MessageReceiver