Skip to content

Commit

Permalink
refactor: Simplify SmartREST publish topics
Browse files Browse the repository at this point in the history
While publishing to a nested child device, publish directly to the SmartREST topic
of that device (`c8y/s/us/<xid>`), without specifying all of its ancestors in the topic.
  • Loading branch information
albinsuresh authored and Bravo555 committed Jan 8, 2025
1 parent 378c798 commit 157e8d3
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 72 deletions.
12 changes: 7 additions & 5 deletions crates/core/c8y_api/src/smartrest/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// smartrest messages are sent. There should be one comprehensive API for
// generating them.

use crate::smartrest::topic::publish_topic_from_ancestors;
use crate::smartrest::topic::publish_topic_from_parent;
use crate::smartrest::topic::C8yTopic;
use mqtt_channel::MqttMessage;
use std::time::Duration;
Expand All @@ -29,7 +29,8 @@ pub fn child_device_creation_message(
child_id: &str,
device_name: Option<&str>,
device_type: Option<&str>,
ancestors: &[String],
parent: Option<&str>,
main_device_id: &str,
prefix: &TopicPrefix,
) -> Result<MqttMessage, InvalidValueError> {
if child_id.is_empty() {
Expand Down Expand Up @@ -60,7 +61,7 @@ pub fn child_device_creation_message(
.expect("child_id, device_name, device_type should not increase payload size over the limit");

Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
&publish_topic_from_parent(parent, main_device_id, prefix),
payload.into_inner(),
))
}
Expand All @@ -73,11 +74,12 @@ pub fn service_creation_message(
service_name: &str,
service_type: &str,
service_status: &str,
ancestors: &[String],
parent: Option<&str>,
main_device_id: &str,
prefix: &TopicPrefix,
) -> Result<MqttMessage, InvalidValueError> {
Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
&publish_topic_from_parent(parent, main_device_id, prefix),
service_creation_message_payload(service_id, service_name, service_type, service_status)?
.into_inner(),
))
Expand Down
58 changes: 30 additions & 28 deletions crates/core/c8y_api/src/smartrest/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::json_c8y::C8yAlarm;
use mqtt_channel::MqttError;
use mqtt_channel::Topic;
use mqtt_channel::TopicFilter;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityType;
use tedge_config::TopicPrefix;

