diff --git a/crates/core/tedge/src/bridge/aws.rs b/crates/core/tedge/src/bridge/aws.rs index 122c657b3d9..96cbfdef7e9 100644 --- a/crates/core/tedge/src/bridge/aws.rs +++ b/crates/core/tedge/src/bridge/aws.rs @@ -2,14 +2,15 @@ use super::BridgeConfig; use crate::bridge::config::BridgeLocation; use camino::Utf8PathBuf; use std::borrow::Cow; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; use tedge_config::HostPort; use tedge_config::ProfileName; use tedge_config::TopicPrefix; use tedge_config::MQTT_TLS_PORT; -const MOSQUITTO_BRIDGE_TOPIC: &str = "te/device/main/service/mosquitto-aws-bridge/status/health"; - -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug)] pub struct BridgeConfigAwsParams { pub mqtt_host: HostPort, pub config_file: Cow<'static, str>, @@ -20,6 +21,7 @@ pub struct BridgeConfigAwsParams { pub bridge_location: BridgeLocation, pub topic_prefix: TopicPrefix, pub profile_name: Option, + pub mqtt_schema: MqttSchema, } impl From for BridgeConfig { @@ -34,6 +36,7 @@ impl From for BridgeConfig { bridge_location, topic_prefix, profile_name, + mqtt_schema, } = params; let user_name = remote_clientid.to_string(); @@ -54,6 +57,11 @@ impl From for BridgeConfig { r#""" in 1 {topic_prefix}/connection-success thinedge/devices/{remote_clientid}/test-connection"# ); + let service_name = format!("mosquitto-{topic_prefix}-bridge"); + let health = mqtt_schema.topic_for( + &EntityTopicId::default_main_service(&service_name).unwrap(), + &Channel::Health, + ); Self { cloud_name: "aws".into(), config_file, @@ -83,7 +91,7 @@ impl From for BridgeConfig { local_clean_session: false, notifications: true, notifications_local_only: true, - notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(), + notification_topic: health.name, bridge_attempt_unsubscribe: false, topics: vec![ pub_msg_topic, @@ -115,6 +123,7 @@ fn test_bridge_config_from_aws_params() -> anyhow::Result<()> { bridge_location: BridgeLocation::Mosquitto, topic_prefix: "aws".try_into().unwrap(), profile_name: None, + mqtt_schema: MqttSchema::with_root("te".into()), }; let bridge = BridgeConfig::from(params); @@ -147,7 +156,7 @@ fn test_bridge_config_from_aws_params() -> anyhow::Result<()> { local_clean_session: false, notifications: true, notifications_local_only: true, - notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(), + notification_topic: "te/device/main/service/mosquitto-aws-bridge/status/health".into(), bridge_attempt_unsubscribe: false, bridge_location: BridgeLocation::Mosquitto, connection_check_attempts: 5, @@ -170,8 +179,9 @@ fn test_bridge_config_aws_custom_topic_prefix() -> anyhow::Result<()> { bridge_certfile: "./test-certificate.pem".into(), bridge_keyfile: "./test-private-key.pem".into(), bridge_location: BridgeLocation::Mosquitto, - topic_prefix: "custom".try_into().unwrap(), + topic_prefix: "aws-custom".try_into().unwrap(), profile_name: Some("profile".parse().unwrap()), + mqtt_schema: MqttSchema::with_root("te".into()), }; let bridge = BridgeConfig::from(params); @@ -191,11 +201,12 @@ fn test_bridge_config_aws_custom_topic_prefix() -> anyhow::Result<()> { use_mapper: true, use_agent: false, topics: vec![ - "td/# out 1 custom/ thinedge/alpha/".into(), - "cmd/# in 1 custom/ thinedge/alpha/".into(), - "shadow/# both 1 custom/ $aws/things/alpha/".into(), - r#""" out 1 custom/test-connection thinedge/devices/alpha/test-connection"#.into(), - r#""" in 1 custom/connection-success thinedge/devices/alpha/test-connection"#.into(), + "td/# out 1 aws-custom/ thinedge/alpha/".into(), + "cmd/# in 1 aws-custom/ thinedge/alpha/".into(), + "shadow/# both 1 aws-custom/ $aws/things/alpha/".into(), + r#""" out 1 aws-custom/test-connection thinedge/devices/alpha/test-connection"#.into(), + r#""" in 1 aws-custom/connection-success thinedge/devices/alpha/test-connection"# + .into(), ], try_private: false, start_type: "automatic".into(), @@ -204,7 +215,8 @@ fn test_bridge_config_aws_custom_topic_prefix() -> anyhow::Result<()> { local_clean_session: false, notifications: true, notifications_local_only: true, - notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(), + notification_topic: "te/device/main/service/mosquitto-aws-custom-bridge/status/health" + .into(), bridge_attempt_unsubscribe: false, bridge_location: BridgeLocation::Mosquitto, connection_check_attempts: 5, diff --git a/crates/core/tedge/src/bridge/azure.rs b/crates/core/tedge/src/bridge/azure.rs index de0e1147541..3b12846e7db 100644 --- a/crates/core/tedge/src/bridge/azure.rs +++ b/crates/core/tedge/src/bridge/azure.rs @@ -2,14 +2,15 @@ use super::BridgeConfig; use crate::bridge::config::BridgeLocation; use camino::Utf8PathBuf; use std::borrow::Cow; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; use tedge_config::HostPort; use tedge_config::ProfileName; use tedge_config::TopicPrefix; use tedge_config::MQTT_TLS_PORT; -const MOSQUITTO_BRIDGE_TOPIC: &str = "te/device/main/service/mosquitto-az-bridge/status/health"; - -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug)] pub struct BridgeConfigAzureParams { pub mqtt_host: HostPort, pub config_file: Cow<'static, str>, @@ -20,6 +21,7 @@ pub struct BridgeConfigAzureParams { pub bridge_location: BridgeLocation, pub topic_prefix: TopicPrefix, pub profile_name: Option, + pub mqtt_schema: MqttSchema, } impl From for BridgeConfig { @@ -34,6 +36,7 @@ impl From for BridgeConfig { bridge_location, topic_prefix, profile_name, + mqtt_schema, } = params; let address = mqtt_host.clone(); @@ -46,6 +49,12 @@ impl From for BridgeConfig { format!("messages/events/# out 1 {topic_prefix}/ devices/{remote_clientid}/"); let sub_msg_topic = format!("messages/devicebound/# in 1 {topic_prefix}/ devices/{remote_clientid}/"); + + let service_name = format!("mosquitto-{topic_prefix}-bridge"); + let health = mqtt_schema.topic_for( + &EntityTopicId::default_main_service(&service_name).unwrap(), + &Channel::Health, + ); Self { cloud_name: "az".into(), config_file, @@ -75,7 +84,7 @@ impl From for BridgeConfig { local_clean_session: false, notifications: true, notifications_local_only: true, - notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(), + notification_topic: health.name, bridge_attempt_unsubscribe: false, topics: vec![ // See Azure IoT Hub documentation for detailed explanation on the topics @@ -112,6 +121,7 @@ fn test_bridge_config_from_azure_params() -> anyhow::Result<()> { bridge_location: BridgeLocation::Mosquitto, topic_prefix: "az".try_into().unwrap(), profile_name: None, + mqtt_schema: MqttSchema::with_root("te".into()), }; let bridge = BridgeConfig::from(params); @@ -146,7 +156,7 @@ fn test_bridge_config_from_azure_params() -> anyhow::Result<()> { local_clean_session: false, notifications: true, notifications_local_only: true, - notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(), + notification_topic: "te/device/main/service/mosquitto-az-bridge/status/health".into(), bridge_attempt_unsubscribe: false, bridge_location: BridgeLocation::Mosquitto, connection_check_attempts: 1, @@ -171,8 +181,9 @@ fn test_azure_bridge_config_with_custom_prefix() -> anyhow::Result<()> { bridge_certfile: "./test-certificate.pem".into(), bridge_keyfile: "./test-private-key.pem".into(), bridge_location: BridgeLocation::Mosquitto, - topic_prefix: "custom".try_into().unwrap(), + topic_prefix: "az-custom".try_into().unwrap(), profile_name: Some("profile".parse().unwrap()), + mqtt_schema: MqttSchema::with_root("te".into()), }; let bridge = BridgeConfig::from(params); @@ -192,13 +203,13 @@ fn test_azure_bridge_config_with_custom_prefix() -> anyhow::Result<()> { use_mapper: true, use_agent: false, topics: vec![ - "messages/events/# out 1 custom/ devices/alpha/".into(), - "messages/devicebound/# in 1 custom/ devices/alpha/".into(), - "methods/POST/# in 1 custom/ $iothub/".into(), - "methods/res/# out 1 custom/ $iothub/".into(), - "twin/res/# in 1 custom/ $iothub/".into(), - "twin/GET/# out 1 custom/ $iothub/".into(), - "twin/PATCH/# out 1 custom/ $iothub/".into(), + "messages/events/# out 1 az-custom/ devices/alpha/".into(), + "messages/devicebound/# in 1 az-custom/ devices/alpha/".into(), + "methods/POST/# in 1 az-custom/ $iothub/".into(), + "methods/res/# out 1 az-custom/ $iothub/".into(), + "twin/res/# in 1 az-custom/ $iothub/".into(), + "twin/GET/# out 1 az-custom/ $iothub/".into(), + "twin/PATCH/# out 1 az-custom/ $iothub/".into(), ], try_private: false, start_type: "automatic".into(), @@ -207,7 +218,8 @@ fn test_azure_bridge_config_with_custom_prefix() -> anyhow::Result<()> { local_clean_session: false, notifications: true, notifications_local_only: true, - notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(), + notification_topic: "te/device/main/service/mosquitto-az-custom-bridge/status/health" + .into(), bridge_attempt_unsubscribe: false, bridge_location: BridgeLocation::Mosquitto, connection_check_attempts: 1, diff --git a/crates/core/tedge/src/bridge/c8y.rs b/crates/core/tedge/src/bridge/c8y.rs index 7fbbf20a872..e2c8fbded08 100644 --- a/crates/core/tedge/src/bridge/c8y.rs +++ b/crates/core/tedge/src/bridge/c8y.rs @@ -3,6 +3,9 @@ use crate::bridge::config::BridgeLocation; use camino::Utf8PathBuf; use std::borrow::Cow; use std::process::Command; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::EntityTopicId; +use tedge_api::mqtt_topics::MqttSchema; use tedge_config::auth_method::AuthMethod; use tedge_config::AutoFlag; use tedge_config::HostPort; @@ -12,9 +15,7 @@ use tedge_config::TopicPrefix; use tedge_config::MQTT_TLS_PORT; use which::which; -const C8Y_BRIDGE_HEALTH_TOPIC: &str = "te/device/main/service/mosquitto-c8y-bridge/status/health"; - -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug)] pub struct BridgeConfigC8yParams { pub mqtt_host: HostPort, pub config_file: Cow<'static, str>, @@ -30,6 +31,7 @@ pub struct BridgeConfigC8yParams { pub bridge_location: BridgeLocation, pub topic_prefix: TopicPrefix, pub profile_name: Option, + pub mqtt_schema: MqttSchema, } impl From for BridgeConfig { @@ -49,6 +51,7 @@ impl From for BridgeConfig { bridge_location, topic_prefix, profile_name, + mqtt_schema, } = params; let mut topics: Vec = vec![ @@ -146,6 +149,11 @@ impl From for BridgeConfig { AuthMethod::Certificate }; + let service_name = format!("mosquitto-{topic_prefix}-bridge"); + let health = mqtt_schema.topic_for( + &EntityTopicId::default_main_service(&service_name).unwrap(), + &Channel::Health, + ); Self { cloud_name: "c8y".into(), config_file, @@ -175,9 +183,7 @@ impl From for BridgeConfig { local_clean_session: false, notifications: true, notifications_local_only: true, - - // FIXME: doesn't account for custom topic root, use MQTT scheme API here - notification_topic: C8Y_BRIDGE_HEALTH_TOPIC.into(), + notification_topic: health.name, bridge_attempt_unsubscribe: false, topics, bridge_location, @@ -242,6 +248,7 @@ mod tests { bridge_location: BridgeLocation::Mosquitto, topic_prefix: "c8y".try_into().unwrap(), profile_name: None, + mqtt_schema: MqttSchema::with_root("te".into()), }; let bridge = BridgeConfig::from(params); @@ -306,7 +313,7 @@ mod tests { local_clean_session: false, notifications: true, notifications_local_only: true, - notification_topic: C8Y_BRIDGE_HEALTH_TOPIC.into(), + notification_topic: "te/device/main/service/mosquitto-c8y-bridge/status/health".into(), bridge_attempt_unsubscribe: false, bridge_location: BridgeLocation::Mosquitto, connection_check_attempts: 1, @@ -336,6 +343,7 @@ mod tests { bridge_location: BridgeLocation::Mosquitto, topic_prefix: "c8y".try_into().unwrap(), profile_name: Some("profile".parse().unwrap()), + mqtt_schema: MqttSchema::with_root("te".into()), }; let bridge = BridgeConfig::from(params); @@ -407,7 +415,7 @@ mod tests { local_clean_session: false, notifications: true, notifications_local_only: true, - notification_topic: C8Y_BRIDGE_HEALTH_TOPIC.into(), + notification_topic: "te/device/main/service/mosquitto-c8y-bridge/status/health".into(), bridge_attempt_unsubscribe: false, bridge_location: BridgeLocation::Mosquitto, connection_check_attempts: 1, diff --git a/crates/core/tedge/src/cli/connect/command.rs b/crates/core/tedge/src/cli/connect/command.rs index 0388fc4ecff..0988567d061 100644 --- a/crates/core/tedge/src/cli/connect/command.rs +++ b/crates/core/tedge/src/cli/connect/command.rs @@ -365,6 +365,7 @@ pub fn bridge_config( true => BridgeLocation::BuiltIn, false => BridgeLocation::Mosquitto, }; + let mqtt_schema = MqttSchema::with_root(config.mqtt.topic_root.clone()); match cloud { MaybeBorrowedCloud::Azure(profile) => { let az_config = config.az.try_get(profile.as_deref())?; @@ -381,6 +382,7 @@ pub fn bridge_config( bridge_location, topic_prefix: az_config.bridge.topic_prefix.clone(), profile_name: profile.clone().map(Cow::into_owned), + mqtt_schema, }; Ok(BridgeConfig::from(params)) @@ -400,6 +402,7 @@ pub fn bridge_config( bridge_location, topic_prefix: aws_config.bridge.topic_prefix.clone(), profile_name: profile.clone().map(Cow::into_owned), + mqtt_schema, }; Ok(BridgeConfig::from(params)) @@ -432,7 +435,7 @@ pub fn bridge_config( bridge_location, topic_prefix: c8y_config.bridge.topic_prefix.clone(), profile_name: profile.clone().map(Cow::into_owned), - mqtt_topic_root: config.mqtt.topic_root.clone(), + mqtt_schema, }; Ok(BridgeConfig::from(params)) diff --git a/crates/extensions/c8y_mapper_ext/src/config.rs b/crates/extensions/c8y_mapper_ext/src/config.rs index e775d3b82af..2d730f4c6a2 100644 --- a/crates/extensions/c8y_mapper_ext/src/config.rs +++ b/crates/extensions/c8y_mapper_ext/src/config.rs @@ -113,7 +113,7 @@ impl C8yMapperConfig { let bridge_service_name = if bridge_in_mapper { format!("tedge-mapper-bridge-{}", bridge_config.c8y_prefix) } else { - "mosquitto-c8y-bridge".into() + format!("mosquitto-{}-bridge", bridge_config.c8y_prefix) }; let bridge_health_topic = service_health_topic(&mqtt_schema, &device_topic_id, &bridge_service_name); diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 157e9fa558d..d3b775d6c8d 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -1079,7 +1079,7 @@ impl CumulocityConverter { tokio::spawn(async move { let op_name = op_name.as_str(); - let topic = C8yTopic::SmartRestResponse.to_topic(&c8y_prefix).unwrap(); + let topic = C8yTopic::upstream_topic(&c8y_prefix); if !skip_status_update { // mqtt client publishes executing @@ -1474,7 +1474,7 @@ impl CumulocityConverter { let device_data_message = self.inventory_device_type_update_message()?; let pending_operations_message = - create_get_pending_operations_message(&self.config.bridge_config.c8y_prefix)?; + create_get_pending_operations_message(&self.config.bridge_config.c8y_prefix); messages.append(&mut vec![ supported_operations_message, @@ -1512,11 +1512,11 @@ impl CumulocityConverter { } } -fn create_get_pending_operations_message( +pub fn create_get_pending_operations_message( prefix: &TopicPrefix, -) -> Result { - let topic = C8yTopic::SmartRestResponse.to_topic(prefix)?; - Ok(MqttMessage::new(&topic, request_pending_operations())) +) -> MqttMessage { + let topic = C8yTopic::upstream_topic(prefix); + MqttMessage::new(&topic, request_pending_operations()) } impl CumulocityConverter { diff --git a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs index 46bcaa3176b..048194e93be 100644 --- a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs +++ b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs @@ -3,11 +3,14 @@ use tedge_api::entity_store::EntityMetadata; use tedge_api::entity_store::EntityType; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::HealthStatus; +use tedge_api::Status; use tedge_config::TopicPrefix; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use tracing::error; +use crate::converter::create_get_pending_operations_message; + pub fn is_c8y_bridge_established( message: &MqttMessage, mqtt_schema: &MqttSchema, @@ -60,7 +63,16 @@ pub fn convert_health_status_message( return vec![]; }; - vec![status_message] + let mut value = vec![status_message]; + + if display_name == format!("mosquitto-{prefix}-bridge") && status == Status::Up { + // Receiving this message indicates mosquitto has reconnected (following a + // disconnection) to the cloud. We need to re-request operations in case any + // were triggered while we were down + value.push(create_get_pending_operations_message(prefix)); + } + + value } #[cfg(test)] diff --git a/tests/RobotFramework/tests/cumulocity/pending_operations/request_pending_operations.robot b/tests/RobotFramework/tests/cumulocity/pending_operations/request_pending_operations.robot new file mode 100644 index 00000000000..5526dc10bea --- /dev/null +++ b/tests/RobotFramework/tests/cumulocity/pending_operations/request_pending_operations.robot @@ -0,0 +1,93 @@ +*** Settings *** +Resource ../../../../resources/common.resource +Library Cumulocity +Library ThinEdgeIO + +Test Setup Custom Setup +Test Teardown Get Logs + +Test Tags theme:c8y theme:operation + + +*** Test Cases *** +Process any pending operations after connection disruptions to mosquitto bridge + ThinEdgeIO.Bridge Should Be Up c8y + + ThinEdgeIO.Disconnect From Network + ThinEdgeIO.Execute Command tedge connect c8y --test exp_exit_code=1 + + # Create cloud operation whilst the device is disconnected + ${operation}= Cumulocity.Get Configuration tedge-configuration-plugin + + # Restore connection + ThinEdgeIO.Connect To Network + ThinEdgeIO.Bridge Should Be Up c8y + + Operation Should Be SUCCESSFUL ${operation} + +Process any pending operations after connection disruptions to mosquitto bridge with custom topic-prefix + ThinEdgeIO.Execute Command tedge config set c8y.bridge.topic_prefix c8y-test + ThinEdgeIO.Execute Command tedge reconnect c8y + ThinEdgeIO.Bridge Should Be Up c8y-test + + ThinEdgeIO.Disconnect From Network + # Verify we are definitely disconnected from cloud + ThinEdgeIO.Execute Command tedge connect c8y --profile test --test exp_exit_code=1 + + # Create cloud operation whilst the device is disconnected + ${operation}= Cumulocity.Get Configuration tedge-configuration-plugin + + # Restore connection + ThinEdgeIO.Connect To Network + ThinEdgeIO.Bridge Should Be Up c8y-test + + Operation Should Be SUCCESSFUL ${operation} + +Process any pending operations after connection disruptions to built_in bridge + ThinEdgeIO.Execute Command tedge config set mqtt.bridge.built_in true + ThinEdgeIO.Execute Command tedge config set mqtt.bridge.reconnect_policy.initial_interval 0 + ThinEdgeIO.Execute Command tedge reconnect c8y + ThinEdgeIO.Bridge Should Be Up c8y + + ThinEdgeIO.Disconnect From Network + # Verify we are definitely disconnected from cloud + ThinEdgeIO.Execute Command tedge connect c8y --test exp_exit_code=1 + + # Create cloud operation whilst the device is disconnected + ${operation}= Cumulocity.Get Configuration tedge-configuration-plugin + + # Restore connection + ThinEdgeIO.Connect To Network + ThinEdgeIO.Bridge Should Be Up c8y + + Operation Should Be SUCCESSFUL ${operation} + +Process any pending operations after connection disruptions to profiled built-in bridge + ThinEdgeIO.Execute Command tedge config set mqtt.bridge.built_in true + ThinEdgeIO.Execute Command tedge config set mqtt.bridge.reconnect_policy.initial_interval 0 + ThinEdgeIO.Execute Command tedge config set c8y@test.url "$(tedge config get c8y.url)" + ThinEdgeIO.Execute Command tedge config set c8y@test.bridge.topic_prefix c8y-test + ThinEdgeIO.Execute Command tedge config unset c8y.url + ThinEdgeIO.Execute Command tedge disconnect c8y + ThinEdgeIO.Execute Command tedge connect c8y --profile test + ThinEdgeIO.Bridge Should Be Up c8y-test + + ThinEdgeIO.Disconnect From Network + # Verify we are definitely disconnected from cloud + ThinEdgeIO.Execute Command tedge connect c8y --profile test --test exp_exit_code=1 + + # Create cloud operation whilst the device is disconnected + ${operation}= Cumulocity.Get Configuration tedge-configuration-plugin + + # Restore connection + ThinEdgeIO.Connect To Network + ThinEdgeIO.Bridge Should Be Up c8y-test + + Operation Should Be SUCCESSFUL ${operation} + + +*** Keywords *** +Custom Setup + ${DEVICE_SN}= Setup + Set Suite Variable $DEVICE_SN + Device Should Exist ${DEVICE_SN}