From 93c0cda4dc87cd9f7d714996e46b0a57e3fa3c70 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Thu, 14 Dec 2023 11:03:11 +0000 Subject: [PATCH] Make entity store persistent #2428 Persist the entity store as a JSON lines file. Every registration message and twin data message is persisted as JSON lines. On startup the in-memory entity store is rebuilt by replaying these messages. --- Cargo.lock | 2 + crates/common/mqtt_channel/Cargo.toml | 1 + crates/common/mqtt_channel/src/messages.rs | 95 +++- crates/core/c8y_api/src/json_c8y.rs | 15 +- crates/core/tedge_api/Cargo.toml | 1 + crates/core/tedge_api/src/entity_store.rs | 532 +++++++++++++++--- crates/core/tedge_api/src/lib.rs | 1 + crates/core/tedge_api/src/message_log.rs | 152 +++++ crates/extensions/c8y_mapper_ext/src/actor.rs | 2 + .../extensions/c8y_mapper_ext/src/config.rs | 4 + .../c8y_mapper_ext/src/converter.rs | 7 +- .../c8y_mapper_ext/src/inventory.rs | 13 +- .../c8y_mapper_ext/src/service_monitor.rs | 7 +- crates/extensions/c8y_mapper_ext/src/tests.rs | 1 + crates/extensions/tedge_mqtt_ext/src/lib.rs | 1 + .../registration/device_registration.robot | 42 +- 16 files changed, 789 insertions(+), 87 deletions(-) create mode 100644 crates/core/tedge_api/src/message_log.rs diff --git a/Cargo.lock b/Cargo.lock index ae3c0a0bd9d..2d5f27c707a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2193,6 +2193,7 @@ dependencies = [ "mqtt_tests", "rumqttc", "serde", + "serde_json", "serial_test", "thiserror", "tokio", @@ -3859,6 +3860,7 @@ dependencies = [ "serde_json", "shell-words", "tedge_utils", + "tempfile", "test-case", "thiserror", "time", diff --git a/crates/common/mqtt_channel/Cargo.toml b/crates/common/mqtt_channel/Cargo.toml index 3fee030dbf5..bf0be1f7dfe 100644 --- a/crates/common/mqtt_channel/Cargo.toml +++ b/crates/common/mqtt_channel/Cargo.toml @@ -23,4 +23,5 @@ zeroize = { workspace = true } [dev-dependencies] anyhow = { workspace = true } mqtt_tests = { workspace = true } +serde_json = { workspace = true } serial_test = { workspace = true } diff --git a/crates/common/mqtt_channel/src/messages.rs b/crates/common/mqtt_channel/src/messages.rs index b9dde62d7fa..fca31cd0acd 100644 --- a/crates/common/mqtt_channel/src/messages.rs +++ b/crates/common/mqtt_channel/src/messages.rs @@ -2,19 +2,44 @@ use crate::errors::MqttError; use crate::topics::Topic; use rumqttc::Publish; use rumqttc::QoS; +use serde::Deserialize; +use serde::Deserializer; +use serde::Serialize; +use serde::Serializer; use std::fmt::Debug; use std::fmt::Formatter; use std::fmt::Write; /// A message to be sent to or received from MQTT. -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct Message { pub topic: Topic, pub payload: DebugPayload, + #[serde(serialize_with = "serialize_qos", deserialize_with = "deserialize_qos")] pub qos: QoS, pub retain: bool, } +fn serialize_qos(qos: &QoS, serializer: S) -> Result +where + S: serde::Serializer, +{ + (*qos as u8).serialize(serializer) +} + +fn deserialize_qos<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + let value = u8::deserialize(deserializer)?; + match value { + 0 => Ok(QoS::AtMostOnce), + 1 => Ok(QoS::AtLeastOnce), + 2 => Ok(QoS::ExactlyOnce), + _ => Err(serde::de::Error::custom("Invalid QoS value")), + } +} + #[derive(Clone, Eq, PartialEq)] pub struct DebugPayload(Payload); @@ -43,6 +68,57 @@ impl AsRef for DebugPayload { } } +impl Serialize for DebugPayload { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match std::str::from_utf8(&self.0) { + Ok(payload_str) => { + // Serialize as a string if all characters are valid UTF-8 + serializer.serialize_str(payload_str) + } + Err(_) => { + // Serialize as a byte array otherwise + serializer.serialize_bytes(&self.0) + } + } + } +} + +impl<'de> Deserialize<'de> for DebugPayload { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct DebugPayloadVisitor; + + impl<'de> serde::de::Visitor<'de> for DebugPayloadVisitor { + type Value = DebugPayload; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a string or a sequence of bytes") + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + Ok(DebugPayload(value.as_bytes().to_vec())) + } + + fn visit_bytes(self, value: &[u8]) -> Result + where + E: serde::de::Error, + { + Ok(DebugPayload(value.to_vec())) + } + } + + deserializer.deserialize_any(DebugPayloadVisitor) + } +} + impl DebugPayload { /// The payload string (unless this payload is not UTF8) pub fn as_str(&self) -> Result<&str, MqttError> { @@ -138,6 +214,8 @@ where #[cfg(test)] mod tests { + use serde_json::json; + use super::*; #[test] @@ -188,4 +266,19 @@ mod tests { "Invalid UTF8 payload: invalid utf-8 sequence of 1 bytes from index 0: ..." ); } + + #[test] + fn message_serialize_deserialize() { + let message = Message { + topic: Topic::new("test").unwrap(), + payload: DebugPayload("test-payload".as_bytes().to_vec()), + qos: QoS::AtMostOnce, + retain: true, + }; + + let json = serde_json::to_value(&message).expect("Serialization failed"); + assert_eq!(json.get("payload").unwrap(), &json!("test-payload")); + let deserialized: Message = serde_json::from_value(json).expect("Deserialization failed"); + assert_eq!(deserialized, message); + } } diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index 8c748f7abdd..afa2f4cde59 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -404,6 +404,7 @@ mod tests { use tedge_api::event::ThinEdgeEventData; use tedge_api::messages::SoftwareListCommandPayload; use tedge_api::mqtt_topics::EntityTopicId; + use tedge_api::mqtt_topics::MqttSchema; use test_case::test_case; use time::macros::datetime; @@ -768,11 +769,16 @@ mod tests { ;"convert to clear alarm" )] fn check_alarm_translation(tedge_alarm: ThinEdgeAlarm, expected_c8y_alarm: C8yAlarm) { + let temp_dir = tempfile::tempdir().unwrap(); let main_device = EntityRegistrationMessage::main_device("test-main".into()); - let mut entity_store = EntityStore::with_main_device( + let mut entity_store = EntityStore::with_main_device_and_default_service_type( + MqttSchema::default(), main_device, + "service".into(), dummy_external_id_mapper, dummy_external_id_validator, + 5, + &temp_dir, ) .unwrap(); @@ -800,11 +806,16 @@ mod tests { }), }; + let temp_dir = tempfile::tempdir().unwrap(); let main_device = EntityRegistrationMessage::main_device("test-main".into()); - let entity_store = EntityStore::with_main_device( + let entity_store = EntityStore::with_main_device_and_default_service_type( + MqttSchema::default(), main_device, + "service".into(), dummy_external_id_mapper, dummy_external_id_validator, + 5, + &temp_dir, ) .unwrap(); diff --git a/crates/core/tedge_api/Cargo.toml b/crates/core/tedge_api/Cargo.toml index ab25085a6a9..5050fd87969 100644 --- a/crates/core/tedge_api/Cargo.toml +++ b/crates/core/tedge_api/Cargo.toml @@ -36,6 +36,7 @@ clock = { workspace = true } maplit = { workspace = true } mockall = { workspace = true } regex = { workspace = true } +tempfile = { workspace = true } test-case = { workspace = true } time = { workspace = true, features = ["macros"] } toml = { workspace = true } diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index 8de93ce2281..5ecc8e2d1ca 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -8,6 +8,8 @@ // TODO: move entity business logic to its own module use crate::entity_store; +use crate::message_log::MessageLogReader; +use crate::message_log::MessageLogWriter; use crate::mqtt_topics::Channel; use crate::mqtt_topics::EntityTopicId; use crate::mqtt_topics::MqttSchema; @@ -15,6 +17,9 @@ use crate::mqtt_topics::TopicIdError; use crate::pending_entity_store::PendingEntityData; use crate::pending_entity_store::PendingEntityStore; use log::debug; +use log::error; +use log::info; +use log::warn; use mqtt_channel::Message; use serde_json::json; use serde_json::Map; @@ -22,6 +27,7 @@ use serde_json::Value as JsonValue; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt::Display; +use std::path::Path; use thiserror::Error; /// Represents an "Entity topic identifier" portion of the MQTT topic @@ -114,6 +120,7 @@ type ExternalIdValidatorFn = /// /// ``` /// # use mqtt_channel::{Message, Topic}; +/// # use tedge_api::mqtt_topics::MqttSchema; /// # use tedge_api::entity_store::{EntityStore, EntityRegistrationMessage}; /// let mqtt_message = Message::new( /// &Topic::new("te/device/main//").unwrap(), @@ -121,13 +128,18 @@ type ExternalIdValidatorFn = /// ); /// let registration_message = EntityRegistrationMessage::try_from(&mqtt_message).unwrap(); /// -/// let mut entity_store = EntityStore::with_main_device( +/// let mut entity_store = EntityStore::with_main_device_and_default_service_type( +/// MqttSchema::default(), /// registration_message, +/// "service".into(), /// |tid, xid| tid.to_string().into(), /// |xid| Ok(xid.into()), +/// 5, +/// "/tmp" /// ); /// ``` pub struct EntityStore { + mqtt_schema: MqttSchema, main_device: EntityTopicId, entities: HashMap, entity_id_index: HashMap, @@ -136,52 +148,36 @@ pub struct EntityStore { // TODO: this is a c8y cloud specific concern and it'd be better to put it somewhere else. default_service_type: String, pending_entity_store: PendingEntityStore, + // The persistent message log to persist entity registrations and twin data messages + message_log: MessageLogWriter, } impl EntityStore { - /// Creates a new entity store with a given main device. - #[must_use] - pub fn with_main_device( - main_device: EntityRegistrationMessage, - external_id_mapper_fn: MF, - external_id_validator_fn: SF, - ) -> Option - where - MF: Fn(&EntityTopicId, &EntityExternalId) -> EntityExternalId, - MF: 'static + Send + Sync, - SF: Fn(&str) -> Result, - SF: 'static + Send + Sync, - { - Self::with_main_device_and_default_service_type( - MqttSchema::default(), - main_device, - "service".to_string(), - external_id_mapper_fn, - external_id_validator_fn, - 100, - ) - } - - #[must_use] - pub fn with_main_device_and_default_service_type( + pub fn with_main_device_and_default_service_type( mqtt_schema: MqttSchema, main_device: EntityRegistrationMessage, default_service_type: String, external_id_mapper_fn: MF, external_id_validator_fn: SF, telemetry_cache_size: usize, - ) -> Option + log_dir: P, + ) -> Result where MF: Fn(&EntityTopicId, &EntityExternalId) -> EntityExternalId, MF: 'static + Send + Sync, SF: Fn(&str) -> Result, SF: 'static + Send + Sync, + P: AsRef, { if main_device.r#type != EntityType::MainDevice { - return None; + return Err(InitError::Custom( + "Provided main device is not of type main-device".into(), + )); } - let entity_id: EntityExternalId = main_device.external_id?; + let entity_id: EntityExternalId = main_device.external_id.ok_or_else(|| { + InitError::Custom("External id for the main device not provided".into()) + })?; let metadata = EntityMetadata { topic_id: main_device.topic_id.clone(), external_id: entity_id.clone(), @@ -191,7 +187,10 @@ impl EntityStore { twin_data: Map::new(), }; - Some(EntityStore { + let message_log = MessageLogWriter::new(log_dir.as_ref())?; + + let mut entity_store = EntityStore { + mqtt_schema: mqtt_schema.clone(), main_device: main_device.topic_id.clone(), entities: HashMap::from([(main_device.topic_id.clone(), metadata)]), entity_id_index: HashMap::from([(entity_id, main_device.topic_id)]), @@ -199,7 +198,99 @@ impl EntityStore { external_id_validator_fn: Box::new(external_id_validator_fn), default_service_type, pending_entity_store: PendingEntityStore::new(mqtt_schema, telemetry_cache_size), - }) + message_log, + }; + + entity_store.load_from_message_log(log_dir.as_ref()); + + Ok(entity_store) + } + + pub fn load_from_message_log

