Skip to content

Commit

Permalink
refactor: Simplify SmartREST publish topics
Browse files Browse the repository at this point in the history
While publishing to a nested child device, publish directly to the SmartREST topic
of that device (`c8y/s/us/<xid>`), without specifying all of all its ancestors.
  • Loading branch information
albinsuresh committed Dec 12, 2024
1 parent 7a5dd83 commit aa48c0c
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 74 deletions.
10 changes: 5 additions & 5 deletions crates/core/c8y_api/src/smartrest/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// smartrest messages are sent. There should be one comprehensive API for
// generating them.

use crate::smartrest::topic::publish_topic_from_ancestors;
use crate::smartrest::topic::publish_topic_from_parent;
use crate::smartrest::topic::C8yTopic;
use mqtt_channel::MqttMessage;
use std::time::Duration;
Expand All @@ -29,7 +29,7 @@ pub fn child_device_creation_message(
child_id: &str,
device_name: Option<&str>,
device_type: Option<&str>,
ancestors: &[String],
parent: Option<&str>,
prefix: &TopicPrefix,
) -> Result<MqttMessage, InvalidValueError> {
if child_id.is_empty() {
Expand Down Expand Up @@ -60,7 +60,7 @@ pub fn child_device_creation_message(
.expect("child_id, device_name, device_type should not increase payload size over the limit");

Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
&publish_topic_from_parent(parent, prefix),
payload.into_inner(),
))
}
Expand All @@ -73,11 +73,11 @@ pub fn service_creation_message(
service_name: &str,
service_type: &str,
service_status: &str,
ancestors: &[String],
parent: Option<&str>,
prefix: &TopicPrefix,
) -> Result<MqttMessage, InvalidValueError> {
Ok(MqttMessage::new(
&publish_topic_from_ancestors(ancestors, prefix),
&publish_topic_from_parent(parent, prefix),
service_creation_message_payload(service_id, service_name, service_type, service_status)?
.into_inner(),
))
Expand Down
48 changes: 16 additions & 32 deletions crates/core/c8y_api/src/smartrest/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::json_c8y::C8yAlarm;
use mqtt_channel::MqttError;
use mqtt_channel::Topic;
use mqtt_channel::TopicFilter;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityType;
use tedge_config::TopicPrefix;

Expand All @@ -19,13 +19,14 @@ pub enum C8yTopic {
impl C8yTopic {
/// Return the c8y SmartRest response topic for the given entity
pub fn smartrest_response_topic(
entity: &EntityMetadata,
external_id: &EntityExternalId,
entity_type: &EntityType,
prefix: &TopicPrefix,
) -> Option<Topic> {
match entity.r#type {
match entity_type {
EntityType::MainDevice => Some(C8yTopic::upstream_topic(prefix)),
EntityType::ChildDevice | EntityType::Service => {
Self::ChildSmartRestResponse(entity.external_id.clone().into())
Self::ChildSmartRestResponse(external_id.clone().into())
.to_topic(prefix)
.ok()
}
Expand Down Expand Up @@ -77,28 +78,14 @@ impl From<&C8yAlarm> for C8yTopic {
}
}

/// Generates the SmartREST topic to publish to, for a given managed object
/// from the list of external IDs of itself and all its parents.
///
/// The parents are appended in the reverse order,
/// starting from the main device at the end of the list.
/// The main device itself is represented by the root topic c8y/s/us,
/// with the rest of the children appended to it at each topic level.
///
/// # Examples
///
/// - `["main"]` -> `c8y/s/us`
/// - `["child1", "main"]` -> `c8y/s/us/child1`
/// - `["child2", "child1", "main"]` -> `c8y/s/us/child1/child2`
pub fn publish_topic_from_ancestors(ancestors: &[impl AsRef<str>], prefix: &TopicPrefix) -> Topic {
let mut target_topic = format!("{prefix}/{SMARTREST_PUBLISH_TOPIC}");
for ancestor in ancestors.iter().rev().skip(1) {
// Skipping the last ancestor as it is the main device represented by the root topic itself
target_topic.push('/');
target_topic.push_str(ancestor.as_ref());
pub fn publish_topic_from_parent(parent: Option<&str>, prefix: &TopicPrefix) -> Topic {
if let Some(parent) = parent {
C8yTopic::ChildSmartRestResponse(parent.to_string())
.to_topic(prefix)
.unwrap()
} else {
C8yTopic::upstream_topic(prefix)
}

Topic::new_unchecked(&target_topic)
}

#[cfg(test)]
Expand Down Expand Up @@ -135,13 +122,10 @@ mod tests {
)
}

#[test_case(& ["main"], "c8y2/s/us")]
#[test_case(& ["foo"], "c8y2/s/us")]
#[test_case(& ["child1", "main"], "c8y2/s/us/child1")]
#[test_case(& ["child3", "child2", "child1", "main"], "c8y2/s/us/child1/child2/child3")]
fn topic_from_ancestors(ancestors: &[&str], topic: &str) {
let nested_child_topic =
publish_topic_from_ancestors(ancestors, &"c8y2".try_into().unwrap());
#[test_case(None, "c8y2/s/us")]
#[test_case(Some("child01"), "c8y2/s/us/child01")]
fn topic_from_parent(parent: Option<&str>, topic: &str) {
let nested_child_topic = publish_topic_from_parent(parent, &"c8y2".try_into().unwrap());
assert_eq!(nested_child_topic, Topic::new_unchecked(topic));
}
}
2 changes: 1 addition & 1 deletion crates/core/tedge_mapper/src/c8y/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ pub fn service_monitor_client_config(
c8y_mapper_name,
service_type.as_str(),
"down",
&[],
None,
prefix,
)?;

Expand Down
58 changes: 32 additions & 26 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_id;
use c8y_api::smartrest::smartrest_serializer::succeed_operation_with_name;
use c8y_api::smartrest::smartrest_serializer::EmbeddedCsv;
use c8y_api::smartrest::smartrest_serializer::TextOrCsv;
use c8y_api::smartrest::topic::publish_topic_from_ancestors;
use c8y_api::smartrest::topic::C8yTopic;
use c8y_http_proxy::handle::C8YHttpProxy;
use c8y_http_proxy::messages::CreateEvent;
Expand Down Expand Up @@ -372,31 +371,28 @@ impl CumulocityConverter {
let display_type = input.other.get("type").and_then(|v| v.as_str());

let entity_topic_id = &input.topic_id;
let external_id = self
.entity_store
.try_get(entity_topic_id)
.map(|e| &e.external_id)?;
let entity = self.entity_store.try_get(entity_topic_id)?;
let external_id = &entity.external_id;
match input.r#type {
EntityType::MainDevice => {
self.entity_store.update(input.clone())?;
Ok(vec![])
}
EntityType::ChildDevice => {
let ancestors_external_ids =
self.entity_store.ancestors_external_ids(entity_topic_id)?;
let parent_xid = self.parent_xid_if_not_main_device(entity_topic_id)?;

let child_creation_message = child_device_creation_message(
external_id.as_ref(),
display_name,
display_type,
&ancestors_external_ids,
parent_xid.map(|xid| xid.as_ref()),
&self.config.bridge_config.c8y_prefix,
)
.context("Could not create device creation message")?;
Ok(vec![child_creation_message])
}
EntityType::Service => {
let ancestors_external_ids =
self.entity_store.ancestors_external_ids(entity_topic_id)?;
let parent_xid = self.parent_xid_if_not_main_device(entity_topic_id)?;

let service_creation_message = service_creation_message(
external_id.as_ref(),
Expand All @@ -407,7 +403,7 @@ impl CumulocityConverter {
}),
display_type.unwrap_or(&self.service_type),
"up",
&ancestors_external_ids,
parent_xid.map(|xid| xid.as_ref()),
&self.config.bridge_config.c8y_prefix,
)
.context("Could not create service creation message")?;
Expand All @@ -423,14 +419,27 @@ impl CumulocityConverter {
entity_topic_id: &EntityTopicId,
) -> Result<Topic, ConversionError> {
let entity = self.entity_store.try_get(entity_topic_id)?;

let mut ancestors_external_ids =
self.entity_store.ancestors_external_ids(entity_topic_id)?;
ancestors_external_ids.insert(0, entity.external_id.as_ref().into());
Ok(publish_topic_from_ancestors(
&ancestors_external_ids,
let topic = C8yTopic::smartrest_response_topic(
&entity.external_id,
&entity.r#type,
&self.config.bridge_config.c8y_prefix,
))
)
.expect("Topic must have been valid as the external id is pre-validated");
Ok(topic)
}

pub fn parent_xid_if_not_main_device(
&self,
entity_tid: &EntityTopicId,
) -> Result<Option<&EntityExternalId>, ConversionError> {
let entity = self.entity_store.try_get(entity_tid)?;
let parent_xid = entity
.parent
.as_ref()
.filter(|tid| *tid != &self.device_topic_id)
.map(|tid| &self.entity_store.try_get(tid).unwrap().external_id);

Ok(parent_xid)
}

/// Generates external ID of the given entity.
Expand Down Expand Up @@ -610,19 +619,16 @@ impl CumulocityConverter {

pub async fn process_health_status_message(
&mut self,
entity: &EntityTopicId,
entity_tid: &EntityTopicId,
message: &MqttMessage,
) -> Result<Vec<MqttMessage>, ConversionError> {
let entity_metadata = self
.entity_store
.get(entity)
.expect("entity was registered");
let entity = self.entity_store.try_get(entity_tid)?;
let parent_xid = self.parent_xid_if_not_main_device(entity_tid)?;

let ancestors_external_ids = self.entity_store.ancestors_external_ids(entity)?;
Ok(convert_health_status_message(
&self.config.mqtt_schema,
entity_metadata,
&ancestors_external_ids,
entity,
parent_xid,
message,
&self.config.bridge_config.c8y_prefix,
))
Expand Down
16 changes: 10 additions & 6 deletions crates/extensions/c8y_mapper_ext/src/service_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use c8y_api::smartrest;
use tedge_api::entity_store::EntityExternalId;
use tedge_api::entity_store::EntityMetadata;
use tedge_api::entity_store::EntityType;
use tedge_api::mqtt_topics::MqttSchema;
Expand All @@ -23,7 +24,7 @@ pub fn is_c8y_bridge_established(
pub fn convert_health_status_message(
mqtt_schema: &MqttSchema,
entity: &EntityMetadata,
ancestors_external_ids: &[String],
parent_xid: Option<&EntityExternalId>,
message: &MqttMessage,
prefix: &TopicPrefix,
) -> Vec<MqttMessage> {
Expand Down Expand Up @@ -53,7 +54,7 @@ pub fn convert_health_status_message(
display_name,
display_type,
&status.to_string(),
ancestors_external_ids,
parent_xid.map(|v| v.as_ref()),
prefix,
) else {
error!("Can't create 102 for service status update");
Expand Down Expand Up @@ -208,14 +209,17 @@ mod tests {
entity_store.update(entity_registration).unwrap();

let entity = entity_store.get(&entity_topic_id).unwrap();
let ancestors_external_ids = entity_store
.ancestors_external_ids(&entity_topic_id)
.unwrap();
let parent = entity
.parent
.as_ref()
.filter(|tid| *tid != "device/main//")
.map(|tid| &entity_store.try_get(tid).unwrap().external_id);
dbg!(&parent);

let msg = convert_health_status_message(
&mqtt_schema,
entity,
&ancestors_external_ids,
parent,
&health_message,
&"c8y".try_into().unwrap(),
);
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ async fn child_device_registration_mapping() {
assert_received_contains_str(
&mut mqtt,
[(
"c8y/s/us/test-device:device:child1/test-device:device:child2",
"c8y/s/us/test-device:device:child2",
"101,child3,child3,thin-edge.io-child",
)],
)
Expand Down Expand Up @@ -326,7 +326,7 @@ async fn service_registration_mapping() {
assert_received_contains_str(
&mut mqtt,
[(
"c8y/s/us/test-device:device:child1/test-device:device:child2",
"c8y/s/us/test-device:device:child2",
"102,test-device:device:child2:service:collectd,systemd,Collectd,up",
)],
)
Expand Down
4 changes: 2 additions & 2 deletions crates/extensions/tedge_mqtt_ext/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ pub fn assert_message_contains_str(message: &MqttMessage, expected: (&str, &str)
let expected_payload = expected.1;
assert!(
TopicFilter::new_unchecked(expected_topic).accept(message),
"\nReceived unexpected message: {:?} \nExpected: {expected_payload:?}",
"\nReceived unexpected message: {:?} \nExpected message with topic: {expected_topic}, payload: {expected_payload}",
message
);
let payload = message.payload_str().expect("non UTF-8 payload");
assert!(
payload.contains(expected_payload),
"Payload assertion failed.\n Actual: {payload:?} \nExpected: {expected_payload:?}",
"Payload assertion failed.\n Actual: {payload:?} \nExpected message with topic: {expected_topic}, payload: {expected_payload}",
)
}

Expand Down

0 comments on commit aa48c0c

Please sign in to comment.