diff --git a/crates/core/c8y_api/src/smartrest/inventory.rs b/crates/core/c8y_api/src/smartrest/inventory.rs index 42d3e473eb1..f15c5a82246 100644 --- a/crates/core/c8y_api/src/smartrest/inventory.rs +++ b/crates/core/c8y_api/src/smartrest/inventory.rs @@ -68,6 +68,21 @@ pub fn service_creation_message( ancestors: &[String], prefix: &TopicPrefix, ) -> Result { + Ok(MqttMessage::new( + &publish_topic_from_ancestors(ancestors, prefix), + service_creation_message_payload(service_id, service_name, service_type, service_status)?, + )) +} + +/// Create a SmartREST message for creating a service on device. +/// The provided ancestors list must contain all the parents of the given service +/// starting from its immediate parent device. +pub fn service_creation_message_payload( + service_id: &str, + service_name: &str, + service_type: &str, + service_status: &str, +) -> Result { // TODO: most of this noise can be eliminated by implementing `Serialize`/`Deserialize` for smartrest format if service_id.is_empty() { return Err(InvalidValueError { @@ -94,16 +109,13 @@ pub fn service_creation_message( }); } - Ok(MqttMessage::new( - &publish_topic_from_ancestors(ancestors, prefix), - fields_to_csv_string(&[ - "102", - service_id, - service_type, - service_name, - service_status, - ]), - )) + Ok(fields_to_csv_string(&[ + "102", + service_id, + service_type, + service_name, + service_status, + ])) } /// Create a SmartREST message for updating service status. diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index fdac59ef3a5..a629010f006 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -17,9 +17,11 @@ use tedge_config::TEdgeConfig; use tedge_downloader_ext::DownloaderActor; use tedge_file_system_ext::FsWatchActorBuilder; use tedge_http_ext::HttpActor; +use tedge_mqtt_bridge::rumqttc::LastWill; use tedge_mqtt_bridge::use_key_and_cert; use tedge_mqtt_bridge::BridgeConfig; use tedge_mqtt_bridge::MqttBridgeActorBuilder; +use tedge_mqtt_bridge::QoS; use tedge_mqtt_ext::MqttActorBuilder; use tedge_timer_ext::TimerActor; use tedge_uploader_ext::UploaderActor; @@ -89,14 +91,68 @@ impl TEdgeComponent for CumulocityMapper { &tedge_config, )?; - let bridge_actor = MqttBridgeActorBuilder::new( - &tedge_config, - c8y_mapper_config.bridge_service_name(), - tc, - cloud_config, - ) - .await; - runtime.spawn(bridge_actor).await?; + let main_device_xid: EntityExternalId = + tedge_config.device.id.try_read(&tedge_config)?.into(); + let service_type = &tedge_config.service.ty; + let service_type = if service_type.is_empty() { + "service".to_string() + } else { + service_type.to_string() + }; + + // FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme + + // there is one mapper instance per cloud per thin-edge instance, perhaps we should use some + // predefined topic id instead of trying to derive it from current device? + let entity_topic_id: EntityTopicId = tedge_config + .mqtt + .device_topic_id + .clone() + .parse() + .context("Invalid device_topic_id")?; + + let mapper_service_topic_id = entity_topic_id + .default_service_for_device(CUMULOCITY_MAPPER_NAME) + .context("Can't derive service name if device topic id not in default scheme")?; + + let mapper_service_external_id = CumulocityConverter::map_to_c8y_external_id( + &mapper_service_topic_id, + &main_device_xid, + ); + + let last_will_message_mapper = + c8y_api::smartrest::inventory::service_creation_message_payload( + mapper_service_external_id.as_ref(), + CUMULOCITY_MAPPER_NAME, + service_type.as_str(), + "down", + )?; + let last_will_message_bridge = + c8y_api::smartrest::inventory::service_creation_message_payload( + mapper_service_external_id.as_ref(), + &c8y_mapper_config.bridge_service_name(), + service_type.as_str(), + "down", + )?; + + cloud_config.set_last_will(LastWill { + topic: "s/us".into(), + qos: QoS::AtLeastOnce, + message: format!("{last_will_message_bridge}\n{last_will_message_mapper}").into(), + retain: false, + }); + + runtime + .spawn( + MqttBridgeActorBuilder::new( + &tedge_config, + c8y_mapper_config.bridge_service_name(), + tc, + cloud_config, + ) + .await, + ) + .await?; } let mut jwt_actor = C8YJwtRetriever::builder( mqtt_config.clone(), @@ -157,44 +213,5 @@ impl TEdgeComponent for CumulocityMapper { } pub fn service_monitor_client_config(tedge_config: &TEdgeConfig) -> Result { - let main_device_xid: EntityExternalId = tedge_config.device.id.try_read(tedge_config)?.into(); - let service_type = &tedge_config.service.ty; - let service_type = if service_type.is_empty() { - "service".to_string() - } else { - service_type.to_string() - }; - - // FIXME: this will not work if `mqtt.device_topic_id` is not in default scheme - - // there is one mapper instance per cloud per thin-edge instance, perhaps we should use some - // predefined topic id instead of trying to derive it from current device? - let entity_topic_id: EntityTopicId = tedge_config - .mqtt - .device_topic_id - .clone() - .parse() - .context("Invalid device_topic_id")?; - - let mapper_service_topic_id = entity_topic_id - .default_service_for_device(CUMULOCITY_MAPPER_NAME) - .context("Can't derive service name if device topic id not in default scheme")?; - - let mapper_service_external_id = - CumulocityConverter::map_to_c8y_external_id(&mapper_service_topic_id, &main_device_xid); - - let last_will_message = c8y_api::smartrest::inventory::service_creation_message( - mapper_service_external_id.as_ref(), - CUMULOCITY_MAPPER_NAME, - service_type.as_str(), - "down", - &[], - &tedge_config.c8y.bridge.topic_prefix, - )?; - - let mqtt_config = tedge_config - .mqtt_config()? - .with_session_name("last_will_c8y_mapper") - .with_last_will_message(last_will_message); - Ok(mqtt_config) + Ok(tedge_config.mqtt_config()?) } diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index e0e38afcd53..ae8ef77e349 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use certificate::parse_root_certificate::create_tls_config; use futures::SinkExt; use futures::StreamExt; +pub use rumqttc; use rumqttc::AsyncClient; use rumqttc::ClientError; use rumqttc::Event; diff --git a/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry_built-in_bridge.robot b/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry_built-in_bridge.robot index bb959b3b1ee..cd08c8142c5 100644 --- a/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry_built-in_bridge.robot +++ b/tests/RobotFramework/tests/cumulocity/telemetry/thin-edge_device_telemetry_built-in_bridge.robot @@ -24,6 +24,7 @@ Bridge stops if mapper stops running ${measurements}= Device Should Have Measurements minimum=1 maximum=1 type=CustomMeasurement series=temperature Log ${measurements} Execute Command systemctl stop tedge-mapper-c8y + Service Health Status Should Be Down tedge-mapper-bridge-custom-c8y-prefix Execute Command tedge mqtt pub ${C8Y_TOPIC_PREFIX}/s/us '200,CustomMeasurement,temperature,25' ${measurements}= Device Should Have Measurements minimum=1 maximum=1 type=CustomMeasurement series=temperature Log ${measurements}