Skip to content

Commit

Permalink
Merge pull request #2276 from Bravo555/services-new-health-topics
Browse files Browse the repository at this point in the history
Move c8y-mapper to new health topics
  • Loading branch information
Bravo555 authored Oct 18, 2023
2 parents ca4b74e + 2a8cccc commit f811fad
Show file tree
Hide file tree
Showing 39 changed files with 1,070 additions and 442 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bin/c8y-device-management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ clap = { workspace = true }
env_logger = { workspace = true }
log = { workspace = true }
tedge_actors = { workspace = true }
tedge_api = { workspace = true }
tedge_config = { workspace = true }
tedge_downloader_ext = { workspace = true }
tedge_file_system_ext = { workspace = true }
Expand Down
31 changes: 30 additions & 1 deletion crates/bin/c8y-device-management/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Context;
use c8y_config_manager::ConfigManagerBuilder;
use c8y_config_manager::ConfigManagerConfig;
use c8y_firmware_manager::FirmwareManagerBuilder;
Expand All @@ -9,6 +10,10 @@ use c8y_log_manager::LogManagerConfig;
use clap::Parser;
use std::path::PathBuf;
use tedge_actors::Runtime;
use tedge_api::mqtt_topics::DeviceTopicId;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::Service;
use tedge_config::TEdgeConfigLocation;
use tedge_config::TEdgeConfigRepository;
use tedge_config::DEFAULT_TEDGE_CONFIG_PATH;
Expand Down Expand Up @@ -92,7 +97,31 @@ async fn main() -> anyhow::Result<()> {
)?;

// Instantiate health monitor actor
let health_actor = HealthMonitorBuilder::new(PLUGIN_NAME, &mut mqtt_actor);
// TODO: take a user-configurable service topic id
let mqtt_device_topic_id = &tedge_config
.mqtt
.device_topic_id
.parse::<EntityTopicId>()
.unwrap();

let service_topic_id = mqtt_device_topic_id
.to_default_service_topic_id(PLUGIN_NAME)
.with_context(|| {
format!(
"Device topic id {mqtt_device_topic_id} currently needs default scheme, e.g: 'device/DEVICE_NAME//'",
)
})?;
let service = Service {
service_topic_id,
device_topic_id: DeviceTopicId::new(mqtt_device_topic_id.clone()),
};
let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.to_string());
let health_actor = HealthMonitorBuilder::from_service_topic_id(
service,
&mut mqtt_actor,
&mqtt_schema,
tedge_config.service.ty.clone(),
);

