From aeafdf54a1cb616773704a0c176762d00b288179 Mon Sep 17 00:00:00 2001 From: Devin Kelley <105753233+devkelley@users.noreply.github.com> Date: Thu, 29 Feb 2024 16:44:08 -0800 Subject: [PATCH 1/3] Initial impl of send func --- Cargo.toml | 1 + src/lib.rs | 135 ++++++++++++++++++++++++++++++++++++++++++++++- src/transport.rs | 114 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 242 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b26c55a..be05c06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ version = "0.1.0" async-trait = { version = "0.1" } bytes = { version = "1.5" } paho-mqtt = { version = "0.12.1" } +protobuf = { version = "3.3" } rand = { version = "0.8" } regex = { version = "1.10" } serde = { version = "1.0", features = ["derive"] } diff --git a/src/lib.rs b/src/lib.rs index c84ed72..a336550 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,139 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ +use std::sync::Arc; + +use paho_mqtt::{self as mqtt, MQTT_VERSION_5}; +use protobuf::Message; +use serde::{Deserialize, Serialize}; +use up_rust::uprotocol::{UAttributes, UCode, UMessage, UPayload, UStatus}; + pub mod transport; -pub struct UPClientMqtt {} +pub type UListener = Box) + Send + Sync + 'static>; + +const UPAYLOAD_VERSION: u8 = 1; +const UATTRIBUTE_VERSION: u8 = 1; + +pub struct MqttCliConfig { + pub mqtt_port: String, + pub mqtt_namespace: String, +} + +#[derive(Serialize, Deserialize)] +pub struct UMessageBytes { + pub payload_bytes: UPayloadBytes, + pub attributes_bytes: UAttributesBytes, +} + +#[derive(Serialize, Deserialize)] +pub struct UPayloadBytes { + pub version: u8, + pub payload: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct UAttributesBytes { + pub version: u8, + pub attributes: Vec, +} + +pub struct UPClientMqtt { + pub mqtt_cli: Arc, +} + +impl UPClientMqtt { + pub async fn new(config: MqttCliConfig) -> Result { + let mqtt_uri = format!("mqtt://{}:{}", config.mqtt_namespace, config.mqtt_port); + + let mqtt_cli = mqtt::CreateOptionsBuilder::new() + .server_uri(mqtt_uri) + .mqtt_version(MQTT_VERSION_5) + .create_client() + .map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + format!("Unable to create mqtt client: {e:?}"), + ) + })?; + + let conn_opts = mqtt::ConnectOptionsBuilder::new_v5() + .clean_session(true) + .finalize(); + + mqtt_cli.connect(conn_opts).await.map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + format!("Unable to connect to mqtt broker: {e:?}"), + ) + })?; + + Ok(Self { + mqtt_cli: Arc::new(mqtt_cli), + }) + } + + fn uattributes_to_bytes(attributes: &UAttributes) -> Result { + let bytes = &attributes.write_to_bytes().map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + format!("Unable to serialize uAttributes: {e:?}"), + ) + })?; + + Ok(UAttributesBytes { + version: UATTRIBUTE_VERSION, + attributes: bytes.to_vec(), + }) + } + + fn bytes_to_uattributes(bytes: &UAttributesBytes) -> Result { + if bytes.version != UATTRIBUTE_VERSION { + return Err(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Invalid uAttributes version", + )); + } + + let attributes = Message::parse_from_bytes(&bytes.attributes).map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + format!("Unable to deserialize uAttributes: {e:?}"), + ) + })?; + + Ok(attributes) + } + + fn upayload_to_bytes(payload: &UPayload) -> Result { + let bytes = &payload.write_to_bytes().map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + format!("Unable to serialize uPayload: {e:?}"), + ) + })?; + + Ok(UPayloadBytes { + version: UPAYLOAD_VERSION, + payload: bytes.to_vec(), + }) + } + + fn deserialize_upayload(bytes: &UPayloadBytes) -> Result { + if bytes.version != UPAYLOAD_VERSION { + return Err(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Invalid uPayload version", + )); + } + + let payload = Message::parse_from_bytes(&bytes.payload).map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + format!("Unable to deserialize uPayload: {e:?}"), + ) + })?; + + Ok(payload) + } +} diff --git a/src/transport.rs b/src/transport.rs index 4f98355..ed0ed1f 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -13,21 +13,121 @@ use async_trait::async_trait; +use paho_mqtt::{self as mqtt}; + use up_rust::{ - transport::datamodel::UTransport, - uprotocol::{UMessage, UStatus, UUri}, + transport::{datamodel::UTransport, validator::Validators}, + uprotocol::{UAttributes, UCode, UMessage, UMessageType, UPayload, UStatus, UUri}, uuid::builder::UUIDBuilder, }; -use crate::UPClientMqtt; +use crate::{UMessageBytes, UPClientMqtt}; + +impl UPClientMqtt { + async fn send_msg( + &self, + topic: &str, + payload: UPayload, + attributes: UAttributes, + ) -> Result<(), UStatus> { + let payload_bytes = UPClientMqtt::upayload_to_bytes(&payload)?; + let attributes_bytes = UPClientMqtt::uattributes_to_bytes(&attributes)?; + + let msg_data = UMessageBytes { + payload_bytes, + attributes_bytes, + }; + + let msg = mqtt::MessageBuilder::new() + .topic(topic) + .payload(serde_json::to_vec(&msg_data).map_err(|e| { + UStatus::fail_with_code( + UCode::INTERNAL, + format!("Unable to serialize payload: {e:?}"), + ) + })?) + .qos(1) + .finalize(); + + self.mqtt_cli.publish(msg).await.map_err(|e| { + UStatus::fail_with_code(UCode::INTERNAL, format!("Unable to publish message: {e:?}")) + })?; + + Ok(()) + } +} #[async_trait] impl UTransport for UPClientMqtt { async fn send(&self, message: UMessage) -> Result<(), UStatus> { - // implementation goes here - println!("Sending message: {:?}", message); + // validate message + let attributes = *message.attributes.0.ok_or(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Invalid uAttributes", + ))?; - Ok(()) + let payload = *message.payload.0.ok_or(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Invalid uPayload", + ))?; + + // Match UAttributes type (Publish / Request / Response / Unspecified) + let topic = match attributes + .type_ + .enum_value() + .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Unable to parse type"))? + { + UMessageType::UMESSAGE_TYPE_PUBLISH => { + Validators::Publish + .validator() + .validate(&attributes) + .map_err(|e| { + UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + format!("Wrong Publish UAttributes {e:?}"), + ) + })?; + + attributes.clone().source + } + UMessageType::UMESSAGE_TYPE_REQUEST => { + Validators::Request + .validator() + .validate(&attributes) + .map_err(|e| { + UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + format!("Wrong Request UAttributes {e:?}"), + ) + })?; + + attributes.clone().sink + } + UMessageType::UMESSAGE_TYPE_RESPONSE => { + Validators::Response + .validator() + .validate(&attributes) + .map_err(|e| { + UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + format!("Wrong Response UAttributes {e:?}"), + ) + })?; + + attributes.clone().sink + } + UMessageType::UMESSAGE_TYPE_UNSPECIFIED => { + return Err(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Wrong Message type in UAttributes", + )) + } + }; + + // TODO: Does the UURI need to be transformed/encoded? + let topic_str = topic.to_string(); + + self.send_msg(&topic_str, payload, attributes).await } async fn register_listener( @@ -54,7 +154,7 @@ impl UTransport for UPClientMqtt { async fn receive(&self, _topic: UUri) -> Result { Err(UStatus::fail_with_code( - up_rust::uprotocol::UCode::UNIMPLEMENTED, + UCode::UNIMPLEMENTED, "This method is not implemented for mqtt. Use register_listener instead.", )) } From d3cd4695dbb7be559faa7450f0577ca389062a72 Mon Sep 17 00:00:00 2001 From: Devin Kelley <105753233+devkelley@users.noreply.github.com> Date: Fri, 1 Mar 2024 05:33:39 -0800 Subject: [PATCH 2/3] Better implementation of send message, still missing topic split --- src/lib.rs | 120 +++++++++++++++++++---------------------------- src/transport.rs | 108 ++++++++++++++++++++---------------------- 2 files changed, 100 insertions(+), 128 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a336550..758ebb5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,50 +15,41 @@ use std::sync::Arc; use paho_mqtt::{self as mqtt, MQTT_VERSION_5}; use protobuf::Message; -use serde::{Deserialize, Serialize}; -use up_rust::uprotocol::{UAttributes, UCode, UMessage, UPayload, UStatus}; +use up_rust::uprotocol::{UCode, UMessage, UStatus, UUri, UUID}; pub mod transport; pub type UListener = Box) + Send + Sync + 'static>; -const UPAYLOAD_VERSION: u8 = 1; -const UATTRIBUTE_VERSION: u8 = 1; - -pub struct MqttCliConfig { +pub struct MqttConfig { pub mqtt_port: String, pub mqtt_namespace: String, -} - -#[derive(Serialize, Deserialize)] -pub struct UMessageBytes { - pub payload_bytes: UPayloadBytes, - pub attributes_bytes: UAttributesBytes, -} - -#[derive(Serialize, Deserialize)] -pub struct UPayloadBytes { - pub version: u8, - pub payload: Vec, -} - -#[derive(Serialize, Deserialize)] -pub struct UAttributesBytes { - pub version: u8, - pub attributes: Vec, + pub ssl_options: Option, } pub struct UPClientMqtt { - pub mqtt_cli: Arc, + mqtt_client: Arc, } +#[allow(dead_code)] impl UPClientMqtt { - pub async fn new(config: MqttCliConfig) -> Result { - let mqtt_uri = format!("mqtt://{}:{}", config.mqtt_namespace, config.mqtt_port); + pub async fn new(config: MqttConfig, client_id: UUID) -> Result { + let mqtt_protocol = if config.ssl_options.is_some() { + "mqtts" + } else { + "mqtt" + }; + + let mqtt_uri = format!( + "{}://{}:{}", + mqtt_protocol, config.mqtt_namespace, config.mqtt_port + ); let mqtt_cli = mqtt::CreateOptionsBuilder::new() .server_uri(mqtt_uri) + .client_id(client_id) .mqtt_version(MQTT_VERSION_5) + .max_buffered_messages(100) .create_client() .map_err(|e| { UStatus::fail_with_code( @@ -67,6 +58,7 @@ impl UPClientMqtt { ) })?; + // TODO: Integrate ssl options when connecting, may need a username, etc. let conn_opts = mqtt::ConnectOptionsBuilder::new_v5() .clean_session(true) .finalize(); @@ -79,71 +71,57 @@ impl UPClientMqtt { })?; Ok(Self { - mqtt_cli: Arc::new(mqtt_cli), + mqtt_client: Arc::new(mqtt_cli), }) } - fn uattributes_to_bytes(attributes: &UAttributes) -> Result { - let bytes = &attributes.write_to_bytes().map_err(|e| { + fn get_client_id(&self) -> String { + self.mqtt_client.client_id() + } + + // Serialize UMessage for transport over mqtt + fn serialize_umessage(message: &UMessage) -> Result, UStatus> { + let bytes = message.write_to_bytes().map_err(|e| { UStatus::fail_with_code( UCode::INTERNAL, - format!("Unable to serialize uAttributes: {e:?}"), + format!("Unable to serialize uMessage: {e:?}"), ) })?; - Ok(UAttributesBytes { - version: UATTRIBUTE_VERSION, - attributes: bytes.to_vec(), - }) + Ok(bytes) } - fn bytes_to_uattributes(bytes: &UAttributesBytes) -> Result { - if bytes.version != UATTRIBUTE_VERSION { - return Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "Invalid uAttributes version", - )); - } - - let attributes = Message::parse_from_bytes(&bytes.attributes).map_err(|e| { + fn deserialize_umessage(bytes: &[u8]) -> Result { + let message = Message::parse_from_bytes(bytes).map_err(|e| { UStatus::fail_with_code( UCode::INTERNAL, - format!("Unable to deserialize uAttributes: {e:?}"), + format!("Unable to deserialize uMessage: {e:?}"), ) })?; - Ok(attributes) + Ok(message) } - fn upayload_to_bytes(payload: &UPayload) -> Result { - let bytes = &payload.write_to_bytes().map_err(|e| { - UStatus::fail_with_code( - UCode::INTERNAL, - format!("Unable to serialize uPayload: {e:?}"), - ) - })?; + fn mqtt_topic_from_uuri(uri: &UUri) -> Result { + //TODO: determine how to best split up the uri for an mqtt topic. + // This is a placeholder implementation. - Ok(UPayloadBytes { - version: UPAYLOAD_VERSION, - payload: bytes.to_vec(), - }) + Ok(uri.to_string()) } +} - fn deserialize_upayload(bytes: &UPayloadBytes) -> Result { - if bytes.version != UPAYLOAD_VERSION { - return Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "Invalid uPayload version", - )); - } +#[cfg(test)] +mod tests { + use super::*; - let payload = Message::parse_from_bytes(&bytes.payload).map_err(|e| { - UStatus::fail_with_code( - UCode::INTERNAL, - format!("Unable to deserialize uPayload: {e:?}"), - ) - })?; + #[test] + fn test_serialize_umessage_and_back() { + let message = UMessage { + ..Default::default() + }; - Ok(payload) + let bytes = UPClientMqtt::serialize_umessage(&message).unwrap(); + let message_from_bytes = UPClientMqtt::deserialize_umessage(&bytes).unwrap(); + assert_eq!(message, message_from_bytes); } } diff --git a/src/transport.rs b/src/transport.rs index ed0ed1f..8e5ec6e 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -17,62 +17,17 @@ use paho_mqtt::{self as mqtt}; use up_rust::{ transport::{datamodel::UTransport, validator::Validators}, - uprotocol::{UAttributes, UCode, UMessage, UMessageType, UPayload, UStatus, UUri}, + uprotocol::{UAttributes, UCode, UMessage, UMessageType, UStatus, UUri}, + uri::validator::UriValidator, uuid::builder::UUIDBuilder, }; -use crate::{UMessageBytes, UPClientMqtt}; +use crate::UPClientMqtt; impl UPClientMqtt { - async fn send_msg( - &self, - topic: &str, - payload: UPayload, - attributes: UAttributes, - ) -> Result<(), UStatus> { - let payload_bytes = UPClientMqtt::upayload_to_bytes(&payload)?; - let attributes_bytes = UPClientMqtt::uattributes_to_bytes(&attributes)?; - - let msg_data = UMessageBytes { - payload_bytes, - attributes_bytes, - }; - - let msg = mqtt::MessageBuilder::new() - .topic(topic) - .payload(serde_json::to_vec(&msg_data).map_err(|e| { - UStatus::fail_with_code( - UCode::INTERNAL, - format!("Unable to serialize payload: {e:?}"), - ) - })?) - .qos(1) - .finalize(); - - self.mqtt_cli.publish(msg).await.map_err(|e| { - UStatus::fail_with_code(UCode::INTERNAL, format!("Unable to publish message: {e:?}")) - })?; - - Ok(()) - } -} - -#[async_trait] -impl UTransport for UPClientMqtt { - async fn send(&self, message: UMessage) -> Result<(), UStatus> { - // validate message - let attributes = *message.attributes.0.ok_or(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "Invalid uAttributes", - ))?; - - let payload = *message.payload.0.ok_or(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "Invalid uPayload", - ))?; - - // Match UAttributes type (Publish / Request / Response / Unspecified) - let topic = match attributes + async fn get_topic_from_attributes(&self, attributes: &UAttributes) -> Result { + // Match UAttributes type (Publish / Request / Response) to determine what uuri to use (source or sink) + let uri_topic = match attributes .type_ .enum_value() .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Unable to parse type"))? @@ -80,7 +35,7 @@ impl UTransport for UPClientMqtt { UMessageType::UMESSAGE_TYPE_PUBLISH => { Validators::Publish .validator() - .validate(&attributes) + .validate(attributes) .map_err(|e| { UStatus::fail_with_code( UCode::INVALID_ARGUMENT, @@ -93,7 +48,7 @@ impl UTransport for UPClientMqtt { UMessageType::UMESSAGE_TYPE_REQUEST => { Validators::Request .validator() - .validate(&attributes) + .validate(attributes) .map_err(|e| { UStatus::fail_with_code( UCode::INVALID_ARGUMENT, @@ -106,7 +61,7 @@ impl UTransport for UPClientMqtt { UMessageType::UMESSAGE_TYPE_RESPONSE => { Validators::Response .validator() - .validate(&attributes) + .validate(attributes) .map_err(|e| { UStatus::fail_with_code( UCode::INVALID_ARGUMENT, @@ -124,10 +79,49 @@ impl UTransport for UPClientMqtt { } }; - // TODO: Does the UURI need to be transformed/encoded? - let topic_str = topic.to_string(); + // Validate that topic is resolved. + if !UriValidator::is_resolved(&uri_topic) { + return Err(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "UUri does not resolved", + )); + } + + // Convert UUri topic to valid mqtt topic. + let mqtt_topic = UPClientMqtt::mqtt_topic_from_uuri(&uri_topic)?; + + Ok(mqtt_topic) + } + + async fn send_message(&self, topic: &str, message: &UMessage) -> Result<(), UStatus> { + let data = UPClientMqtt::serialize_umessage(message)?; + + let msg = mqtt::MessageBuilder::new() + .topic(topic) + .payload(data) + .qos(1) + .finalize(); + + self.mqtt_client.publish(msg).await.map_err(|e| { + UStatus::fail_with_code(UCode::INTERNAL, format!("Unable to publish message: {e:?}")) + })?; + + Ok(()) + } +} + +#[async_trait] +impl UTransport for UPClientMqtt { + async fn send(&self, message: UMessage) -> Result<(), UStatus> { + // validate message + let attributes = message.attributes.as_ref().ok_or(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Invalid uAttributes", + ))?; + + let topic = self.get_topic_from_attributes(attributes).await?; - self.send_msg(&topic_str, payload, attributes).await + self.send_message(&topic, &message).await } async fn register_listener( From b37e7dfd42f831e73bd3460d9181c4f0b3930790 Mon Sep 17 00:00:00 2001 From: Devin Kelley <105753233+devkelley@users.noreply.github.com> Date: Wed, 5 Jun 2024 17:57:11 -0700 Subject: [PATCH 3/3] Initial update to 1.5.8 spec --- Cargo.toml | 8 +- src/lib.rs | 629 ++++++++++++++++++++++++++++++++++++++++++++--- src/rpc.rs | 129 ++++++++++ src/transport.rs | 153 ++++-------- 4 files changed, 772 insertions(+), 147 deletions(-) create mode 100644 src/rpc.rs diff --git a/Cargo.toml b/Cargo.toml index be05c06..19ba765 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,15 +32,19 @@ rust-version = "1.72" version = "0.1.0" [dependencies] +async-std = { version = "1.12.0", features = ["attributes"] } async-trait = { version = "0.1" } bytes = { version = "1.5" } -paho-mqtt = { version = "0.12.1" } +env_logger = { version = "0.10" } +log = { version = "^0.4" } +paho-mqtt = { version = "0.12.3" } protobuf = { version = "3.3" } rand = { version = "0.8" } regex = { version = "1.10" } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } -up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", version = "0.1.5" } +tokio = { version = "1.38", features = [ "sync"] } +up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "f5248a89cf1db6232f463ee3ce7b1cb20d79cfdb" } url = { version = "2.5" } uuid = { version = "1.7", features = ["v8"] } diff --git a/src/lib.rs b/src/lib.rs index 758ebb5..b031587 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,29 +11,85 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use std::sync::Arc; +use std::{ + collections::{HashMap, HashSet}, + str::FromStr, + sync::Arc, +}; +use async_std::{sync::RwLock, task::block_on}; +use log::info; +use mqtt::AsyncClient; use paho_mqtt::{self as mqtt, MQTT_VERSION_5}; -use protobuf::Message; -use up_rust::uprotocol::{UCode, UMessage, UStatus, UUri, UUID}; +use protobuf::Enum; +use up_rust::{ComparableListener, UAttributes, UCode, UMessage, UPriority, UStatus, UUri, UUID}; +pub mod rpc; pub mod transport; -pub type UListener = Box) + Send + Sync + 'static>; +/// Constants defining the protobuf field numbers for UAttributes. +// TODO: Convert this to a dynamically generated list. +pub const ID_NUM: &str = "1"; +pub const TYPE_NUM: &str = "2"; +pub const SOURCE_NUM: &str = "3"; +pub const SINK_NUM: &str = "4"; +pub const PRIORITY_NUM: &str = "5"; +pub const TTL_NUM: &str = "6"; +pub const PERM_LEVEL_NUM: &str = "7"; +pub const COMMSTATUS_NUM: &str = "8"; +pub const REQID_NUM: &str = "9"; +pub const TOKEN_NUM: &str = "10"; +pub const TRACEPARENT_NUM: &str = "11"; +pub const PAYLOAD_NUM: &str = "12"; +// URI Wildcard consts +const WILDCARD_AUTHORITY: &str = "*"; +const WILDCARD_ENTITY_ID: u32 = 0x0000_FFFF; +const WILDCARD_ENTITY_VERSION: u32 = 0x0000_00FF; +const WILDCARD_RESOURCE_ID: u32 = 0x0000_FFFF; + +/// Configuration for the mqtt client. pub struct MqttConfig { + /// Port of the mqtt broker to connect to. pub mqtt_port: String, - pub mqtt_namespace: String, + /// Hostname of the mqtt broker. + pub mqtt_hostname: String, + /// Optional SSL options for the mqtt connection. pub ssl_options: Option, } +/// UP Client for mqtt. pub struct UPClientMqtt { mqtt_client: Arc, + topic_listener_map: Arc>>>, + // My authority + authority_name: String, + // Indicates where client instance is running. + client_type: UPClientMqttType, +} + +pub enum UPClientMqttType { + Device, + Cloud, } #[allow(dead_code)] impl UPClientMqtt { - pub async fn new(config: MqttConfig, client_id: UUID) -> Result { + /// Create a new UPClientMqtt. + /// + /// # Arguments + /// * `config` - Configuration for the mqtt client. + /// * `client_id` - Client id for the mqtt client. + pub async fn new( + config: MqttConfig, + client_id: UUID, + authority_name: String, + client_type: UPClientMqttType, + ) -> Result { + let topic_listener_map = Arc::new(RwLock::new(HashMap::new())); + + let ptr = topic_listener_map.clone(); + let mqtt_protocol = if config.ssl_options.is_some() { "mqtts" } else { @@ -42,13 +98,12 @@ impl UPClientMqtt { let mqtt_uri = format!( "{}://{}:{}", - mqtt_protocol, config.mqtt_namespace, config.mqtt_port + mqtt_protocol, config.mqtt_hostname, config.mqtt_port ); let mqtt_cli = mqtt::CreateOptionsBuilder::new() .server_uri(mqtt_uri) .client_id(client_id) - .mqtt_version(MQTT_VERSION_5) .max_buffered_messages(100) .create_client() .map_err(|e| { @@ -58,9 +113,16 @@ impl UPClientMqtt { ) })?; + mqtt_cli.set_message_callback( + move |cli: &AsyncClient, message: std::option::Option| { + UPClientMqtt::on_receive(cli, message, ptr.clone()) + }, + ); + // TODO: Integrate ssl options when connecting, may need a username, etc. - let conn_opts = mqtt::ConnectOptionsBuilder::new_v5() - .clean_session(true) + let conn_opts = mqtt::ConnectOptionsBuilder::with_mqtt_version(MQTT_VERSION_5) + .clean_start(false) + .properties(mqtt::properties![mqtt::PropertyCode::SessionExpiryInterval => 3600]) .finalize(); mqtt_cli.connect(conn_opts).await.map_err(|e| { @@ -72,56 +134,555 @@ impl UPClientMqtt { Ok(Self { mqtt_client: Arc::new(mqtt_cli), + topic_listener_map, + authority_name, + client_type, }) } + /// Helper function that handles MQTT messages on reception. + /// + /// # Arguments + /// * `cli` - MQTT client that received the message. + /// * `message` - MQTT message received. + fn on_receive( + _cli: &AsyncClient, + message: Option, + topic_map: Arc>>>, + ) { + if let Some(msg) = message { + let topic = msg.topic(); + + // Get attributes from mqtt header. + let uattributes = + UPClientMqtt::get_uattributes_from_mqtt_properties(msg.properties()).unwrap(); + + let payload = msg.payload(); + let upayload = payload.to_vec(); + + let umessage = UMessage { + attributes: Some(uattributes).into(), + payload: Some(upayload.into()), + ..Default::default() + }; + + // Create UMessage from UAttributes and UPayload. + let topic_map_read = block_on(topic_map.read()); + let listeners = topic_map_read.get(topic); + + if let Some(listeners) = listeners { + for listener in listeners { + let umsg_clone = umessage.clone(); // need to clone outside of closure. + block_on(listener.on_receive(umsg_clone)); + } + } + } + } + + /// Get the client id of the mqtt client. fn get_client_id(&self) -> String { self.mqtt_client.client_id() } - // Serialize UMessage for transport over mqtt - fn serialize_umessage(message: &UMessage) -> Result, UStatus> { - let bytes = message.write_to_bytes().map_err(|e| { + /// Get the client indicator based on client type. + fn get_client_indicator(&self) -> String { + match self.client_type { + UPClientMqttType::Device => "d", + UPClientMqttType::Cloud => "c", + } + .to_string() + } + + /// Send UMessage to mqtt topic. + /// + /// # Arguments + /// * `topic` - Mqtt topic to send message to. + /// * `message` - UMessage to send. + /// * `attributes` - UAttributes to send with message. + async fn send_message( + &self, + topic: &str, + message: &UMessage, + attributes: &UAttributes, + ) -> Result<(), UStatus> { + let props = UPClientMqtt::create_mqtt_properties_from_uattributes(attributes)?; + let data = message.payload.clone().unwrap(); // TODO: Handle optional case and is this the best way to send data? + + let msg = mqtt::MessageBuilder::new() + .topic(topic) + .properties(props) + .payload(data) + .qos(1) + .finalize(); + + info!("Sending message: {:?}", msg); + + // UserProperty is not sent with the message.. + + self.mqtt_client.publish(msg).await.map_err(|e| { + UStatus::fail_with_code(UCode::INTERNAL, format!("Unable to publish message: {e:?}")) + })?; + + Ok(()) + } + + /// Add a UListener to an mqtt topic. + /// + /// # Arguments + /// * `topic` - Topic to add the listener to. + /// * `listener` - Listener to call when the topic recieves a message. + async fn add_listener( + &self, + topic: &str, + listener: Arc, + ) -> Result<(), UStatus> { + let mut topic_listener_map = self.topic_listener_map.write().await; + + if !topic_listener_map.contains_key(topic) { + // Subscribe to topic. + self.subscribe(topic).await?; + } + + let listeners = topic_listener_map + .entry(topic.to_string()) + .or_insert(HashSet::new()); + + // Add listener to hash set. + let comp_listener = ComparableListener::new(listener); + listeners.insert(comp_listener); + + Ok(()) + } + + /// Remove a UListener from an mqtt topic. + /// + /// # Arguments + /// * `topic` - Topic to remove the listener from. + /// * `listener` - Listener to remove from the topic subscription list. + async fn remove_listener( + &self, + topic: &str, + listener: Arc, + ) -> Result<(), UStatus> { + let mut topic_listener_map = self.topic_listener_map.write().await; + + if topic_listener_map.contains_key(topic) { + topic_listener_map + .entry(topic.to_string()) + .and_modify(|listeners| { + // Remove listener from hash set. + let comp_listener = ComparableListener::new(listener); + listeners.remove(&comp_listener); + }); + + // Remove topic if no listeners are left. + if topic_listener_map.get(topic).unwrap().is_empty() { + topic_listener_map.remove(topic); + + // Unsubscribe from topic. + self.unsubscribe(topic).await?; + } + } + + Ok(()) + } + + /// Helper function for subscribing the mqtt client to a topic. + /// + /// # Arguments + /// * `topic` - Topic to subscribe to. + async fn subscribe(&self, topic: &str) -> Result<(), UStatus> { + self.mqtt_client.subscribe(topic, 1).await.map_err(|e| { UStatus::fail_with_code( UCode::INTERNAL, - format!("Unable to serialize uMessage: {e:?}"), + format!("Unable to subscribe to topic: {e:?}"), ) })?; - Ok(bytes) + Ok(()) } - fn deserialize_umessage(bytes: &[u8]) -> Result { - let message = Message::parse_from_bytes(bytes).map_err(|e| { + /// Helper function for unsubscribing the mqtt client from a topic. + /// + /// # Arguments + /// * `topic` - Topic to unsubscribe from. + async fn unsubscribe(&self, topic: &str) -> Result<(), UStatus> { + self.mqtt_client.unsubscribe(topic).await.map_err(|e| { UStatus::fail_with_code( UCode::INTERNAL, - format!("Unable to deserialize uMessage: {e:?}"), + format!("Unable to unsubscribe from topic: {e:?}"), ) })?; - Ok(message) + Ok(()) + } + + /// Create mqtt header properties from UAttributes information. + /// + /// # Arguments + /// * `attributes` - UAttributes to create properties from. + fn create_mqtt_properties_from_uattributes( + attributes: &UAttributes, + ) -> Result { + let mut properties = mqtt::Properties::new(); + + // Map UAttributes.id + let id_val = attributes.id.as_ref().ok_or(UStatus::fail_with_code( + UCode::INTERNAL, + "No UAtrributes id found", + ))?; + properties + .push_string_pair( + mqtt::PropertyCode::UserProperty, + ID_NUM, + &id_val.to_hyphenated_string(), + ) + .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Unable to add id property"))?; + + // Map UAttributes.type + let type_val = attributes.type_.value(); + properties + .push_string_pair( + mqtt::PropertyCode::UserProperty, + TYPE_NUM, + &type_val.to_string(), + ) + .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Unable to add type property"))?; + + // Map UAttributes.source + let source_val = attributes.source.as_ref().ok_or(UStatus::fail_with_code( + UCode::INTERNAL, + "No UAtrributes source found", + ))?; + let source_string: String = source_val.into(); + properties + .push_string_pair(mqtt::PropertyCode::UserProperty, SOURCE_NUM, &source_string) + .map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Unable to add source property") + })?; + + // Map UAttributes.sink + let msg_type = up_rust::UMessageType::from_i32(type_val).unwrap(); + if msg_type != up_rust::UMessageType::UMESSAGE_TYPE_PUBLISH { + let sink_val = attributes.sink.as_ref().ok_or(UStatus::fail_with_code( + UCode::INTERNAL, + "No UAtrributes sink found", + ))?; + let sink_string: String = sink_val.into(); + properties + .push_string_pair(mqtt::PropertyCode::UserProperty, SINK_NUM, &sink_string) + .map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Unable to add sink property") + })?; + } + + // Map UAttributes.priority + let priority_enum = attributes + .priority + .enum_value() + .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Unable to parse priority"))?; + + // If priority code is unspecified, default to CS1. + let priority_val = if priority_enum == UPriority::UPRIORITY_UNSPECIFIED { + UPriority::UPRIORITY_CS1.to_priority_code() + } else { + priority_enum.to_priority_code() + }; + + properties + .push_string_pair( + mqtt::PropertyCode::UserProperty, + PRIORITY_NUM, + &priority_val, + ) + .map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Unable to add priority property") + })?; + + // Map Optional UAttributes.ttl + if let Some(ttl_val) = attributes.ttl { + properties + .push_string_pair( + mqtt::PropertyCode::UserProperty, + TTL_NUM, + &ttl_val.to_string(), + ) + .map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Unable to add ttl property") + })?; + } + + // Map Optional UAttributes.permission_level + if let Some(perm_level_val) = attributes.permission_level { + properties + .push_string_pair( + mqtt::PropertyCode::UserProperty, + PERM_LEVEL_NUM, + &perm_level_val.to_string(), + ) + .map_err(|_| { + UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to add permission_level property", + ) + })?; + } + + // Map Optional UAttributes.commstatus + if let Some(commstatus) = attributes.commstatus { + let commstatus_val = commstatus.value(); + properties + .push_string_pair( + mqtt::PropertyCode::UserProperty, + COMMSTATUS_NUM, + &commstatus_val.to_string(), + ) + .map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Unable to add comm_status property") + })?; + } + + // Map UAttributes.reqid + // TODO: Need to make this non-optional + if let Some(reqid) = attributes.reqid.as_ref() { + let reqid_val = reqid.to_hyphenated_string(); + properties + .push_string_pair(mqtt::PropertyCode::UserProperty, REQID_NUM, &reqid_val) + .map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Unable to add reqid property") + })?; + } + + // Map Optional UAttributes.token + if let Some(token_val) = attributes.token.as_ref() { + properties + .push_string_pair(mqtt::PropertyCode::UserProperty, TOKEN_NUM, token_val) + .map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Unable to add token property") + })?; + } + + // Map Optional UAttributes.traceparent + if let Some(traceparent_val) = attributes.traceparent.as_ref() { + properties + .push_string_pair( + mqtt::PropertyCode::UserProperty, + TRACEPARENT_NUM, + traceparent_val, + ) + .map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Unable to add traceparent property") + })?; + } + + // Map UAttributes.payload_format + let payload_fmt_val = attributes.payload_format.value(); + properties + .push_string_pair( + mqtt::PropertyCode::UserProperty, + PAYLOAD_NUM, + &payload_fmt_val.to_string(), + ) + .map_err(|_| { + UStatus::fail_with_code(UCode::INTERNAL, "Unable to add payload_format property") + })?; + + Ok(properties) } - fn mqtt_topic_from_uuri(uri: &UUri) -> Result { - //TODO: determine how to best split up the uri for an mqtt topic. - // This is a placeholder implementation. + /// Get UAttributes from mqtt header properties. + /// + /// # Arguments + /// * `props` - Mqtt properties to get UAttributes from. + fn get_uattributes_from_mqtt_properties( + props: &mqtt::Properties, + ) -> Result { + let mut attributes = UAttributes::default(); + + props.user_iter().for_each(|(key, value)| { + match key.as_str() { + ID_NUM => { + let id = UUID::from_str(&value) + .map_err(|_| { + UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to parse id from mqtt properties", + ) + }) + .unwrap(); + attributes.id = Some(id).into(); + } + TYPE_NUM => { + let type_val = value + .parse::() + .map_err(|_| { + UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to parse type from mqtt properties", + ) + }) + .unwrap(); + attributes.type_ = up_rust::UMessageType::from_i32(type_val).unwrap().into(); + } + SOURCE_NUM => { + let source = UUri::from_str(&value) + .map_err(|_| { + UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to parse source from mqtt properties", + ) + }) + .unwrap(); + attributes.source = Some(source).into(); + } + SINK_NUM => { + let sink = UUri::from_str(&value) + .map_err(|_| { + UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to parse sink from mqtt properties", + ) + }) + .unwrap(); + attributes.sink = Some(sink).into(); + } + PRIORITY_NUM => { + println!("Value: {value:?}"); + let priority_val = + up_rust::UPriority::try_from_priority_code(value).map_err(|_| { + UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to parse priority from mqtt properties", + ) + }); + println!("Priority: {priority_val:?}"); + attributes.priority = priority_val.unwrap().into(); + } + TTL_NUM => { + let ttl = value + .parse::() + .map_err(|_| { + UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to parse ttl from mqtt properties", + ) + }) + .unwrap(); + attributes.ttl = Some(ttl); + } + PERM_LEVEL_NUM => { + let perm_level = value + .parse::() + .map_err(|_| { + UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to parse permission level from mqtt properties", + ) + }) + .unwrap(); + attributes.permission_level = Some(perm_level); + } + COMMSTATUS_NUM => { + let commstatus_val = value + .parse::() + .map_err(|_| { + UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to parse commstatus from mqtt properties", + ) + }) + .unwrap(); + attributes.commstatus = Some(UCode::from_i32(commstatus_val).unwrap().into()); + } + REQID_NUM => { + let reqid = UUID::from_str(&value) + .map_err(|_| { + UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to parse reqid from mqtt properties", + ) + }) + .unwrap(); + attributes.reqid = Some(reqid).into(); + } + TOKEN_NUM => { + attributes.token = Some(value).into(); + } + TRACEPARENT_NUM => { + attributes.traceparent = Some(value).into(); + } + PAYLOAD_NUM => { + let payload_fmt_val = value + .parse::() + .map_err(|_| { + UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to parse payload format from mqtt properties", + ) + }) + .unwrap(); + attributes.payload_format = up_rust::UPayloadFormat::from_i32(payload_fmt_val) + .unwrap() + .into(); + } + _ => { + //TODO: Handle unknown user props} + println!("Unknown user property: {key:?} - {value:?}"); + } + } + }); - Ok(uri.to_string()) + Ok(attributes) } -} -#[cfg(test)] -mod tests { - use super::*; + /// Convert a UUri to a valid mqtt topic segment. + /// + /// # Arguments + /// * `uri` - UUri to convert to mqtt topic segment. + fn uri_to_mqtt_topic_segment(&self, uri: &UUri) -> String { + let authority = if uri.authority_name.is_empty() { + &self.authority_name + } else if uri.authority_name == WILDCARD_AUTHORITY { + "+" + } else { + &uri.authority_name + }; - #[test] - fn test_serialize_umessage_and_back() { - let message = UMessage { - ..Default::default() + let ue_id = if uri.ue_id == WILDCARD_ENTITY_ID { + "+".into() + } else { + format!("{:X}", uri.ue_id) + }; + + let ue_ver = if uri.ue_version_major == WILDCARD_ENTITY_VERSION { + "+".into() + } else { + format!("{:X}", uri.ue_version_major) + }; + + let res_id = if uri.resource_id == WILDCARD_RESOURCE_ID { + "+".into() + } else { + format!("{:X}", uri.resource_id) + }; + + format!("{authority}/{ue_id}/{ue_ver}/{res_id}") + } + + /// Create a valid mqtt topic based on a source and sink UUri. + /// + /// # Arguments + /// * `src_uri` - Source UUri. + /// * `sink_uri` - Optional sink UUri. + fn to_mqtt_topic_string(&self, src_uri: &UUri, sink_uri: Option<&UUri>) -> String { + let cli_indicator = &self.get_client_indicator(); + let src_segment = &self.uri_to_mqtt_topic_segment(src_uri); + let sink_segment = if let Some(sink) = sink_uri { + self.uri_to_mqtt_topic_segment(sink) + } else { + "///".into() }; - let bytes = UPClientMqtt::serialize_umessage(&message).unwrap(); - let message_from_bytes = UPClientMqtt::deserialize_umessage(&bytes).unwrap(); - assert_eq!(message, message_from_bytes); + format!("{cli_indicator}/{src_segment}/{sink_segment}") } } diff --git a/src/rpc.rs b/src/rpc.rs new file mode 100644 index 0000000..00b108e --- /dev/null +++ b/src/rpc.rs @@ -0,0 +1,129 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use crate::UPClientMqtt; +use async_std::future::timeout; +use async_trait::async_trait; +use std::{sync::Arc, time::Duration}; +use tokio::sync::mpsc; +use up_rust::{ + RpcClient, RpcClientResult, UAttributesValidators, UListener, UMessage, UMessageError, UStatus, + UUri, +}; + +struct RpcListener { + sender: mpsc::Sender, + correlation_id: String, +} + +impl RpcListener { + pub fn new(sender: mpsc::Sender, correlation_id: String) -> Self { + Self { + sender, + correlation_id, + } + } +} + +#[async_trait] +impl UListener for RpcListener { + // TODO: Handle send errors + async fn on_error(&self, status: UStatus) { + println!("Error: {:?}", status); + + let _ = self + .sender + .send(RpcClientResult::Err(UMessageError::PayloadError( + status.get_message(), + ))) + .await; + } + + // TODO: Handle send and unwrap errors + async fn on_receive(&self, message: UMessage) { + println!("Received message: {:?}", message); + + if message + .attributes + .as_ref() + .unwrap() + .reqid + .as_ref() + .unwrap() + .to_hyphenated_string() + != self.correlation_id + { + // Ignore message + return; + } + + let _ = self.sender.send(RpcClientResult::Ok(message)).await; + } +} + +#[async_trait] +impl RpcClient for UPClientMqtt { + async fn invoke_method(&self, _method: UUri, request: UMessage) -> RpcClientResult { + let attributes = request.attributes.clone().unwrap(); + + // Validate uattributes is an RPC request + UAttributesValidators::Request + .validator() + .validate(&attributes)?; + + // Sink is required for RPC, and this should be caught by the UAttributesValidator. + let sink = attributes + .sink + .as_ref() + .ok_or(UMessageError::PayloadError("Sink not found".to_string()))?; + + // Request id is required for RPC, and this should be caught by the UAttributesValidator. + let req_id = attributes + .reqid + .as_ref() + .ok_or(UMessageError::PayloadError( + "Request ID not found".to_string(), + ))?; + + // Get MQTT request topic from UAttributes + let request_topic = self.to_mqtt_topic_string(&attributes.source, Some(&sink)); + + // Get MQTT response topic from UAttributes + let response_topic = self.to_mqtt_topic_string(&sink, Some(&attributes.source)); + + // Create a channel to receive RPC response from listener + let (sender, mut receiver) = mpsc::channel(100); + + let listener = RpcListener::new(sender, req_id.to_hyphenated_string()); + + self.add_listener(&response_topic, Arc::new(listener)) + .await + .map_err(|err| UMessageError::PayloadError(err.get_message()))?; + + // Send RPC request + self.send_message(&request_topic, &request, &attributes) + .await + .map_err(|err| UMessageError::PayloadError(err.get_message()))?; + + // Recieve RPC response, error on ttl. + let rpc_response = if let Some(ttl) = attributes.ttl { + timeout(Duration::from_millis(u64::from(ttl)), receiver.recv()).await + } else { + Ok(receiver.recv().await) + } + .map_err(|_| UMessageError::PayloadError("RPC response timeout".to_string()))? + .ok_or_else(|| UMessageError::PayloadError("RPC response error".to_string()))?; + + rpc_response + } +} diff --git a/src/transport.rs b/src/transport.rs index 8e5ec6e..8eecc2d 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -11,105 +11,14 @@ * SPDX-License-Identifier: Apache-2.0 ********************************************************************************/ -use async_trait::async_trait; +use std::sync::Arc; -use paho_mqtt::{self as mqtt}; +use async_trait::async_trait; -use up_rust::{ - transport::{datamodel::UTransport, validator::Validators}, - uprotocol::{UAttributes, UCode, UMessage, UMessageType, UStatus, UUri}, - uri::validator::UriValidator, - uuid::builder::UUIDBuilder, -}; +use up_rust::{UCode, UListener, UMessage, UStatus, UTransport, UUri}; use crate::UPClientMqtt; -impl UPClientMqtt { - async fn get_topic_from_attributes(&self, attributes: &UAttributes) -> Result { - // Match UAttributes type (Publish / Request / Response) to determine what uuri to use (source or sink) - let uri_topic = match attributes - .type_ - .enum_value() - .map_err(|_| UStatus::fail_with_code(UCode::INTERNAL, "Unable to parse type"))? - { - UMessageType::UMESSAGE_TYPE_PUBLISH => { - Validators::Publish - .validator() - .validate(attributes) - .map_err(|e| { - UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - format!("Wrong Publish UAttributes {e:?}"), - ) - })?; - - attributes.clone().source - } - UMessageType::UMESSAGE_TYPE_REQUEST => { - Validators::Request - .validator() - .validate(attributes) - .map_err(|e| { - UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - format!("Wrong Request UAttributes {e:?}"), - ) - })?; - - attributes.clone().sink - } - UMessageType::UMESSAGE_TYPE_RESPONSE => { - Validators::Response - .validator() - .validate(attributes) - .map_err(|e| { - UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - format!("Wrong Response UAttributes {e:?}"), - ) - })?; - - attributes.clone().sink - } - UMessageType::UMESSAGE_TYPE_UNSPECIFIED => { - return Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "Wrong Message type in UAttributes", - )) - } - }; - - // Validate that topic is resolved. - if !UriValidator::is_resolved(&uri_topic) { - return Err(UStatus::fail_with_code( - UCode::INVALID_ARGUMENT, - "UUri does not resolved", - )); - } - - // Convert UUri topic to valid mqtt topic. - let mqtt_topic = UPClientMqtt::mqtt_topic_from_uuri(&uri_topic)?; - - Ok(mqtt_topic) - } - - async fn send_message(&self, topic: &str, message: &UMessage) -> Result<(), UStatus> { - let data = UPClientMqtt::serialize_umessage(message)?; - - let msg = mqtt::MessageBuilder::new() - .topic(topic) - .payload(data) - .qos(1) - .finalize(); - - self.mqtt_client.publish(msg).await.map_err(|e| { - UStatus::fail_with_code(UCode::INTERNAL, format!("Unable to publish message: {e:?}")) - })?; - - Ok(()) - } -} - #[async_trait] impl UTransport for UPClientMqtt { async fn send(&self, message: UMessage) -> Result<(), UStatus> { @@ -119,34 +28,56 @@ impl UTransport for UPClientMqtt { "Invalid uAttributes", ))?; - let topic = self.get_topic_from_attributes(attributes).await?; + // validate source and sink uuri's contain no wildcards + let src_uri = attributes.source.as_ref().ok_or(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Invalid source: expected a source value, none was found", + ))?; - self.send_message(&topic, &message).await + src_uri.verify_no_wildcards().map_err(|e| { + UStatus::fail_with_code(UCode::INVALID_ARGUMENT, format!("Invalid source: {e:?}")) + })?; + + let sink_uri = attributes.sink.as_ref(); + + if let Some(sink) = sink_uri { + sink.verify_no_wildcards().map_err(|e| { + UStatus::fail_with_code(UCode::INVALID_ARGUMENT, format!("Invalid sink: {e:?}")) + })?; + } + + let topic = self.to_mqtt_topic_string(src_uri, sink_uri); + + self.send_message(&topic, &message, attributes).await } async fn register_listener( &self, - topic: UUri, - listener: Box) + Send + Sync + 'static>, - ) -> Result { - // implementation goes here - println!("Registering listener for topic: {:?}", topic); - - listener(Ok(UMessage::new())); + source_filter: &UUri, + sink_filter: Option<&UUri>, + listener: Arc, + ) -> Result<(), UStatus> { + let topic = self.to_mqtt_topic_string(source_filter, sink_filter); - let listener_id = UUIDBuilder::new().build().to_string(); - - Ok(listener_id) + self.add_listener(&topic, listener).await } - async fn unregister_listener(&self, topic: UUri, listener: &str) -> Result<(), UStatus> { - // implementation goes here - println!("Unregistering listener: {listener} for topic: {:?}", topic); + async fn unregister_listener( + &self, + source_filter: &UUri, + sink_filter: Option<&UUri>, + listener: Arc, + ) -> Result<(), UStatus> { + let topic: String = self.to_mqtt_topic_string(source_filter, sink_filter); - Ok(()) + self.remove_listener(&topic, listener).await } - async fn receive(&self, _topic: UUri) -> Result { + async fn receive( + &self, + _source_filter: &UUri, + _sink_filter: Option<&UUri>, + ) -> Result { Err(UStatus::fail_with_code( UCode::UNIMPLEMENTED, "This method is not implemented for mqtt. Use register_listener instead.",