Expand All @@ -19,13 +19,14 @@ pub enum C8yTopic {
impl C8yTopic {
/// Return the c8y SmartRest response topic for the given entity
pub fn smartrest_response_topic(
entity: &EntityMetadata,
external_id: &EntityExternalId,
entity_type: &EntityType,
prefix: &TopicPrefix,
) -> Option<Topic> {
match entity.r#type {
match entity_type {
EntityType::MainDevice => Some(C8yTopic::upstream_topic(prefix)),
EntityType::ChildDevice | EntityType::Service => {
Self::ChildSmartRestResponse(entity.external_id.clone().into())
Self::ChildSmartRestResponse(external_id.clone().into())
.to_topic(prefix)
.ok()
}
Expand Down Expand Up @@ -77,28 +78,30 @@ impl From<&C8yAlarm> for C8yTopic {
}
}

/// Generates the SmartREST topic to publish to, for a given managed object
/// from the list of external IDs of itself and all its parents.
///
/// The parents are appended in the reverse order,
/// starting from the main device at the end of the list.
/// The main device itself is represented by the root topic c8y/s/us,
/// with the rest of the children appended to it at each topic level.
/// Generates the SmartREST topic to publish to, from the external ID of its parent.
/// If the parent is the main device, the topic would be `<prefix>/s/us`.
/// For all other parent devices, the target topic would be `<prefix>/s/us/<parent-xid>`.
/// For the main device with no parent, and the topic would be `<prefix>/s/us` in that case as well.
///
/// # Examples
///
/// - `["main"]` -> `c8y/s/us`
/// - `["child1", "main"]` -> `c8y/s/us/child1`
/// - `["child2", "child1", "main"]` -> `c8y/s/us/child1/child2`
pub fn publish_topic_from_ancestors(ancestors: &[impl AsRef<str>], prefix: &TopicPrefix) -> Topic {
let mut target_topic = format!("{prefix}/{SMARTREST_PUBLISH_TOPIC}");
for ancestor in ancestors.iter().rev().skip(1) {
// Skipping the last ancestor as it is the main device represented by the root topic itself
target_topic.push('/');
target_topic.push_str(ancestor.as_ref());
/// - `(Some("main"), "main", "c8y")` -> `c8y/s/us`
/// - `[Some("child1"), "main", "c8y"]` -> `c8y/s/us/child1`
/// - `[Some("service1"), "main", "c8y"]` -> `c8y/s/us/service1`
/// - `(None, "main", "c8y")` -> `c8y/s/us`
pub fn publish_topic_from_parent(
parent_xid: Option<&str>,
main_device_xid: &str,
prefix: &TopicPrefix,
) -> Topic {
if let Some(parent) = parent_xid {
if parent != main_device_xid {
return C8yTopic::ChildSmartRestResponse(parent.to_string())
.to_topic(prefix)
.unwrap();
}
}

Topic::new_unchecked(&target_topic)
C8yTopic::upstream_topic(prefix)
}

#[cfg(test)]
Expand Down Expand Up @@ -135,13 +138,12 @@ mod tests {
)
}

#[test_case(& ["main"], "c8y2/s/us")]
#[test_case(& ["foo"], "c8y2/s/us")]
#[test_case(& ["child1", "main"], "c8y2/s/us/child1")]
#[test_case(& ["child3", "child2", "child1", "main"], "c8y2/s/us/child1/child2/child3")]
fn topic_from_ancestors(ancestors: &[&str], topic: &str) {
#[test_case(None, "main-device", "c8y2/s/us")]
#[test_case(Some("child01"), "main-device", "c8y2/s/us/child01")]
#[test_case(Some("main-device"), "main-device", "c8y2/s/us")]
fn topic_from_parent(parent_xid: Option<&str>, main_device_xid: &str, topic: &str) {
let nested_child_topic =
publish_topic_from_ancestors(ancestors, &"c8y2".try_into().unwrap());
publish_topic_from_parent(parent_xid, main_device_xid, &"c8y2".try_into().unwrap());
assert_eq!(nested_child_topic, Topic::new_unchecked(topic));
}
}
19 changes: 19 additions & 0 deletions crates/core/tedge_api/src/entity_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,25 @@ impl EntityStore {
self.get(&self.main_device).unwrap().external_id.clone()
}

/// Returns the external id of the parent of the given entity.
/// Returns None for the main device, that doesn't have any parents.
pub fn parent_external_id(
&self,
entity_tid: &EntityTopicId,
) -> Result<Option<&EntityExternalId>, Error> {
let entity = self.try_get(entity_tid)?;
let parent_xid = entity.parent.as_ref().map(|tid| {
&self
.try_get(tid)
.expect(
"for every registered entity, its parent is also guaranteed to be registered",
)
.external_id
});

Ok(parent_xid)
}

/// Returns an ordered list of ancestors of the given entity
/// starting from the immediate parent all the way till the root main device.
/// The last parent in the list for any entity would always be the main device.
Expand Down
3 changes: 2 additions & 1 deletion crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ pub fn service_monitor_client_config(
c8y_mapper_name,
service_type.as_str(),
"down",
&[],
None,
main_device_xid.as_ref(),
prefix,
)?;

Expand Down
47 changes: 21 additions & 26 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_id;
use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_name;
use c8y_api::smartrest::smartrest_serializer::EmbeddedCsv;
use c8y_api::smartrest::smartrest_serializer::TextOrCsv;
use c8y_api::smartrest::topic::publish_topic_from_ancestors;
use c8y_api::smartrest::topic::C8yTopic;
use c8y_http_proxy::handle::C8YHttpProxy;
use c8y_http_proxy::messages::CreateEvent;
Expand Down Expand Up @@ -372,31 +371,29 @@ impl CumulocityConverter {
let display_type = input.other.get("type").and_then(|v| v.as_str());

let entity_topic_id = &input.topic_id;
let external_id = self
.entity_store
.try_get(entity_topic_id)
.map(|e| &e.external_id)?;
let entity = self.entity_store.try_get(entity_topic_id)?;
let external_id = &entity.external_id;
match input.r#type {
EntityType::MainDevice => {
self.entity_store.update(input.clone())?;
Ok(vec![])
}
EntityType::ChildDevice => {
let ancestors_external_ids =
self.entity_store.ancestors_external_ids(entity_topic_id)?;
let parent_xid = self.entity_store.parent_external_id(entity_topic_id)?;

let child_creation_message = child_device_creation_message(
external_id.as_ref(),
display_name,
display_type,
&ancestors_external_ids,
parent_xid.map(|xid| xid.as_ref()),
&self.device_name,
&self.config.bridge_config.c8y_prefix,
)
.context("Could not create device creation message")?;
Ok(vec![child_creation_message])
}
EntityType::Service => {
let ancestors_external_ids =
self.entity_store.ancestors_external_ids(entity_topic_id)?;
let parent_xid = self.entity_store.parent_external_id(entity_topic_id)?;

let service_creation_message = service_creation_message(
external_id.as_ref(),
Expand All @@ -407,7 +404,8 @@ impl CumulocityConverter {
}),
display_type.unwrap_or(&self.service_type),
"up",
&ancestors_external_ids,
parent_xid.map(|xid| xid.as_ref()),
&self.device_name,
&self.config.bridge_config.c8y_prefix,
)
.context("Could not create service creation message")?;
Expand All @@ -423,14 +421,13 @@ impl CumulocityConverter {
entity_topic_id: &EntityTopicId,
) -> Result<Topic, ConversionError> {
let entity = self.entity_store.try_get(entity_topic_id)?;

let mut ancestors_external_ids =
self.entity_store.ancestors_external_ids(entity_topic_id)?;
ancestors_external_ids.insert(0, entity.external_id.as_ref().into());
Ok(publish_topic_from_ancestors(
&ancestors_external_ids,
let topic = C8yTopic::smartrest_response_topic(
&entity.external_id,
&entity.r#type,
&self.config.bridge_config.c8y_prefix,
))
)
.expect("Topic must have been valid as the external id is pre-validated");
Ok(topic)
}

/// Generates external ID of the given entity.
Expand Down Expand Up @@ -610,19 +607,17 @@ impl CumulocityConverter {

pub async fn process_health_status_message(
&mut self,
entity: &EntityTopicId,
entity_tid: &EntityTopicId,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
let entity_metadata = self
.entity_store
.get(entity)
.expect("entity was registered");
let entity = self.entity_store.try_get(entity_tid)?;
let parent_xid = self.entity_store.parent_external_id(entity_tid)?;

let ancestors_external_ids = self.entity_store.ancestors_external_ids(entity)?;
Ok(convert_health_status_message(
&self.config.mqtt_schema,
entity_metadata,
&ancestors_external_ids,
entity,
parent_xid,
&self.entity_store.main_device_external_id(),
message,
&self.config.bridge_config.c8y_prefix,
))
Expand Down
23 changes: 15 additions & 8 deletions crates/extensions/c8y_mapper_ext/src/service_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use c8y_api::smartrest;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity_store::EntityType;
use tedge_api::mqtt_topics::MqttSchema;
Expand Down Expand Up @@ -26,7 +27,8 @@ pub fn is_c8y_bridge_established(
pub fn convert_health_status_message(
mqtt_schema: &MqttSchema,
entity: &EntityMetadata,
ancestors_external_ids: &[String],
parent_xid: Option<&EntityExternalId>,
main_device_xid: &EntityExternalId,
message: &MqttMessage,
prefix: &TopicPrefix,
) -> Vec<MqttMessage> {
Expand Down Expand Up @@ -56,7 +58,8 @@ pub fn convert_health_status_message(
display_name,
display_type,
&status.to_string(),
ancestors_external_ids,
parent_xid.map(|v| v.as_ref()),
main_device_xid.as_ref(),
prefix,
) else {
error!("Can't create 102 for service status update");
Expand Down Expand Up @@ -174,7 +177,7 @@ mod tests {
"service-monitoring-mosquitto-bridge-unknown-status"
)]
fn translate_health_status_to_c8y_service_monitoring_message(
device_name: &str,
main_device_id: &str,
health_topic: &str,
health_payload: &str,
c8y_monitor_topic: &str,
Expand All @@ -193,7 +196,7 @@ mod tests {

let temp_dir = tempfile::tempdir().unwrap();
let main_device_registration =
EntityRegistrationMessage::main_device(device_name.to_string());
EntityRegistrationMessage::main_device(main_device_id.to_string());
let mut entity_store = EntityStore::with_main_device_and_default_service_type(
MqttSchema::default(),
main_device_registration,
Expand All @@ -220,14 +223,18 @@ mod tests {
entity_store.update(entity_registration).unwrap();

let entity = entity_store.get(&entity_topic_id).unwrap();
let ancestors_external_ids = entity_store
.ancestors_external_ids(&entity_topic_id)
.unwrap();
let parent = entity
.parent
.as_ref()
.filter(|tid| *tid != "device/main//")
.map(|tid| &entity_store.try_get(tid).unwrap().external_id);
dbg!(&parent);

let msg = convert_health_status_message(
&mqtt_schema,
entity,
&ancestors_external_ids,
parent,
&main_device_id.into(),
&health_message,
&"c8y".try_into().unwrap(),
);
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async fn child_device_registration_mapping() {
assert_received_contains_str(
&mut mqtt,
[(
"c8y/s/us/test-device:device:child1/test-device:device:child2",
"c8y/s/us/test-device:device:child2",
"101,child3,child3,thin-edge.io-child",
)],
)
Expand Down Expand Up @@ -326,7 +326,7 @@ async fn service_registration_mapping() {
assert_received_contains_str(
&mut mqtt,
[(
"c8y/s/us/test-device:device:child1/test-device:device:child2",
"c8y/s/us/test-device:device:child2",
"102,test-device:device:child2:service:collectd,systemd,Collectd,up",
)],
)
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/tedge_mqtt_ext/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ pub fn assert_message_contains_str(message: &MqttMessage, expected: (&str, &str)
let expected_payload = expected.1;
assert!(
TopicFilter::new_unchecked(expected_topic).accept(message),
"\nReceived unexpected message: {:?} \nExpected: {expected_payload:?}",
"\nReceived unexpected message: {:?} \nExpected message with topic: {expected_topic}, payload: {expected_payload}",
message
);
let payload = message.payload_str().expect("non UTF-8 payload");
assert!(
payload.contains(expected_payload),
"Payload assertion failed.\n Actual: {payload:?} \nExpected: {expected_payload:?}",
"Payload assertion failed.\n Actual: {payload:?} \nExpected message with topic: {expected_topic}, payload: {expected_payload}",
)
}

Expand Down

0 comments on commit 157e8d3

Please sign in to comment.