diff --git a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs index 670fe44481a..6cd24d9892d 100644 --- a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs @@ -450,6 +450,10 @@ define_tedge_config! { /// Enable auto registration feature #[tedge_config(example = "true", default(value = true))] auto_register: bool, + + /// Determines if entity state on tedge should be sent to cloud on startup + #[tedge_config(example = "true", default(value = true))] + clean_start: bool, }, }, diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index 3dd96f08135..ab654a7ca05 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -719,6 +719,7 @@ mod tests { dummy_external_id_validator, 5, &temp_dir, + true, ) .unwrap(); @@ -756,6 +757,7 @@ mod tests { dummy_external_id_validator, 5, &temp_dir, + true, ) .unwrap(); diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index 9a6e7c57bf2..b9b2476287e 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -135,7 +135,8 @@ type ExternalIdValidatorFn = /// |tid, xid| tid.to_string().into(), /// |xid| Ok(xid.into()), /// 5, -/// "/tmp" +/// "/tmp", +/// true /// ); /// ``` pub struct EntityStore { @@ -153,6 +154,7 @@ pub struct EntityStore { } impl EntityStore { + #[allow(clippy::too_many_arguments)] pub fn with_main_device_and_default_service_type( mqtt_schema: MqttSchema, main_device: EntityRegistrationMessage, @@ -161,6 +163,7 @@ impl EntityStore { external_id_validator_fn: SF, telemetry_cache_size: usize, log_dir: P, + clean_start: bool, ) -> Result where MF: Fn(&EntityTopicId, &EntityExternalId) -> EntityExternalId, @@ -187,7 +190,7 @@ impl EntityStore { twin_data: Map::new(), }; - let message_log = MessageLogWriter::new(log_dir.as_ref())?; + let message_log = MessageLogWriter::new(log_dir.as_ref(), clean_start)?; let mut entity_store = EntityStore { mqtt_schema: mqtt_schema.clone(), @@ -201,7 +204,9 @@ impl EntityStore { message_log, }; - entity_store.load_from_message_log(log_dir.as_ref()); + if !clean_start { + entity_store.load_from_message_log(log_dir.as_ref()); + } Ok(entity_store) } @@ -1068,7 +1073,7 @@ mod tests { #[test] fn registers_main_device() { let temp_dir = tempfile::tempdir().unwrap(); - let store = new_entity_store(&temp_dir); + let store = new_entity_store(&temp_dir, true); assert_eq!(store.main_device(), &EntityTopicId::default_main_device()); assert!(store.get(&EntityTopicId::default_main_device()).is_some()); @@ -1077,7 +1082,7 @@ mod tests { #[test] fn lists_child_devices() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // If the @parent info is not provided, it is assumed to be an immediate // child of the main device. @@ -1115,7 +1120,7 @@ mod tests { #[test] fn lists_services() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // Services are namespaced under devices, so `parent` is not necessary let updated_entities = store @@ -1157,7 +1162,7 @@ mod tests { #[test] fn list_ancestors() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // Assert no ancestors of main device assert!(store @@ -1264,7 +1269,7 @@ mod tests { #[test] fn list_ancestors_external_ids() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // Assert ancestor external ids of main device assert!(store @@ -1375,7 +1380,7 @@ mod tests { #[test] fn auto_register_service() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let service_topic_id = EntityTopicId::default_child_service("child1", "service1").unwrap(); let res = store.auto_register_entity(&service_topic_id).unwrap(); @@ -1406,7 +1411,7 @@ mod tests { #[test] fn auto_register_child_device() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let child_topic_id = EntityTopicId::default_child_device("child2").unwrap(); let res = store.auto_register_entity(&child_topic_id).unwrap(); @@ -1426,7 +1431,7 @@ mod tests { #[test] fn auto_register_custom_topic_scheme_not_supported() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); assert_matches!( store.auto_register_entity(&EntityTopicId::from_str("custom/child2//").unwrap()), Err(Error::NonDefaultTopicScheme(_)) @@ -1436,7 +1441,7 @@ mod tests { #[test] fn register_main_device_custom_scheme() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // Register main device with custom topic scheme let main_topic_id = EntityTopicId::from_str("custom/main//").unwrap(); @@ -1504,7 +1509,7 @@ mod tests { #[test] fn external_id_validation() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap(); let res = store.update(EntityRegistrationMessage { @@ -1522,7 +1527,7 @@ mod tests { #[test] fn update_twin_data_new_fragment() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let topic_id = EntityTopicId::default_main_device(); let updated = store @@ -1547,7 +1552,7 @@ mod tests { #[test] fn update_twin_data_update_existing_fragment() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let topic_id = EntityTopicId::default_main_device(); let _ = store @@ -1580,7 +1585,7 @@ mod tests { #[test] fn update_twin_data_remove_fragment() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let topic_id = EntityTopicId::default_main_device(); @@ -1630,6 +1635,7 @@ mod tests { dummy_external_id_sanitizer, 5, &temp_dir, + true, ) .unwrap(); @@ -1677,7 +1683,7 @@ mod tests { #[test] fn duplicate_registration_message_ignored() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, false); let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap(); let reg_message = EntityRegistrationMessage { topic_id: entity_topic_id.clone(), @@ -1694,7 +1700,7 @@ mod tests { assert!(affected_entities.0.is_empty()); // Duplicate registration ignore even after the entity store is restored from the disk - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, false); let affected_entities = store.update(reg_message).unwrap(); assert!(affected_entities.0.is_empty()); } @@ -1702,7 +1708,7 @@ mod tests { #[test] fn duplicate_registration_message_ignored_after_twin_update() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, false); let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap(); let reg_message = EntityRegistrationMessage { topic_id: entity_topic_id.clone(), @@ -1729,7 +1735,7 @@ mod tests { assert!(affected_entities.0.is_empty()); // Duplicate registration ignore even after the entity store is restored from the disk - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, false); let affected_entities = store.update(reg_message).unwrap(); assert!(affected_entities.0.is_empty()); } @@ -1737,7 +1743,7 @@ mod tests { #[test] fn early_child_device_registrations_processed_only_after_parent_registration() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let child0_topic_id = EntityTopicId::default_child_device("child0").unwrap(); let child000_topic_id = EntityTopicId::default_child_device("child000").unwrap(); @@ -1773,7 +1779,7 @@ mod tests { assert!(affected_entities.0.is_empty()); // Reload the entity store from the persistent log - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // Assert that duplicate registrations are still ignored let affected_entities = store.update(child000_reg_message).unwrap(); @@ -1791,7 +1797,7 @@ mod tests { let twin_fragment_value = json!("bar"); { - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, false); store .update( EntityRegistrationMessage::new_custom( @@ -1822,7 +1828,7 @@ mod tests { { // Reload the entity store using the same persistent file - let store = new_entity_store(&temp_dir); + let store = new_entity_store(&temp_dir, false); let mut expected_entity_metadata = EntityMetadata::child_device("child1".into()).unwrap(); expected_entity_metadata @@ -1843,7 +1849,7 @@ mod tests { } } - fn new_entity_store(temp_dir: &TempDir) -> EntityStore { + fn new_entity_store(temp_dir: &TempDir, clean_start: bool) -> EntityStore { EntityStore::with_main_device_and_default_service_type( MqttSchema::default(), EntityRegistrationMessage { @@ -1858,6 +1864,7 @@ mod tests { dummy_external_id_sanitizer, 5, temp_dir, + clean_start, ) .unwrap() } diff --git a/crates/core/tedge_api/src/message_log.rs b/crates/core/tedge_api/src/message_log.rs index 7eeaf58d359..c6e9b21ad4c 100644 --- a/crates/core/tedge_api/src/message_log.rs +++ b/crates/core/tedge_api/src/message_log.rs @@ -68,15 +68,22 @@ pub struct MessageLogWriter { } impl MessageLogWriter { - pub fn new

(log_dir: P) -> Result + pub fn new

(log_dir: P, truncate: bool) -> Result where P: AsRef, { - let file = OpenOptions::new() - .create(true) - .append(true) - .open(log_dir.as_ref().join(LOG_FILE_NAME))?; - + let file = if truncate { + OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(log_dir.as_ref().join(LOG_FILE_NAME))? + } else { + OpenOptions::new() + .create(true) + .append(true) + .open(log_dir.as_ref().join(LOG_FILE_NAME))? + }; // If the file is empty append the version information as a header let metadata = file.metadata()?; let file_is_empty = metadata.len() == 0; @@ -127,7 +134,7 @@ mod tests { // Populate the log { - let mut message_log = MessageLogWriter::new(&temp_dir).unwrap(); + let mut message_log = MessageLogWriter::new(&temp_dir, true).unwrap(); let mut message_log_reader = MessageLogReader::new(&temp_dir).unwrap(); assert_eq!(message_log_reader.next_message().unwrap(), None); diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index befeb6f87ce..caa5ba1ee20 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -48,6 +48,7 @@ pub struct C8yMapperConfig { pub auth_proxy_protocol: Protocol, pub mqtt_schema: MqttSchema, pub enable_auto_register: bool, + pub clean_start: bool, } impl C8yMapperConfig { @@ -70,6 +71,7 @@ impl C8yMapperConfig { auth_proxy_protocol: Protocol, mqtt_schema: MqttSchema, enable_auto_register: bool, + clean_start: bool, ) -> Self { let ops_dir = config_dir.join("operations").join("c8y"); let state_dir = config_dir.join(STATE_DIR_NAME); @@ -94,6 +96,7 @@ impl C8yMapperConfig { auth_proxy_protocol, mqtt_schema, enable_auto_register, + clean_start, } } @@ -135,6 +138,7 @@ impl C8yMapperConfig { let mut topics = Self::default_internal_topic_filter(&config_dir)?; let enable_auto_register = tedge_config.c8y.entity_store.auto_register; + let clean_start = tedge_config.c8y.entity_store.clean_start; // Add feature topic filters for cmd in [ @@ -190,6 +194,7 @@ impl C8yMapperConfig { auth_proxy_protocol, mqtt_schema, enable_auto_register, + clean_start, )) } diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index e6b053c52ab..9cfe45d0f29 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -267,6 +267,7 @@ impl CumulocityConverter { Self::validate_external_id, EARLY_MESSAGE_BUFFER_SIZE, config.state_dir.clone(), + config.clean_start, ) .unwrap(); @@ -3412,6 +3413,7 @@ pub(crate) mod tests { auth_proxy_protocol, MqttSchema::default(), true, + true, ) } fn create_c8y_converter_from_config( diff --git a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs index 4c6642ec95b..53b2ef6b517 100644 --- a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs +++ b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs @@ -157,6 +157,7 @@ mod tests { crate::converter::CumulocityConverter::validate_external_id, 5, &temp_dir, + true, ) .unwrap(); diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 34a9fddf2a2..a24afec3661 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2623,6 +2623,7 @@ pub(crate) async fn spawn_c8y_mapper_actor( Protocol::Http, MqttSchema::default(), true, + true, ); let mut mqtt_builder: SimpleMessageBoxBuilder = diff --git a/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot b/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot index 80ede8373c3..06a7e0ec108 100644 --- a/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot +++ b/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot @@ -157,6 +157,11 @@ Early data messages cached and processed Entities persisted and restored + [Teardown] Re-enable Clean start + Execute Command sudo tedge config set c8y.entity_store.clean_start false + Restart Service tedge-mapper-c8y + Service Health Status Should Be Up tedge-mapper-c8y + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/' '{"@type":"child-device","@id":"plc1"}' Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/' '{"@type":"child-device","@id":"plc2"}' Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/sensor1' '{"@type":"child-device","@id":"plc1-sensor1","@parent":"factory/shop/plc1/"}' @@ -193,6 +198,47 @@ Entities persisted and restored Should Have MQTT Messages c8y/s/us/plc2 message_contains=102 date_from=${timestamp} minimum=0 maximum=0 END +Entities send to cloud on restart + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/' '{"@type":"child-device","@id":"plc1"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/' '{"@type":"child-device","@id":"plc2"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/sensor1' '{"@type":"child-device","@id":"plc1-sensor1","@parent":"factory/shop/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/sensor2' '{"@type":"child-device","@id":"plc1-sensor2","@parent":"factory/shop/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/sensor1' '{"@type":"child-device","@id":"plc2-sensor1","@parent":"factory/shop/plc2/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/metrics' '{"@type":"service","@id":"plc1-metrics","@parent":"factory/shop/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/metrics' '{"@type":"service","@id":"plc2-metrics","@parent":"factory/shop/plc2/"}' + + External Identity Should Exist plc1 + External Identity Should Exist plc2 + External Identity Should Exist plc1-sensor1 + External Identity Should Exist plc1-sensor2 + External Identity Should Exist plc2-sensor1 + External Identity Should Exist plc1-metrics + External Identity Should Exist plc2-metrics + + Execute Command cat /etc/tedge/.tedge-mapper-c8y/entity_store.jsonl + ${original_last_modified_time}= Execute Command date -r /etc/tedge/.tedge-mapper-c8y/entity_store.jsonl + ${original_last_size}= Execute Command stat -c %s /etc/tedge/.tedge-mapper-c8y/entity_store.jsonl + + FOR ${counter} IN RANGE 0 5 + ${timestamp}= Get Unix Timestamp + Restart Service tedge-mapper-c8y + Service Health Status Should Be Up tedge-mapper-c8y + + # Assert that the file contents changed on restart + ${last_modified_time}= Execute Command date -r /etc/tedge/.tedge-mapper-c8y/entity_store.jsonl + Should Not Be Equal ${last_modified_time} ${original_last_modified_time} + + # Assert that the file was not appended on restart + ${last_size}= Execute Command stat -c %s /etc/tedge/.tedge-mapper-c8y/entity_store.jsonl + Should Be Equal ${last_size} ${original_last_size} + + # Assert that the restored entities are converted again + Should Have MQTT Messages c8y/s/us message_contains=101 date_from=${timestamp} minimum=1 maximum=12 + Should Have MQTT Messages c8y/s/us/plc1 message_contains=101 date_from=${timestamp} minimum=1 maximum=12 + Should Have MQTT Messages c8y/s/us/plc2 message_contains=101 date_from=${timestamp} minimum=1 maximum=12 + Should Have MQTT Messages c8y/s/us/plc1 message_contains=102 date_from=${timestamp} minimum=1 maximum=12 + Should Have MQTT Messages c8y/s/us/plc2 message_contains=102 date_from=${timestamp} minimum=1 maximum=12 + END *** Keywords *** @@ -200,6 +246,10 @@ Re-enable Auto-registration Execute Command sudo tedge config unset c8y.entity_store.auto_register Restart Service tedge-mapper-c8y +Re-enable Clean start + Execute Command sudo tedge config unset c8y.entity_store.clean_start + Restart Service tedge-mapper-c8y + Check Child Device [Arguments] ${parent_sn} ${child_sn} ${child_name} ${child_type} ${child_mo}= Device Should Exist ${child_sn}