// Shutdown on SIGINT
let signal_actor = SignalActor::builder(&runtime.get_handle());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ define_tedge_config! {

/// Set of MQTT topics the Cumulocity mapper should subscribe to
#[tedge_config(example = "te/+/+/+/+/a/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+")]
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,tedge/health/+,tedge/health/+/+"))]
#[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))]
topics: TemplatesSet,

enable: {
Expand Down
40 changes: 40 additions & 0 deletions crates/core/c8y_api/src/smartrest/inventory.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
//! This module provides some helper functions to create SmartREST messages
//! that can be used to create various managed objects in Cumulocity inventory.
// TODO: Have different SmartREST messages be different types, so we can see
// where these messages are used, not only created.
//
// TODO: both `C8yTopic::smartrest_response_topic(&EntityMetadata)` and
// `publish_topic_from_ancestors(&[String])` produce C8y MQTT topics on which
// smartrest messages are sent. There should be one comprehensive API for
// generating them.

use crate::smartrest::topic::publish_topic_from_ancestors;
use mqtt_channel::Message;

use super::message::sanitize_for_smartrest;

/// Create a SmartREST message for creating a child device under the given ancestors.
/// The provided ancestors list must contain all the parents of the given device
/// starting from its immediate parent device.
Expand All @@ -14,6 +25,7 @@ pub fn child_device_creation_message(
) -> Message {
Message::new(
&publish_topic_from_ancestors(ancestors),
// XXX: if any arguments contain commas, output will be wrong
format!(
"101,{},{},{}",
child_id,
Expand All @@ -35,9 +47,37 @@ pub fn service_creation_message(
) -> Message {
Message::new(
&publish_topic_from_ancestors(ancestors),
// XXX: if any arguments contain commas, output will be wrong
format!(
"102,{},{},{},{}",
service_id, service_type, service_name, service_status
),
)
}

/// Create a SmartREST message for updating service status.
///
/// `service_status` can be any string, but `"up"`, `"down"`, and `"unknown"`
/// have known meanings and are displayed in the UI in different ways.
///
/// `external_ids` differs from what is returned by `ancestors_external_ids` in
/// that it also contains the external ID of the current entity (the one we want
/// to set the status of).
///
/// https://cumulocity.com/guides/reference/smartrest-two/#104
pub fn service_status_update_message(external_ids: &[String], service_status: &str) -> Message {
let topic = publish_topic_from_ancestors(external_ids);

let mut service_status = sanitize_for_smartrest(
service_status.into(),
super::message::MAX_PAYLOAD_LIMIT_IN_BYTES,
);

if service_status.contains(',') {
service_status = format!("\"{service_status}\"");
}

let payload = format!("104,{service_status}");

Message::new(&topic, payload)
}
2 changes: 2 additions & 0 deletions crates/core/c8y_api/src/smartrest/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub fn get_smartrest_template_id(payload: &str) -> String {
/// - Remove all control characters except for `\n`, `\t`, `\r`.
/// - Double quote is escaped as `\"\"`.
/// - Strip the input according to `max_size`.
// TODO: make this return Result
// TODO: make a variant which assumes `max_size = MAX_PAYLOAD_LIMIT_IN_BYTES`
pub fn sanitize_for_smartrest(input: Vec<u8>, max_size: usize) -> String {
String::from_utf8(input)
.unwrap_or_else(|err| {
Expand Down
11 changes: 0 additions & 11 deletions crates/core/c8y_api/src/smartrest/monitor.rs

This file was deleted.

21 changes: 20 additions & 1 deletion crates/core/tedge_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use tedge_actors::ConvertingActorBuilder;
use tedge_actors::MessageSink;
use tedge_actors::MessageSource;
use tedge_actors::Runtime;
use tedge_api::mqtt_topics::DeviceTopicId;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_api::mqtt_topics::Service;
use tedge_api::path::DataDir;
use tedge_health_ext::HealthMonitorBuilder;
use tedge_mqtt_ext::MqttActorBuilder;
Expand All @@ -45,6 +48,7 @@ pub struct AgentConfig {
pub data_dir: DataDir,
pub mqtt_device_topic_id: EntityTopicId,
pub mqtt_topic_root: Arc<str>,
pub service_type: String,
}

impl AgentConfig {
Expand Down Expand Up @@ -111,6 +115,7 @@ impl AgentConfig {
log_dir,
mqtt_topic_root,
mqtt_device_topic_id,
service_type: tedge_config.service.ty.clone(),
})
}
}
Expand Down Expand Up @@ -184,7 +189,20 @@ impl Agent {
let signal_actor_builder = SignalActor::builder(&runtime.get_handle());

// Health actor
let health_actor = HealthMonitorBuilder::new(TEDGE_AGENT, &mut mqtt_actor_builder);
// TODO: take a user-configurable service topic id
let service_topic_id = self.config.mqtt_device_topic_id.to_default_service_topic_id("tedge-agent")
.with_context(|| format!("Device topic id {} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", self.config.mqtt_device_topic_id))?;
let service = Service {
service_topic_id,
device_topic_id: DeviceTopicId::new(self.config.mqtt_device_topic_id.clone()),
};
let mqtt_schema = MqttSchema::with_root(self.config.mqtt_topic_root.to_string());
let health_actor = HealthMonitorBuilder::from_service_topic_id(
service,
&mut mqtt_actor_builder,
&mqtt_schema,
self.config.service_type.clone(),
);

// Tedge to Te topic converter
let tedge_to_te_converter = create_tedge_to_te_converter(&mut mqtt_actor_builder)?;
Expand Down Expand Up @@ -220,6 +238,7 @@ pub fn create_tedge_to_te_converter(
mqtt_actor_builder: &mut MqttActorBuilder,
) -> Result<ConvertingActorBuilder<TedgetoTeConverter, TopicFilter>, anyhow::Error> {
let tedge_to_te_converter = TedgetoTeConverter::new();

let subscriptions: TopicFilter = vec![
"tedge/measurements",
"tedge/measurements/+",
Expand Down
20 changes: 0 additions & 20 deletions crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ impl TedgetoTeConverter {
}
topic if topic.name.starts_with("tedge/events") => self.convert_event(message),
topic if topic.name.starts_with("tedge/alarms") => self.convert_alarm(message),
topic if topic.name.starts_with("tedge/health") => {
self.convert_health_status_message(message)
}
_ => vec![],
}
}
Expand Down Expand Up @@ -101,23 +98,6 @@ impl TedgetoTeConverter {
message.topic = topic;
vec![message]
}

// tedge/health/service-name -> te/device/main/service/<service-name>/status/health
// tedge/health/child/service-name -> te/device/child/service/<service-name>/status/health
fn convert_health_status_message(&mut self, mut message: MqttMessage) -> Vec<MqttMessage> {
let topic = match message.topic.name.split('/').collect::<Vec<_>>()[..] {
["tedge", "health", service_name] => Topic::new_unchecked(
format!("te/device/main/service/{service_name}/status/health").as_str(),
),
["tedge", "health", cid, service_name] => Topic::new_unchecked(
format!("te/device/{cid}/service/{service_name}/status/health").as_str(),
),
_ => return vec![],
};
message.topic = topic;
message.retain = true;
vec![message]
}
}
#[cfg(test)]
mod tests {
Expand Down
54 changes: 0 additions & 54 deletions crates/core/tedge_agent/src/tedge_to_te_converter/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,58 +311,6 @@ async fn convert_incoming_child_device_event_topic() -> Result<(), DynError> {
Ok(())
}

// tedge/health/service-name -> te/device/main/service/<service-name>/status/health
// tedge/health/child/service-name -> te/device/child/service/<service-name>/status/health
#[tokio::test]
async fn convert_incoming_main_device_service_health_status() -> Result<(), DynError> {
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate health status of main device service MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/health/myservice"),
r#"{""pid":1234,"status":"up","time":1674739912}"#,
)
.with_retain();

let expected_mqtt_message = MqttMessage::new(
&Topic::new_unchecked("te/device/main/service/myservice/status/health"),
r#"{""pid":1234,"status":"up","time":1674739912}"#,
)
.with_retain();

mqtt_box.send(mqtt_message).await?;

// Assert health status message
mqtt_box.assert_received([expected_mqtt_message]).await;
Ok(())
}

#[tokio::test]
async fn convert_incoming_child_device_service_health_status() -> Result<(), DynError> {
// Spawn incoming mqtt message converter
let mut mqtt_box = spawn_tedge_to_te_converter().await?;

// Simulate child device service health status MQTT message received.
let mqtt_message = MqttMessage::new(
&Topic::new_unchecked("tedge/health/child/myservice"),
r#"{""pid":1234,"status":"up","time":1674739912}"#,
)
.with_retain();

let expected_mqtt_message = MqttMessage::new(
&Topic::new_unchecked("te/device/child/service/myservice/status/health"),
r#"{""pid":1234,"status":"up","time":1674739912}"#,
)
.with_retain();

mqtt_box.send(mqtt_message).await?;

// Assert health status mqtt message
mqtt_box.assert_received([expected_mqtt_message]).await;
Ok(())
}

async fn spawn_tedge_to_te_converter(
) -> Result<TimedMessageBox<SimpleMessageBox<MqttMessage, MqttMessage>>, DynError> {
// Tedge to Te topic converter
Expand All @@ -374,8 +322,6 @@ async fn spawn_tedge_to_te_converter(
"tedge/events/+/+",
"tedge/alarms/+/+",
"tedge/alarms/+/+/+",
"tedge/health/+",
"tedge/health/+/+",
]
.try_into()?;

Expand Down
Loading

1 comment on commit f811fad

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
316 0 3 316 100 53m20.657999999s

Please sign in to comment.