(&mut self, log_dir: P) + where + P: AsRef, + { + info!("Loading the entity store from the log"); + match MessageLogReader::new(log_dir) { + Err(err) => { + error!( + "Failed to read the entity store log due to {err}. Ignoring and proceeding..." + ) + } + Ok(mut message_log_reader) => { + loop { + match message_log_reader.next_message() { + Err(err) => { + error!("Parsing log entry failed with {err}"); + continue; + } + Ok(None) => { + info!("Finished loading the entity store from the log"); + return; + } + Ok(Some(message)) => { + if let Ok((source, channel)) = + self.mqtt_schema.entity_channel_of(&message.topic) + { + match channel { + Channel::EntityMetadata => { + if let Ok(register_message) = + EntityRegistrationMessage::try_from(&message) + { + if let Err(err) = self.register_entity(register_message) + { + error!("Failed to re-register {source} from the persistent entity store due to {err}"); + continue; + } + } + } + Channel::EntityTwinData { fragment_key } => { + let fragment_value = if message.payload_bytes().is_empty() { + JsonValue::Null + } else { + match serde_json::from_slice::( + message.payload_bytes(), + ) { + Ok(json_value) => json_value, + Err(err) => { + error!("Failed to parse twin fragment value of {fragment_key} of {source} from the persistent entity store due to {err}"); + continue; + } + } + }; + + let twin_data = EntityTwinMessage::new( + source.clone(), + fragment_key, + fragment_value, + ); + if let Err(err) = self.register_twin_data(twin_data.clone()) + { + error!("Failed to restore twin fragment: {twin_data:?} from the persistent entity store due to {err}"); + continue; + } + } + Channel::CommandMetadata { .. } => { + // Do nothing for now as supported operations are not part of the entity store + } + channel => { + warn!( + "Restoring messages on channel: {:?} not supported", + channel + ) + } + } + } else { + warn!( + "Ignoring unsupported message retrieved from entity store: {:?}", + message + ); + } + } + } + } + } + } } /// Returns information about an entity under a given MQTT entity topic identifier. @@ -339,7 +430,7 @@ impl EntityStore { &mut self, message: EntityRegistrationMessage, ) -> Result<(Vec, Vec), Error> { - match self.register_entity(message.clone()) { + match self.register_and_persist_entity(message.clone()) { Ok(affected_entities) => { if affected_entities.is_empty() { Ok((vec![], vec![])) @@ -354,7 +445,7 @@ impl EntityStore { .take_cached_child_entities_data(&topic_id); for pending_child in pending_children { let child_reg_message = pending_child.reg_message.clone(); - self.register_entity(child_reg_message)?; + self.register_and_persist_entity(child_reg_message.clone())?; pending_entities.push(pending_child); } @@ -437,18 +528,20 @@ impl EntityStore { match previous { Entry::Occupied(mut occupied) => { // if there is no change, no entities were affected - let mut existing_entity = occupied.get().clone(); - if existing_entity == entity_metadata { - return Ok(vec![]); - } + let existing_entity = occupied.get().clone(); - existing_entity.other.extend(entity_metadata.other.clone()); + let mut merged_other = existing_entity.other.clone(); + merged_other.extend(entity_metadata.other.clone()); let merged_entity = EntityMetadata { - twin_data: existing_entity.twin_data, - other: existing_entity.other, + twin_data: existing_entity.twin_data.clone(), + other: merged_other, ..entity_metadata }; + if existing_entity == merged_entity { + return Ok(vec![]); + } + occupied.insert(merged_entity); affected_entities.push(topic_id); } @@ -463,6 +556,19 @@ impl EntityStore { Ok(affected_entities) } + fn register_and_persist_entity( + &mut self, + message: EntityRegistrationMessage, + ) -> Result, Error> { + let affected_entities = self.register_entity(message.clone())?; + if !affected_entities.is_empty() { + self.message_log + .append_message(&message.to_mqtt_message(&self.mqtt_schema))?; + } + + Ok(affected_entities) + } + /// An iterator over all registered entities. pub fn iter(&self) -> impl Iterator { self.entities.iter() @@ -541,11 +647,18 @@ impl EntityStore { /// If the provided fragment already existed, `false` is returned. pub fn update_twin_data( &mut self, - entity_topic_id: &EntityTopicId, - fragment_key: String, - fragment_value: JsonValue, + twin_message: EntityTwinMessage, + ) -> Result { + self.register_and_persist_twin_data(twin_message.clone()) + } + + pub fn register_twin_data( + &mut self, + twin_message: EntityTwinMessage, ) -> Result { - let entity = self.try_get_mut(entity_topic_id)?; + let fragment_key = twin_message.fragment_key.clone(); + let fragment_value = twin_message.fragment_value.clone(); + let entity = self.try_get_mut(&twin_message.topic_id)?; if fragment_value.is_null() { let existing = entity.twin_data.remove(&fragment_key); if existing.is_none() { @@ -563,6 +676,19 @@ impl EntityStore { Ok(true) } + pub fn register_and_persist_twin_data( + &mut self, + twin_message: EntityTwinMessage, + ) -> Result { + let updated = self.register_twin_data(twin_message.clone())?; + if updated { + self.message_log + .append_message(&twin_message.to_mqtt_message(&self.mqtt_schema))?; + } + + Ok(updated) + } + pub fn cache_early_data_message(&mut self, message: Message) { self.pending_entity_store.cache_early_data_message(message) } @@ -625,7 +751,7 @@ impl EntityMetadata { } /// Represents an error encountered while updating the store. -#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +#[derive(thiserror::Error, Debug)] pub enum Error { #[error("Specified parent {0:?} does not exist in the store")] NoParent(Box), @@ -649,6 +775,27 @@ pub enum Error { // TODO: remove this error variant when `EntityRegistrationMessage` is changed #[error("`EntityRegistrationMessage::other` field needs to be a Map")] EntityRegistrationOtherNotMap, + + #[error(transparent)] + FromStdIoError(#[from] std::io::Error), + + #[error(transparent)] + FromSerdeJson(#[from] serde_json::Error), +} + +#[derive(thiserror::Error, Debug)] +pub enum InitError { + #[error(transparent)] + FromError(#[from] Error), + + #[error(transparent)] + FromStdIoError(#[from] std::io::Error), + + #[error(transparent)] + FromSerdeJson(#[from] serde_json::Error), + + #[error("Initialization failed with: {0}")] + Custom(String), } /// An object representing a valid entity registration message. @@ -825,6 +972,33 @@ fn parse_entity_register_payload(payload: &[u8]) -> Option { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EntityTwinMessage { + topic_id: EntityTopicId, + fragment_key: String, + fragment_value: JsonValue, +} + +impl EntityTwinMessage { + pub fn new(topic_id: EntityTopicId, fragment_key: String, fragment_value: JsonValue) -> Self { + EntityTwinMessage { + topic_id, + fragment_key, + fragment_value, + } + } + + pub fn to_mqtt_message(self, mqtt_schema: &MqttSchema) -> Message { + let message_topic = mqtt_schema.topic_for( + &self.topic_id, + &Channel::EntityTwinData { + fragment_key: self.fragment_key, + }, + ); + Message::new(&message_topic, self.fragment_value.to_string()).with_retain() + } +} + #[cfg(test)] mod tests { use super::*; @@ -833,6 +1007,7 @@ mod tests { use serde_json::json; use std::collections::HashSet; use std::str::FromStr; + use tempfile::TempDir; fn dummy_external_id_mapper( entity_topic_id: &EntityTopicId, @@ -886,7 +1061,8 @@ mod tests { #[test] fn registers_main_device() { - let store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let store = new_entity_store(&temp_dir); assert_eq!(store.main_device(), &EntityTopicId::default_main_device()); assert!(store.get(&EntityTopicId::default_main_device()).is_some()); @@ -894,7 +1070,8 @@ mod tests { #[test] fn lists_child_devices() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); // If the @parent info is not provided, it is assumed to be an immediate // child of the main device. @@ -931,7 +1108,8 @@ mod tests { #[test] fn lists_services() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); // Services are namespaced under devices, so `parent` is not necessary let updated_entities = store @@ -972,7 +1150,8 @@ mod tests { #[test] fn list_ancestors() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); // Assert no ancestors of main device assert!(store @@ -1078,7 +1257,8 @@ mod tests { #[test] fn list_ancestors_external_ids() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); // Assert ancestor external ids of main device assert!(store @@ -1188,7 +1368,8 @@ mod tests { #[test] fn auto_register_service() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let service_topic_id = EntityTopicId::default_child_service("child1", "service1").unwrap(); let res = store.auto_register_entity(&service_topic_id).unwrap(); @@ -1218,7 +1399,8 @@ mod tests { #[test] fn auto_register_child_device() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let child_topic_id = EntityTopicId::default_child_device("child2").unwrap(); let res = store.auto_register_entity(&child_topic_id).unwrap(); @@ -1237,7 +1419,8 @@ mod tests { #[test] fn auto_register_custom_topic_scheme_not_supported() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); assert_matches!( store.auto_register_entity(&EntityTopicId::from_str("custom/child2//").unwrap()), Err(Error::NonDefaultTopicScheme(_)) @@ -1246,7 +1429,8 @@ mod tests { #[test] fn register_main_device_custom_scheme() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); // Register main device with custom topic scheme let main_topic_id = EntityTopicId::from_str("custom/main//").unwrap(); @@ -1313,7 +1497,8 @@ mod tests { #[test] fn external_id_validation() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap(); let res = store.update(EntityRegistrationMessage { @@ -1325,22 +1510,21 @@ mod tests { }); // Assert service registered under main device with custom topic scheme - assert_eq!( - res, - Err(Error::InvalidExternalIdError(InvalidExternalIdError { - external_id: "bad+id".into(), - invalid_char: '+' - })) - ); + assert_matches!(res, Err(Error::InvalidExternalIdError(_))); } #[test] fn update_twin_data_new_fragment() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let topic_id = EntityTopicId::default_main_device(); let updated = store - .update_twin_data(&topic_id, "hardware".into(), json!({ "version": 5 })) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "hardware".into(), + json!({ "version": 5 }), + )) .unwrap(); assert!( updated, @@ -1356,15 +1540,24 @@ mod tests { #[test] fn update_twin_data_update_existing_fragment() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let topic_id = EntityTopicId::default_main_device(); let _ = store - .update_twin_data(&topic_id, "hardware".into(), json!({ "version": 5 })) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "hardware".into(), + json!({ "version": 5 }), + )) .unwrap(); let updated = store - .update_twin_data(&topic_id, "hardware".into(), json!({ "version": 6 })) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "hardware".into(), + json!({ "version": 6 }), + )) .unwrap(); assert!( updated, @@ -1380,16 +1573,25 @@ mod tests { #[test] fn update_twin_data_remove_fragment() { - let mut store = new_entity_store(); + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); let topic_id = EntityTopicId::default_main_device(); let _ = store - .update_twin_data(&topic_id, "foo".into(), json!("bar")) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "foo".into(), + json!("bar"), + )) .unwrap(); let updated = store - .update_twin_data(&topic_id, "foo".into(), json!(null)) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "foo".into(), + json!(null), + )) .unwrap(); assert!( updated, @@ -1402,9 +1604,11 @@ mod tests { #[test] fn updated_registration_message_after_twin_updates() { + let temp_dir = tempfile::tempdir().unwrap(); // Create an entity store with main device having an explicit `name` fragment let topic_id = EntityTopicId::default_main_device(); - let mut store = EntityStore::with_main_device( + let mut store = EntityStore::with_main_device_and_default_service_type( + MqttSchema::default(), EntityRegistrationMessage { topic_id: topic_id.clone(), external_id: Some("test-device".into()), @@ -1415,14 +1619,21 @@ mod tests { .unwrap() .to_owned(), }, + "service".into(), dummy_external_id_mapper, dummy_external_id_sanitizer, + 5, + &temp_dir, ) .unwrap(); // Add some additional fragments to the device twin data let _ = store - .update_twin_data(&topic_id, "hardware".into(), json!({ "version": 5 })) + .update_twin_data(EntityTwinMessage::new( + topic_id.clone(), + "hardware".into(), + json!({ "version": 5 }), + )) .unwrap(); // Update the name of the device with @@ -1457,8 +1668,178 @@ mod tests { ); } - fn new_entity_store() -> EntityStore { - EntityStore::with_main_device( + #[test] + fn duplicate_registration_message_ignored() { + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); + let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap(); + let reg_message = EntityRegistrationMessage { + topic_id: entity_topic_id.clone(), + r#type: EntityType::ChildDevice, + external_id: Some("child1".into()), + parent: None, + other: Map::new(), + }; + + let affected_entities = store.update(reg_message.clone()).unwrap(); + assert!(!affected_entities.0.is_empty()); + + let affected_entities = store.update(reg_message.clone()).unwrap(); + assert!(affected_entities.0.is_empty()); + + // Duplicate registration ignore even after the entity store is restored from the disk + let mut store = new_entity_store(&temp_dir); + let affected_entities = store.update(reg_message).unwrap(); + assert!(affected_entities.0.is_empty()); + } + + #[test] + fn duplicate_registration_message_ignored_after_twin_update() { + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); + let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap(); + let reg_message = EntityRegistrationMessage { + topic_id: entity_topic_id.clone(), + r#type: EntityType::ChildDevice, + external_id: Some("child1".into()), + parent: None, + other: Map::new(), + }; + + let affected_entities = store.update(reg_message.clone()).unwrap(); + assert!(!affected_entities.0.is_empty()); + + // Update the entity twin data + store + .update_twin_data(EntityTwinMessage::new( + entity_topic_id.clone(), + "foo".into(), + json!("bar"), + )) + .unwrap(); + + // Assert that the duplicate registration message is still ignored + let affected_entities = store.update(reg_message.clone()).unwrap(); + assert!(affected_entities.0.is_empty()); + + // Duplicate registration ignore even after the entity store is restored from the disk + let mut store = new_entity_store(&temp_dir); + let affected_entities = store.update(reg_message).unwrap(); + assert!(affected_entities.0.is_empty()); + } + + #[test] + fn early_child_device_registrations_processed_only_after_parent_registration() { + let temp_dir = tempfile::tempdir().unwrap(); + let mut store = new_entity_store(&temp_dir); + + let child0_topic_id = EntityTopicId::default_child_device("child0").unwrap(); + let child000_topic_id = EntityTopicId::default_child_device("child000").unwrap(); + let child00_topic_id = EntityTopicId::default_child_device("child00").unwrap(); + + // Register great-grand-child before grand-child and child + let child000_reg_message = EntityRegistrationMessage::new_custom( + child000_topic_id.clone(), + EntityType::ChildDevice, + ) + .with_parent(child00_topic_id.clone()); + let affected_entities = store.update(child000_reg_message.clone()).unwrap(); + assert!(affected_entities.0.is_empty()); + + // Register grand-child before child + let child00_reg_message = EntityRegistrationMessage::new_custom( + child00_topic_id.clone(), + EntityType::ChildDevice, + ) + .with_parent(child0_topic_id.clone()); + let affected_entities = store.update(child00_reg_message).unwrap(); + assert!(affected_entities.0.is_empty()); + + // Register the immediate child device which will trigger the registration of its children as well + let child0_reg_message = + EntityRegistrationMessage::new_custom(child0_topic_id.clone(), EntityType::ChildDevice); + let affected_entities = store.update(child0_reg_message).unwrap(); + + // Assert that the affected entities include all the children + assert!(!affected_entities.0.is_empty()); + + let affected_entities = store.update(child000_reg_message.clone()).unwrap(); + assert!(affected_entities.0.is_empty()); + + // Reload the entity store from the persistent log + let mut store = new_entity_store(&temp_dir); + + // Assert that duplicate registrations are still ignored + let affected_entities = store.update(child000_reg_message).unwrap(); + assert!(affected_entities.0.is_empty()); + } + + #[test] + fn entities_persisted_and_restored() { + let temp_dir = tempfile::tempdir().unwrap(); + + let child1_topic_id = EntityTopicId::default_child_device("child1").unwrap(); + let child2_topic_id = EntityTopicId::default_child_device("child2").unwrap(); + + let twin_fragment_key = "foo".to_string(); + let twin_fragment_value = json!("bar"); + + { + let mut store = new_entity_store(&temp_dir); + store + .update( + EntityRegistrationMessage::new_custom( + child1_topic_id.clone(), + EntityType::ChildDevice, + ) + .with_external_id("child1".into()), + ) + .unwrap(); + store + .update_twin_data(EntityTwinMessage::new( + child1_topic_id.clone(), + twin_fragment_key.clone(), + twin_fragment_value.clone(), + )) + .unwrap(); + + store + .update( + EntityRegistrationMessage::new_custom( + child2_topic_id.clone(), + EntityType::ChildDevice, + ) + .with_external_id("child2".into()), + ) + .unwrap(); + } + + { + // Reload the entity store using the same persistent file + let store = new_entity_store(&temp_dir); + let mut expected_entity_metadata = + EntityMetadata::child_device("child1".into()).unwrap(); + expected_entity_metadata + .twin_data + .insert(twin_fragment_key.clone(), twin_fragment_value.clone()); + + let entity_metadata = store.get(&child1_topic_id).unwrap(); + assert_eq!(entity_metadata, &expected_entity_metadata); + assert_eq!( + entity_metadata.twin_data.get(&twin_fragment_key).unwrap(), + &twin_fragment_value + ); + + assert_eq!( + store.get(&child2_topic_id).unwrap(), + &EntityMetadata::child_device("child2".into()).unwrap() + ); + } + } + + fn new_entity_store(temp_dir: &TempDir) -> EntityStore { + EntityStore::with_main_device_and_default_service_type( + MqttSchema::default(), EntityRegistrationMessage { topic_id: EntityTopicId::default_main_device(), external_id: Some("test-device".into()), @@ -1466,8 +1847,11 @@ mod tests { parent: None, other: Map::new(), }, + "service".into(), dummy_external_id_mapper, dummy_external_id_sanitizer, + 5, + temp_dir, ) .unwrap() } diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs index bdf350eefe2..71788d40d17 100644 --- a/crates/core/tedge_api/src/lib.rs +++ b/crates/core/tedge_api/src/lib.rs @@ -7,6 +7,7 @@ pub mod event; pub mod group; pub mod health; pub mod measurement; +pub mod message_log; pub mod messages; pub mod mqtt_topics; pub mod parser; diff --git a/crates/core/tedge_api/src/message_log.rs b/crates/core/tedge_api/src/message_log.rs new file mode 100644 index 00000000000..7eeaf58d359 --- /dev/null +++ b/crates/core/tedge_api/src/message_log.rs @@ -0,0 +1,152 @@ +//! The message log is a persistent append-only log of MQTT messages. +//! Each line is the JSON representation of that MQTT message. +//! The underlying file is a JSON lines file. +use mqtt_channel::Message as MqttMessage; +use serde_json::json; +use std::fs::File; +use std::fs::OpenOptions; +use std::io::BufRead; +use std::io::BufReader; +use std::io::BufWriter; +use std::io::Write; +use std::path::Path; + +const LOG_FILE_NAME: &str = "entity_store.jsonl"; +const LOG_FORMAT_VERSION: &str = "1.0"; + +#[derive(thiserror::Error, Debug)] +pub enum LogEntryError { + #[error(transparent)] + FromStdIo(std::io::Error), + + #[error("Deserialization failed with {0} while parsing {1}")] + FromSerdeJson(#[source] serde_json::Error, String), +} + +/// A reader to read the log file entries line by line +pub struct MessageLogReader { + reader: BufReader, +} + +impl MessageLogReader { + pub fn new

(log_dir: P) -> Result + where + P: AsRef, + { + let file = OpenOptions::new() + .read(true) + .open(log_dir.as_ref().join(LOG_FILE_NAME))?; + let mut reader = BufReader::new(file); + + let mut version_info = String::new(); + reader.read_line(&mut version_info)?; + // TODO: Validate if the read version is supported + + Ok(MessageLogReader { reader }) + } + + /// Return the next MQTT message from the log + /// The reads start from the beginning of the file + /// and each read advances the file pointer to the next line + pub fn next_message(&mut self) -> Result, LogEntryError> { + let mut buffer = String::new(); + match self.reader.read_line(&mut buffer) { + Ok(bytes_read) if bytes_read > 0 => { + let message: MqttMessage = serde_json::from_str(&buffer) + .map_err(|err| LogEntryError::FromSerdeJson(err, buffer))?; + Ok(Some(message)) + } + Ok(_) => Ok(None), // EOF + Err(err) => Err(LogEntryError::FromStdIo(err)), + } + } +} + +/// A writer to append new MQTT messages to the end of the log +pub struct MessageLogWriter { + writer: BufWriter, +} + +impl MessageLogWriter { + pub fn new

(log_dir: P) -> Result + where + P: AsRef, + { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(log_dir.as_ref().join(LOG_FILE_NAME))?; + + // If the file is empty append the version information as a header + let metadata = file.metadata()?; + let file_is_empty = metadata.len() == 0; + + let mut writer = BufWriter::new(file); + + if file_is_empty { + let version_info = json!({"version": LOG_FORMAT_VERSION}).to_string(); + writeln!(writer, "{}", version_info)?; + } + + Ok(MessageLogWriter { writer }) + } + + /// Append the JSON representation of the given message to the log. + /// Each message is appended on a new line. + pub fn append_message(&mut self, message: &MqttMessage) -> Result<(), std::io::Error> { + let json_line = serde_json::to_string(message)?; + writeln!(self.writer, "{}", json_line)?; + self.writer.flush()?; + self.writer.get_ref().sync_all()?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::message_log::MessageLogReader; + + use super::MessageLogWriter; + use mqtt_channel::Message; + use mqtt_channel::Topic; + use tempfile::tempdir; + + #[test] + fn test_append_and_retrieve() { + let temp_dir = tempdir().unwrap(); + + // Prepare some dummy messages + let mut messages = vec![]; + for i in 1..5 { + let message = Message::new( + &Topic::new(&format!("topic{i}")).unwrap(), + format!("payload{i}"), + ); + messages.push(message); + } + + // Populate the log + { + let mut message_log = MessageLogWriter::new(&temp_dir).unwrap(); + let mut message_log_reader = MessageLogReader::new(&temp_dir).unwrap(); + + assert_eq!(message_log_reader.next_message().unwrap(), None); + + for message in messages.clone() { + message_log.append_message(&message).unwrap(); + } + } + + // Read from the log + { + // Reload the message log + let mut message_log_reader = MessageLogReader::new(&temp_dir).unwrap(); + + for message in messages { + assert_eq!(message_log_reader.next_message().unwrap(), Some(message)); + } + // EOF -> None + assert_eq!(message_log_reader.next_message().unwrap(), None); + } + } +} diff --git a/crates/extensions/c8y_mapper_ext/src/actor.rs b/crates/extensions/c8y_mapper_ext/src/actor.rs index dfbb573889c..f315185f0e8 100644 --- a/crates/extensions/c8y_mapper_ext/src/actor.rs +++ b/crates/extensions/c8y_mapper_ext/src/actor.rs @@ -378,6 +378,8 @@ impl C8yMapperBuilder { create_directory_with_defaults(config.ops_dir.clone())?; // Create directory for device custom fragments create_directory_with_defaults(config.config_dir.join("device"))?; + // Create directory for persistent entity store + create_directory_with_defaults(&config.state_dir)?; Ok(()) } } diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index 309188d20d5..befeb6f87ce 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -26,6 +26,7 @@ use tedge_mqtt_ext::TopicFilter; use tracing::log::warn; pub const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16184; +const STATE_DIR_NAME: &str = ".tedge-mapper-c8y"; pub struct C8yMapperConfig { pub config_dir: PathBuf, @@ -36,6 +37,7 @@ pub struct C8yMapperConfig { pub device_type: String, pub service: TEdgeConfigReaderService, pub ops_dir: PathBuf, + pub state_dir: PathBuf, pub tmp_dir: Arc, pub c8y_host: String, pub tedge_http_host: Arc, @@ -70,6 +72,7 @@ impl C8yMapperConfig { enable_auto_register: bool, ) -> Self { let ops_dir = config_dir.join("operations").join("c8y"); + let state_dir = config_dir.join(STATE_DIR_NAME); Self { config_dir, @@ -80,6 +83,7 @@ impl C8yMapperConfig { device_type, service, ops_dir, + state_dir, tmp_dir, c8y_host, tedge_http_host, diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 2925434afaa..454dde38594 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -93,6 +93,7 @@ use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use tedge_utils::file::create_directory_with_defaults; use tedge_utils::file::create_file_with_defaults; +use tedge_utils::file::FileError; use tedge_utils::size_threshold::SizeThreshold; use thiserror::Error; use tokio::time::Duration; @@ -266,6 +267,7 @@ impl CumulocityConverter { Self::map_to_c8y_external_id, Self::validate_external_id, EARLY_MESSAGE_BUFFER_SIZE, + config.state_dir.clone(), ) .unwrap(); @@ -998,6 +1000,9 @@ pub enum CumulocityConverterBuildError { #[error(transparent)] OperationLogsError(#[from] OperationLogsError), + + #[error(transparent)] + FileError(#[from] FileError), } impl CumulocityConverter { @@ -3329,7 +3334,6 @@ pub(crate) mod tests { .to_string(), ); let messages = converter.convert(®_message).await; - dbg!(&messages); // Assert that the registration message, the twin updates and the cached measurement messages are converted assert_messages_matching( @@ -3361,6 +3365,7 @@ pub(crate) mod tests { fn c8y_converter_config(tmp_dir: &TempTedgeDir) -> C8yMapperConfig { tmp_dir.dir("operations").dir("c8y"); tmp_dir.dir("tedge").dir("agent"); + tmp_dir.dir(".tedge-mapper-c8y"); let device_id = "test-device".into(); let device_topic_id = EntityTopicId::default_main_device(); diff --git a/crates/extensions/c8y_mapper_ext/src/inventory.rs b/crates/extensions/c8y_mapper_ext/src/inventory.rs index 6d1f8574d38..d59c6cee556 100644 --- a/crates/extensions/c8y_mapper_ext/src/inventory.rs +++ b/crates/extensions/c8y_mapper_ext/src/inventory.rs @@ -8,6 +8,7 @@ use serde_json::Value as JsonValue; use std::fs::File; use std::io::Read; use std::path::Path; +use tedge_api::entity_store::EntityTwinMessage; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::EntityTopicId; use tedge_mqtt_ext::Message; @@ -42,11 +43,11 @@ impl CumulocityConverter { if let JsonValue::Object(map) = inventory_base { for (key, value) in map { let main_device_tid = self.entity_store.main_device().clone(); - let _ = self.entity_store.update_twin_data( - &main_device_tid, + let _ = self.entity_store.update_twin_data(EntityTwinMessage::new( + main_device_tid.clone(), key.clone(), value.clone(), - )?; + ))?; let mapped_message = self.entity_twin_data_message(&main_device_tid, key.clone(), value.clone()); messages.push(mapped_message); @@ -92,11 +93,11 @@ impl CumulocityConverter { serde_json::from_slice::(message.payload_bytes())? }; - let updated = self.entity_store.update_twin_data( - source, + let updated = self.entity_store.update_twin_data(EntityTwinMessage::new( + source.clone(), fragment_key.into(), fragment_value.clone(), - )?; + ))?; if !updated { return Ok(vec![]); } diff --git a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs index 63ac92b3b95..1dfb6e7f143 100644 --- a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs +++ b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs @@ -136,12 +136,17 @@ mod tests { c8y_monitor_payload.as_bytes(), ); + let temp_dir = tempfile::tempdir().unwrap(); let main_device_registration = EntityRegistrationMessage::main_device(device_name.to_string()); - let mut entity_store = EntityStore::with_main_device( + let mut entity_store = EntityStore::with_main_device_and_default_service_type( + MqttSchema::default(), main_device_registration, + "service".into(), crate::converter::CumulocityConverter::map_to_c8y_external_id, crate::converter::CumulocityConverter::validate_external_id, + 5, + &temp_dir, ) .unwrap(); diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index f59b625b70b..34a9fddf2a2 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2583,6 +2583,7 @@ pub(crate) async fn spawn_c8y_mapper_actor( ) { if init { config_dir.dir("operations").dir("c8y"); + config_dir.dir(".tedge-mapper-c8y"); } let device_name = "test-device".into(); diff --git a/crates/extensions/tedge_mqtt_ext/src/lib.rs b/crates/extensions/tedge_mqtt_ext/src/lib.rs index 9b6ca1c3ddb..277a0bf005c 100644 --- a/crates/extensions/tedge_mqtt_ext/src/lib.rs +++ b/crates/extensions/tedge_mqtt_ext/src/lib.rs @@ -27,6 +27,7 @@ use tedge_actors::WrappedInput; pub type MqttConfig = mqtt_channel::Config; pub type MqttMessage = mqtt_channel::Message; +pub use mqtt_channel::DebugPayload; pub use mqtt_channel::Message; pub use mqtt_channel::MqttError; pub use mqtt_channel::QoS; diff --git a/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot b/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot index da6a69a6195..594aba4ad98 100644 --- a/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot +++ b/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot @@ -127,9 +127,10 @@ Register tedge-agent when tedge-mapper-c8y is not running #2389 Should Have MQTT Messages te/device/offlinechild1///cmd/restart/+ Early data messages cached and processed + [Teardown] Re-enable Auto-registration ${timestamp}= Get Unix Timestamp Execute Command sudo tedge config set c8y.entity_store.auto_register false - Restart Service tedge-mapper-c8y + Restart Service tedge-mapper-c8y Service Health Status Should Be Up tedge-mapper-c8y ${children}= Create List child0 child00 child01 child000 child0000 child00000 @@ -151,11 +152,48 @@ Early data messages cached and processed Device Should Have Fragments maintenance_mode END - Execute Command sudo tedge config unset c8y.entity_store.auto_register Restart Service tedge-mapper-c8y + Service Health Status Should Be Up tedge-mapper-c8y + + +Entities persisted and restored + ${timestamp}= Get Unix Timestamp + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/' '{"@type":"child-device","@id":"plc1"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/' '{"@type":"child-device","@id":"plc2"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/sensor1' '{"@type":"child-device","@id":"plc1-sensor1","@parent":"factory/shop/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/sensor2' '{"@type":"child-device","@id":"plc1-sensor2","@parent":"factory/shop/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/sensor1' '{"@type":"child-device","@id":"plc2-sensor1","@parent":"factory/shop/plc2/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/metrics' '{"@type":"service","@id":"plc1-metrics","@parent":"factory/shop/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/metrics' '{"@type":"service","@id":"plc2-metrics","@parent":"factory/shop/plc2/"}' + + Should Have MQTT Messages c8y/s/us message_contains=101,plc1 date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us message_contains=101,plc2 date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us/plc1 message_contains=101,plc1-sensor1 date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us/plc1 message_contains=101,plc1-sensor2 date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us/plc2 message_contains=101,plc2-sensor1 date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us/plc1 message_contains=102,plc1-metrics date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us/plc2 message_contains=102,plc2-metrics date_from=${timestamp} minimum=1 maximum=1 + + FOR ${counter} IN RANGE 0 5 + ${timestamp}= Get Unix Timestamp + Restart Service tedge-mapper-c8y + Service Health Status Should Be Up tedge-mapper-c8y + + # Assert that the restored entities are not converted again + Should Have MQTT Messages c8y/s/us message_contains=101 date_from=${timestamp} minimum=0 maximum=0 + Should Have MQTT Messages c8y/s/us/plc1 message_contains=101 date_from=${timestamp} minimum=0 maximum=0 + Should Have MQTT Messages c8y/s/us/plc2 message_contains=101 date_from=${timestamp} minimum=0 maximum=0 + Should Have MQTT Messages c8y/s/us/plc1 message_contains=102 date_from=${timestamp} minimum=0 maximum=0 + Should Have MQTT Messages c8y/s/us/plc2 message_contains=102 date_from=${timestamp} minimum=0 maximum=0 + END + *** Keywords *** +Re-enable Auto-registration + Execute Command sudo tedge config unset c8y.entity_store.auto_register + Restart Service tedge-mapper-c8y + Check Child Device [Arguments] ${parent_sn} ${child_sn} ${child_name} ${child_type} ${child_mo}= Device Should Exist ${child_sn}