From 4b8213a8665b237f5f28ac40119af2e4c9ddea4f Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Thu, 12 Dec 2024 07:14:31 +0000 Subject: [PATCH] refactor: Simplify SmartREST publish topics While publishing to a nested child device, publish directly to the SmartREST topic of that device (`c8y/s/us/`), without specifying all of its ancestors in the topic. --- .../core/c8y_api/src/smartrest/inventory.rs | 12 ++-- crates/core/c8y_api/src/smartrest/topic.rs | 58 ++++++++++--------- crates/core/tedge_api/src/entity_store.rs | 19 ++++++ crates/core/tedge_mapper/src/c8y/mapper.rs | 3 +- .../c8y_mapper_ext/src/converter.rs | 47 +++++++-------- .../c8y_mapper_ext/src/service_monitor.rs | 23 +++++--- crates/extensions/c8y_mapper_ext/src/tests.rs | 4 +- .../tedge_mqtt_ext/src/test_helpers.rs | 4 +- 8 files changed, 98 insertions(+), 72 deletions(-) diff --git a/crates/core/c8y_api/src/smartrest/inventory.rs b/crates/core/c8y_api/src/smartrest/inventory.rs index e48a2c46bd8..d391cd868ac 100644 --- a/crates/core/c8y_api/src/smartrest/inventory.rs +++ b/crates/core/c8y_api/src/smartrest/inventory.rs @@ -9,7 +9,7 @@ // smartrest messages are sent. There should be one comprehensive API for // generating them. -use crate::smartrest::topic::publish_topic_from_ancestors; +use crate::smartrest::topic::publish_topic_from_parent; use crate::smartrest::topic::C8yTopic; use mqtt_channel::MqttMessage; use std::time::Duration; @@ -29,7 +29,8 @@ pub fn child_device_creation_message( child_id: &str, device_name: Option<&str>, device_type: Option<&str>, - ancestors: &[String], + parent: Option<&str>, + main_device_id: &str, prefix: &TopicPrefix, ) -> Result { if child_id.is_empty() { @@ -60,7 +61,7 @@ pub fn child_device_creation_message( .expect("child_id, device_name, device_type should not increase payload size over the limit"); Ok(MqttMessage::new( - &publish_topic_from_ancestors(ancestors, prefix), + &publish_topic_from_parent(parent, main_device_id, prefix), payload.into_inner(), )) } @@ -73,11 +74,12 @@ pub fn service_creation_message( service_name: &str, service_type: &str, service_status: &str, - ancestors: &[String], + parent: Option<&str>, + main_device_id: &str, prefix: &TopicPrefix, ) -> Result { Ok(MqttMessage::new( - &publish_topic_from_ancestors(ancestors, prefix), + &publish_topic_from_parent(parent, main_device_id, prefix), service_creation_message_payload(service_id, service_name, service_type, service_status)? .into_inner(), )) diff --git a/crates/core/c8y_api/src/smartrest/topic.rs b/crates/core/c8y_api/src/smartrest/topic.rs index 786411294cb..e005c9bb74d 100644 --- a/crates/core/c8y_api/src/smartrest/topic.rs +++ b/crates/core/c8y_api/src/smartrest/topic.rs @@ -2,7 +2,7 @@ use crate::json_c8y::C8yAlarm; use mqtt_channel::MqttError; use mqtt_channel::Topic; use mqtt_channel::TopicFilter; -use tedge_api::entity_store::EntityMetadata; +use tedge_api::entity_store::EntityExternalId; use tedge_api::entity_store::EntityType; use tedge_config::TopicPrefix; @@ -19,13 +19,14 @@ pub enum C8yTopic { impl C8yTopic { /// Return the c8y SmartRest response topic for the given entity pub fn smartrest_response_topic( - entity: &EntityMetadata, + external_id: &EntityExternalId, + entity_type: &EntityType, prefix: &TopicPrefix, ) -> Option { - match entity.r#type { + match entity_type { EntityType::MainDevice => Some(C8yTopic::upstream_topic(prefix)), EntityType::ChildDevice | EntityType::Service => { - Self::ChildSmartRestResponse(entity.external_id.clone().into()) + Self::ChildSmartRestResponse(external_id.clone().into()) .to_topic(prefix) .ok() } @@ -77,28 +78,30 @@ impl From<&C8yAlarm> for C8yTopic { } } -/// Generates the SmartREST topic to publish to, for a given managed object -/// from the list of external IDs of itself and all its parents. -/// -/// The parents are appended in the reverse order, -/// starting from the main device at the end of the list. -/// The main device itself is represented by the root topic c8y/s/us, -/// with the rest of the children appended to it at each topic level. +/// Generates the SmartREST topic to publish to, from the external ID of its parent. +/// If the parent is the main device, the topic would be `/s/us`. +/// For all other parent devices, the target topic would be `/s/us/`. +/// For the main device with no parent, and the topic would be `/s/us` in that case as well. /// /// # Examples /// -/// - `["main"]` -> `c8y/s/us` -/// - `["child1", "main"]` -> `c8y/s/us/child1` -/// - `["child2", "child1", "main"]` -> `c8y/s/us/child1/child2` -pub fn publish_topic_from_ancestors(ancestors: &[impl AsRef], prefix: &TopicPrefix) -> Topic { - let mut target_topic = format!("{prefix}/{SMARTREST_PUBLISH_TOPIC}"); - for ancestor in ancestors.iter().rev().skip(1) { - // Skipping the last ancestor as it is the main device represented by the root topic itself - target_topic.push('/'); - target_topic.push_str(ancestor.as_ref()); +/// - `(Some("main"), "main", "c8y")` -> `c8y/s/us` +/// - `[Some("child1"), "main", "c8y"]` -> `c8y/s/us/child1` +/// - `[Some("service1"), "main", "c8y"]` -> `c8y/s/us/service1` +/// - `(None, "main", "c8y")` -> `c8y/s/us` +pub fn publish_topic_from_parent( + parent_xid: Option<&str>, + main_device_xid: &str, + prefix: &TopicPrefix, +) -> Topic { + if let Some(parent) = parent_xid { + if parent != main_device_xid { + return C8yTopic::ChildSmartRestResponse(parent.to_string()) + .to_topic(prefix) + .unwrap(); + } } - - Topic::new_unchecked(&target_topic) + C8yTopic::upstream_topic(prefix) } #[cfg(test)] @@ -135,13 +138,12 @@ mod tests { ) } - #[test_case(& ["main"], "c8y2/s/us")] - #[test_case(& ["foo"], "c8y2/s/us")] - #[test_case(& ["child1", "main"], "c8y2/s/us/child1")] - #[test_case(& ["child3", "child2", "child1", "main"], "c8y2/s/us/child1/child2/child3")] - fn topic_from_ancestors(ancestors: &[&str], topic: &str) { + #[test_case(None, "main-device", "c8y2/s/us")] + #[test_case(Some("child01"), "main-device", "c8y2/s/us/child01")] + #[test_case(Some("main-device"), "main-device", "c8y2/s/us")] + fn topic_from_parent(parent_xid: Option<&str>, main_device_xid: &str, topic: &str) { let nested_child_topic = - publish_topic_from_ancestors(ancestors, &"c8y2".try_into().unwrap()); + publish_topic_from_parent(parent_xid, main_device_xid, &"c8y2".try_into().unwrap()); assert_eq!(nested_child_topic, Topic::new_unchecked(topic)); } } diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index 0f6b894a169..a32873f3895 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -354,6 +354,25 @@ impl EntityStore { self.get(&self.main_device).unwrap().external_id.clone() } + /// Returns the external id of the parent of the given entity. + /// Returns None for the main device, that doesn't have any parents. + pub fn parent_external_id( + &self, + entity_tid: &EntityTopicId, + ) -> Result, Error> { + let entity = self.try_get(entity_tid)?; + let parent_xid = entity.parent.as_ref().map(|tid| { + &self + .try_get(tid) + .expect( + "for every registered entity, its parent is also guaranteed to be registered", + ) + .external_id + }); + + Ok(parent_xid) + } + /// Returns an ordered list of ancestors of the given entity /// starting from the immediate parent all the way till the root main device. /// The last parent in the list for any entity would always be the main device. diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index b0b33e6df2b..baafe5b2813 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -336,7 +336,8 @@ pub fn service_monitor_client_config( c8y_mapper_name, service_type.as_str(), "down", - &[], + None, + main_device_xid.as_ref(), prefix, )?; diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index d8daa339cab..79b22517f0d 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -43,7 +43,6 @@ use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_id; use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_name; use c8y_api::smartrest::smartrest_serializer::EmbeddedCsv; use c8y_api::smartrest::smartrest_serializer::TextOrCsv; -use c8y_api::smartrest::topic::publish_topic_from_ancestors; use c8y_api::smartrest::topic::C8yTopic; use c8y_http_proxy::handle::C8YHttpProxy; use c8y_http_proxy::messages::CreateEvent; @@ -372,31 +371,29 @@ impl CumulocityConverter { let display_type = input.other.get("type").and_then(|v| v.as_str()); let entity_topic_id = &input.topic_id; - let external_id = self - .entity_store - .try_get(entity_topic_id) - .map(|e| &e.external_id)?; + let entity = self.entity_store.try_get(entity_topic_id)?; + let external_id = &entity.external_id; match input.r#type { EntityType::MainDevice => { self.entity_store.update(input.clone())?; Ok(vec![]) } EntityType::ChildDevice => { - let ancestors_external_ids = - self.entity_store.ancestors_external_ids(entity_topic_id)?; + let parent_xid = self.entity_store.parent_external_id(entity_topic_id)?; + let child_creation_message = child_device_creation_message( external_id.as_ref(), display_name, display_type, - &ancestors_external_ids, + parent_xid.map(|xid| xid.as_ref()), + &self.device_name, &self.config.bridge_config.c8y_prefix, ) .context("Could not create device creation message")?; Ok(vec![child_creation_message]) } EntityType::Service => { - let ancestors_external_ids = - self.entity_store.ancestors_external_ids(entity_topic_id)?; + let parent_xid = self.entity_store.parent_external_id(entity_topic_id)?; let service_creation_message = service_creation_message( external_id.as_ref(), @@ -407,7 +404,8 @@ impl CumulocityConverter { }), display_type.unwrap_or(&self.service_type), "up", - &ancestors_external_ids, + parent_xid.map(|xid| xid.as_ref()), + &self.device_name, &self.config.bridge_config.c8y_prefix, ) .context("Could not create service creation message")?; @@ -423,14 +421,13 @@ impl CumulocityConverter { entity_topic_id: &EntityTopicId, ) -> Result { let entity = self.entity_store.try_get(entity_topic_id)?; - - let mut ancestors_external_ids = - self.entity_store.ancestors_external_ids(entity_topic_id)?; - ancestors_external_ids.insert(0, entity.external_id.as_ref().into()); - Ok(publish_topic_from_ancestors( - &ancestors_external_ids, + let topic = C8yTopic::smartrest_response_topic( + &entity.external_id, + &entity.r#type, &self.config.bridge_config.c8y_prefix, - )) + ) + .expect("Topic must have been valid as the external id is pre-validated"); + Ok(topic) } /// Generates external ID of the given entity. @@ -610,19 +607,17 @@ impl CumulocityConverter { pub async fn process_health_status_message( &mut self, - entity: &EntityTopicId, + entity_tid: &EntityTopicId, message: &MqttMessage, ) -> Result, ConversionError> { - let entity_metadata = self - .entity_store - .get(entity) - .expect("entity was registered"); + let entity = self.entity_store.try_get(entity_tid)?; + let parent_xid = self.entity_store.parent_external_id(entity_tid)?; - let ancestors_external_ids = self.entity_store.ancestors_external_ids(entity)?; Ok(convert_health_status_message( &self.config.mqtt_schema, - entity_metadata, - &ancestors_external_ids, + entity, + parent_xid, + &self.entity_store.main_device_external_id(), message, &self.config.bridge_config.c8y_prefix, )) diff --git a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs index 048194e93be..fcb69e98305 100644 --- a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs +++ b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs @@ -1,4 +1,5 @@ use c8y_api::smartrest; +use tedge_api::entity_store::EntityExternalId; use tedge_api::entity_store::EntityMetadata; use tedge_api::entity_store::EntityType; use tedge_api::mqtt_topics::MqttSchema; @@ -26,7 +27,8 @@ pub fn is_c8y_bridge_established( pub fn convert_health_status_message( mqtt_schema: &MqttSchema, entity: &EntityMetadata, - ancestors_external_ids: &[String], + parent_xid: Option<&EntityExternalId>, + main_device_xid: &EntityExternalId, message: &MqttMessage, prefix: &TopicPrefix, ) -> Vec { @@ -56,7 +58,8 @@ pub fn convert_health_status_message( display_name, display_type, &status.to_string(), - ancestors_external_ids, + parent_xid.map(|v| v.as_ref()), + main_device_xid.as_ref(), prefix, ) else { error!("Can't create 102 for service status update"); @@ -174,7 +177,7 @@ mod tests { "service-monitoring-mosquitto-bridge-unknown-status" )] fn translate_health_status_to_c8y_service_monitoring_message( - device_name: &str, + main_device_id: &str, health_topic: &str, health_payload: &str, c8y_monitor_topic: &str, @@ -193,7 +196,7 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let main_device_registration = - EntityRegistrationMessage::main_device(device_name.to_string()); + EntityRegistrationMessage::main_device(main_device_id.to_string()); let mut entity_store = EntityStore::with_main_device_and_default_service_type( MqttSchema::default(), main_device_registration, @@ -220,14 +223,18 @@ mod tests { entity_store.update(entity_registration).unwrap(); let entity = entity_store.get(&entity_topic_id).unwrap(); - let ancestors_external_ids = entity_store - .ancestors_external_ids(&entity_topic_id) - .unwrap(); + let parent = entity + .parent + .as_ref() + .filter(|tid| *tid != "device/main//") + .map(|tid| &entity_store.try_get(tid).unwrap().external_id); + dbg!(&parent); let msg = convert_health_status_message( &mqtt_schema, entity, - &ancestors_external_ids, + parent, + &main_device_id.into(), &health_message, &"c8y".try_into().unwrap(), ); diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index a944ae8b689..bf9fb95bf5f 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -176,7 +176,7 @@ async fn child_device_registration_mapping() { assert_received_contains_str( &mut mqtt, [( - "c8y/s/us/test-device:device:child1/test-device:device:child2", + "c8y/s/us/test-device:device:child2", "101,child3,child3,thin-edge.io-child", )], ) @@ -326,7 +326,7 @@ async fn service_registration_mapping() { assert_received_contains_str( &mut mqtt, [( - "c8y/s/us/test-device:device:child1/test-device:device:child2", + "c8y/s/us/test-device:device:child2", "102,test-device:device:child2:service:collectd,systemd,Collectd,up", )], ) diff --git a/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs b/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs index 482aac48433..44870f5b1f9 100644 --- a/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs +++ b/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs @@ -42,13 +42,13 @@ pub fn assert_message_contains_str(message: &MqttMessage, expected: (&str, &str) let expected_payload = expected.1; assert!( TopicFilter::new_unchecked(expected_topic).accept(message), - "\nReceived unexpected message: {:?} \nExpected: {expected_payload:?}", + "\nReceived unexpected message: {:?} \nExpected message with topic: {expected_topic}, payload: {expected_payload}", message ); let payload = message.payload_str().expect("non UTF-8 payload"); assert!( payload.contains(expected_payload), - "Payload assertion failed.\n Actual: {payload:?} \nExpected: {expected_payload:?}", + "Payload assertion failed.\n Actual: {payload:?} \nExpected message with topic: {expected_topic}, payload: {expected_payload}", ) }