From b698e0189b694a7151b4b6277aa5d8dc3fe9813b Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Wed, 20 Sep 2023 15:59:23 +0000 Subject: [PATCH 1/8] Update c8y mapper to consume new health topics This commit updates tedge-mapper-c8y to subscribe to new health status topics: ``` tedge/health/SERVICE_NAME -> te/device/main/service/SERVICE_NAME/status/health tedge/health/CHILD_ID/SERVICE_NAME -> te/device/CHILD_ID/service/SERVICE_NAME/status/health ``` Producers of health status topics (i.e. different thin-edge services) are not in the scope of this PR, however the tests for tedge-agent were failing because after it terminated, the topic converter could not convert a last will message to the new health status topic. As such, only tedge-agent was updated to publish its health status messages to a new health topic, introducing a new health topic API that will be used by other services, however for now they still publish to old health status topics, and these messages are converted by the tedge-agent's topic converter. Signed-off-by: Marcel Guzik --- .../src/tedge_config_cli/tedge_config.rs | 2 +- crates/core/tedge_agent/src/agent.rs | 9 +- crates/core/tedge_api/src/health.rs | 101 +++++++++++++++++- crates/core/tedge_api/src/mqtt_topics.rs | 83 +++++++++++++- crates/core/tedge_mapper/src/c8y/mapper.rs | 2 +- .../extensions/c8y_mapper_ext/src/config.rs | 3 +- .../c8y_mapper_ext/src/converter.rs | 83 +++++++++----- .../c8y_mapper_ext/src/service_monitor.rs | 88 +++++++-------- crates/extensions/c8y_mapper_ext/src/tests.rs | 2 +- .../extensions/tedge_health_ext/src/actor.rs | 17 +-- crates/extensions/tedge_health_ext/src/lib.rs | 77 +++++++++++-- .../service_monitoring.robot | 2 + ...apper-publishing-agent-supported-ops.robot | 8 +- .../tests/tedge/call_tedge_config_list.robot | 2 +- 14 files changed, 381 insertions(+), 98 deletions(-) diff --git a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs index c0a4d2711f8..3e4edd8ce6b 100644 --- a/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_config_cli/tedge_config.rs @@ -369,7 +369,7 @@ define_tedge_config! { /// Set of MQTT topics the Cumulocity mapper should subscribe to #[tedge_config(example = "te/+/+/+/+/a/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+")] - #[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,tedge/health/+,tedge/health/+/+"))] + #[tedge_config(default(value = "te/+/+/+/+,te/+/+/+/+/m/+,te/+/+/+/+/e/+,te/+/+/+/+/a/+,te/+/+/+/+/status/health"))] topics: TemplatesSet, enable: { diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index ec18d79e4bb..9731496d519 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -184,7 +184,13 @@ impl Agent { let signal_actor_builder = SignalActor::builder(&runtime.get_handle()); // Health actor - let health_actor = HealthMonitorBuilder::new(TEDGE_AGENT, &mut mqtt_actor_builder); + let service_topic_id = self.config.mqtt_device_topic_id.to_service_topic_id("tedge-agent") + .with_context(|| format!("Device topic id {} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", self.config.mqtt_device_topic_id))?; + let health_actor = HealthMonitorBuilder::from_service_topic_id( + service_topic_id, + &mut mqtt_actor_builder, + self.config.mqtt_topic_root.clone(), + ); // Tedge to Te topic converter let tedge_to_te_converter = create_tedge_to_te_converter(&mut mqtt_actor_builder)?; @@ -220,6 +226,7 @@ pub fn create_tedge_to_te_converter( mqtt_actor_builder: &mut MqttActorBuilder, ) -> Result, anyhow::Error> { let tedge_to_te_converter = TedgetoTeConverter::new(); + let subscriptions: TopicFilter = vec![ "tedge/measurements", "tedge/measurements/+", diff --git a/crates/core/tedge_api/src/health.rs b/crates/core/tedge_api/src/health.rs index 75894513667..0059bc84a49 100644 --- a/crates/core/tedge_api/src/health.rs +++ b/crates/core/tedge_api/src/health.rs @@ -1,11 +1,110 @@ use clock::Clock; use clock::WallClock; +use std::process; +use std::sync::Arc; + +use crate::mqtt_topics::ServiceTopicId; use mqtt_channel::Message; use mqtt_channel::PubChannel; use mqtt_channel::Topic; use mqtt_channel::TopicFilter; use serde_json::json; -use std::process; + +/// Encodes a valid health topic. +/// +/// Health topics are topics on which messages about health status of services are published. To be +/// able to send health messages, a health topic needs to be constructed for a given entity. +// TODO: replace `Arc` with `ServiceTopicId` after we're done with transition to new topics +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ServiceHealthTopic(Arc); + +impl ServiceHealthTopic { + pub fn new(service: ServiceTopicId) -> Self { + // XXX: hardcoded MQTT root + ServiceHealthTopic(Arc::from(format!("te/{}/status/health", service.as_str()))) + } + + pub fn from_old_topic(topic: String) -> Result { + match topic.split('/').collect::>()[..] { + ["tedge", "health", _service_name] => {} + ["tedge", "health", _child_id, _service_name] => {} + _ => return Err(HealthTopicError), + } + + Ok(Self(Arc::from(topic))) + } + + pub fn is_health_topic(topic: &str) -> bool { + matches!( + topic.split('/').collect::>()[..], + ["te", _, _, _, _, "status", "health"] + ) + } + + pub fn as_str(&self) -> &str { + &self.0 + } + + pub async fn send_health_status(&self, responses: &mut impl PubChannel) { + let response_topic_health = Topic::new_unchecked(self.as_str()); + + let health_status = json!({ + "status": "up", + "pid": process::id(), + }) + .to_string(); + + let health_message = Message::new(&response_topic_health, health_status).with_retain(); + let _ = responses.send(health_message).await; + } + + pub fn down_message(&self) -> Message { + Message { + topic: Topic::new_unchecked(self.as_str()), + payload: json!({ + "status": "down", + "pid": process::id()}) + .to_string() + .into(), + qos: mqtt_channel::QoS::AtLeastOnce, + retain: true, + } + } + + pub fn up_message(&self) -> Message { + let timestamp = WallClock + .now() + .format(&time::format_description::well_known::Rfc3339); + match timestamp { + Ok(timestamp) => { + let health_status = json!({ + "status": "up", + "pid": process::id(), + "time": timestamp + }) + .to_string(); + + let response_topic_health = Topic::new_unchecked(self.as_str()); + + Message::new(&response_topic_health, health_status) + .with_qos(mqtt_channel::QoS::AtLeastOnce) + .with_retain() + } + Err(e) => { + let error_topic = Topic::new_unchecked("tedge/errors"); + let error_msg = format!( + "Health message: Failed to convert timestamp to Rfc3339 format due to: {e}" + ); + Message::new(&error_topic, error_msg).with_qos(mqtt_channel::QoS::AtLeastOnce) + } + } + } +} + +#[derive(Debug)] +pub struct HealthTopicError; + +// TODO: remove below functions once components moved to new health topics pub fn health_check_topics(daemon_name: &str) -> TopicFilter { vec![ diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index 12541fde39e..cb4583d1a4c 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -8,6 +8,8 @@ use std::fmt::Display; use std::fmt::Formatter; use std::str::FromStr; +const ENTITY_ID_SEGMENTS: usize = 4; + /// The MQTT topics are represented by three distinct groups: /// - a root prefix, used by all the topics /// - an entity topic identifier of the source or target of the messages @@ -234,8 +236,6 @@ impl FromStr for EntityTopicId { type Err = TopicIdError; fn from_str(entity_id: &str) -> Result { - const ENTITY_ID_SEGMENTS: usize = 4; - let entity_id_segments = entity_id.matches('/').count() + 1; if entity_id_segments > ENTITY_ID_SEGMENTS { return Err(TopicIdError::TooLong); @@ -283,6 +283,11 @@ impl EntityTopicId { .is_some() } + /// Returns `true` if it's the topic identifier of the child device. + pub fn is_default_child_device(&self) -> bool { + matches!(self.segments(), ["device", device_name, "", ""] if device_name != "main" && !device_name.is_empty()) + } + /// Returns the device name when the entity topic identifier is using the `device/+/service/+` pattern. /// /// Returns None otherwise. @@ -322,6 +327,75 @@ impl EntityTopicId { pub fn is_default_main_device(&self) -> bool { self == &Self::default_main_device() } + + /// If `self` is a device topic id, return a service topic id under this + /// device. + /// + /// The device topic id must be in a format: "device/DEVICE_NAME//"; if not, + /// `None` will be returned. + pub fn to_service_topic_id(&self, service_name: &str) -> Option { + if let ["device", device_name, "", ""] = self.0.split('/').collect::>()[..] { + return Some(ServiceTopicId(EntityTopicId(format!( + "device/{device_name}/service/{service_name}" + )))); + } + None + } + + /// Returns an array of all segments of this entity topic. + fn segments(&self) -> [&str; ENTITY_ID_SEGMENTS] { + let mut segments = self.0.split('/'); + let seg1 = segments.next().unwrap(); + let seg2 = segments.next().unwrap(); + let seg3 = segments.next().unwrap(); + let seg4 = segments.next().unwrap(); + [seg1, seg2, seg3, seg4] + } + + pub fn as_str(&self) -> &str { + self.0.as_str() + } +} + +/// Represents an entity topic identifier known to be a service. +/// +/// It's most often in a format `device/DEVICE_NAME/service/SERVICE_NAME`. +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct ServiceTopicId(EntityTopicId); + +impl ServiceTopicId { + pub fn as_str(&self) -> &str { + self.0.as_str() + } + + pub fn entity(&self) -> &EntityTopicId { + &self.0 + } + + /// If in a default MQTT scheme, returns a device topic id of this service. + pub fn to_device_topic_id(&self) -> Option { + if let ["device", device_name, "service", _] = self.0.segments() { + Some(DeviceTopicId(EntityTopicId(format!( + "device/{device_name}//" + )))) + } else { + None + } + } +} + +impl Display for ServiceTopicId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +pub struct DeviceTopicId(EntityTopicId); + +impl DeviceTopicId { + pub fn entity(&self) -> &EntityTopicId { + &self.0 + } } #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] @@ -364,6 +438,7 @@ pub enum Channel { CommandMetadata { operation: OperationType, }, + Health, } impl FromStr for Channel { @@ -401,6 +476,7 @@ impl FromStr for Channel { operation: operation.parse().unwrap(), // Infallible cmd_id: cmd_id.to_string(), }), + ["status", "health"] => Ok(Channel::Health), _ => Err(ChannelError::InvalidCategory(channel.to_string())), } @@ -425,6 +501,7 @@ impl Display for Channel { Channel::Command { operation, cmd_id } => write!(f, "cmd/{operation}/{cmd_id}"), Channel::CommandMetadata { operation } => write!(f, "cmd/{operation}"), + Channel::Health => write!(f, "status/health"), } } } @@ -443,6 +520,7 @@ pub enum OperationType { LogUpload, ConfigSnapshot, ConfigUpdate, + Health, Custom(String), } @@ -471,6 +549,7 @@ impl Display for OperationType { OperationType::LogUpload => write!(f, "log_upload"), OperationType::ConfigSnapshot => write!(f, "config_snapshot"), OperationType::ConfigUpdate => write!(f, "config_update"), + OperationType::Health => write!(f, "health"), OperationType::Custom(operation) => write!(f, "{operation}"), } } diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 713b88cdfec..cf3940bd6de 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -56,7 +56,7 @@ impl TEdgeComponent for CumulocityMapper { // MQTT client dedicated to set service down status on shutdown, using a last-will message // A separate MQTT actor/client is required as the last will message of the main MQTT actor - // is used to send down status to tedge/health topic + // is used to send down status to health topic let service_monitor_actor = MqttActorBuilder::new(service_monitor_client_config(&tedge_config)?); diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index 17e0636ad11..3da9e99aed1 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -169,8 +169,7 @@ impl C8yMapperConfig { "te/+/+/+/+/m/+", "te/+/+/+/+/e/+", "te/+/+/+/+/a/+", - "tedge/health/+", - "tedge/health/+/+", + "te/+/+/+/+/status/health", ] .try_into() .unwrap() diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index c265a9f7787..7da119afffa 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -107,7 +107,8 @@ const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create"; const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent"; const CREATE_EVENT_SMARTREST_CODE: u16 = 400; -const TEDGE_AGENT_HEALTH_TOPIC: &str = "tedge/health/tedge-agent"; +/// XXX: hardcoded topic root +const TEDGE_AGENT_HEALTH_TOPIC: &str = "te/device/main/service/tedge-agent/status/health"; const DEFAULT_EVENT_TYPE: &str = "ThinEdgeEvent"; const FORBIDDEN_ID_CHARS: [char; 3] = ['/', '+', '#']; @@ -485,29 +486,33 @@ impl CumulocityConverter { pub async fn process_health_status_message( &mut self, + entity: &EntityTopicId, message: &Message, ) -> Result, ConversionError> { let mut mqtt_messages: Vec = Vec::new(); // Send the init messages - if check_tedge_agent_status(message)? { - create_tedge_agent_supported_ops(self.ops_dir.clone()).await?; + // TODO: use entity store to check service statuses + if is_message_tedge_agent_up(message)? { + create_tedge_agent_supported_ops(&self.ops_dir).await?; mqtt_messages.push(create_get_software_list_message()?); } // When there is some messages to be sent on behalf of a child device, // this child device must be declared first, if not done yet - let topic_split: Vec<&str> = message.topic.name.split('/').collect(); - if topic_split.len() == 4 { - let child_id = topic_split[2]; - add_external_device_registration_message( - child_id.to_string(), - &mut self.children, - &mut mqtt_messages, - ); + if let Some(parent) = entity.default_parent_identifier() { + if parent.is_default_child_device() { + let child_id = parent.default_device_name().unwrap(); + add_external_device_registration_message( + child_id.to_string(), + &mut self.children, + &mut mqtt_messages, + ); + } } let mut message = convert_health_status_message( + entity, message, self.device_name.clone(), self.service_type.clone(), @@ -960,6 +965,7 @@ impl CumulocityConverter { self.handle_config_update_state_change(&source, cmd_id, message) .await? } + Channel::Health => self.process_health_status_message(&source, message).await?, _ => vec![], }; @@ -1034,9 +1040,6 @@ impl CumulocityConverter { self.alarm_converter.process_internal_alarm(message); Ok(vec![]) } - topic if topic.name.starts_with("tedge/health") => { - self.process_health_status_message(message).await - } topic => match topic.clone().try_into() { Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareListResponse)) => { debug!("Software list"); @@ -1442,7 +1445,7 @@ fn get_inventory_fragments( } } -async fn create_tedge_agent_supported_ops(ops_dir: PathBuf) -> Result<(), ConversionError> { +async fn create_tedge_agent_supported_ops(ops_dir: &Path) -> Result<(), ConversionError> { create_file_with_defaults(ops_dir.join("c8y_SoftwareUpdate"), None)?; Ok(()) @@ -1455,7 +1458,7 @@ pub struct HealthStatus { pub status: String, } -pub fn check_tedge_agent_status(message: &Message) -> Result { +pub fn is_message_tedge_agent_up(message: &Message) -> Result { if message.topic.name.eq(TEDGE_AGENT_HEALTH_TOPIC) { let status: HealthStatus = serde_json::from_str(message.payload_str()?)?; return Ok(status.status.eq("up")); @@ -1492,6 +1495,7 @@ mod tests { use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; use tedge_api::entity_store::EntityRegistrationMessage; + use tedge_api::entity_store::EntityType; use tedge_api::entity_store::InvalidExternalIdError; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; @@ -2162,10 +2166,13 @@ mod tests { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let in_topic = "tedge/health/child1/child-service-c8y"; + let in_topic = "te/device/child1/service/child-service-c8y/status/health"; let in_payload = r#"{"pid":"1234","status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#; let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + let mqtt_schema = MqttSchema::new(); + let (in_entity, _in_channel) = mqtt_schema.entity_channel_of(&in_message.topic).unwrap(); + let expected_child_create_smart_rest_message = Message::new( &Topic::new_unchecked("c8y/s/us"), "101,child1,child1,thin-edge.io-child", @@ -2176,11 +2183,27 @@ mod tests { r#"102,test-device_child1_child-service-c8y,"thin-edge.io",child-service-c8y,"up""#, ); - // Test the first output messages contains SmartREST and C8Y JSON. - let out_first_messages = converter.convert(&in_message).await; + let mut out_messages = converter.convert(&in_message).await.into_iter(); + // child device entity store registration message + let device_registration_message = out_messages.next().unwrap(); + let device_registration_message = + EntityRegistrationMessage::new(&device_registration_message).unwrap(); assert_eq!( - out_first_messages, + device_registration_message.topic_id, + in_entity.default_parent_identifier().unwrap() + ); + assert_eq!(device_registration_message.r#type, EntityType::ChildDevice); + + // service entity store registration message + let service_registration_message = out_messages.next().unwrap(); + let service_registration_message = + EntityRegistrationMessage::new(&service_registration_message).unwrap(); + assert_eq!(service_registration_message.topic_id, in_entity); + assert_eq!(service_registration_message.r#type, EntityType::Service); + + assert_eq!( + out_messages.collect::>(), vec![ expected_child_create_smart_rest_message, expected_service_monitor_smart_rest_message.clone() @@ -2193,21 +2216,33 @@ mod tests { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let in_topic = "tedge/health/test-tedge-mapper-c8y"; + let in_topic = "te/device/main/service/test-tedge-mapper-c8y/status/health"; let in_payload = r#"{"pid":"1234","status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#; let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + let mqtt_schema = MqttSchema::new(); + let (in_entity, _in_channel) = mqtt_schema.entity_channel_of(&in_message.topic).unwrap(); + let expected_service_monitor_smart_rest_message = Message::new( &Topic::new_unchecked("c8y/s/us"), r#"102,test-device_test-tedge-mapper-c8y,"thin-edge.io",test-tedge-mapper-c8y,"up""#, ); // Test the output messages contains SmartREST and C8Y JSON. - let out_messages = converter.convert(&in_message).await; + let mut out_messages = converter.convert(&in_message).await.into_iter(); + + // service entity store registration message + let service_registration_message = out_messages.next().unwrap(); + let service_registration_message = + EntityRegistrationMessage::new(&service_registration_message).unwrap(); + assert_eq!(service_registration_message.topic_id, in_entity); + assert_eq!(service_registration_message.r#type, EntityType::Service); + + let service_monitor_message = out_messages.next().unwrap(); assert_eq!( - out_messages, - vec![expected_service_monitor_smart_rest_message] + service_monitor_message, + expected_service_monitor_smart_rest_message ); } diff --git a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs index 3307544817f..6b70913925a 100644 --- a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs +++ b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs @@ -3,6 +3,7 @@ use c8y_api::smartrest::message::MAX_PAYLOAD_LIMIT_IN_BYTES; use c8y_api::smartrest::topic::SMARTREST_PUBLISH_TOPIC; use serde::Deserialize; use serde::Serialize; +use tedge_api::mqtt_topics::EntityTopicId; use tedge_mqtt_ext::Message; use tedge_mqtt_ext::Topic; @@ -25,47 +26,37 @@ fn default_type() -> String { "".to_string() } -#[derive(Deserialize, Serialize, Debug)] -pub struct TopicInfo { - pub service_name: String, - pub child_id: Option, -} - -impl TopicInfo { - fn parse_topic_info(topic: &str) -> Self { - let topic_split: Vec<&str> = topic.split('/').collect(); - let service_name = if topic_split.len() == 4 { - topic_split[3] - } else { - topic_split[2] - } - .to_string(); - - let child_id = if topic_split.len() == 4 { - Some(topic_split[2].to_owned()) - } else { - None - }; - - Self { - service_name, - child_id, - } - } -} - pub fn convert_health_status_message( + entity: &EntityTopicId, message: &Message, device_name: String, default_service_type: String, ) -> Vec { let mut mqtt_messages: Vec = Vec::new(); - let topic = message.topic.name.to_owned(); - let topic_info = TopicInfo::parse_topic_info(&topic); + + let service_name = entity + .default_service_name() + .expect("EntityTopicId should be in default scheme"); + + let parent = entity + .default_parent_identifier() + .expect("EntityTopicId should be in default scheme"); + + let child_id = if parent.is_default_child_device() { + Some( + parent + .default_device_name() + .expect("EntityTopicId should be in default scheme") + .to_string(), + ) + } else { + None + }; + let default_health_status = format!("\"type\":{default_service_type},\"status\":\"unknown\""); // If not Bridge health status - if !topic_info.service_name.contains("bridge") { + if !service_name.contains("bridge") { let payload_str = message.payload_str().unwrap_or(&default_health_status); let mut health_status = @@ -88,10 +79,10 @@ pub fn convert_health_status_message( let status_message = service_monitor_status_message( &device_name, - &topic_info.service_name, + service_name, &health_status.status, &health_status.service_type, - topic_info.child_id, + child_id, ); mqtt_messages.push(status_message); @@ -134,10 +125,11 @@ pub fn service_monitor_status_message( #[cfg(test)] mod tests { use super::*; + use tedge_api::mqtt_topics::MqttSchema; use test_case::test_case; #[test_case( "test_device", - "tedge/health/tedge-mapper-c8y", + "te/device/main/service/tedge-mapper-c8y/status/health", r#"{"pid":"1234","type":"systemd","status":"up"}"#, "c8y/s/us", r#"102,test_device_tedge-mapper-c8y,"systemd",tedge-mapper-c8y,"up""#; @@ -145,7 +137,7 @@ mod tests { )] #[test_case( "test_device", - "tedge/health/child/tedge-mapper-c8y", + "te/device/child/service/tedge-mapper-c8y/status/health", r#"{"pid":"1234","type":"systemd","status":"up"}"#, "c8y/s/us/child", r#"102,test_device_child_tedge-mapper-c8y,"systemd",tedge-mapper-c8y,"up""#; @@ -153,7 +145,7 @@ mod tests { )] #[test_case( "test_device", - "tedge/health/tedge-mapper-c8y", + "te/device/main/service/tedge-mapper-c8y/status/health", r#"{"pid":"123456","type":"systemd"}"#, "c8y/s/us", r#"102,test_device_tedge-mapper-c8y,"systemd",tedge-mapper-c8y,"unknown""#; @@ -161,7 +153,7 @@ mod tests { )] #[test_case( "test_device", - "tedge/health/tedge-mapper-c8y", + "te/device/main/service/tedge-mapper-c8y/status/health", r#"{"type":"systemd"}"#, "c8y/s/us", r#"102,test_device_tedge-mapper-c8y,"systemd",tedge-mapper-c8y,"unknown""#; @@ -169,7 +161,7 @@ mod tests { )] #[test_case( "test_device", - "tedge/health/tedge-mapper-c8y", + "te/device/main/service/tedge-mapper-c8y/status/health", r#"{"type":"", "status":""}"#, "c8y/s/us", r#"102,test_device_tedge-mapper-c8y,"service",tedge-mapper-c8y,"unknown""#; @@ -177,7 +169,7 @@ mod tests { )] #[test_case( "test_device", - "tedge/health/tedge-mapper-c8y", + "te/device/main/service/tedge-mapper-c8y/status/health", "{}", "c8y/s/us", r#"102,test_device_tedge-mapper-c8y,"service",tedge-mapper-c8y,"unknown""#; @@ -185,7 +177,7 @@ mod tests { )] #[test_case( "test_device", - "tedge/health/tedge-mapper-c8y", + "te/device/main/service/tedge-mapper-c8y/status/health", r#"{"type":"thin,edge","status":"up,down"}"#, "c8y/s/us", r#"102,test_device_tedge-mapper-c8y,"thin,edge",tedge-mapper-c8y,"up,down""#; @@ -193,7 +185,7 @@ mod tests { )] #[test_case( "test_device", - "tedge/health/tedge-mapper-c8y", + "te/device/main/service/tedge-mapper-c8y/status/health", r#"{"type":"thin\"\"edge","status":"up\"down"}"#, "c8y/s/us", r#"102,test_device_tedge-mapper-c8y,"thin""""edge",tedge-mapper-c8y,"up""down""#; @@ -207,14 +199,22 @@ mod tests { c8y_monitor_payload: &str, ) { let topic = Topic::new_unchecked(health_topic); + + let mqtt_schema = MqttSchema::new(); + let (entity, _) = mqtt_schema.entity_channel_of(&topic).unwrap(); + let health_message = Message::new(&topic, health_payload.as_bytes().to_owned()); let expected_message = Message::new( &Topic::new_unchecked(c8y_monitor_topic), c8y_monitor_payload.as_bytes(), ); - let msg = - convert_health_status_message(&health_message, device_name.into(), "service".into()); + let msg = convert_health_status_message( + &entity, + &health_message, + device_name.into(), + "service".into(), + ); assert_eq!(msg[0], expected_message); } } diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 1c89efeaeb7..8c8a905b133 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -1168,7 +1168,7 @@ async fn mapper_dynamically_updates_supported_operations_for_tedge_device() { // Simulate tedge-agent health status message mqtt.send( MqttMessage::new( - &Topic::new_unchecked("tedge/health/tedge-agent"), + &Topic::new_unchecked("te/device/main/service/tedge-agent/status/health"), "{\"status\":\"up\"}", ) .with_retain(), diff --git a/crates/extensions/tedge_health_ext/src/actor.rs b/crates/extensions/tedge_health_ext/src/actor.rs index fd1e418cd62..1672aa14f2f 100644 --- a/crates/extensions/tedge_health_ext/src/actor.rs +++ b/crates/extensions/tedge_health_ext/src/actor.rs @@ -4,30 +4,31 @@ use tedge_actors::MessageReceiver; use tedge_actors::RuntimeError; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; +use tedge_api::health::ServiceHealthTopic; use tedge_mqtt_ext::MqttMessage; -use tedge_api::health::health_status_down_message; -use tedge_api::health::health_status_up_message; - pub struct HealthMonitorActor { - daemon_name: String, + health_topic: ServiceHealthTopic, messages: SimpleMessageBox, } impl HealthMonitorActor { - pub fn new(daemon_name: String, messages: SimpleMessageBox) -> Self { + pub fn new( + health_topic: ServiceHealthTopic, + messages: SimpleMessageBox, + ) -> Self { Self { - daemon_name, + health_topic, messages, } } pub fn up_health_status(&self) -> MqttMessage { - health_status_up_message(&self.daemon_name) + self.health_topic.up_message() } pub fn down_health_status(&self) -> MqttMessage { - health_status_down_message(&self.daemon_name) + self.health_topic.down_message() } } diff --git a/crates/extensions/tedge_health_ext/src/lib.rs b/crates/extensions/tedge_health_ext/src/lib.rs index 41936aa1517..b1f1f1265e6 100644 --- a/crates/extensions/tedge_health_ext/src/lib.rs +++ b/crates/extensions/tedge_health_ext/src/lib.rs @@ -3,6 +3,8 @@ mod actor; #[cfg(test)] mod tests; +use std::sync::Arc; + use actor::HealthMonitorActor; use tedge_actors::Builder; use tedge_actors::DynSender; @@ -13,18 +15,23 @@ use tedge_actors::RuntimeRequestSink; use tedge_actors::ServiceConsumer; use tedge_actors::ServiceProvider; use tedge_actors::SimpleMessageBoxBuilder; -use tedge_api::health::health_status_down_message; -use tedge_api::health::health_status_up_message; +use tedge_api::health::ServiceHealthTopic; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::OperationType; +use tedge_api::mqtt_topics::ServiceTopicId; use tedge_mqtt_ext::MqttConfig; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::TopicFilter; pub struct HealthMonitorBuilder { - service_name: String, + service_health_topic: ServiceHealthTopic, box_builder: SimpleMessageBoxBuilder, } impl HealthMonitorBuilder { + /// Creates a HealthMonitorBuilder that creates a HealthMonitorActor with + /// old topic scheme. pub fn new( service_name: &str, mqtt: &mut (impl ServiceProvider + AsMut), @@ -37,12 +44,65 @@ impl HealthMonitorBuilder { .try_into() .expect("Failed to create the HealthMonitorActor topic filter"); + let service_health_topic = + ServiceHealthTopic::from_old_topic(format!("tedge/health/{service_name}")).unwrap(); + let mut box_builder = SimpleMessageBoxBuilder::new(service_name, 16); box_builder .set_request_sender(mqtt.connect_consumer(subscriptions, box_builder.get_sender())); let builder = HealthMonitorBuilder { - service_name: service_name.to_owned(), + service_health_topic, + box_builder, + }; + + // Update the MQTT config + *mqtt.as_mut() = builder.set_init_and_last_will(mqtt.as_mut().clone()); + + builder + } + + /// Creates a HealthMonitorBuilder that creates a HealthMonitorActor with + /// a new topic scheme. + pub fn from_service_topic_id( + service: ServiceTopicId, + mqtt: &mut (impl ServiceProvider + AsMut), + // TODO: pass it less annoying way + mqtt_topic_root: Arc, + ) -> Self { + let health_topic = ServiceHealthTopic::new(service.clone()); + + let mut box_builder = SimpleMessageBoxBuilder::new(service.as_str(), 16); + + // passed service is in default scheme + let device_topic_id = service.to_device_topic_id().unwrap(); + + let mqtt_schema = MqttSchema::with_root(mqtt_topic_root.to_string()); + let subscriptions = vec![ + mqtt_schema.topic_for( + service.clone().entity(), + &Channel::Command { + operation: OperationType::Health, + cmd_id: "check".to_string(), + }, + ), + mqtt_schema.topic_for( + device_topic_id.entity(), + &Channel::Command { + operation: OperationType::Health, + cmd_id: "check".to_string(), + }, + ), + ] + .into_iter() + .map(|t| t.into()) + .collect::(); + + box_builder + .set_request_sender(mqtt.connect_consumer(subscriptions, box_builder.get_sender())); + + let builder = HealthMonitorBuilder { + service_health_topic: health_topic, box_builder, }; @@ -53,10 +113,11 @@ impl HealthMonitorBuilder { } fn set_init_and_last_will(&self, config: MqttConfig) -> MqttConfig { - let name = self.service_name.to_owned(); + let name = self.service_health_topic.to_owned(); + let _name = name.clone(); config - .with_initial_message(move || health_status_up_message(&name)) - .with_last_will_message(health_status_down_message(&self.service_name)) + .with_initial_message(move || _name.up_message()) + .with_last_will_message(name.down_message()) } } @@ -71,7 +132,7 @@ impl Builder for HealthMonitorBuilder { fn try_build(self) -> Result { let message_box = self.box_builder.build(); - let actor = HealthMonitorActor::new(self.service_name, message_box); + let actor = HealthMonitorActor::new(self.service_health_topic, message_box); Ok(actor) } diff --git a/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot b/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot index bddc3c93303..df29f5e031a 100644 --- a/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot +++ b/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot @@ -129,6 +129,8 @@ Custom Test Setup ThinEdgeIO.Start Service mosquitto ThinEdgeIO.Restart Service tedge-mapper-c8y ThinEdgeIO.Service Should Be Running tedge-mapper-c8y + ThinEdgeIO.Restart Service tedge-agent + ThinEdgeIO.Service Should Be Running tedge-agent Custom Test Teardown ThinEdgeIO.Stop Service tedge-mapper-c8y diff --git a/tests/RobotFramework/tests/cumulocity/supported_operations/mapper-publishing-agent-supported-ops.robot b/tests/RobotFramework/tests/cumulocity/supported_operations/mapper-publishing-agent-supported-ops.robot index 9d44a1b8324..6fc4dc363fe 100644 --- a/tests/RobotFramework/tests/cumulocity/supported_operations/mapper-publishing-agent-supported-ops.robot +++ b/tests/RobotFramework/tests/cumulocity/supported_operations/mapper-publishing-agent-supported-ops.robot @@ -20,23 +20,23 @@ Create and publish the tedge agent supported operations on mapper restart ${timestamp}= Get Unix Timestamp # now restart the mapper ThinEdgeIO.start Service tedge-mapper-c8y - Should Have MQTT Messages tedge/health/tedge-mapper-c8y message_contains=up date_from=${timestamp} + Should Have MQTT Messages te/device/main/service/tedge-mapper-c8y/status/health message_contains=up date_from=${timestamp} # After receiving the health status `up` from tege-agent, the mapper creates supported operations and will publish to c8y - Should Have MQTT Messages tedge/health/tedge-agent message_contains=up + Should Have MQTT Messages te/device/main/service/tedge-agent/status/health message_contains=up # Check if the `c8y_SoftwareUpdate` and `c8y_Restart` ops files exists in `/etc/tedge/operations/c8y` directory ThinEdgeIO.File Should Exist /etc/tedge/operations/c8y/c8y_SoftwareUpdate ThinEdgeIO.File Should Exist /etc/tedge/operations/c8y/c8y_Restart # Check if the tedge-agent supported operations exists in c8y cloud - Cumulocity.Should Contain Supported Operations c8y_Restart c8y_SoftwareUpdate + Cumulocity.Should Contain Supported Operations c8y_Restart c8y_SoftwareUpdate Agent gets the software list request once it comes up ${timestamp}= Get Unix Timestamp ThinEdgeIO.restart Service tedge-agent # wait till there is up status on tedge-agent health - Should Have MQTT Messages tedge/health/tedge-agent message_contains=up date_from=${timestamp} + Should Have MQTT Messages te/device/main/service/tedge-agent/status/health message_contains=up date_from=${timestamp} # now there should be a new list request Should Have MQTT Messages tedge/commands/req/software/list message_contains=id date_from=${timestamp} diff --git a/tests/RobotFramework/tests/tedge/call_tedge_config_list.robot b/tests/RobotFramework/tests/tedge/call_tedge_config_list.robot index 6fc7aaac5f5..9235c20099a 100644 --- a/tests/RobotFramework/tests/tedge/call_tedge_config_list.robot +++ b/tests/RobotFramework/tests/tedge/call_tedge_config_list.robot @@ -105,7 +105,7 @@ set/unset c8y.topics ${unset} Execute Command tedge config list Should Contain ... ${unset} - ... c8y.topics=["te/+/+/+/+", "te/+/+/+/+/m/+", "te/+/+/+/+/e/+", "te/+/+/+/+/a/+", "tedge/health/+", "tedge/health/+/+"] + ... c8y.topics=["te/+/+/+/+", "te/+/+/+/+/m/+", "te/+/+/+/+/e/+", "te/+/+/+/+/a/+", "te/+/+/+/+/status/health"] set/unset az.root_cert_path Execute Command sudo tedge config set az.root_cert_path /etc/ssl/certs1 # Changing az.root_cert_path From f37111e83357cd36be73d78cfd4cfb914ad8a7a3 Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Mon, 25 Sep 2023 11:41:44 +0000 Subject: [PATCH 2/8] tedge_api: Create `Service` type A new `Service` type was created, containing topic ids of the service itself and the associated device. Functions which need to take an entity topic id of a service can use this type to ensure that the given argument is indeed a service, as well as to obtain the topic id of the associated device. Signed-off-by: Marcel Guzik --- crates/core/tedge_agent/src/agent.rs | 13 +++++-- crates/core/tedge_api/src/health.rs | 36 +++++++------------ crates/core/tedge_api/src/mqtt_topics.rs | 27 +++++++++++++- crates/extensions/tedge_health_ext/src/lib.rs | 24 ++++++------- 4 files changed, 62 insertions(+), 38 deletions(-) diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 9731496d519..21f2150ea49 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -19,7 +19,10 @@ use tedge_actors::ConvertingActorBuilder; use tedge_actors::MessageSink; use tedge_actors::MessageSource; use tedge_actors::Runtime; +use tedge_api::mqtt_topics::DeviceTopicId; use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::Service; use tedge_api::path::DataDir; use tedge_health_ext::HealthMonitorBuilder; use tedge_mqtt_ext::MqttActorBuilder; @@ -184,12 +187,18 @@ impl Agent { let signal_actor_builder = SignalActor::builder(&runtime.get_handle()); // Health actor + // TODO: take a user-configurable service topic id let service_topic_id = self.config.mqtt_device_topic_id.to_service_topic_id("tedge-agent") .with_context(|| format!("Device topic id {} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", self.config.mqtt_device_topic_id))?; - let health_actor = HealthMonitorBuilder::from_service_topic_id( + let service = Service { service_topic_id, + device_topic_id: DeviceTopicId::new(self.config.mqtt_device_topic_id.clone()), + }; + let mqtt_schema = MqttSchema::with_root(self.config.mqtt_topic_root.to_string()); + let health_actor = HealthMonitorBuilder::from_service_topic_id( + service, &mut mqtt_actor_builder, - self.config.mqtt_topic_root.clone(), + &mqtt_schema, ); // Tedge to Te topic converter diff --git a/crates/core/tedge_api/src/health.rs b/crates/core/tedge_api/src/health.rs index 0059bc84a49..85caafbd269 100644 --- a/crates/core/tedge_api/src/health.rs +++ b/crates/core/tedge_api/src/health.rs @@ -3,6 +3,8 @@ use clock::WallClock; use std::process; use std::sync::Arc; +use crate::mqtt_topics::Channel; +use crate::mqtt_topics::MqttSchema; use crate::mqtt_topics::ServiceTopicId; use mqtt_channel::Message; use mqtt_channel::PubChannel; @@ -14,16 +16,24 @@ use serde_json::json; /// /// Health topics are topics on which messages about health status of services are published. To be /// able to send health messages, a health topic needs to be constructed for a given entity. +// Because all the services use the same `HealthMonitorActor`, `ServiceHealthTopic` needs to support +// both old and new topics until all the services are fully moved to the new topic scheme. +// // TODO: replace `Arc` with `ServiceTopicId` after we're done with transition to new topics #[derive(Debug, Clone, PartialEq, Eq)] pub struct ServiceHealthTopic(Arc); impl ServiceHealthTopic { - pub fn new(service: ServiceTopicId) -> Self { - // XXX: hardcoded MQTT root - ServiceHealthTopic(Arc::from(format!("te/{}/status/health", service.as_str()))) + /// Create a new `ServiceHealthTopic` from a topic in a new topic scheme. + pub fn from_new_topic(service_topic_id: &ServiceTopicId, mqtt_schema: &MqttSchema) -> Self { + let health_topic = mqtt_schema.topic_for(service_topic_id.entity(), &Channel::Health); + Self(health_topic.name.into()) } + /// Create a new `ServiceHealthTopic` from a topic in an old topic scheme. + /// + /// The argument has to fit old topic scheme, i.e. contain either "tedge/health/SERVICE_NAME" or + /// "tedge/health/CHILD_ID/SERVICE_NAME" pub fn from_old_topic(topic: String) -> Result { match topic.split('/').collect::>()[..] { ["tedge", "health", _service_name] => {} @@ -34,30 +44,10 @@ impl ServiceHealthTopic { Ok(Self(Arc::from(topic))) } - pub fn is_health_topic(topic: &str) -> bool { - matches!( - topic.split('/').collect::>()[..], - ["te", _, _, _, _, "status", "health"] - ) - } - pub fn as_str(&self) -> &str { &self.0 } - pub async fn send_health_status(&self, responses: &mut impl PubChannel) { - let response_topic_health = Topic::new_unchecked(self.as_str()); - - let health_status = json!({ - "status": "up", - "pid": process::id(), - }) - .to_string(); - - let health_message = Message::new(&response_topic_health, health_status).with_retain(); - let _ = responses.send(health_message).await; - } - pub fn down_message(&self) -> Message { Message { topic: Topic::new_unchecked(self.as_str()), diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index cb4583d1a4c..897484a0e40 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -357,9 +357,21 @@ impl EntityTopicId { } } +/// Contains a topic id of the service itself and the associated device. +pub struct Service { + pub service_topic_id: ServiceTopicId, + pub device_topic_id: DeviceTopicId, +} + /// Represents an entity topic identifier known to be a service. /// -/// It's most often in a format `device/DEVICE_NAME/service/SERVICE_NAME`. +/// It's most often in a format `device/DEVICE_NAME/service/SERVICE_NAME`, but +/// it doesn't have to be. Thus in order to know whether or not a particular +/// [`EntityTopicId`] is a service, one has to check the +/// [`EntityStore`](super::entity_store::EntityStore), but some functions do not +/// have any way to access it. As such, functions can use this type to tell the +/// caller that they expect passed [`EntityTopicId`] to be a service, and that +/// it is the responsibility of the caller to verify it first. #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct ServiceTopicId(EntityTopicId); @@ -390,9 +402,22 @@ impl Display for ServiceTopicId { } } +/// Represents an entity topic identifier known to be a device. +/// +/// It's most often in a format `device/DEVICE_NAME//`, but it doesn't have to +/// be. Thus in order to know whether or not a particular [`EntityTopicId`] is a +/// service, one has to check the +/// [`EntityStore`](super::entity_store::EntityStore), but some functions do not +/// have any way to access it. As such, functions can use this type to tell the +/// caller that they expect passed [`EntityTopicId`] to be a device, and that +/// it is the responsibility of the caller to verify it first. pub struct DeviceTopicId(EntityTopicId); impl DeviceTopicId { + pub fn new(device_topic_id: EntityTopicId) -> Self { + Self(device_topic_id) + } + pub fn entity(&self) -> &EntityTopicId { &self.0 } diff --git a/crates/extensions/tedge_health_ext/src/lib.rs b/crates/extensions/tedge_health_ext/src/lib.rs index b1f1f1265e6..357606865fa 100644 --- a/crates/extensions/tedge_health_ext/src/lib.rs +++ b/crates/extensions/tedge_health_ext/src/lib.rs @@ -3,8 +3,6 @@ mod actor; #[cfg(test)] mod tests; -use std::sync::Arc; - use actor::HealthMonitorActor; use tedge_actors::Builder; use tedge_actors::DynSender; @@ -19,7 +17,7 @@ use tedge_api::health::ServiceHealthTopic; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; -use tedge_api::mqtt_topics::ServiceTopicId; +use tedge_api::mqtt_topics::Service; use tedge_mqtt_ext::MqttConfig; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::TopicFilter; @@ -65,22 +63,24 @@ impl HealthMonitorBuilder { /// Creates a HealthMonitorBuilder that creates a HealthMonitorActor with /// a new topic scheme. pub fn from_service_topic_id( - service: ServiceTopicId, + service: Service, mqtt: &mut (impl ServiceProvider + AsMut), // TODO: pass it less annoying way - mqtt_topic_root: Arc, + mqtt_schema: &MqttSchema, ) -> Self { - let health_topic = ServiceHealthTopic::new(service.clone()); + let Service { + service_topic_id, + device_topic_id, + } = service; - let mut box_builder = SimpleMessageBoxBuilder::new(service.as_str(), 16); + let service_health_topic = + ServiceHealthTopic::from_new_topic(&service_topic_id, mqtt_schema); - // passed service is in default scheme - let device_topic_id = service.to_device_topic_id().unwrap(); + let mut box_builder = SimpleMessageBoxBuilder::new(service_topic_id.as_str(), 16); - let mqtt_schema = MqttSchema::with_root(mqtt_topic_root.to_string()); let subscriptions = vec![ mqtt_schema.topic_for( - service.clone().entity(), + service_topic_id.entity(), &Channel::Command { operation: OperationType::Health, cmd_id: "check".to_string(), @@ -102,7 +102,7 @@ impl HealthMonitorBuilder { .set_request_sender(mqtt.connect_consumer(subscriptions, box_builder.get_sender())); let builder = HealthMonitorBuilder { - service_health_topic: health_topic, + service_health_topic, box_builder, }; From 22d0d32051364a2f018671eac146ad188607cd68 Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Tue, 26 Sep 2023 13:16:47 +0000 Subject: [PATCH 3/8] Use EntityStore to check if main device agent up Signed-off-by: Marcel Guzik --- .../c8y_mapper_ext/src/converter.rs | 82 ++++++++----------- crates/extensions/c8y_mapper_ext/src/tests.rs | 3 +- 2 files changed, 35 insertions(+), 50 deletions(-) diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 7da119afffa..ea36b1d2521 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -41,7 +41,6 @@ use c8y_api::smartrest::topic::publish_topic_from_ancestors; use c8y_api::smartrest::topic::C8yTopic; use c8y_api::smartrest::topic::MapperSubscribeTopic; use c8y_api::smartrest::topic::SMARTREST_PUBLISH_TOPIC; -use c8y_api::utils::child_device::new_child_device_message; use c8y_auth_proxy::url::ProxyUrlGenerator; use c8y_http_proxy::handle::C8YHttpProxy; use logged_command::LoggedCommand; @@ -107,8 +106,6 @@ const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create"; const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent"; const CREATE_EVENT_SMARTREST_CODE: u16 = 400; -/// XXX: hardcoded topic root -const TEDGE_AGENT_HEALTH_TOPIC: &str = "te/device/main/service/tedge-agent/status/health"; const DEFAULT_EVENT_TYPE: &str = "ThinEdgeEvent"; const FORBIDDEN_ID_CHARS: [char; 3] = ['/', '+', '#']; @@ -492,25 +489,11 @@ impl CumulocityConverter { let mut mqtt_messages: Vec = Vec::new(); // Send the init messages - // TODO: use entity store to check service statuses - if is_message_tedge_agent_up(message)? { + if self.is_message_tedge_agent_up(message)? { create_tedge_agent_supported_ops(&self.ops_dir).await?; mqtt_messages.push(create_get_software_list_message()?); } - // When there is some messages to be sent on behalf of a child device, - // this child device must be declared first, if not done yet - if let Some(parent) = entity.default_parent_identifier() { - if parent.is_default_child_device() { - let child_id = parent.default_device_name().unwrap(); - add_external_device_registration_message( - child_id.to_string(), - &mut self.children, - &mut mqtt_messages, - ); - } - } - let mut message = convert_health_status_message( entity, message, @@ -1213,19 +1196,6 @@ fn create_request_for_cloud_child_devices() -> Message { Message::new(&Topic::new_unchecked("c8y/s/us"), "105") } -fn add_external_device_registration_message( - child_id: String, - children: &mut HashMap, - mqtt_messages: &mut Vec, -) -> bool { - if !children.contains_key(&child_id) { - children.insert(child_id.to_string(), Operations::default()); - mqtt_messages.push(new_child_device_message(&child_id)); - return true; - } - false -} - fn create_inventory_fragments_message( device_name: &str, cfg_dir: &Path, @@ -1328,6 +1298,22 @@ impl CumulocityConverter { } } } + + pub fn is_message_tedge_agent_up(&self, message: &Message) -> Result { + let main_device_topic_id = self.entity_store.main_device(); + let tedge_agent_topic_id = main_device_topic_id + .to_service_topic_id("tedge-agent") + .expect("main device topic needs to fit default MQTT scheme"); + let tedge_agent_health_topic = self + .mqtt_schema + .topic_for(tedge_agent_topic_id.entity(), &Channel::Health); + + if message.topic == tedge_agent_health_topic { + let status: HealthStatus = serde_json::from_str(message.payload_str()?)?; + return Ok(status.status.eq("up")); + } + Ok(false) + } } async fn publish_operation_status( @@ -1458,14 +1444,6 @@ pub struct HealthStatus { pub status: String, } -pub fn is_message_tedge_agent_up(message: &Message) -> Result { - if message.topic.name.eq(TEDGE_AGENT_HEALTH_TOPIC) { - let status: HealthStatus = serde_json::from_str(message.payload_str()?)?; - return Ok(status.status.eq("up")); - } - Ok(false) -} - #[cfg(test)] mod tests { use super::CumulocityConverter; @@ -2175,15 +2153,16 @@ mod tests { let expected_child_create_smart_rest_message = Message::new( &Topic::new_unchecked("c8y/s/us"), - "101,child1,child1,thin-edge.io-child", + "101,test-device:device:child1,child1,thin-edge.io-child", ); let expected_service_monitor_smart_rest_message = Message::new( - &Topic::new_unchecked("c8y/s/us/child1"), - r#"102,test-device_child1_child-service-c8y,"thin-edge.io",child-service-c8y,"up""#, + &Topic::new_unchecked("c8y/s/us/test-device:device:child1"), + r#"102,test-device:device:child1:service:child-service-c8y,systemd,child-service-c8y,up"#, ); - let mut out_messages = converter.convert(&in_message).await.into_iter(); + let out_messages = converter.convert(&in_message).await; + let mut out_messages = out_messages.into_iter(); // child device entity store registration message let device_registration_message = out_messages.next().unwrap(); @@ -2195,6 +2174,12 @@ mod tests { ); assert_eq!(device_registration_message.r#type, EntityType::ChildDevice); + // child device cloud registration message + assert_eq!( + out_messages.next().unwrap(), + expected_child_create_smart_rest_message + ); + // service entity store registration message let service_registration_message = out_messages.next().unwrap(); let service_registration_message = @@ -2202,12 +2187,11 @@ mod tests { assert_eq!(service_registration_message.topic_id, in_entity); assert_eq!(service_registration_message.r#type, EntityType::Service); + // service cloud registration message + assert_eq!( - out_messages.collect::>(), - vec![ - expected_child_create_smart_rest_message, - expected_service_monitor_smart_rest_message.clone() - ] + out_messages.next().unwrap(), + expected_service_monitor_smart_rest_message.clone() ); } @@ -2225,7 +2209,7 @@ mod tests { let expected_service_monitor_smart_rest_message = Message::new( &Topic::new_unchecked("c8y/s/us"), - r#"102,test-device_test-tedge-mapper-c8y,"thin-edge.io",test-tedge-mapper-c8y,"up""#, + r#"102,test-device:device:main:service:test-tedge-mapper-c8y,systemd,test-tedge-mapper-c8y,up"#, ); // Test the output messages contains SmartREST and C8Y JSON. diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 8c8a905b133..d13d45d147b 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -1176,7 +1176,8 @@ async fn mapper_dynamically_updates_supported_operations_for_tedge_device() { .await .expect("Send failed"); - mqtt.skip(2).await; // Skip tedge-agent health status mapping and software list request + // Skip tedge-agent registration, health status mapping, and software list request + mqtt.skip(4).await; // Simulate FsEvent for the creation of a new operation file fs.send(FsWatchEvent::FileCreated( From bedde7637008daf13e386148aff03a18b106bdde Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Tue, 3 Oct 2023 11:39:06 +0000 Subject: [PATCH 4/8] Rewrite health message conversion Signed-off-by: Marcel Guzik --- Cargo.lock | 2 + .../core/c8y_api/src/smartrest/inventory.rs | 2 + crates/core/tedge_agent/src/agent.rs | 2 +- crates/core/tedge_api/src/mqtt_topics.rs | 4 +- crates/core/tedge_mapper/Cargo.toml | 6 +- crates/core/tedge_mapper/src/c8y/mapper.rs | 22 +- .../c8y_mapper_ext/src/converter.rs | 15 +- .../c8y_mapper_ext/src/service_monitor.rs | 226 ++++++++---------- 8 files changed, 131 insertions(+), 148 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f5ac85bd71f..2098733b4ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3651,6 +3651,7 @@ dependencies = [ "aws_mapper_ext", "az_mapper_ext", "batcher", + "c8y_api", "c8y_auth_proxy", "c8y_http_proxy", "c8y_mapper_ext", @@ -3660,6 +3661,7 @@ dependencies = [ "flockfile", "mqtt_channel", "tedge_actors", + "tedge_api", "tedge_config", "tedge_downloader_ext", "tedge_file_system_ext", diff --git a/crates/core/c8y_api/src/smartrest/inventory.rs b/crates/core/c8y_api/src/smartrest/inventory.rs index 5dea653703f..d7bfecc74cf 100644 --- a/crates/core/c8y_api/src/smartrest/inventory.rs +++ b/crates/core/c8y_api/src/smartrest/inventory.rs @@ -6,6 +6,7 @@ use mqtt_channel::Message; /// Create a SmartREST message for creating a child device under the given ancestors. /// The provided ancestors list must contain all the parents of the given device /// starting from its immediate parent device. +// XXX: if any arguments contain commas, output will be wrong pub fn child_device_creation_message( child_id: &str, device_name: Option<&str>, @@ -26,6 +27,7 @@ pub fn child_device_creation_message( /// Create a SmartREST message for creating a service on device. /// The provided ancestors list must contain all the parents of the given service /// starting from its immediate parent device. +// XXX: if any arguments contain commas, output will be wrong pub fn service_creation_message( service_id: &str, service_name: &str, diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 21f2150ea49..9338272b75b 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -188,7 +188,7 @@ impl Agent { // Health actor // TODO: take a user-configurable service topic id - let service_topic_id = self.config.mqtt_device_topic_id.to_service_topic_id("tedge-agent") + let service_topic_id = self.config.mqtt_device_topic_id.to_default_service_topic_id("tedge-agent") .with_context(|| format!("Device topic id {} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", self.config.mqtt_device_topic_id))?; let service = Service { service_topic_id, diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index 897484a0e40..933b2727c1d 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -283,7 +283,7 @@ impl EntityTopicId { .is_some() } - /// Returns `true` if it's the topic identifier of the child device. + /// Returns `true` if it's the topic identifier of the child device in default topic scheme. pub fn is_default_child_device(&self) -> bool { matches!(self.segments(), ["device", device_name, "", ""] if device_name != "main" && !device_name.is_empty()) } @@ -333,7 +333,7 @@ impl EntityTopicId { /// /// The device topic id must be in a format: "device/DEVICE_NAME//"; if not, /// `None` will be returned. - pub fn to_service_topic_id(&self, service_name: &str) -> Option { + pub fn to_default_service_topic_id(&self, service_name: &str) -> Option { if let ["device", device_name, "", ""] = self.0.split('/').collect::>()[..] { return Some(ServiceTopicId(EntityTopicId(format!( "device/{device_name}/service/{service_name}" diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index 7d4fdee3886..6e3c24073de 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -15,6 +15,7 @@ async-trait = { workspace = true } aws_mapper_ext = { workspace = true } az_mapper_ext = { workspace = true } batcher = { workspace = true } +c8y_api = { workspace = true } c8y_auth_proxy = { workspace = true } c8y_http_proxy = { workspace = true } c8y_mapper_ext = { workspace = true } @@ -24,6 +25,7 @@ collectd_ext = { workspace = true } flockfile = { workspace = true } mqtt_channel = { workspace = true } tedge_actors = { workspace = true } +tedge_api = { workspace = true } tedge_config = { workspace = true } tedge_downloader_ext = { workspace = true } tedge_file_system_ext = { workspace = true } @@ -32,9 +34,7 @@ tedge_http_ext = { workspace = true } tedge_mqtt_ext = { workspace = true } tedge_signal_ext = { workspace = true } tedge_timer_ext = { workspace = true } -tedge_utils = { workspace = true, features = [ - "logging", -] } +tedge_utils = { workspace = true, features = ["logging"] } tokio = { workspace = true, features = [ "process", "rt", diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index cf3940bd6de..c3d5d5cd837 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -6,7 +6,6 @@ use c8y_http_proxy::credentials::C8YJwtRetriever; use c8y_http_proxy::C8YHttpProxyBuilder; use c8y_mapper_ext::actor::C8yMapperBuilder; use c8y_mapper_ext::config::C8yMapperConfig; -use c8y_mapper_ext::service_monitor::service_monitor_status_message; use mqtt_channel::Config; use std::path::Path; use tedge_config::TEdgeConfig; @@ -80,15 +79,22 @@ pub fn service_monitor_client_config(tedge_config: &TEdgeConfig) -> Result Result { let main_device_topic_id = self.entity_store.main_device(); let tedge_agent_topic_id = main_device_topic_id - .to_service_topic_id("tedge-agent") + .to_default_service_topic_id("tedge-agent") .expect("main device topic needs to fit default MQTT scheme"); let tedge_agent_health_topic = self .mqtt_schema diff --git a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs index 6b70913925a..29eafbc71a8 100644 --- a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs +++ b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs @@ -1,19 +1,13 @@ -use c8y_api::smartrest::message::sanitize_for_smartrest; -use c8y_api::smartrest::message::MAX_PAYLOAD_LIMIT_IN_BYTES; -use c8y_api::smartrest::topic::SMARTREST_PUBLISH_TOPIC; +use c8y_api::smartrest; use serde::Deserialize; use serde::Serialize; -use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::entity_store::EntityMetadata; +use tedge_api::entity_store::EntityType; +use tedge_api::EntityStore; use tedge_mqtt_ext::Message; -use tedge_mqtt_ext::Topic; -const DEFAULT_SERVICE_TYPE: &str = "service"; - -#[derive(Deserialize, Serialize, Debug)] +#[derive(Deserialize, Serialize, Debug, Default)] pub struct HealthStatus { - #[serde(rename = "type", default = "default_type")] - pub service_type: String, - #[serde(default = "default_status")] pub status: String, } @@ -22,173 +16,133 @@ fn default_status() -> String { "unknown".to_string() } -fn default_type() -> String { - "".to_string() -} - +// TODO: instead of passing entity store, pass information about parent as part of the entity +// also reduce number of arguments pub fn convert_health_status_message( - entity: &EntityTopicId, + entity_store: &EntityStore, + entity: &EntityMetadata, message: &Message, - device_name: String, - default_service_type: String, ) -> Vec { - let mut mqtt_messages: Vec = Vec::new(); - - let service_name = entity - .default_service_name() - .expect("EntityTopicId should be in default scheme"); - - let parent = entity - .default_parent_identifier() - .expect("EntityTopicId should be in default scheme"); - - let child_id = if parent.is_default_child_device() { - Some( - parent - .default_device_name() - .expect("EntityTopicId should be in default scheme") - .to_string(), - ) - } else { - None - }; + if entity.r#type != EntityType::Service { + return vec![]; + } - let default_health_status = format!("\"type\":{default_service_type},\"status\":\"unknown\""); + let mut mqtt_messages: Vec = Vec::new(); // If not Bridge health status - if !service_name.contains("bridge") { - let payload_str = message.payload_str().unwrap_or(&default_health_status); - - let mut health_status = - serde_json::from_str(payload_str).unwrap_or_else(|_| HealthStatus { - service_type: default_service_type.clone(), - status: "unknown".to_string(), - }); - - if health_status.status.is_empty() { - health_status.status = "unknown".into(); - } - - if health_status.service_type.is_empty() { - health_status.service_type = if default_service_type.is_empty() { - DEFAULT_SERVICE_TYPE.to_string() - } else { - default_service_type - }; - } - - let status_message = service_monitor_status_message( - &device_name, - service_name, - &health_status.status, - &health_status.service_type, - child_id, - ); + if entity.topic_id.as_str().contains("bridge") { + return mqtt_messages; + } - mqtt_messages.push(status_message); + let HealthStatus { + status: mut health_status, + } = serde_json::from_slice(message.payload()).unwrap_or_default(); + + if health_status.is_empty() { + health_status = "unknown".into(); } - mqtt_messages -} + // TODO: make a "smartrest payload" type that contains appropriately escaped and sanitised data + let mut health_status = smartrest::message::sanitize_for_smartrest( + health_status.into_bytes(), + smartrest::message::MAX_PAYLOAD_LIMIT_IN_BYTES, + ); -pub fn service_monitor_status_message( - device_name: &str, - daemon_name: &str, - status: &str, - service_type: &str, - child_id: Option, -) -> Message { - let sanitized_status = sanitize_for_smartrest(status.into(), MAX_PAYLOAD_LIMIT_IN_BYTES); - let sanitized_type = sanitize_for_smartrest(service_type.into(), MAX_PAYLOAD_LIMIT_IN_BYTES); - match child_id { - Some(cid) => Message { - topic: Topic::new_unchecked(&format!("{SMARTREST_PUBLISH_TOPIC}/{cid}")), - payload: format!( - "102,{device_name}_{cid}_{daemon_name},\"{sanitized_type}\",{daemon_name},\"{sanitized_status}\"" - ) - .into(), - qos: tedge_mqtt_ext::QoS::AtLeastOnce, - retain: false, - }, - None => Message { - topic: Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC), - payload: format!( - "102,{device_name}_{daemon_name},\"{sanitized_type}\",{daemon_name},\"{sanitized_status}\"" - ) - .into(), - qos: tedge_mqtt_ext::QoS::AtLeastOnce, - retain: false, - }, + if health_status.contains(',') { + health_status = format!(r#""{health_status}""#); } + + let service_name = entity + .other + .get("name") + .and_then(|n| n.as_str()) + .or(entity.topic_id.default_service_name()) + .unwrap_or(entity.external_id.as_ref()); + + let service_type = entity + .other + .get("type") + .and_then(|t| t.as_str()) + .unwrap_or("service"); + + let ancestors_external_ids = entity_store + .ancestors_external_ids(&entity.topic_id) + .unwrap(); + + let status_message = c8y_api::smartrest::inventory::service_creation_message( + entity.external_id.as_ref(), + service_name, + service_type, + &health_status, + &ancestors_external_ids, + ); + + mqtt_messages.push(status_message); + + mqtt_messages } #[cfg(test)] mod tests { use super::*; + use tedge_api::entity_store::EntityRegistrationMessage; use tedge_api::mqtt_topics::MqttSchema; + use tedge_mqtt_ext::Topic; use test_case::test_case; #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", - r#"{"pid":"1234","type":"systemd","status":"up"}"#, + r#"{"pid":"1234","status":"up"}"#, "c8y/s/us", - r#"102,test_device_tedge-mapper-c8y,"systemd",tedge-mapper-c8y,"up""#; + r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,up"#; "service-monitoring-thin-edge-device" )] #[test_case( "test_device", "te/device/child/service/tedge-mapper-c8y/status/health", - r#"{"pid":"1234","type":"systemd","status":"up"}"#, - "c8y/s/us/child", - r#"102,test_device_child_tedge-mapper-c8y,"systemd",tedge-mapper-c8y,"up""#; + r#"{"pid":"1234","status":"up"}"#, + "c8y/s/us/test_device:device:child", + r#"102,test_device:device:child:service:tedge-mapper-c8y,service,tedge-mapper-c8y,up"#; "service-monitoring-thin-edge-child-device" )] #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", - r#"{"pid":"123456","type":"systemd"}"#, + r#"{"pid":"123456"}"#, "c8y/s/us", - r#"102,test_device_tedge-mapper-c8y,"systemd",tedge-mapper-c8y,"unknown""#; + r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,unknown"#; "service-monitoring-thin-edge-no-status" )] #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", - r#"{"type":"systemd"}"#, + r#"{"status":""}"#, "c8y/s/us", - r#"102,test_device_tedge-mapper-c8y,"systemd",tedge-mapper-c8y,"unknown""#; - "service-monitoring-thin-edge-no-status-no-pid" - )] - #[test_case( - "test_device", - "te/device/main/service/tedge-mapper-c8y/status/health", - r#"{"type":"", "status":""}"#, - "c8y/s/us", - r#"102,test_device_tedge-mapper-c8y,"service",tedge-mapper-c8y,"unknown""#; - "service-monitoring-empty-status-and-type" + r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,unknown"#; + "service-monitoring-empty-status" )] #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", "{}", "c8y/s/us", - r#"102,test_device_tedge-mapper-c8y,"service",tedge-mapper-c8y,"unknown""#; + r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,unknown"#; "service-monitoring-empty-health-message" )] #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", - r#"{"type":"thin,edge","status":"up,down"}"#, + r#"{"status":"up,down"}"#, "c8y/s/us", - r#"102,test_device_tedge-mapper-c8y,"thin,edge",tedge-mapper-c8y,"up,down""#; + r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,"up,down""#; "service-monitoring-type-with-comma-health-message" )] #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", - r#"{"type":"thin\"\"edge","status":"up\"down"}"#, + r#"{"status":"up\"down"}"#, "c8y/s/us", - r#"102,test_device_tedge-mapper-c8y,"thin""""edge",tedge-mapper-c8y,"up""down""#; + r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,up""down"#; "service-monitoring-double-quotes-health-message" )] fn translate_health_status_to_c8y_service_monitoring_message( @@ -209,12 +163,30 @@ mod tests { c8y_monitor_payload.as_bytes(), ); - let msg = convert_health_status_message( - &entity, - &health_message, - device_name.into(), - "service".into(), - ); + let main_device_registration = + EntityRegistrationMessage::main_device(device_name.to_string()); + let mut entity_store = EntityStore::with_main_device( + main_device_registration, + crate::converter::CumulocityConverter::map_to_c8y_external_id, + ) + .unwrap(); + + let entity_registration = EntityRegistrationMessage { + topic_id: entity.clone(), + external_id: None, + r#type: EntityType::Service, + parent: None, + other: serde_json::json!({}), + }; + + entity_store + .auto_register_entity(&entity_registration.topic_id) + .unwrap(); + entity_store.update(entity_registration).unwrap(); + + let entity = entity_store.get(&entity).unwrap(); + + let msg = convert_health_status_message(&entity_store, entity, &health_message); assert_eq!(msg[0], expected_message); } } From 372b82b7a933ba142d0341cf872915cce3f7adfe Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Thu, 5 Oct 2023 11:12:04 +0000 Subject: [PATCH 5/8] Remove dead module c8y_api::smartrest::monitor Signed-off-by: Marcel Guzik --- crates/core/c8y_api/src/smartrest/monitor.rs | 11 ----------- 1 file changed, 11 deletions(-) delete mode 100644 crates/core/c8y_api/src/smartrest/monitor.rs diff --git a/crates/core/c8y_api/src/smartrest/monitor.rs b/crates/core/c8y_api/src/smartrest/monitor.rs deleted file mode 100644 index fcb3e98025d..00000000000 --- a/crates/core/c8y_api/src/smartrest/monitor.rs +++ /dev/null @@ -1,11 +0,0 @@ -use mqtt_channel::{Message, Topic}; - -pub fn service_monitor_status_down_message(device_name: &str, daemon_name: &str, status: &str, service_type: &str, child_id: Option ) -> Message { - Message { - topic: Topic::new_unchecked("c8y/s/us"), - payload: format!("102,{device_name}_{daemon_name},thin-edge.io,{daemon_name},down") - .into_bytes(), - qos: mqtt_channel::QoS::AtLeastOnce, - retain: true, - } -} From 83ba0354a3bf508a153232365a983fdfdeca9b07 Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Thu, 5 Oct 2023 20:18:25 +0000 Subject: [PATCH 6/8] Use service.type as default service type --- .../core/c8y_api/src/smartrest/inventory.rs | 42 ++++++++- crates/core/c8y_api/src/smartrest/message.rs | 2 + crates/core/tedge_api/src/entity_store.rs | 58 +++++++++++-- crates/core/tedge_api/src/mqtt_topics.rs | 4 + crates/core/tedge_mapper/src/c8y/mapper.rs | 2 +- crates/core/tedge_mapper/src/core/mapper.rs | 17 +++- .../c8y_mapper_ext/src/converter.rs | 15 ++-- .../c8y_mapper_ext/src/service_monitor.rs | 87 +++++++------------ crates/extensions/c8y_mapper_ext/src/tests.rs | 2 +- .../service_monitoring.robot | 31 +++---- 10 files changed, 174 insertions(+), 86 deletions(-) diff --git a/crates/core/c8y_api/src/smartrest/inventory.rs b/crates/core/c8y_api/src/smartrest/inventory.rs index d7bfecc74cf..6c6ce07e172 100644 --- a/crates/core/c8y_api/src/smartrest/inventory.rs +++ b/crates/core/c8y_api/src/smartrest/inventory.rs @@ -1,12 +1,22 @@ //! This module provides some helper functions to create SmartREST messages //! that can be used to create various managed objects in Cumulocity inventory. + +// TODO: Have different SmartREST messages be different types, so we can see +// where these messages are used, not only created. +// +// TODO: both `C8yTopic::smartrest_response_topic(&EntityMetadata)` and +// `publish_topic_from_ancestors(&[String])` produce C8y MQTT topics on which +// smartrest messages are sent. There should be one comprehensive API for +// generating them. + use crate::smartrest::topic::publish_topic_from_ancestors; use mqtt_channel::Message; +use super::message::sanitize_for_smartrest; + /// Create a SmartREST message for creating a child device under the given ancestors. /// The provided ancestors list must contain all the parents of the given device /// starting from its immediate parent device. -// XXX: if any arguments contain commas, output will be wrong pub fn child_device_creation_message( child_id: &str, device_name: Option<&str>, @@ -15,6 +25,7 @@ pub fn child_device_creation_message( ) -> Message { Message::new( &publish_topic_from_ancestors(ancestors), + // XXX: if any arguments contain commas, output will be wrong format!( "101,{},{},{}", child_id, @@ -27,7 +38,6 @@ pub fn child_device_creation_message( /// Create a SmartREST message for creating a service on device. /// The provided ancestors list must contain all the parents of the given service /// starting from its immediate parent device. -// XXX: if any arguments contain commas, output will be wrong pub fn service_creation_message( service_id: &str, service_name: &str, @@ -37,9 +47,37 @@ pub fn service_creation_message( ) -> Message { Message::new( &publish_topic_from_ancestors(ancestors), + // XXX: if any arguments contain commas, output will be wrong format!( "102,{},{},{},{}", service_id, service_type, service_name, service_status ), ) } + +/// Create a SmartREST message for updating service status. +/// +/// `service_status` can be any string, but `"up"`, `"down"`, and `"unknown"` +/// have known meanings and are displayed in the UI in different ways. +/// +/// `external_ids` differs from what is returned by `ancestors_external_ids` in +/// that it also contains the external ID of the current entity (the one we want +/// to set the status of). +/// +/// https://cumulocity.com/guides/reference/smartrest-two/#104 +pub fn service_status_update_message(external_ids: &[String], service_status: &str) -> Message { + let topic = publish_topic_from_ancestors(external_ids); + + let mut service_status = sanitize_for_smartrest( + service_status.into(), + super::message::MAX_PAYLOAD_LIMIT_IN_BYTES, + ); + + if service_status.contains(',') { + service_status = format!("\"{service_status}\""); + } + + let payload = format!("104,{service_status}"); + + Message::new(&topic, payload) +} diff --git a/crates/core/c8y_api/src/smartrest/message.rs b/crates/core/c8y_api/src/smartrest/message.rs index f68d38f73b8..17146b7cf53 100644 --- a/crates/core/c8y_api/src/smartrest/message.rs +++ b/crates/core/c8y_api/src/smartrest/message.rs @@ -32,6 +32,8 @@ pub fn get_smartrest_template_id(payload: &str) -> String { /// - Remove all control characters except for `\n`, `\t`, `\r`. /// - Double quote is escaped as `\"\"`. /// - Strip the input according to `max_size`. +// TODO: make this return Result +// TODO: make a variant which assumes `max_size = MAX_PAYLOAD_LIMIT_IN_BYTES` pub fn sanitize_for_smartrest(input: Vec, max_size: usize) -> String { String::from_utf8(input) .unwrap_or_else(|err| { diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index 0a01d30c483..14f99ee56cb 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -30,8 +30,10 @@ use thiserror::Error; const MQTT_ROOT: &str = "te"; /// Represents externally provided unique ID of an entity. +/// /// Although this struct doesn't enforce any restrictions for the values, /// the consumers may impose restrictions on the accepted values. + #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct EntityExternalId(String); @@ -41,6 +43,10 @@ impl AsRef for EntityExternalId { } } +// XXX: As `EntityExternalId` is used as a part of cloudbound MQTT topic, it +// can't contain characters invalid in topics, i.e. `+` and `#`. ([MQTT-4.7]). +// If it's derived from a MQTT topic, this holds, but if created from a string, +// this isn't checked, which is invalid! impl From<&str> for EntityExternalId { fn from(val: &str) -> Self { Self(val.to_string()) @@ -121,6 +127,8 @@ pub struct EntityStore { entity_id_index: HashMap, external_id_mapper: ExternalIdMapperFn, external_id_validator_fn: ExternalIdValidatorFn, + // TODO: this is a c8y cloud specific concern and it'd be better to put it somewhere else. + default_service_type: String, } impl EntityStore { @@ -131,6 +139,27 @@ impl EntityStore { external_id_mapper_fn: MF, external_id_validator_fn: SF, ) -> Option + where + MF: Fn(&EntityTopicId, &EntityExternalId) -> EntityExternalId, + MF: 'static + Send + Sync, + SF: Fn(&str) -> Result, + SF: 'static + Send + Sync, + { + Self::with_main_device_and_default_service_type( + main_device, + "service".to_string(), + external_id_mapper_fn, + external_id_validator_fn, + ) + } + + #[must_use] + pub fn with_main_device_and_default_service_type( + main_device: EntityRegistrationMessage, + default_service_type: String, + external_id_mapper_fn: MF, + external_id_validator_fn: SF, + ) -> Option where MF: Fn(&EntityTopicId, &EntityExternalId) -> EntityExternalId, MF: 'static + Send + Sync, @@ -156,6 +185,7 @@ impl EntityStore { entity_id_index: HashMap::from([(entity_id, main_device.topic_id)]), external_id_mapper: Box::new(external_id_mapper_fn), external_id_validator_fn: Box::new(external_id_validator_fn), + default_service_type, }) } @@ -314,12 +344,22 @@ impl EntityStore { } } }; + + let JsonValue::Object(mut other) = message.other else { + return Err(Error::EntityRegistrationOtherNotMap); + }; + + // XXX: this is c8y-specific, entity store shouldn't do this! + other + .entry("type".to_string()) + .or_insert(JsonValue::String(self.default_service_type.clone())); + let entity_metadata = EntityMetadata { topic_id: topic_id.clone(), r#type: message.r#type, external_id: external_id.clone(), parent, - other: message.other, + other: JsonValue::Object(other), }; // device is affected if it was previously registered and was updated @@ -401,7 +441,7 @@ impl EntityStore { external_id: Some(service_external_id), r#type: EntityType::Service, parent: Some(parent_device_id), - other: json!({ "name": service_id, "type": "systemd" }), + other: json!({ "name": service_id, "type": self.default_service_type }), }; register_messages.push(service_register_message.clone()); self.update(service_register_message)?; @@ -420,6 +460,9 @@ pub struct EntityMetadata { pub parent: Option, pub r#type: EntityType, pub external_id: EntityExternalId, + + // TODO: use a dedicated struct for cloud-specific fields, have `EntityMetadata` be generic over + // cloud we're currently connected to pub other: JsonValue, } @@ -474,6 +517,11 @@ pub enum Error { #[error(transparent)] InvalidExternalIdError(#[from] InvalidExternalIdError), + + // In practice won't be thrown because usually it is a map + // TODO: remove this error variant when `EntityRegistrationMessage` is changed + #[error("`EntityRegistrationMessage::other` field needs to be a Map")] + EntityRegistrationOtherNotMap, } /// An object representing a valid entity registration message. @@ -974,7 +1022,7 @@ mod tests { r#type: EntityType::Service, external_id: Some("device:child1:service:service1".into()), parent: Some(EntityTopicId::from_str("device/child1//").unwrap()), - other: json!({ "name": "service1", "type": "systemd" }), + other: json!({ "name": "service1", "type": "service" }), } ] ); @@ -1029,7 +1077,7 @@ mod tests { parent: None, r#type: EntityType::MainDevice, external_id: "test-device".into(), - other: json!({}), + other: json!({"type": "service"}), }; // Assert main device registered with custom topic scheme assert_eq!( @@ -1058,7 +1106,7 @@ mod tests { parent: Some(main_topic_id), r#type: EntityType::Service, external_id: "custom:main:service:collectd".into(), - other: json!({}), + other: json!({"type": "service"}), }; // Assert service registered under main device with custom topic scheme assert_eq!( diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index 933b2727c1d..6c4d216503f 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -376,6 +376,10 @@ pub struct Service { pub struct ServiceTopicId(EntityTopicId); impl ServiceTopicId { + pub fn new(entity_topic_id: EntityTopicId) -> Self { + Self(entity_topic_id) + } + pub fn as_str(&self) -> &str { self.0.as_str() } diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index c3d5d5cd837..a999dd29a1a 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -82,7 +82,7 @@ pub fn service_monitor_client_config(tedge_config: &TEdgeConfig) -> Result() + .unwrap(), + ), + device_topic_id: DeviceTopicId::new("device/main//".parse::().unwrap()), + }; + let mqtt_schema = MqttSchema::with_root(config.mqtt.topic_root.clone()); + let health_actor = + HealthMonitorBuilder::from_service_topic_id(service, &mut mqtt_actor, &mqtt_schema); // Shutdown on SIGINT let signal_actor = SignalActor::builder(&runtime.get_handle()); diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index c6e8991d16f..7737168826b 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -97,6 +97,7 @@ use tokio::time::Duration; use tracing::debug; use tracing::info; use tracing::log::error; +use tracing::trace; const C8Y_CLOUD: &str = "c8y"; const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "device/inventory.json"; @@ -221,8 +222,9 @@ impl CumulocityConverter { }; let main_device = entity_store::EntityRegistrationMessage::main_device(device_id.clone()); - let entity_store = EntityStore::with_main_device( + let entity_store = EntityStore::with_main_device_and_default_service_type( main_device, + service_type.clone(), Self::map_to_c8y_external_id, Self::validate_external_id, ) @@ -293,7 +295,7 @@ impl CumulocityConverter { .default_service_name() .unwrap_or(external_id.as_ref()) }), - display_type.unwrap_or("service"), + display_type.unwrap_or(&self.service_type), "up", &ancestors_external_ids, ); @@ -499,8 +501,9 @@ impl CumulocityConverter { .get(entity) .expect("entity was registered"); + let ancestors_external_ids = self.entity_store.ancestors_external_ids(entity)?; let mut message = - convert_health_status_message(&self.entity_store, entity_metadata, message); + convert_health_status_message(entity_metadata, &ancestors_external_ids, message); mqtt_messages.append(&mut message); Ok(mqtt_messages) @@ -843,6 +846,8 @@ impl CumulocityConverter { &mut self, message: &Message, ) -> Result, ConversionError> { + debug!("Mapping message on topic: {}", message.topic.name); + trace!("Message content: {:?}", message.payload_str()); match self.mqtt_schema.entity_channel_of(&message.topic) { Ok((source, channel)) => self.try_convert_te_topics(source, channel, message).await, Err(_) => self.try_convert_tedge_topics(message).await, @@ -2159,7 +2164,7 @@ mod tests { let expected_service_monitor_smart_rest_message = Message::new( &Topic::new_unchecked("c8y/s/us/test-device:device:child1"), - r#"102,test-device:device:child1:service:child-service-c8y,systemd,child-service-c8y,up"#, + r#"102,test-device:device:child1:service:child-service-c8y,service,child-service-c8y,up"#, ); let out_messages = converter.convert(&in_message).await; @@ -2210,7 +2215,7 @@ mod tests { let expected_service_monitor_smart_rest_message = Message::new( &Topic::new_unchecked("c8y/s/us"), - r#"102,test-device:device:main:service:test-tedge-mapper-c8y,systemd,test-tedge-mapper-c8y,up"#, + r#"102,test-device:device:main:service:test-tedge-mapper-c8y,service,test-tedge-mapper-c8y,up"#, ); // Test the output messages contains SmartREST and C8Y JSON. diff --git a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs index 29eafbc71a8..f483a8a956b 100644 --- a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs +++ b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs @@ -3,7 +3,6 @@ use serde::Deserialize; use serde::Serialize; use tedge_api::entity_store::EntityMetadata; use tedge_api::entity_store::EntityType; -use tedge_api::EntityStore; use tedge_mqtt_ext::Message; #[derive(Deserialize, Serialize, Debug, Default)] @@ -16,13 +15,12 @@ fn default_status() -> String { "unknown".to_string() } -// TODO: instead of passing entity store, pass information about parent as part of the entity -// also reduce number of arguments pub fn convert_health_status_message( - entity_store: &EntityStore, entity: &EntityMetadata, + ancestors_external_ids: &[String], message: &Message, ) -> Vec { + // TODO: introduce type to remove entity type guards if entity.r#type != EntityType::Service { return vec![]; } @@ -42,40 +40,12 @@ pub fn convert_health_status_message( health_status = "unknown".into(); } - // TODO: make a "smartrest payload" type that contains appropriately escaped and sanitised data - let mut health_status = smartrest::message::sanitize_for_smartrest( - health_status.into_bytes(), - smartrest::message::MAX_PAYLOAD_LIMIT_IN_BYTES, - ); - - if health_status.contains(',') { - health_status = format!(r#""{health_status}""#); - } - - let service_name = entity - .other - .get("name") - .and_then(|n| n.as_str()) - .or(entity.topic_id.default_service_name()) - .unwrap_or(entity.external_id.as_ref()); - - let service_type = entity - .other - .get("type") - .and_then(|t| t.as_str()) - .unwrap_or("service"); - - let ancestors_external_ids = entity_store - .ancestors_external_ids(&entity.topic_id) - .unwrap(); - - let status_message = c8y_api::smartrest::inventory::service_creation_message( - entity.external_id.as_ref(), - service_name, - service_type, - &health_status, - &ancestors_external_ids, - ); + // FIXME: `ancestors_external_ids` gives external ids starting from the parent, but for health + // we need XID of current device as well + let mut external_ids = vec![entity.external_id.as_ref().to_string()]; + external_ids.extend_from_slice(ancestors_external_ids); + let status_message = + smartrest::inventory::service_status_update_message(&external_ids, &health_status); mqtt_messages.push(status_message); @@ -86,6 +56,7 @@ pub fn convert_health_status_message( mod tests { use super::*; use tedge_api::entity_store::EntityRegistrationMessage; + use tedge_api::entity_store::EntityStore; use tedge_api::mqtt_topics::MqttSchema; use tedge_mqtt_ext::Topic; use test_case::test_case; @@ -93,56 +64,56 @@ mod tests { "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", r#"{"pid":"1234","status":"up"}"#, - "c8y/s/us", - r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,up"#; + "c8y/s/us/test_device:device:main:service:tedge-mapper-c8y", + r#"104,up"#; "service-monitoring-thin-edge-device" )] #[test_case( "test_device", "te/device/child/service/tedge-mapper-c8y/status/health", r#"{"pid":"1234","status":"up"}"#, - "c8y/s/us/test_device:device:child", - r#"102,test_device:device:child:service:tedge-mapper-c8y,service,tedge-mapper-c8y,up"#; + "c8y/s/us/test_device:device:child/test_device:device:child:service:tedge-mapper-c8y", + r#"104,up"#; "service-monitoring-thin-edge-child-device" )] #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", r#"{"pid":"123456"}"#, - "c8y/s/us", - r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,unknown"#; + "c8y/s/us/test_device:device:main:service:tedge-mapper-c8y", + r#"104,unknown"#; "service-monitoring-thin-edge-no-status" )] #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", r#"{"status":""}"#, - "c8y/s/us", - r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,unknown"#; + "c8y/s/us/test_device:device:main:service:tedge-mapper-c8y", + r#"104,unknown"#; "service-monitoring-empty-status" )] #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", "{}", - "c8y/s/us", - r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,unknown"#; + "c8y/s/us/test_device:device:main:service:tedge-mapper-c8y", + r#"104,unknown"#; "service-monitoring-empty-health-message" )] #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", r#"{"status":"up,down"}"#, - "c8y/s/us", - r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,"up,down""#; + "c8y/s/us/test_device:device:main:service:tedge-mapper-c8y", + r#"104,"up,down""#; "service-monitoring-type-with-comma-health-message" )] #[test_case( "test_device", "te/device/main/service/tedge-mapper-c8y/status/health", r#"{"status":"up\"down"}"#, - "c8y/s/us", - r#"102,test_device:device:main:service:tedge-mapper-c8y,service,tedge-mapper-c8y,up""down"#; + "c8y/s/us/test_device:device:main:service:tedge-mapper-c8y", + r#"104,up""down"#; "service-monitoring-double-quotes-health-message" )] fn translate_health_status_to_c8y_service_monitoring_message( @@ -155,7 +126,7 @@ mod tests { let topic = Topic::new_unchecked(health_topic); let mqtt_schema = MqttSchema::new(); - let (entity, _) = mqtt_schema.entity_channel_of(&topic).unwrap(); + let (entity_topic_id, _) = mqtt_schema.entity_channel_of(&topic).unwrap(); let health_message = Message::new(&topic, health_payload.as_bytes().to_owned()); let expected_message = Message::new( @@ -168,11 +139,12 @@ mod tests { let mut entity_store = EntityStore::with_main_device( main_device_registration, crate::converter::CumulocityConverter::map_to_c8y_external_id, + crate::converter::CumulocityConverter::validate_external_id, ) .unwrap(); let entity_registration = EntityRegistrationMessage { - topic_id: entity.clone(), + topic_id: entity_topic_id.clone(), external_id: None, r#type: EntityType::Service, parent: None, @@ -184,9 +156,12 @@ mod tests { .unwrap(); entity_store.update(entity_registration).unwrap(); - let entity = entity_store.get(&entity).unwrap(); + let entity = entity_store.get(&entity_topic_id).unwrap(); + let ancestors_external_ids = entity_store + .ancestors_external_ids(&entity_topic_id) + .unwrap(); - let msg = convert_health_status_message(&entity_store, entity, &health_message); + let msg = convert_health_status_message(entity, &ancestors_external_ids, &health_message); assert_eq!(msg[0], expected_message); } } diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index d13d45d147b..8f62dc5375c 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -1756,7 +1756,7 @@ async fn inventory_registers_unknown_entity_once() { .expect("Service register message payload must be JSON"); assert_json_include!( actual: service_register_payload, - expected: json!({"@type": "service", "type": "systemd"}) + expected: json!({"@type": "service", "type": "service"}) ); assert!( diff --git a/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot b/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot index df29f5e031a..4f12a017852 100644 --- a/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot +++ b/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot @@ -57,7 +57,7 @@ Test if all c8y services using default service type when service type configured Check health status of tedge-mapper-c8y service on broker stop start Custom Test Setup - Device Should Exist ${DEVICE_SN}_tedge-mapper-c8y show_info=False + Device Should Exist ${DEVICE_SN}:device:main:service:tedge-mapper-c8y show_info=False ${SERVICE}= Cumulocity.Device Should Have Fragment Values status\=up Should Be Equal ${SERVICE["name"]} tedge-mapper-c8y Should Be Equal ${SERVICE["status"]} up @@ -65,7 +65,7 @@ Check health status of tedge-mapper-c8y service on broker stop start ThinEdgeIO.Stop Service mosquitto.service ThinEdgeIO.Service Should Be Stopped mosquitto.service - Device Should Exist ${DEVICE_SN}_tedge-mapper-c8y show_info=False + Device Should Exist ${DEVICE_SN}:device:main:service:tedge-mapper-c8y show_info=False ${SERVICE}= Cumulocity.Device Should Have Fragment Values status\=down Should Be Equal ${SERVICE["name"]} tedge-mapper-c8y Should Be Equal ${SERVICE["status"]} down @@ -83,7 +83,7 @@ Check health status of tedge-mapper-c8y service on broker restart [Documentation] Test tedge-mapper-c8y on mqtt broker restart Custom Test Setup - Device Should Exist ${DEVICE_SN}_tedge-mapper-c8y show_info=False + Device Should Exist ${DEVICE_SN}:device:main:service:tedge-mapper-c8y show_info=False ${SERVICE}= Cumulocity.Device Should Have Fragment Values status\=up timeout=${TIMEOUT} Should Be Equal ${SERVICE["name"]} tedge-mapper-c8y Should Be Equal ${SERVICE["status"]} up @@ -92,7 +92,7 @@ Check health status of tedge-mapper-c8y service on broker restart ThinEdgeIO.Service Should Be Running mosquitto.service Sleep 5s reason=Wait for any potential status changes to be sent to Cumulocity IoT - Device Should Exist ${DEVICE_SN}_tedge-mapper-c8y show_info=False + Device Should Exist ${DEVICE_SN}:device:main:service:tedge-mapper-c8y show_info=False ${SERVICE}= Cumulocity.Device Should Have Fragment Values status\=up timeout=${TIMEOUT} Should Be Equal ${SERVICE["name"]} tedge-mapper-c8y Should Be Equal ${SERVICE["status"]} up @@ -103,17 +103,18 @@ Check health status of child device service [Documentation] Test service status of child device services # Create the child device by sending the service status on tedge/health// Date: Wed, 11 Oct 2023 13:52:23 +0000 Subject: [PATCH 7/8] Services publish registration message on startup Signed-off-by: Marcel Guzik --- Cargo.lock | 4 + crates/bin/c8y-device-management/Cargo.toml | 1 + crates/bin/c8y-device-management/src/main.rs | 31 +++- crates/core/tedge_agent/src/agent.rs | 3 + crates/core/tedge_api/src/entity_store.rs | 47 +++++- crates/core/tedge_api/src/mqtt_topics.rs | 136 +++++++++++++++--- crates/core/tedge_mapper/src/c8y/mapper.rs | 31 +++- crates/core/tedge_mapper/src/core/mapper.rs | 8 +- .../tedge_watchdog/src/systemd_watchdog.rs | 49 +++++-- .../extensions/tedge_health_ext/src/actor.rs | 14 +- crates/extensions/tedge_health_ext/src/lib.rs | 95 +++++------- .../extensions/tedge_health_ext/src/tests.rs | 39 ++++- plugins/c8y_configuration_plugin/Cargo.toml | 7 +- plugins/c8y_configuration_plugin/src/lib.rs | 31 +++- plugins/c8y_firmware_plugin/Cargo.toml | 7 +- plugins/c8y_firmware_plugin/src/lib.rs | 31 +++- plugins/c8y_log_plugin/Cargo.toml | 7 +- plugins/c8y_log_plugin/src/main.rs | 37 ++++- plugins/tedge_configuration_plugin/src/lib.rs | 30 +++- plugins/tedge_log_plugin/src/lib.rs | 30 +++- .../MQTT_health_endpoints.robot | 4 +- .../health_tedge-mapper-collectd.robot | 3 +- .../software_management/software.robot | 4 +- 23 files changed, 513 insertions(+), 136 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2098733b4ec..65f65753e84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -500,6 +500,7 @@ dependencies = [ "c8y_http_proxy", "clap", "tedge_actors", + "tedge_api", "tedge_config", "tedge_file_system_ext", "tedge_health_ext", @@ -525,6 +526,7 @@ dependencies = [ "env_logger", "log", "tedge_actors", + "tedge_api", "tedge_config", "tedge_downloader_ext", "tedge_file_system_ext", @@ -545,6 +547,7 @@ dependencies = [ "c8y_http_proxy", "clap", "tedge_actors", + "tedge_api", "tedge_config", "tedge_downloader_ext", "tedge_health_ext", @@ -564,6 +567,7 @@ dependencies = [ "c8y_log_manager", "clap", "tedge_actors", + "tedge_api", "tedge_config", "tedge_file_system_ext", "tedge_health_ext", diff --git a/crates/bin/c8y-device-management/Cargo.toml b/crates/bin/c8y-device-management/Cargo.toml index 24d8a2ad29d..30ac4bbc798 100644 --- a/crates/bin/c8y-device-management/Cargo.toml +++ b/crates/bin/c8y-device-management/Cargo.toml @@ -19,6 +19,7 @@ clap = { workspace = true } env_logger = { workspace = true } log = { workspace = true } tedge_actors = { workspace = true } +tedge_api = { workspace = true } tedge_config = { workspace = true } tedge_downloader_ext = { workspace = true } tedge_file_system_ext = { workspace = true } diff --git a/crates/bin/c8y-device-management/src/main.rs b/crates/bin/c8y-device-management/src/main.rs index 56372af0366..3175eec7e00 100644 --- a/crates/bin/c8y-device-management/src/main.rs +++ b/crates/bin/c8y-device-management/src/main.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use c8y_config_manager::ConfigManagerBuilder; use c8y_config_manager::ConfigManagerConfig; use c8y_firmware_manager::FirmwareManagerBuilder; @@ -9,6 +10,10 @@ use c8y_log_manager::LogManagerConfig; use clap::Parser; use std::path::PathBuf; use tedge_actors::Runtime; +use tedge_api::mqtt_topics::DeviceTopicId; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::Service; use tedge_config::TEdgeConfigLocation; use tedge_config::TEdgeConfigRepository; use tedge_config::DEFAULT_TEDGE_CONFIG_PATH; @@ -92,7 +97,31 @@ async fn main() -> anyhow::Result<()> { )?; // Instantiate health monitor actor - let health_actor = HealthMonitorBuilder::new(PLUGIN_NAME, &mut mqtt_actor); + // TODO: take a user-configurable service topic id + let mqtt_device_topic_id = &tedge_config + .mqtt + .device_topic_id + .parse::() + .unwrap(); + + let service_topic_id = mqtt_device_topic_id + .to_default_service_topic_id(PLUGIN_NAME) + .with_context(|| { + format!( + "Device topic id {mqtt_device_topic_id} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", + ) + })?; + let service = Service { + service_topic_id, + device_topic_id: DeviceTopicId::new(mqtt_device_topic_id.clone()), + }; + let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.to_string()); + let health_actor = HealthMonitorBuilder::from_service_topic_id( + service, + &mut mqtt_actor, + &mqtt_schema, + tedge_config.service.ty.clone(), + ); // Shutdown on SIGINT let signal_actor = SignalActor::builder(&runtime.get_handle()); diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 9338272b75b..d9a12f7fc6b 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -48,6 +48,7 @@ pub struct AgentConfig { pub data_dir: DataDir, pub mqtt_device_topic_id: EntityTopicId, pub mqtt_topic_root: Arc, + pub service_type: String, } impl AgentConfig { @@ -114,6 +115,7 @@ impl AgentConfig { log_dir, mqtt_topic_root, mqtt_device_topic_id, + service_type: tedge_config.service.ty.clone(), }) } } @@ -199,6 +201,7 @@ impl Agent { service, &mut mqtt_actor_builder, &mqtt_schema, + self.config.service_type.clone(), ); // Tedge to Te topic converter diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index 14f99ee56cb..a46f2695c4d 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -8,7 +8,9 @@ // TODO: move entity business logic to its own module use crate::entity_store; +use crate::mqtt_topics::Channel; use crate::mqtt_topics::EntityTopicId; +use crate::mqtt_topics::MqttSchema; use crate::mqtt_topics::TopicIdError; use log::debug; use mqtt_channel::Message; @@ -16,6 +18,7 @@ use serde_json::json; use serde_json::Value as JsonValue; use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::fmt::Display; use thiserror::Error; /// Represents an "Entity topic identifier" portion of the MQTT topic @@ -473,6 +476,16 @@ pub enum EntityType { Service, } +impl Display for EntityType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + EntityType::MainDevice => write!(f, "device"), + EntityType::ChildDevice => write!(f, "child-device"), + EntityType::Service => write!(f, "service"), + } + } +} + impl EntityMetadata { /// Creates a entity metadata for the main device. pub fn main_device(device_id: String) -> Self { @@ -550,6 +563,12 @@ impl EntityRegistrationMessage { // Serialize/Deserialize. #[must_use] pub fn new(message: &Message) -> Option { + let topic_id = message + .topic + .name + .strip_prefix(MQTT_ROOT) + .and_then(|s| s.strip_prefix('/'))?; + let payload = parse_entity_register_payload(message.payload_bytes())?; let JsonValue::Object(mut properties) = payload else { @@ -596,12 +615,6 @@ impl EntityRegistrationMessage { None }; - let topic_id = message - .topic - .name - .strip_prefix(MQTT_ROOT) - .and_then(|s| s.strip_prefix('/'))?; - let other = JsonValue::Object(properties); assert_eq!(other.get("@id"), None); @@ -627,6 +640,28 @@ impl EntityRegistrationMessage { other: serde_json::json!({}), } } + + // TODO: manual serialize impl + pub fn to_mqtt_message(mut self, mqtt_schema: &MqttSchema) -> Message { + let mut props = serde_json::Map::new(); + + props.insert("@type".to_string(), self.r#type.to_string().into()); + + if let Some(external_id) = self.external_id { + props.insert("@id".to_string(), external_id.as_ref().to_string().into()); + } + + if let Some(parent) = self.parent { + props.insert("@parent".to_string(), parent.to_string().into()); + } + + props.append(self.other.as_object_mut().unwrap()); + + let message = serde_json::to_string(&props).unwrap(); + + let message_topic = mqtt_schema.topic_for(&self.topic_id, &Channel::EntityMetadata); + Message::new(&message_topic, message).with_retain() + } } impl TryFrom<&Message> for EntityRegistrationMessage { diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index 6c4d216503f..84207c3edd4 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -94,7 +94,12 @@ impl MqttSchema { /// ); /// ``` pub fn topic_for(&self, entity: &EntityTopicId, channel: &Channel) -> mqtt_channel::Topic { - let topic = format!("{}/{}/{}", self.root, entity, channel); + let channel = channel.to_string(); + let topic = if channel.is_empty() { + format!("{}/{entity}", self.root) + } else { + format!("{}/{entity}/{channel}", self.root) + }; mqtt_channel::Topic::new(&topic).unwrap() } @@ -272,6 +277,16 @@ impl EntityTopicId { format!("device/{child}/service/{service}").parse() } + /// Assuming `self` is a device in default MQTT scheme, create an + /// `EntityTopicId` for a service on that device. + /// + /// Returns `None` if `self` is not in default MQTT scheme or if `service` + /// is an invalid service name. + pub fn default_service_for_device(&self, service: &str) -> Option { + let device_name = self.default_device_name()?; + Self::default_child_service(device_name, service).ok() + } + /// Returns true if the current topic id matches the default topic scheme: /// - device/// : for devices /// - device//service/ : for services @@ -334,12 +349,8 @@ impl EntityTopicId { /// The device topic id must be in a format: "device/DEVICE_NAME//"; if not, /// `None` will be returned. pub fn to_default_service_topic_id(&self, service_name: &str) -> Option { - if let ["device", device_name, "", ""] = self.0.split('/').collect::>()[..] { - return Some(ServiceTopicId(EntityTopicId(format!( - "device/{device_name}/service/{service_name}" - )))); - } - None + self.default_service_for_device(service_name) + .map(ServiceTopicId) } /// Returns an array of all segments of this entity topic. @@ -387,16 +398,11 @@ impl ServiceTopicId { pub fn entity(&self) -> &EntityTopicId { &self.0 } +} - /// If in a default MQTT scheme, returns a device topic id of this service. - pub fn to_device_topic_id(&self) -> Option { - if let ["device", device_name, "service", _] = self.0.segments() { - Some(DeviceTopicId(EntityTopicId(format!( - "device/{device_name}//" - )))) - } else { - None - } +impl From for ServiceTopicId { + fn from(value: EntityTopicId) -> Self { + Self::new(value) } } @@ -427,6 +433,12 @@ impl DeviceTopicId { } } +impl From for DeviceTopicId { + fn from(value: EntityTopicId) -> Self { + Self::new(value) + } +} + #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] pub enum TopicIdError { #[error("An entity topic identifier has at most 4 segments")] @@ -725,4 +737,96 @@ mod tests { Err(TopicIdError::InvalidMqttTopic) ); } + + // TODO: we can forgot to update the test when adding variants, figure out a + // way to use type system to fail if not all values checked + #[test] + fn topic_for() { + let mqtt_schema = MqttSchema::new(); + + let device: EntityTopicId = "device/main//".parse().unwrap(); + + assert_eq!( + mqtt_schema.topic_for(&device, &Channel::EntityMetadata), + mqtt_channel::Topic::new_unchecked("te/device/main//") + ); + assert_eq!( + mqtt_schema.topic_for( + &device, + &Channel::Measurement { + measurement_type: "type".to_string() + } + ), + mqtt_channel::Topic::new_unchecked("te/device/main///m/type") + ); + assert_eq!( + mqtt_schema.topic_for( + &device, + &Channel::MeasurementMetadata { + measurement_type: "type".to_string() + } + ), + mqtt_channel::Topic::new_unchecked("te/device/main///m/type/meta") + ); + + assert_eq!( + mqtt_schema.topic_for( + &device, + &Channel::Event { + event_type: "type".to_string() + } + ), + mqtt_channel::Topic::new_unchecked("te/device/main///e/type") + ); + assert_eq!( + mqtt_schema.topic_for( + &device, + &Channel::EventMetadata { + event_type: "type".to_string() + } + ), + mqtt_channel::Topic::new_unchecked("te/device/main///e/type/meta") + ); + assert_eq!( + mqtt_schema.topic_for( + &device, + &Channel::Alarm { + alarm_type: "type".to_string() + } + ), + mqtt_channel::Topic::new_unchecked("te/device/main///a/type") + ); + assert_eq!( + mqtt_schema.topic_for( + &device, + &Channel::AlarmMetadata { + alarm_type: "type".to_string() + } + ), + mqtt_channel::Topic::new_unchecked("te/device/main///a/type/meta") + ); + assert_eq!( + mqtt_schema.topic_for( + &device, + &Channel::Command { + operation: OperationType::Health, + cmd_id: "check".to_string() + } + ), + mqtt_channel::Topic::new_unchecked("te/device/main///cmd/health/check") + ); + assert_eq!( + mqtt_schema.topic_for( + &device, + &Channel::CommandMetadata { + operation: OperationType::LogUpload + } + ), + mqtt_channel::Topic::new_unchecked("te/device/main///cmd/log_upload") + ); + assert_eq!( + mqtt_schema.topic_for(&device, &Channel::Health), + mqtt_channel::Topic::new_unchecked("te/device/main///status/health") + ); + } } diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index a999dd29a1a..a60d14be9ad 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -1,13 +1,17 @@ use crate::core::component::TEdgeComponent; use crate::core::mapper::start_basic_actors; +use anyhow::Context; use async_trait::async_trait; use c8y_auth_proxy::actor::C8yAuthProxyBuilder; use c8y_http_proxy::credentials::C8YJwtRetriever; use c8y_http_proxy::C8YHttpProxyBuilder; use c8y_mapper_ext::actor::C8yMapperBuilder; use c8y_mapper_ext::config::C8yMapperConfig; +use c8y_mapper_ext::converter::CumulocityConverter; use mqtt_channel::Config; use std::path::Path; +use tedge_api::entity_store::EntityExternalId; +use tedge_api::mqtt_topics::EntityTopicId; use tedge_config::TEdgeConfig; use tedge_downloader_ext::DownloaderActor; use tedge_file_system_ext::FsWatchActorBuilder; @@ -76,17 +80,30 @@ impl TEdgeComponent for CumulocityMapper { } pub fn service_monitor_client_config(tedge_config: &TEdgeConfig) -> Result { - let device_name = tedge_config.device.id.try_read(tedge_config)?.to_string(); + let main_device_xid: EntityExternalId = tedge_config.device.id.try_read(tedge_config)?.into(); let service_type = tedge_config.service.ty.clone(); - // from this level we don't have access to the entity store and registered main device so best - // we can do for now is just guess it - // TODO: fix this, preferably use a HealthMonitorActor - let service_external_id = format!("{device_name}:device:main:service:tedge-mapper-c8y"); + // FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme + + // there is one mapper instance per cloud per thin-edge instance, perhaps we should use some + // predefined topic id instead of trying to derive it from current device? + let entity_topic_id: EntityTopicId = tedge_config + .mqtt + .device_topic_id + .clone() + .parse() + .context("Invalid device_topic_id")?; + + let mapper_service_topic_id = entity_topic_id + .default_service_for_device(CUMULOCITY_MAPPER_NAME) + .context("Can't derive service name if device topic id not in default scheme")?; + + let mapper_service_external_id = + CumulocityConverter::map_to_c8y_external_id(&mapper_service_topic_id, &main_device_xid); let last_will_message = c8y_api::smartrest::inventory::service_creation_message( - service_external_id.as_str(), - "tedge-mapper-c8y", + mapper_service_external_id.as_ref(), + CUMULOCITY_MAPPER_NAME, service_type.as_str(), "down", &[], diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index bc49327e006..3c71d9c5615 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -30,8 +30,12 @@ pub async fn start_basic_actors( device_topic_id: DeviceTopicId::new("device/main//".parse::().unwrap()), }; let mqtt_schema = MqttSchema::with_root(config.mqtt.topic_root.clone()); - let health_actor = - HealthMonitorBuilder::from_service_topic_id(service, &mut mqtt_actor, &mqtt_schema); + let health_actor = HealthMonitorBuilder::from_service_topic_id( + service, + &mut mqtt_actor, + &mqtt_schema, + config.service.ty.clone(), + ); // Shutdown on SIGINT let signal_actor = SignalActor::builder(&runtime.get_handle()); diff --git a/crates/core/tedge_watchdog/src/systemd_watchdog.rs b/crates/core/tedge_watchdog/src/systemd_watchdog.rs index 7c7817c23bb..2aa76998811 100644 --- a/crates/core/tedge_watchdog/src/systemd_watchdog.rs +++ b/crates/core/tedge_watchdog/src/systemd_watchdog.rs @@ -18,6 +18,9 @@ use std::time::Instant; use tedge_api::health::health_status_down_message; use tedge_api::health::health_status_up_message; use tedge_api::health::send_health_status; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; use tedge_config::TEdgeConfigLocation; use time::format_description; use time::OffsetDateTime; @@ -26,6 +29,7 @@ use tracing::error; use tracing::info; use tracing::warn; +// TODO: extract to common module #[derive(Debug, Serialize, Deserialize)] pub struct HealthStatus { status: String, @@ -71,6 +75,24 @@ async fn start_watchdog_for_self() -> Result<(), WatchdogError> { } async fn start_watchdog_for_tedge_services(tedge_config_dir: PathBuf) { + // let tedge_config_location = tedge_config::TEdgeConfigLocation::from_custom_root(&config_dir); + let tedge_config_location = + tedge_config::TEdgeConfigLocation::from_custom_root(tedge_config_dir.clone()); + let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone()); + let tedge_config = config_repository.load().expect("Could not load config"); + + let mqtt_topic_root = tedge_config.mqtt.topic_root.clone(); + let mqtt_schema = MqttSchema::with_root(mqtt_topic_root); + + // TODO: now that we have entity registration, instead of hardcoding, the watchdog can see all + // running services by looking at registration messages + let device_topic_id = tedge_config + .mqtt + .device_topic_id + .parse::() + .expect("Services not in default scheme unsupported"); + + // let device_topic_id = tedge_config_dir let tedge_services = vec![ "tedge-mapper-c8y", "tedge-mapper-az", @@ -80,24 +102,31 @@ async fn start_watchdog_for_tedge_services(tedge_config_dir: PathBuf) { "tedge-log-plugin", "c8y-configuration-plugin", "c8y-firmware-plugin", - ]; + ] + .into_iter() + .map(|s| { + device_topic_id + .default_service_for_device(s) + .expect("Services not in default scheme unsupported") + }) + .collect::>(); let watchdog_tasks = FuturesUnordered::new(); for service in tedge_services { - match get_watchdog_sec(&format!("/lib/systemd/system/{service}.service")) { + let service_name = service.default_service_name().unwrap(); + match get_watchdog_sec(&format!("/lib/systemd/system/{service_name}.service")) { Ok(interval) => { - let req_topic = format!("tedge/health-check/{service}"); - let res_topic = format!("tedge/health/{service}"); - let tedge_config_location = - tedge_config::TEdgeConfigLocation::from_custom_root(tedge_config_dir.clone()); + let req_topic = format!("tedge/health-check/{service_name}"); + let res_topic = mqtt_schema.topic_for(&service, &Channel::Health); + let tedge_config_location = tedge_config_location.clone(); watchdog_tasks.push(tokio::spawn(async move { monitor_tedge_service( tedge_config_location, - service, + service.as_str(), &req_topic, - &res_topic, + res_topic, interval / 4, ) .await @@ -117,7 +146,7 @@ async fn monitor_tedge_service( tedge_config_location: TEdgeConfigLocation, name: &str, req_topic: &str, - res_topic: &str, + res_topic: Topic, interval: u64, ) -> Result<(), WatchdogError> { let client_id: &str = &format!("{}_{}", name, nanoid!()); @@ -126,7 +155,7 @@ async fn monitor_tedge_service( let mqtt_config = tedge_config .mqtt_config()? .with_session_name(client_id) - .with_subscriptions(res_topic.try_into()?) + .with_subscriptions(res_topic.into()) .with_initial_message(|| health_status_up_message("tedge-watchdog")) .with_last_will_message(health_status_down_message("tedge-watchdog")); let client = mqtt_channel::Connection::new(&mqtt_config).await?; diff --git a/crates/extensions/tedge_health_ext/src/actor.rs b/crates/extensions/tedge_health_ext/src/actor.rs index 1672aa14f2f..c50135f1c7d 100644 --- a/crates/extensions/tedge_health_ext/src/actor.rs +++ b/crates/extensions/tedge_health_ext/src/actor.rs @@ -5,19 +5,24 @@ use tedge_actors::RuntimeError; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_api::health::ServiceHealthTopic; +use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttMessage; pub struct HealthMonitorActor { + // TODO(marcel): move this + service_registration_message: Option, health_topic: ServiceHealthTopic, messages: SimpleMessageBox, } impl HealthMonitorActor { pub fn new( + service_registration_message: Option, health_topic: ServiceHealthTopic, messages: SimpleMessageBox, ) -> Self { Self { + service_registration_message, health_topic, messages, } @@ -39,11 +44,14 @@ impl Actor for HealthMonitorActor { } async fn run(mut self) -> Result<(), RuntimeError> { + if let Some(registration_message) = &self.service_registration_message { + self.messages.send(registration_message.clone()).await?; + } + self.messages.send(self.up_health_status()).await?; + while let Some(_message) = self.messages.recv().await { - { - self.messages.send(self.up_health_status()).await?; - } + self.messages.send(self.up_health_status()).await?; } Ok(()) } diff --git a/crates/extensions/tedge_health_ext/src/lib.rs b/crates/extensions/tedge_health_ext/src/lib.rs index 357606865fa..619c9f4b152 100644 --- a/crates/extensions/tedge_health_ext/src/lib.rs +++ b/crates/extensions/tedge_health_ext/src/lib.rs @@ -13,53 +13,23 @@ use tedge_actors::RuntimeRequestSink; use tedge_actors::ServiceConsumer; use tedge_actors::ServiceProvider; use tedge_actors::SimpleMessageBoxBuilder; +use tedge_api::entity_store::EntityRegistrationMessage; +use tedge_api::entity_store::EntityType; use tedge_api::health::ServiceHealthTopic; -use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::MqttSchema; -use tedge_api::mqtt_topics::OperationType; use tedge_api::mqtt_topics::Service; +use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttConfig; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::TopicFilter; pub struct HealthMonitorBuilder { - service_health_topic: ServiceHealthTopic, + registration_message: Option, + health_topic: ServiceHealthTopic, box_builder: SimpleMessageBoxBuilder, } impl HealthMonitorBuilder { - /// Creates a HealthMonitorBuilder that creates a HealthMonitorActor with - /// old topic scheme. - pub fn new( - service_name: &str, - mqtt: &mut (impl ServiceProvider + AsMut), - ) -> Self { - // Connect this actor to MQTT - let subscriptions = vec![ - "tedge/health-check", - &format!("tedge/health-check/{service_name}"), - ] - .try_into() - .expect("Failed to create the HealthMonitorActor topic filter"); - - let service_health_topic = - ServiceHealthTopic::from_old_topic(format!("tedge/health/{service_name}")).unwrap(); - - let mut box_builder = SimpleMessageBoxBuilder::new(service_name, 16); - box_builder - .set_request_sender(mqtt.connect_consumer(subscriptions, box_builder.get_sender())); - - let builder = HealthMonitorBuilder { - service_health_topic, - box_builder, - }; - - // Update the MQTT config - *mqtt.as_mut() = builder.set_init_and_last_will(mqtt.as_mut().clone()); - - builder - } - /// Creates a HealthMonitorBuilder that creates a HealthMonitorActor with /// a new topic scheme. pub fn from_service_topic_id( @@ -67,53 +37,52 @@ impl HealthMonitorBuilder { mqtt: &mut (impl ServiceProvider + AsMut), // TODO: pass it less annoying way mqtt_schema: &MqttSchema, + service_type: String, ) -> Self { - let Service { - service_topic_id, - device_topic_id, - } = service; - - let service_health_topic = - ServiceHealthTopic::from_new_topic(&service_topic_id, mqtt_schema); + let service_topic_id = &service.service_topic_id; let mut box_builder = SimpleMessageBoxBuilder::new(service_topic_id.as_str(), 16); + let service_name = service_topic_id.entity().default_service_name().unwrap(); let subscriptions = vec![ - mqtt_schema.topic_for( - service_topic_id.entity(), - &Channel::Command { - operation: OperationType::Health, - cmd_id: "check".to_string(), - }, - ), - mqtt_schema.topic_for( - device_topic_id.entity(), - &Channel::Command { - operation: OperationType::Health, - cmd_id: "check".to_string(), - }, - ), + "tedge/health-check", + &format!("tedge/health-check/{service_name}"), ] - .into_iter() - .map(|t| t.into()) - .collect::(); + .try_into() + .expect("Failed to create the HealthMonitorActor topic filter"); box_builder .set_request_sender(mqtt.connect_consumer(subscriptions, box_builder.get_sender())); + let registration_message = EntityRegistrationMessage { + topic_id: service_topic_id.entity().clone(), + external_id: None, + r#type: EntityType::Service, + parent: Some(service.device_topic_id.entity().clone()), + other: serde_json::json!({"type": service_type}), + }; + let registration_message = registration_message.to_mqtt_message(mqtt_schema); + + let health_topic = ServiceHealthTopic::from_new_topic(service_topic_id, mqtt_schema); + let builder = HealthMonitorBuilder { - service_health_topic, + health_topic, + registration_message: Some(registration_message), box_builder, }; // Update the MQTT config + + // XXX: if the same MqttActorBuilder is used in different actors, then + // this will override init messages that may have been set by other + // actors! *mqtt.as_mut() = builder.set_init_and_last_will(mqtt.as_mut().clone()); builder } fn set_init_and_last_will(&self, config: MqttConfig) -> MqttConfig { - let name = self.service_health_topic.to_owned(); + let name = self.health_topic.to_owned(); let _name = name.clone(); config .with_initial_message(move || _name.up_message()) @@ -132,7 +101,9 @@ impl Builder for HealthMonitorBuilder { fn try_build(self) -> Result { let message_box = self.box_builder.build(); - let actor = HealthMonitorActor::new(self.service_health_topic, message_box); + + let actor = + HealthMonitorActor::new(self.registration_message, self.health_topic, message_box); Ok(actor) } diff --git a/crates/extensions/tedge_health_ext/src/tests.rs b/crates/extensions/tedge_health_ext/src/tests.rs index 02d110b36e6..84e6b2b1f1c 100644 --- a/crates/extensions/tedge_health_ext/src/tests.rs +++ b/crates/extensions/tedge_health_ext/src/tests.rs @@ -1,6 +1,7 @@ use crate::HealthMonitorBuilder; use crate::TopicFilter; use std::time::Duration; +use tedge_actors::test_helpers::MessageReceiverExt; use tedge_actors::Actor; use tedge_actors::Builder; use tedge_actors::DynSender; @@ -9,6 +10,9 @@ use tedge_actors::Sender; use tedge_actors::ServiceProvider; use tedge_actors::SimpleMessageBox; use tedge_actors::SimpleMessageBoxBuilder; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::Service; use tedge_mqtt_ext::MqttConfig; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; @@ -24,8 +28,15 @@ async fn send_health_check_message_to_generic_topic() -> Result<(), anyhow::Erro let health_check_request = MqttMessage::new(&Topic::new_unchecked("tedge/health-check"), ""); mqtt_message_box.send(health_check_request).await.unwrap(); + // skip registration message + mqtt_message_box.skip(1).await; + if let Some(message) = timeout(TEST_TIMEOUT, mqtt_message_box.recv()).await? { - assert!(message.payload_str()?.contains("up")) + assert!( + message.payload_str()?.contains("up"), + "{} doesn't contain \"up\"", + message.payload_str().unwrap() + ) } Ok(()) @@ -40,8 +51,15 @@ async fn send_health_check_message_to_service_specific_topic() -> Result<(), any MqttMessage::new(&Topic::new_unchecked("tedge/health-check/health-test"), ""); mqtt_message_box.send(health_check_request).await.unwrap(); + // skip registration message + mqtt_message_box.skip(1).await; + if let Some(message) = timeout(TEST_TIMEOUT, mqtt_message_box.recv()).await? { - assert!(message.payload_str()?.contains("up")) + assert!( + message.payload_str()?.contains("up"), + "{} doesn't contain \"up\"", + message.payload_str().unwrap() + ) } Ok(()) @@ -53,7 +71,7 @@ async fn health_check_set_init_and_last_will_message() -> Result<(), anyhow::Err let _ = spawn_a_health_check_actor("test", &mut mqtt_config).await; let expected_last_will = MqttMessage::new( - &Topic::new_unchecked("tedge/health/test"), + &Topic::new_unchecked("te/device/main/service/test/status/health"), format!(r#"{{"pid":{},"status":"down"}}"#, std::process::id()), ); let expected_last_will = expected_last_will.with_retain(); @@ -68,7 +86,20 @@ async fn spawn_a_health_check_actor( ) -> SimpleMessageBox { let mut health_mqtt_builder = MqttActorBuilder::new(mqtt_config); - let health_actor = HealthMonitorBuilder::new(service_to_be_monitored, &mut health_mqtt_builder); + let mqtt_schema = MqttSchema::new(); + let service = Service { + service_topic_id: EntityTopicId::default_main_service(service_to_be_monitored) + .unwrap() + .into(), + device_topic_id: EntityTopicId::default_main_device().into(), + }; + + let health_actor = HealthMonitorBuilder::from_service_topic_id( + service, + &mut health_mqtt_builder, + &mqtt_schema, + "service".to_string(), + ); let actor = health_actor.build(); tokio::spawn(async move { actor.run().await }); diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml index beaf3408ad2..7e1211324d6 100644 --- a/plugins/c8y_configuration_plugin/Cargo.toml +++ b/plugins/c8y_configuration_plugin/Cargo.toml @@ -14,11 +14,12 @@ anyhow = { workspace = true } c8y_config_manager = { workspace = true } c8y_http_proxy = { workspace = true } clap = { workspace = true } -tedge_actors = { workspace = true } -tedge_config = { workspace = true } +tedge_actors = { workspace = true } +tedge_api = { workspace = true } +tedge_config = { workspace = true } tedge_file_system_ext = { workspace = true } tedge_health_ext = { workspace = true } -tedge_http_ext = { workspace = true } +tedge_http_ext = { workspace = true } tedge_mqtt_ext = { workspace = true } tedge_signal_ext = { workspace = true } tedge_timer_ext = { workspace = true } diff --git a/plugins/c8y_configuration_plugin/src/lib.rs b/plugins/c8y_configuration_plugin/src/lib.rs index b938d983486..7fcc6582a35 100644 --- a/plugins/c8y_configuration_plugin/src/lib.rs +++ b/plugins/c8y_configuration_plugin/src/lib.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use c8y_config_manager::ConfigManagerBuilder; use c8y_config_manager::ConfigManagerConfig; use c8y_http_proxy::credentials::C8YJwtRetriever; @@ -5,6 +6,10 @@ use c8y_http_proxy::C8YHttpProxyBuilder; use std::path::Path; use std::path::PathBuf; use tedge_actors::Runtime; +use tedge_api::mqtt_topics::DeviceTopicId; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::Service; use tedge_config::system_services::get_log_level; use tedge_config::system_services::set_log_level; use tedge_config::CertificateError; @@ -97,7 +102,31 @@ async fn run_with( let mut mqtt_actor = MqttActorBuilder::new(mqtt_config.clone().with_session_name(PLUGIN_NAME)); // Instantiate health monitor actor - let health_actor = HealthMonitorBuilder::new(PLUGIN_NAME, &mut mqtt_actor); + // TODO: take a user-configurable service topic id + let mqtt_device_topic_id = &tedge_config + .mqtt + .device_topic_id + .parse::() + .unwrap(); + + let service_topic_id = mqtt_device_topic_id + .to_default_service_topic_id(PLUGIN_NAME) + .with_context(|| { + format!( + "Device topic id {mqtt_device_topic_id} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", + ) + })?; + let service = Service { + service_topic_id, + device_topic_id: DeviceTopicId::new(mqtt_device_topic_id.clone()), + }; + let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.to_string()); + let health_actor = HealthMonitorBuilder::from_service_topic_id( + service, + &mut mqtt_actor, + &mqtt_schema, + tedge_config.service.ty.clone(), + ); // Instantiate config manager actor let config_manager_config = ConfigManagerConfig::from_tedge_config(config_dir, &tedge_config)?; diff --git a/plugins/c8y_firmware_plugin/Cargo.toml b/plugins/c8y_firmware_plugin/Cargo.toml index cf1001a0075..1f7587c2cf7 100644 --- a/plugins/c8y_firmware_plugin/Cargo.toml +++ b/plugins/c8y_firmware_plugin/Cargo.toml @@ -14,9 +14,10 @@ anyhow = { workspace = true } c8y_firmware_manager = { workspace = true } c8y_http_proxy = { workspace = true } clap = { workspace = true } -tedge_actors = { workspace = true } -tedge_config = { workspace = true } -tedge_downloader_ext = { workspace = true } +tedge_actors = { workspace = true } +tedge_api = { workspace = true } +tedge_config = { workspace = true } +tedge_downloader_ext = { workspace = true } tedge_health_ext = { workspace = true } tedge_mqtt_ext = { workspace = true } tedge_signal_ext = { workspace = true } diff --git a/plugins/c8y_firmware_plugin/src/lib.rs b/plugins/c8y_firmware_plugin/src/lib.rs index f00445437ab..d3361e07f16 100644 --- a/plugins/c8y_firmware_plugin/src/lib.rs +++ b/plugins/c8y_firmware_plugin/src/lib.rs @@ -1,8 +1,13 @@ +use anyhow::Context; use c8y_firmware_manager::FirmwareManagerBuilder; use c8y_firmware_manager::FirmwareManagerConfig; use c8y_http_proxy::credentials::C8YJwtRetriever; use std::path::PathBuf; use tedge_actors::Runtime; +use tedge_api::mqtt_topics::DeviceTopicId; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::Service; use tedge_config::system_services::get_log_level; use tedge_config::system_services::set_log_level; use tedge_config::TEdgeConfig; @@ -84,7 +89,31 @@ async fn run_with(tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let mut mqtt_actor = MqttActorBuilder::new(mqtt_config.clone().with_session_name(PLUGIN_NAME)); //Instantiate health monitor actor - let health_actor = HealthMonitorBuilder::new(PLUGIN_NAME, &mut mqtt_actor); + // TODO: take a user-configurable service topic id + let mqtt_device_topic_id = &tedge_config + .mqtt + .device_topic_id + .parse::() + .unwrap(); + + let service_topic_id = mqtt_device_topic_id + .to_default_service_topic_id(PLUGIN_NAME) + .with_context(|| { + format!( + "Device topic id {mqtt_device_topic_id} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", + ) + })?; + let service = Service { + service_topic_id, + device_topic_id: DeviceTopicId::new(mqtt_device_topic_id.clone()), + }; + let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.to_string()); + let health_actor = HealthMonitorBuilder::from_service_topic_id( + service, + &mut mqtt_actor, + &mqtt_schema, + tedge_config.service.ty.clone(), + ); // Instantiate firmware manager actor let firmware_manager_config = FirmwareManagerConfig::from_tedge_config(&tedge_config)?; diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml index 84df2279924..a76b060cb15 100644 --- a/plugins/c8y_log_plugin/Cargo.toml +++ b/plugins/c8y_log_plugin/Cargo.toml @@ -14,11 +14,12 @@ anyhow = { workspace = true } c8y_http_proxy = { workspace = true } c8y_log_manager = { workspace = true } clap = { workspace = true } -tedge_actors = { workspace = true } -tedge_config = { workspace = true } +tedge_actors = { workspace = true } +tedge_api = { workspace = true } +tedge_config = { workspace = true } tedge_file_system_ext = { workspace = true } tedge_health_ext = { workspace = true } -tedge_http_ext = { workspace = true } +tedge_http_ext = { workspace = true } tedge_mqtt_ext = { workspace = true } tedge_signal_ext = { workspace = true } tedge_utils = { workspace = true } diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs index f0575c2c5a8..93e2bc5e1d6 100644 --- a/plugins/c8y_log_plugin/src/main.rs +++ b/plugins/c8y_log_plugin/src/main.rs @@ -1,3 +1,4 @@ +use anyhow::Context; use anyhow::Result; use c8y_http_proxy::credentials::C8YJwtRetriever; use c8y_http_proxy::C8YHttpProxyBuilder; @@ -7,6 +8,10 @@ use clap::Parser; use std::path::Path; use std::path::PathBuf; use tedge_actors::Runtime; +use tedge_api::mqtt_topics::DeviceTopicId; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::Service; use tedge_config::system_services::get_log_level; use tedge_config::system_services::set_log_level; use tedge_config::CertificateError; @@ -27,7 +32,7 @@ const AFTER_HELP_TEXT: &str = r#"On start, `c8y-log-plugin` notifies the cloud t The thin-edge `CONFIG_DIR` is used to store: * c8y-log-plugin.toml - the configuration file that specifies which logs to be retrieved"#; -const C8Y_LOG_PLUGIN: &str = "c8y-log-plugin"; +const PLUGIN_NAME: &str = "c8y-log-plugin"; #[derive(Debug, clap::Parser, Clone)] #[clap( @@ -91,12 +96,38 @@ async fn run(config_dir: impl AsRef, tedge_config: TEdgeConfig) -> Result< let mut runtime = Runtime::try_new(runtime_events_logger).await?; let base_mqtt_config = mqtt_config(&tedge_config)?; - let mqtt_config = base_mqtt_config.clone().with_session_name(C8Y_LOG_PLUGIN); + let mqtt_config = base_mqtt_config.clone().with_session_name(PLUGIN_NAME); let c8y_http_config = (&tedge_config).try_into()?; let mut mqtt_actor = MqttActorBuilder::new(mqtt_config); - let health_actor = HealthMonitorBuilder::new(C8Y_LOG_PLUGIN, &mut mqtt_actor); + + // TODO: take a user-configurable service topic id + let mqtt_device_topic_id = &tedge_config + .mqtt + .device_topic_id + .parse::() + .unwrap(); + + let service_topic_id = mqtt_device_topic_id + .to_default_service_topic_id(PLUGIN_NAME) + .with_context(|| { + format!( + "Device topic id {mqtt_device_topic_id} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", + ) + })?; + let service = Service { + service_topic_id, + device_topic_id: DeviceTopicId::new(mqtt_device_topic_id.clone()), + }; + let mqtt_schema = MqttSchema::with_root(tedge_config.mqtt.topic_root.to_string()); + let health_actor = HealthMonitorBuilder::from_service_topic_id( + service, + &mut mqtt_actor, + &mqtt_schema, + tedge_config.service.ty.clone(), + ); + let mut jwt_actor = C8YJwtRetriever::builder(base_mqtt_config); let mut http_actor = HttpActor::new().builder(); let mut c8y_http_proxy_actor = diff --git a/plugins/tedge_configuration_plugin/src/lib.rs b/plugins/tedge_configuration_plugin/src/lib.rs index f1645781aa1..5afc94ac006 100644 --- a/plugins/tedge_configuration_plugin/src/lib.rs +++ b/plugins/tedge_configuration_plugin/src/lib.rs @@ -1,8 +1,12 @@ +use anyhow::Context; use clap::Parser; use std::path::PathBuf; use std::sync::Arc; use tedge_actors::Runtime; +use tedge_api::mqtt_topics::DeviceTopicId; +use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::Service; use tedge_config::system_services::get_log_level; use tedge_config::system_services::set_log_level; use tedge_config::TEdgeConfig; @@ -30,7 +34,7 @@ The thin-edge `CONFIG_DIR` is used: ** `mqtt.topic_root` and `mqtt.device_topic_id`: for the MQTT topics to publish to and subscribe from * to find/store the `tedge-configuration-plugin.toml`: the plugin configuration file"#; -const TEDGE_CONFIGURATION_PLUGIN: &str = "tedge-configuration-plugin"; +const PLUGIN_NAME: &str = "tedge-configuration-plugin"; #[derive(Debug, Parser, Clone)] #[clap( @@ -93,12 +97,32 @@ async fn run_with( let mqtt_config = tedge_config.mqtt_config()?; let mut mqtt_actor = MqttActorBuilder::new(mqtt_config.clone().with_session_name(format!( - "{TEDGE_CONFIGURATION_PLUGIN}#{mqtt_topic_root}/{mqtt_device_topic_id}", + "{PLUGIN_NAME}#{mqtt_topic_root}/{mqtt_device_topic_id}", ))); let mut fs_watch_actor = FsWatchActorBuilder::new(); - let health_actor = HealthMonitorBuilder::new(TEDGE_CONFIGURATION_PLUGIN, &mut mqtt_actor); + // TODO: take a user-configurable service topic id + let mqtt_device_topic_id = mqtt_device_topic_id.parse::().unwrap(); + + let service_topic_id = mqtt_device_topic_id + .to_default_service_topic_id(PLUGIN_NAME) + .with_context(|| { + format!( + "Device topic id {mqtt_device_topic_id} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", + ) + })?; + let service = Service { + service_topic_id, + device_topic_id: DeviceTopicId::new(mqtt_device_topic_id.clone()), + }; + let mqtt_schema = MqttSchema::with_root(mqtt_topic_root.to_string()); + let health_actor = HealthMonitorBuilder::from_service_topic_id( + service, + &mut mqtt_actor, + &mqtt_schema, + tedge_config.service.ty.clone(), + ); let mut downloader_actor = DownloaderActor::new().builder(); diff --git a/plugins/tedge_log_plugin/src/lib.rs b/plugins/tedge_log_plugin/src/lib.rs index 3201f12e502..e1b5dcba68e 100644 --- a/plugins/tedge_log_plugin/src/lib.rs +++ b/plugins/tedge_log_plugin/src/lib.rs @@ -1,8 +1,12 @@ +use anyhow::Context; use clap::Parser; use std::path::PathBuf; use std::sync::Arc; use tedge_actors::Runtime; +use tedge_api::mqtt_topics::DeviceTopicId; +use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; +use tedge_api::mqtt_topics::Service; use tedge_config::system_services::get_log_level; use tedge_config::system_services::set_log_level; use tedge_config::TEdgeConfig; @@ -25,7 +29,7 @@ const AFTER_HELP_TEXT: &str = r#"The thin-edge `CONFIG_DIR` is used: ** `root.topic` and `device.topic`: for the MQTT topics to publish to and subscribe from * to find/store the `tedge-log-plugin.toml`: the configuration file that specifies which logs to be retrieved"#; -const TEDGE_LOG_PLUGIN: &str = "tedge-log-plugin"; +const PLUGIN_NAME: &str = "tedge-log-plugin"; #[derive(Debug, Parser, Clone)] #[clap( @@ -88,12 +92,32 @@ async fn run_with( let mqtt_config = tedge_config.mqtt_config()?; let mut mqtt_actor = MqttActorBuilder::new(mqtt_config.clone().with_session_name(format!( - "{TEDGE_LOG_PLUGIN}#{mqtt_topic_root}/{mqtt_device_topic_id}", + "{PLUGIN_NAME}#{mqtt_topic_root}/{mqtt_device_topic_id}", ))); let mut fs_watch_actor = FsWatchActorBuilder::new(); - let health_actor = HealthMonitorBuilder::new(TEDGE_LOG_PLUGIN, &mut mqtt_actor); + // TODO: take a user-configurable service topic id + let mqtt_device_topic_id = mqtt_device_topic_id.parse::().unwrap(); + + let service_topic_id = mqtt_device_topic_id + .to_default_service_topic_id(PLUGIN_NAME) + .with_context(|| { + format!( + "Device topic id {mqtt_device_topic_id} currently needs default scheme, e.g: 'device/DEVICE_NAME//'", + ) + })?; + let service = Service { + service_topic_id, + device_topic_id: DeviceTopicId::new(mqtt_device_topic_id.clone()), + }; + let mqtt_schema = MqttSchema::with_root(mqtt_topic_root.to_string()); + let health_actor = HealthMonitorBuilder::from_service_topic_id( + service, + &mut mqtt_actor, + &mqtt_schema, + tedge_config.service.ty.clone(), + ); let mut uploader_actor = UploaderActor::new().builder(); diff --git a/tests/RobotFramework/tests/MQTT_health_check/MQTT_health_endpoints.robot b/tests/RobotFramework/tests/MQTT_health_check/MQTT_health_endpoints.robot index ca27a27a018..10379f5b47e 100644 --- a/tests/RobotFramework/tests/MQTT_health_check/MQTT_health_endpoints.robot +++ b/tests/RobotFramework/tests/MQTT_health_check/MQTT_health_endpoints.robot @@ -17,7 +17,7 @@ tedge-log-plugin health status Sleep 5s reason=It fails without this! It needs a better way of queuing requests ${pid}= Execute Command pgrep -f '^/usr/bin/tedge-log-plugin' strip=${True} Execute Command sudo tedge mqtt pub 'tedge/health-check/tedge-log-plugin' '' - ${messages}= Should Have MQTT Messages tedge/health/tedge-log-plugin minimum=1 maximum=2 + ${messages}= Should Have MQTT Messages te/device/main/service/tedge-log-plugin/status/health minimum=1 maximum=2 Should Contain ${messages[0]} "pid":${pid} Should Contain ${messages[0]} "status":"up" @@ -27,6 +27,6 @@ c8y-configuration-plugin health status Sleep 5s reason=It fails without this! It needs a better way of queuing requests ${pid}= Execute Command pgrep -f '^/usr/bin/c8y[_-]configuration[_-]plugin' strip=${True} Execute Command sudo tedge mqtt pub 'tedge/health-check/c8y-configuration-plugin' '' - ${messages}= Should Have MQTT Messages tedge/health/c8y-configuration-plugin minimum=1 maximum=2 + ${messages}= Should Have MQTT Messages te/device/main/service/c8y-configuration-plugin/status/health minimum=1 maximum=2 Should Contain ${messages[0]} "pid":${pid} Should Contain ${messages[0]} "status":"up" diff --git a/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-collectd.robot b/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-collectd.robot index 6e728d28a7c..199382761c3 100644 --- a/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-collectd.robot +++ b/tests/RobotFramework/tests/MQTT_health_check/health_tedge-mapper-collectd.robot @@ -46,6 +46,7 @@ Stop watchdog service Remove entry from service file Execute Command sudo sed -i '10d' /lib/systemd/system/tedge-mapper-collectd.service + Execute Command sudo systemctl daemon-reload tedge-collectd-mapper health status Execute Command sudo systemctl start tedge-mapper-collectd.service @@ -53,6 +54,6 @@ tedge-collectd-mapper health status Sleep 5s reason=It fails without this! It needs a better way of queuing requests ${pid}= Execute Command pgrep -f '^/usr/bin/tedge-mapper collectd' strip=${True} Execute Command sudo tedge mqtt pub 'tedge/health-check/tedge-mapper-collectd' '' - ${messages}= Should Have MQTT Messages tedge/health/tedge-mapper-collectd minimum=1 maximum=2 + ${messages}= Should Have MQTT Messages te/device/main/service/tedge-mapper-collectd/status/health minimum=1 maximum=2 Should Contain ${messages[0]} "pid":${pid} Should Contain ${messages[0]} "status":"up" diff --git a/tests/RobotFramework/tests/cumulocity/software_management/software.robot b/tests/RobotFramework/tests/cumulocity/software_management/software.robot index c72ff7e35cd..2e7fe272c30 100644 --- a/tests/RobotFramework/tests/cumulocity/software_management/software.robot +++ b/tests/RobotFramework/tests/cumulocity/software_management/software.robot @@ -40,12 +40,12 @@ Custom Setup ${DEVICE_SN}= Setup Device Should Exist ${DEVICE_SN} Set Test Variable $DEVICE_SN - Should Have MQTT Messages tedge/health/tedge-mapper-c8y + Should Have MQTT Messages te/device/main/service/tedge-mapper-c8y/status/health Execute Command sudo start-http-server.sh [Documentation] WORKAROUND: #1731 The tedge-mapper-c8y is restarted due to a suspected race condition between the mapper and tedge-agent which results in the software list message being lost ${timestamp}= Get Unix Timestamp Restart Service tedge-mapper-c8y - Should Have MQTT Messages tedge/health/tedge-mapper-c8y date_from=${timestamp} + Should Have MQTT Messages te/device/main/service/tedge-mapper-c8y/status/health date_from=${timestamp} Stop tedge-agent [Timeout] 5 seconds From 2a8ccccc586a8fbf328f1fe928eb638d81b5e84b Mon Sep 17 00:00:00 2001 From: Marcel Guzik Date: Tue, 17 Oct 2023 15:55:01 +0000 Subject: [PATCH 8/8] Remove mapping of health status messages --- .../src/tedge_to_te_converter/converter.rs | 20 ------- .../src/tedge_to_te_converter/tests.rs | 54 ------------------- .../service_monitoring.robot | 5 +- .../convert_tedge_topics_to_te_topics.robot | 28 ---------- 4 files changed, 3 insertions(+), 104 deletions(-) diff --git a/crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs b/crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs index 1ebb63e21f3..d31ec7ee30e 100644 --- a/crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs +++ b/crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs @@ -30,9 +30,6 @@ impl TedgetoTeConverter { } topic if topic.name.starts_with("tedge/events") => self.convert_event(message), topic if topic.name.starts_with("tedge/alarms") => self.convert_alarm(message), - topic if topic.name.starts_with("tedge/health") => { - self.convert_health_status_message(message) - } _ => vec![], } } @@ -101,23 +98,6 @@ impl TedgetoTeConverter { message.topic = topic; vec![message] } - - // tedge/health/service-name -> te/device/main/service//status/health - // tedge/health/child/service-name -> te/device/child/service//status/health - fn convert_health_status_message(&mut self, mut message: MqttMessage) -> Vec { - let topic = match message.topic.name.split('/').collect::>()[..] { - ["tedge", "health", service_name] => Topic::new_unchecked( - format!("te/device/main/service/{service_name}/status/health").as_str(), - ), - ["tedge", "health", cid, service_name] => Topic::new_unchecked( - format!("te/device/{cid}/service/{service_name}/status/health").as_str(), - ), - _ => return vec![], - }; - message.topic = topic; - message.retain = true; - vec![message] - } } #[cfg(test)] mod tests { diff --git a/crates/core/tedge_agent/src/tedge_to_te_converter/tests.rs b/crates/core/tedge_agent/src/tedge_to_te_converter/tests.rs index 1e0870a3f6f..51194638686 100644 --- a/crates/core/tedge_agent/src/tedge_to_te_converter/tests.rs +++ b/crates/core/tedge_agent/src/tedge_to_te_converter/tests.rs @@ -311,58 +311,6 @@ async fn convert_incoming_child_device_event_topic() -> Result<(), DynError> { Ok(()) } -// tedge/health/service-name -> te/device/main/service//status/health -// tedge/health/child/service-name -> te/device/child/service//status/health -#[tokio::test] -async fn convert_incoming_main_device_service_health_status() -> Result<(), DynError> { - // Spawn incoming mqtt message converter - let mut mqtt_box = spawn_tedge_to_te_converter().await?; - - // Simulate health status of main device service MQTT message received. - let mqtt_message = MqttMessage::new( - &Topic::new_unchecked("tedge/health/myservice"), - r#"{""pid":1234,"status":"up","time":1674739912}"#, - ) - .with_retain(); - - let expected_mqtt_message = MqttMessage::new( - &Topic::new_unchecked("te/device/main/service/myservice/status/health"), - r#"{""pid":1234,"status":"up","time":1674739912}"#, - ) - .with_retain(); - - mqtt_box.send(mqtt_message).await?; - - // Assert health status message - mqtt_box.assert_received([expected_mqtt_message]).await; - Ok(()) -} - -#[tokio::test] -async fn convert_incoming_child_device_service_health_status() -> Result<(), DynError> { - // Spawn incoming mqtt message converter - let mut mqtt_box = spawn_tedge_to_te_converter().await?; - - // Simulate child device service health status MQTT message received. - let mqtt_message = MqttMessage::new( - &Topic::new_unchecked("tedge/health/child/myservice"), - r#"{""pid":1234,"status":"up","time":1674739912}"#, - ) - .with_retain(); - - let expected_mqtt_message = MqttMessage::new( - &Topic::new_unchecked("te/device/child/service/myservice/status/health"), - r#"{""pid":1234,"status":"up","time":1674739912}"#, - ) - .with_retain(); - - mqtt_box.send(mqtt_message).await?; - - // Assert health status mqtt message - mqtt_box.assert_received([expected_mqtt_message]).await; - Ok(()) -} - async fn spawn_tedge_to_te_converter( ) -> Result>, DynError> { // Tedge to Te topic converter @@ -374,8 +322,6 @@ async fn spawn_tedge_to_te_converter( "tedge/events/+/+", "tedge/alarms/+/+", "tedge/alarms/+/+/+", - "tedge/health/+", - "tedge/health/+/+", ] .try_into()?; diff --git a/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot b/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot index 4f12a017852..9b7b5fbd8a1 100644 --- a/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot +++ b/tests/RobotFramework/tests/cumulocity/service_monitoring/service_monitoring.robot @@ -101,12 +101,13 @@ Check health status of tedge-mapper-c8y service on broker restart Check health status of child device service [Documentation] Test service status of child device services - # Create the child device by sending the service status on tedge/health///service/