Skip to content

Commit

Permalink
Rename mosquitto bridge notification topics to include topic prefix a…
Browse files Browse the repository at this point in the history
…nd handle dropped connections

Signed-off-by: James Rhodes <[email protected]>
  • Loading branch information
jarhodes314 committed Dec 11, 2024
1 parent bf8588d commit 6603b01
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 43 deletions.
36 changes: 24 additions & 12 deletions crates/core/tedge/src/bridge/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use super::BridgeConfig;
use crate::bridge::config::BridgeLocation;
use camino::Utf8PathBuf;
use std::borrow::Cow;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_config::HostPort;
use tedge_config::ProfileName;
use tedge_config::TopicPrefix;
use tedge_config::MQTT_TLS_PORT;

const MOSQUITTO_BRIDGE_TOPIC: &str = "te/device/main/service/mosquitto-aws-bridge/status/health";

#[derive(Debug, Eq, PartialEq)]
#[derive(Debug)]
pub struct BridgeConfigAwsParams {
pub mqtt_host: HostPort<MQTT_TLS_PORT>,
pub config_file: Cow<'static, str>,
Expand All @@ -20,6 +21,7 @@ pub struct BridgeConfigAwsParams {
pub bridge_location: BridgeLocation,
pub topic_prefix: TopicPrefix,
pub profile_name: Option<ProfileName>,
pub mqtt_schema: MqttSchema,
}

impl From<BridgeConfigAwsParams> for BridgeConfig {
Expand All @@ -34,6 +36,7 @@ impl From<BridgeConfigAwsParams> for BridgeConfig {
bridge_location,
topic_prefix,
profile_name,
mqtt_schema,
} = params;

let user_name = remote_clientid.to_string();
Expand All @@ -54,6 +57,11 @@ impl From<BridgeConfigAwsParams> for BridgeConfig {
r#""" in 1 {topic_prefix}/connection-success thinedge/devices/{remote_clientid}/test-connection"#
);

let service_name = format!("mosquitto-{topic_prefix}-bridge");
let health = mqtt_schema.topic_for(
&EntityTopicId::default_main_service(&service_name).unwrap(),
&Channel::Health,
);
Self {
cloud_name: "aws".into(),
config_file,
Expand Down Expand Up @@ -83,7 +91,7 @@ impl From<BridgeConfigAwsParams> for BridgeConfig {
local_clean_session: false,
notifications: true,
notifications_local_only: true,
notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(),
notification_topic: health.name,
bridge_attempt_unsubscribe: false,
topics: vec![
pub_msg_topic,
Expand Down Expand Up @@ -115,6 +123,7 @@ fn test_bridge_config_from_aws_params() -> anyhow::Result<()> {
bridge_location: BridgeLocation::Mosquitto,
topic_prefix: "aws".try_into().unwrap(),
profile_name: None,
mqtt_schema: MqttSchema::with_root("te".into()),
};

let bridge = BridgeConfig::from(params);
Expand Down Expand Up @@ -147,7 +156,7 @@ fn test_bridge_config_from_aws_params() -> anyhow::Result<()> {
local_clean_session: false,
notifications: true,
notifications_local_only: true,
notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(),
notification_topic: "te/device/main/service/mosquitto-aws-bridge/status/health".into(),
bridge_attempt_unsubscribe: false,
bridge_location: BridgeLocation::Mosquitto,
connection_check_attempts: 5,
Expand All @@ -170,8 +179,9 @@ fn test_bridge_config_aws_custom_topic_prefix() -> anyhow::Result<()> {
bridge_certfile: "./test-certificate.pem".into(),
bridge_keyfile: "./test-private-key.pem".into(),
bridge_location: BridgeLocation::Mosquitto,
topic_prefix: "custom".try_into().unwrap(),
topic_prefix: "aws-custom".try_into().unwrap(),
profile_name: Some("profile".parse().unwrap()),
mqtt_schema: MqttSchema::with_root("te".into()),
};

let bridge = BridgeConfig::from(params);
Expand All @@ -191,11 +201,12 @@ fn test_bridge_config_aws_custom_topic_prefix() -> anyhow::Result<()> {
use_mapper: true,
use_agent: false,
topics: vec![
"td/# out 1 custom/ thinedge/alpha/".into(),
"cmd/# in 1 custom/ thinedge/alpha/".into(),
"shadow/# both 1 custom/ $aws/things/alpha/".into(),
r#""" out 1 custom/test-connection thinedge/devices/alpha/test-connection"#.into(),
r#""" in 1 custom/connection-success thinedge/devices/alpha/test-connection"#.into(),
"td/# out 1 aws-custom/ thinedge/alpha/".into(),
"cmd/# in 1 aws-custom/ thinedge/alpha/".into(),
"shadow/# both 1 aws-custom/ $aws/things/alpha/".into(),
r#""" out 1 aws-custom/test-connection thinedge/devices/alpha/test-connection"#.into(),
r#""" in 1 aws-custom/connection-success thinedge/devices/alpha/test-connection"#
.into(),
],
try_private: false,
start_type: "automatic".into(),
Expand All @@ -204,7 +215,8 @@ fn test_bridge_config_aws_custom_topic_prefix() -> anyhow::Result<()> {
local_clean_session: false,
notifications: true,
notifications_local_only: true,
notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(),
notification_topic: "te/device/main/service/mosquitto-aws-custom-bridge/status/health"
.into(),
bridge_attempt_unsubscribe: false,
bridge_location: BridgeLocation::Mosquitto,
connection_check_attempts: 5,
Expand Down
40 changes: 26 additions & 14 deletions crates/core/tedge/src/bridge/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use super::BridgeConfig;
use crate::bridge::config::BridgeLocation;
use camino::Utf8PathBuf;
use std::borrow::Cow;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_config::HostPort;
use tedge_config::ProfileName;
use tedge_config::TopicPrefix;
use tedge_config::MQTT_TLS_PORT;

const MOSQUITTO_BRIDGE_TOPIC: &str = "te/device/main/service/mosquitto-az-bridge/status/health";

#[derive(Debug, Eq, PartialEq)]
#[derive(Debug)]
pub struct BridgeConfigAzureParams {
pub mqtt_host: HostPort<MQTT_TLS_PORT>,
pub config_file: Cow<'static, str>,
Expand All @@ -20,6 +21,7 @@ pub struct BridgeConfigAzureParams {
pub bridge_location: BridgeLocation,
pub topic_prefix: TopicPrefix,
pub profile_name: Option<ProfileName>,
pub mqtt_schema: MqttSchema,
}

impl From<BridgeConfigAzureParams> for BridgeConfig {
Expand All @@ -34,6 +36,7 @@ impl From<BridgeConfigAzureParams> for BridgeConfig {
bridge_location,
topic_prefix,
profile_name,
mqtt_schema,
} = params;

let address = mqtt_host.clone();
Expand All @@ -46,6 +49,12 @@ impl From<BridgeConfigAzureParams> for BridgeConfig {
format!("messages/events/# out 1 {topic_prefix}/ devices/{remote_clientid}/");
let sub_msg_topic =
format!("messages/devicebound/# in 1 {topic_prefix}/ devices/{remote_clientid}/");

let service_name = format!("mosquitto-{topic_prefix}-bridge");
let health = mqtt_schema.topic_for(
&EntityTopicId::default_main_service(&service_name).unwrap(),
&Channel::Health,
);
Self {
cloud_name: "az".into(),
config_file,
Expand Down Expand Up @@ -75,7 +84,7 @@ impl From<BridgeConfigAzureParams> for BridgeConfig {
local_clean_session: false,
notifications: true,
notifications_local_only: true,
notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(),
notification_topic: health.name,
bridge_attempt_unsubscribe: false,
topics: vec![
// See Azure IoT Hub documentation for detailed explanation on the topics
Expand Down Expand Up @@ -112,6 +121,7 @@ fn test_bridge_config_from_azure_params() -> anyhow::Result<()> {
bridge_location: BridgeLocation::Mosquitto,
topic_prefix: "az".try_into().unwrap(),
profile_name: None,
mqtt_schema: MqttSchema::with_root("te".into()),
};

let bridge = BridgeConfig::from(params);
Expand Down Expand Up @@ -146,7 +156,7 @@ fn test_bridge_config_from_azure_params() -> anyhow::Result<()> {
local_clean_session: false,
notifications: true,
notifications_local_only: true,
notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(),
notification_topic: "te/device/main/service/mosquitto-az-bridge/status/health".into(),
bridge_attempt_unsubscribe: false,
bridge_location: BridgeLocation::Mosquitto,
connection_check_attempts: 1,
Expand All @@ -171,8 +181,9 @@ fn test_azure_bridge_config_with_custom_prefix() -> anyhow::Result<()> {
bridge_certfile: "./test-certificate.pem".into(),
bridge_keyfile: "./test-private-key.pem".into(),
bridge_location: BridgeLocation::Mosquitto,
topic_prefix: "custom".try_into().unwrap(),
topic_prefix: "az-custom".try_into().unwrap(),
profile_name: Some("profile".parse().unwrap()),
mqtt_schema: MqttSchema::with_root("te".into()),
};

let bridge = BridgeConfig::from(params);
Expand All @@ -192,13 +203,13 @@ fn test_azure_bridge_config_with_custom_prefix() -> anyhow::Result<()> {
use_mapper: true,
use_agent: false,
topics: vec![
"messages/events/# out 1 custom/ devices/alpha/".into(),
"messages/devicebound/# in 1 custom/ devices/alpha/".into(),
"methods/POST/# in 1 custom/ $iothub/".into(),
"methods/res/# out 1 custom/ $iothub/".into(),
"twin/res/# in 1 custom/ $iothub/".into(),
"twin/GET/# out 1 custom/ $iothub/".into(),
"twin/PATCH/# out 1 custom/ $iothub/".into(),
"messages/events/# out 1 az-custom/ devices/alpha/".into(),
"messages/devicebound/# in 1 az-custom/ devices/alpha/".into(),
"methods/POST/# in 1 az-custom/ $iothub/".into(),
"methods/res/# out 1 az-custom/ $iothub/".into(),
"twin/res/# in 1 az-custom/ $iothub/".into(),
"twin/GET/# out 1 az-custom/ $iothub/".into(),
"twin/PATCH/# out 1 az-custom/ $iothub/".into(),
],
try_private: false,
start_type: "automatic".into(),
Expand All @@ -207,7 +218,8 @@ fn test_azure_bridge_config_with_custom_prefix() -> anyhow::Result<()> {
local_clean_session: false,
notifications: true,
notifications_local_only: true,
notification_topic: MOSQUITTO_BRIDGE_TOPIC.into(),
notification_topic: "te/device/main/service/mosquitto-az-custom-bridge/status/health"
.into(),
bridge_attempt_unsubscribe: false,
bridge_location: BridgeLocation::Mosquitto,
connection_check_attempts: 1,
Expand Down
24 changes: 16 additions & 8 deletions crates/core/tedge/src/bridge/c8y.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use crate::bridge::config::BridgeLocation;
use camino::Utf8PathBuf;
use std::borrow::Cow;
use std::process::Command;
use tedge_api::mqtt_topics::Channel;
use tedge_api::mqtt_topics::EntityTopicId;
use tedge_api::mqtt_topics::MqttSchema;
use tedge_config::auth_method::AuthMethod;
use tedge_config::AutoFlag;
use tedge_config::HostPort;
Expand All @@ -12,9 +15,7 @@ use tedge_config::TopicPrefix;
use tedge_config::MQTT_TLS_PORT;
use which::which;

const C8Y_BRIDGE_HEALTH_TOPIC: &str = "te/device/main/service/mosquitto-c8y-bridge/status/health";

#[derive(Debug, Eq, PartialEq)]
#[derive(Debug)]
pub struct BridgeConfigC8yParams {
pub mqtt_host: HostPort<MQTT_TLS_PORT>,
pub config_file: Cow<'static, str>,
Expand All @@ -30,6 +31,7 @@ pub struct BridgeConfigC8yParams {
pub bridge_location: BridgeLocation,
pub topic_prefix: TopicPrefix,
pub profile_name: Option<ProfileName>,
pub mqtt_schema: MqttSchema,
}

impl From<BridgeConfigC8yParams> for BridgeConfig {
Expand All @@ -49,6 +51,7 @@ impl From<BridgeConfigC8yParams> for BridgeConfig {
bridge_location,
topic_prefix,
profile_name,
mqtt_schema,
} = params;

let mut topics: Vec<String> = vec![
Expand Down Expand Up @@ -146,6 +149,11 @@ impl From<BridgeConfigC8yParams> for BridgeConfig {
AuthMethod::Certificate
};

let service_name = format!("mosquitto-{topic_prefix}-bridge");
let health = mqtt_schema.topic_for(
&EntityTopicId::default_main_service(&service_name).unwrap(),
&Channel::Health,
);
Self {
cloud_name: "c8y".into(),
config_file,
Expand Down Expand Up @@ -175,9 +183,7 @@ impl From<BridgeConfigC8yParams> for BridgeConfig {
local_clean_session: false,
notifications: true,
notifications_local_only: true,

// FIXME: doesn't account for custom topic root, use MQTT scheme API here
notification_topic: C8Y_BRIDGE_HEALTH_TOPIC.into(),
notification_topic: health.name,
bridge_attempt_unsubscribe: false,
topics,
bridge_location,
Expand Down Expand Up @@ -242,6 +248,7 @@ mod tests {
bridge_location: BridgeLocation::Mosquitto,
topic_prefix: "c8y".try_into().unwrap(),
profile_name: None,
mqtt_schema: MqttSchema::with_root("te".into()),
};

let bridge = BridgeConfig::from(params);
Expand Down Expand Up @@ -306,7 +313,7 @@ mod tests {
local_clean_session: false,
notifications: true,
notifications_local_only: true,
notification_topic: C8Y_BRIDGE_HEALTH_TOPIC.into(),
notification_topic: "te/device/main/service/mosquitto-c8y-bridge/status/health".into(),
bridge_attempt_unsubscribe: false,
bridge_location: BridgeLocation::Mosquitto,
connection_check_attempts: 1,
Expand Down Expand Up @@ -336,6 +343,7 @@ mod tests {
bridge_location: BridgeLocation::Mosquitto,
topic_prefix: "c8y".try_into().unwrap(),
profile_name: Some("profile".parse().unwrap()),
mqtt_schema: MqttSchema::with_root("te".into()),
};

let bridge = BridgeConfig::from(params);
Expand Down Expand Up @@ -407,7 +415,7 @@ mod tests {
local_clean_session: false,
notifications: true,
notifications_local_only: true,
notification_topic: C8Y_BRIDGE_HEALTH_TOPIC.into(),
notification_topic: "te/device/main/service/mosquitto-c8y-bridge/status/health".into(),
bridge_attempt_unsubscribe: false,
bridge_location: BridgeLocation::Mosquitto,
connection_check_attempts: 1,
Expand Down
5 changes: 4 additions & 1 deletion crates/core/tedge/src/cli/connect/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ pub fn bridge_config(
true => BridgeLocation::BuiltIn,
false => BridgeLocation::Mosquitto,
};
let mqtt_schema = MqttSchema::with_root(config.mqtt.topic_root.clone());
match cloud {
MaybeBorrowedCloud::Azure(profile) => {
let az_config = config.az.try_get(profile.as_deref())?;
Expand All @@ -381,6 +382,7 @@ pub fn bridge_config(
bridge_location,
topic_prefix: az_config.bridge.topic_prefix.clone(),
profile_name: profile.clone().map(Cow::into_owned),
mqtt_schema,
};

Ok(BridgeConfig::from(params))
Expand All @@ -400,6 +402,7 @@ pub fn bridge_config(
bridge_location,
topic_prefix: aws_config.bridge.topic_prefix.clone(),
profile_name: profile.clone().map(Cow::into_owned),
mqtt_schema,
};

Ok(BridgeConfig::from(params))
Expand Down Expand Up @@ -432,7 +435,7 @@ pub fn bridge_config(
bridge_location,
topic_prefix: c8y_config.bridge.topic_prefix.clone(),
profile_name: profile.clone().map(Cow::into_owned),
mqtt_topic_root: config.mqtt.topic_root.clone(),
mqtt_schema,
};

Ok(BridgeConfig::from(params))
Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_mapper_ext/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl C8yMapperConfig {
let bridge_service_name = if bridge_in_mapper {
format!("tedge-mapper-bridge-{}", bridge_config.c8y_prefix)
} else {
"mosquitto-c8y-bridge".into()
format!("mosquitto-{}-bridge", bridge_config.c8y_prefix)
};
let bridge_health_topic =
service_health_topic(&mqtt_schema, &device_topic_id, &bridge_service_name);
Expand Down
12 changes: 6 additions & 6 deletions crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ impl CumulocityConverter {

tokio::spawn(async move {
let op_name = op_name.as_str();
let topic = C8yTopic::SmartRestResponse.to_topic(&c8y_prefix).unwrap();
let topic = C8yTopic::upstream_topic(&c8y_prefix);

if !skip_status_update {
// mqtt client publishes executing
Expand Down Expand Up @@ -1474,7 +1474,7 @@ impl CumulocityConverter {
let device_data_message = self.inventory_device_type_update_message()?;

let pending_operations_message =
create_get_pending_operations_message(&self.config.bridge_config.c8y_prefix)?;
create_get_pending_operations_message(&self.config.bridge_config.c8y_prefix);

messages.append(&mut vec![
supported_operations_message,
Expand Down Expand Up @@ -1512,11 +1512,11 @@ impl CumulocityConverter {
}
}

fn create_get_pending_operations_message(
pub fn create_get_pending_operations_message(
prefix: &TopicPrefix,
) -> Result<MqttMessage, ConversionError> {
let topic = C8yTopic::SmartRestResponse.to_topic(prefix)?;
Ok(MqttMessage::new(&topic, request_pending_operations()))
) -> MqttMessage {
let topic = C8yTopic::upstream_topic(prefix);
MqttMessage::new(&topic, request_pending_operations())
}

impl CumulocityConverter {
Expand Down
Loading

0 comments on commit 6603b01

Please sign in to comment.