From a1c8bda330e297d95717f6ad517b10a47824b16a Mon Sep 17 00:00:00 2001 From: Krzysztof Piotrowski Date: Tue, 30 Jan 2024 14:01:35 +0000 Subject: [PATCH] Bypass duplicate message filtering on mapper startup with config flag Signed-off-by: Krzysztof Piotrowski --- .../src/tedge_config_cli/tedge_config.rs | 4 ++ crates/core/c8y_api/src/json_c8y.rs | 2 + crates/core/tedge_api/src/entity_store.rs | 69 +++++++++++-------- crates/core/tedge_api/src/message_log.rs | 13 ++++ .../extensions/c8y_mapper_ext/src/config.rs | 5 ++ .../c8y_mapper_ext/src/converter.rs | 2 + .../c8y_mapper_ext/src/service_monitor.rs | 1 + crates/extensions/c8y_mapper_ext/src/tests.rs | 1 + .../registration/device_registration.robot | 41 +++++++++++ 9 files changed, 110 insertions(+), 28 deletions(-) diff --git a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs index 670fe44481a..642cee5cb53 100644 --- a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs @@ -450,6 +450,10 @@ define_tedge_config! { /// Enable auto registration feature #[tedge_config(example = "true", default(value = true))] auto_register: bool, + + /// On a clean start, the whole state of the device, services and child-devices is resent to the cloud + #[tedge_config(example = "true", default(value = true))] + clean_start: bool, }, }, diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index a5e8ba3571f..09073ee506a 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -719,6 +719,7 @@ mod tests { dummy_external_id_validator, 5, &temp_dir, + true, ) .unwrap(); @@ -756,6 +757,7 @@ mod tests { dummy_external_id_validator, 5, &temp_dir, + true, ) .unwrap(); diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index ae490657815..1445f96a704 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -135,7 +135,8 @@ type ExternalIdValidatorFn = /// |tid, xid| tid.to_string().into(), /// |xid| Ok(xid.into()), /// 5, -/// "/tmp" +/// "/tmp", +/// true /// ); /// ``` pub struct EntityStore { @@ -153,6 +154,7 @@ pub struct EntityStore { } impl EntityStore { + #[allow(clippy::too_many_arguments)] pub fn with_main_device_and_default_service_type( mqtt_schema: MqttSchema, main_device: EntityRegistrationMessage, @@ -161,6 +163,7 @@ impl EntityStore { external_id_validator_fn: SF, telemetry_cache_size: usize, log_dir: P, + clean_start: bool, ) -> Result where MF: Fn(&EntityTopicId, &EntityExternalId) -> EntityExternalId, @@ -187,11 +190,19 @@ impl EntityStore { twin_data: Map::new(), }; - let message_log = MessageLogWriter::new(log_dir.as_ref()).map_err(|err| { - InitError::Custom(format!( - "Loading the entity store log for writes failed with {err}", - )) - })?; + let message_log = if clean_start { + MessageLogWriter::new_truncated(log_dir.as_ref()).map_err(|err| { + InitError::Custom(format!( + "Loading the entity store log for writes failed with {err}", + )) + })? + } else { + MessageLogWriter::new(log_dir.as_ref()).map_err(|err| { + InitError::Custom(format!( + "Loading the entity store log for writes failed with {err}", + )) + })? + }; let mut entity_store = EntityStore { mqtt_schema: mqtt_schema.clone(), @@ -1072,7 +1083,7 @@ mod tests { #[test] fn registers_main_device() { let temp_dir = tempfile::tempdir().unwrap(); - let store = new_entity_store(&temp_dir); + let store = new_entity_store(&temp_dir, true); assert_eq!(store.main_device(), &EntityTopicId::default_main_device()); assert!(store.get(&EntityTopicId::default_main_device()).is_some()); @@ -1081,7 +1092,7 @@ mod tests { #[test] fn lists_child_devices() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // If the @parent info is not provided, it is assumed to be an immediate // child of the main device. @@ -1119,7 +1130,7 @@ mod tests { #[test] fn lists_services() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // Services are namespaced under devices, so `parent` is not necessary let updated_entities = store @@ -1161,7 +1172,7 @@ mod tests { #[test] fn list_ancestors() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // Assert no ancestors of main device assert!(store @@ -1268,7 +1279,7 @@ mod tests { #[test] fn list_ancestors_external_ids() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // Assert ancestor external ids of main device assert!(store @@ -1379,7 +1390,7 @@ mod tests { #[test] fn auto_register_service() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let service_topic_id = EntityTopicId::default_child_service("child1", "service1").unwrap(); let res = store.auto_register_entity(&service_topic_id).unwrap(); @@ -1410,7 +1421,7 @@ mod tests { #[test] fn auto_register_child_device() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let child_topic_id = EntityTopicId::default_child_device("child2").unwrap(); let res = store.auto_register_entity(&child_topic_id).unwrap(); @@ -1430,7 +1441,7 @@ mod tests { #[test] fn auto_register_custom_topic_scheme_not_supported() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); assert_matches!( store.auto_register_entity(&EntityTopicId::from_str("custom/child2//").unwrap()), Err(Error::NonDefaultTopicScheme(_)) @@ -1440,7 +1451,7 @@ mod tests { #[test] fn register_main_device_custom_scheme() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // Register main device with custom topic scheme let main_topic_id = EntityTopicId::from_str("custom/main//").unwrap(); @@ -1508,7 +1519,7 @@ mod tests { #[test] fn external_id_validation() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap(); let res = store.update(EntityRegistrationMessage { @@ -1526,7 +1537,7 @@ mod tests { #[test] fn update_twin_data_new_fragment() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let topic_id = EntityTopicId::default_main_device(); let updated = store @@ -1551,7 +1562,7 @@ mod tests { #[test] fn update_twin_data_update_existing_fragment() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let topic_id = EntityTopicId::default_main_device(); let _ = store @@ -1584,7 +1595,7 @@ mod tests { #[test] fn update_twin_data_remove_fragment() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let topic_id = EntityTopicId::default_main_device(); @@ -1634,6 +1645,7 @@ mod tests { dummy_external_id_sanitizer, 5, &temp_dir, + true, ) .unwrap(); @@ -1681,7 +1693,7 @@ mod tests { #[test] fn duplicate_registration_message_ignored() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, false); let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap(); let reg_message = EntityRegistrationMessage { topic_id: entity_topic_id.clone(), @@ -1698,7 +1710,7 @@ mod tests { assert!(affected_entities.0.is_empty()); // Duplicate registration ignore even after the entity store is restored from the disk - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, false); let affected_entities = store.update(reg_message).unwrap(); assert!(affected_entities.0.is_empty()); } @@ -1706,7 +1718,7 @@ mod tests { #[test] fn duplicate_registration_message_ignored_after_twin_update() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, false); let entity_topic_id = EntityTopicId::default_child_device("child1").unwrap(); let reg_message = EntityRegistrationMessage { topic_id: entity_topic_id.clone(), @@ -1733,7 +1745,7 @@ mod tests { assert!(affected_entities.0.is_empty()); // Duplicate registration ignore even after the entity store is restored from the disk - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, false); let affected_entities = store.update(reg_message).unwrap(); assert!(affected_entities.0.is_empty()); } @@ -1741,7 +1753,7 @@ mod tests { #[test] fn early_child_device_registrations_processed_only_after_parent_registration() { let temp_dir = tempfile::tempdir().unwrap(); - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); let child0_topic_id = EntityTopicId::default_child_device("child0").unwrap(); let child000_topic_id = EntityTopicId::default_child_device("child000").unwrap(); @@ -1777,7 +1789,7 @@ mod tests { assert!(affected_entities.0.is_empty()); // Reload the entity store from the persistent log - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, true); // Assert that duplicate registrations are still ignored let affected_entities = store.update(child000_reg_message).unwrap(); @@ -1795,7 +1807,7 @@ mod tests { let twin_fragment_value = json!("bar"); { - let mut store = new_entity_store(&temp_dir); + let mut store = new_entity_store(&temp_dir, false); store .update( EntityRegistrationMessage::new_custom( @@ -1826,7 +1838,7 @@ mod tests { { // Reload the entity store using the same persistent file - let store = new_entity_store(&temp_dir); + let store = new_entity_store(&temp_dir, false); let mut expected_entity_metadata = EntityMetadata::child_device("child1".into()).unwrap(); expected_entity_metadata @@ -1847,7 +1859,7 @@ mod tests { } } - fn new_entity_store(temp_dir: &TempDir) -> EntityStore { + fn new_entity_store(temp_dir: &TempDir, clean_start: bool) -> EntityStore { EntityStore::with_main_device_and_default_service_type( MqttSchema::default(), EntityRegistrationMessage { @@ -1862,6 +1874,7 @@ mod tests { dummy_external_id_sanitizer, 5, temp_dir, + clean_start, ) .unwrap() } diff --git a/crates/core/tedge_api/src/message_log.rs b/crates/core/tedge_api/src/message_log.rs index 2d5fe9d1584..d20b0dc4611 100644 --- a/crates/core/tedge_api/src/message_log.rs +++ b/crates/core/tedge_api/src/message_log.rs @@ -91,6 +91,19 @@ impl MessageLogWriter { Ok(MessageLogWriter { writer }) } + pub fn new_truncated

(log_dir: P) -> Result + where + P: AsRef, + { + let _ = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(log_dir.as_ref().join(LOG_FILE_NAME))?; + + MessageLogWriter::new(log_dir) + } + /// Append the JSON representation of the given message to the log. /// Each message is appended on a new line. pub fn append_message(&mut self, message: &MqttMessage) -> Result<(), std::io::Error> { diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index 08532df6ec6..e8598bf6b9d 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -50,6 +50,7 @@ pub struct C8yMapperConfig { pub auth_proxy_protocol: Protocol, pub mqtt_schema: MqttSchema, pub enable_auto_register: bool, + pub clean_start: bool, } impl C8yMapperConfig { @@ -72,6 +73,7 @@ impl C8yMapperConfig { auth_proxy_protocol: Protocol, mqtt_schema: MqttSchema, enable_auto_register: bool, + clean_start: bool, ) -> Self { let ops_dir = config_dir .join(SUPPORTED_OPERATIONS_DIRECTORY) @@ -98,6 +100,7 @@ impl C8yMapperConfig { auth_proxy_protocol, mqtt_schema, enable_auto_register, + clean_start, } } @@ -139,6 +142,7 @@ impl C8yMapperConfig { let mut topics = Self::default_internal_topic_filter(&config_dir)?; let enable_auto_register = tedge_config.c8y.entity_store.auto_register; + let clean_start = tedge_config.c8y.entity_store.clean_start; // Add feature topic filters for cmd in [ @@ -194,6 +198,7 @@ impl C8yMapperConfig { auth_proxy_protocol, mqtt_schema, enable_auto_register, + clean_start, )) } diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 54e3b1f890e..d3817216380 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -262,6 +262,7 @@ impl CumulocityConverter { Self::validate_external_id, EARLY_MESSAGE_BUFFER_SIZE, config.state_dir.clone(), + config.clean_start, ) .unwrap(); @@ -3162,6 +3163,7 @@ pub(crate) mod tests { auth_proxy_protocol, MqttSchema::default(), true, + true, ) } fn create_c8y_converter_from_config( diff --git a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs index 4c6642ec95b..53b2ef6b517 100644 --- a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs +++ b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs @@ -157,6 +157,7 @@ mod tests { crate::converter::CumulocityConverter::validate_external_id, 5, &temp_dir, + true, ) .unwrap(); diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index b79f12799f5..e04960fe595 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2414,6 +2414,7 @@ pub(crate) async fn spawn_c8y_mapper_actor( Protocol::Http, MqttSchema::default(), true, + true, ); let mut mqtt_builder: SimpleMessageBoxBuilder = diff --git a/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot b/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot index 0da8c53b721..f7a9739cd3c 100644 --- a/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot +++ b/tests/RobotFramework/tests/cumulocity/registration/device_registration.robot @@ -157,7 +157,13 @@ Early data messages cached and processed Entities persisted and restored + [Teardown] Enable clean start + Execute Command sudo tedge config set c8y.entity_store.clean_start false + Restart Service tedge-mapper-c8y + Service Health Status Should Be Up tedge-mapper-c8y + ${prefix}= Get Random Name + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/' '{"@type":"child-device","@id":"${prefix}plc1"}' Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/' '{"@type":"child-device","@id":"${prefix}plc2"}' Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/sensor1' '{"@type":"child-device","@id":"${prefix}plc1-sensor1","@parent":"factory/shop/plc1/"}' @@ -194,6 +200,37 @@ Entities persisted and restored Should Have MQTT Messages c8y/s/us/${prefix}plc2 message_contains=102 date_from=${timestamp} minimum=0 maximum=0 END +Entities send to cloud on restart + ${prefix}= Get Random Name + + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/' '{"@type":"child-device","@id":"${prefix}plc1"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/' '{"@type":"child-device","@id":"${prefix}plc2"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/sensor1' '{"@type":"child-device","@id":"${prefix}plc1-sensor1","@parent":"factory/shop/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/sensor2' '{"@type":"child-device","@id":"${prefix}plc1-sensor2","@parent":"factory/shop/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/sensor1' '{"@type":"child-device","@id":"${prefix}plc2-sensor1","@parent":"factory/shop/plc2/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc1/metrics' '{"@type":"service","@id":"${prefix}plc1-metrics","@parent":"factory/shop/plc1/"}' + Execute Command tedge mqtt pub --retain 'te/factory/shop/plc2/metrics' '{"@type":"service","@id":"${prefix}plc2-metrics","@parent":"factory/shop/plc2/"}' + + External Identity Should Exist ${prefix}plc1 + External Identity Should Exist ${prefix}plc2 + External Identity Should Exist ${prefix}plc1-sensor1 + External Identity Should Exist ${prefix}plc1-sensor2 + External Identity Should Exist ${prefix}plc2-sensor1 + External Identity Should Exist ${prefix}plc1-metrics + External Identity Should Exist ${prefix}plc2-metrics + + ${timestamp}= Get Unix Timestamp + Restart Service tedge-mapper-c8y + Service Health Status Should Be Up tedge-mapper-c8y + + # Assert that entities are sent to cloud again + Should Have MQTT Messages c8y/s/us message_contains=101,${prefix}plc1 date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us message_contains=101,${prefix}plc2 date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us/${prefix}plc1 message_contains=101,${prefix}plc1-sensor1 date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us/${prefix}plc1 message_contains=101,${prefix}plc1-sensor2 date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us/${prefix}plc2 message_contains=101,${prefix}plc2-sensor1 date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us/${prefix}plc1 message_contains=102,${prefix}plc1-metrics date_from=${timestamp} minimum=1 maximum=1 + Should Have MQTT Messages c8y/s/us/${prefix}plc2 message_contains=102,${prefix}plc2-metrics date_from=${timestamp} minimum=1 maximum=1 *** Keywords *** @@ -201,6 +238,10 @@ Re-enable Auto-registration Execute Command sudo tedge config unset c8y.entity_store.auto_register Restart Service tedge-mapper-c8y +Enable clean start + Execute Command sudo tedge config set c8y.entity_store.clean_start true + Restart Service tedge-mapper-c8y + Check Child Device [Arguments] ${parent_sn} ${child_sn} ${child_name} ${child_type} ${child_mo}= Device Should Exist ${child_sn}