diff --git a/crates/common/mqtt_channel/src/channel.rs b/crates/common/mqtt_channel/src/channel.rs index 2b9db268f4f..6729422b183 100644 --- a/crates/common/mqtt_channel/src/channel.rs +++ b/crates/common/mqtt_channel/src/channel.rs @@ -1,20 +1,20 @@ -use crate::Message; use crate::MqttError; +use crate::MqttMessage; use async_trait::async_trait; use futures::channel::mpsc; use futures::SinkExt; use futures::StreamExt; #[async_trait] -pub trait SubChannel: StreamExt + Unpin + Send {} +pub trait SubChannel: StreamExt + Unpin + Send {} #[async_trait] pub trait ErrChannel: StreamExt + Unpin + Send {} #[async_trait] -pub trait PubChannel: SinkExt + Unpin + Send { +pub trait PubChannel: SinkExt + Unpin + Send { /// Publish a message - unless the pub channel has been closed. - async fn publish(&mut self, message: Message) -> Result<(), MqttError> { + async fn publish(&mut self, message: MqttMessage) -> Result<(), MqttError> { Ok(self .send(message) .await @@ -23,10 +23,10 @@ pub trait PubChannel: SinkExt + Unpin + Send { } #[async_trait] -impl SubChannel for mpsc::UnboundedReceiver {} +impl SubChannel for mpsc::UnboundedReceiver {} #[async_trait] impl ErrChannel for mpsc::UnboundedReceiver {} #[async_trait] -impl PubChannel for mpsc::UnboundedSender {} +impl PubChannel for mpsc::UnboundedSender {} diff --git a/crates/common/mqtt_channel/src/config.rs b/crates/common/mqtt_channel/src/config.rs index 56b6525d6e8..fbd1a6cfe43 100644 --- a/crates/common/mqtt_channel/src/config.rs +++ b/crates/common/mqtt_channel/src/config.rs @@ -1,4 +1,4 @@ -use crate::Message; +use crate::MqttMessage; use crate::TopicFilter; use certificate::parse_root_certificate; use certificate::CertificateError; @@ -51,7 +51,7 @@ pub struct Config { /// LastWill message for a mqtt client /// /// Default: None - pub last_will_message: Option, + pub last_will_message: Option, /// With first message on connection /// @@ -130,17 +130,17 @@ impl zeroize::Zeroize for PrivateKey { #[derive(Clone)] pub struct InitMessageFn { - initfn: Arc Message + Send + Sync>>, + initfn: Arc MqttMessage + Send + Sync>>, } impl InitMessageFn { - pub fn new(call_back: impl Fn() -> Message + Sync + Send + 'static) -> InitMessageFn { + pub fn new(call_back: impl Fn() -> MqttMessage + Sync + Send + 'static) -> InitMessageFn { InitMessageFn { initfn: Arc::new(Box::new(call_back)), } } - pub fn new_init_message(&self) -> Message { + pub fn new_init_message(&self) -> MqttMessage { (*self.initfn)() } } @@ -241,7 +241,7 @@ impl Config { } /// Set the last will message, this will be published when the mqtt connection gets closed. - pub fn with_last_will_message(self, lwm: Message) -> Self { + pub fn with_last_will_message(self, lwm: MqttMessage) -> Self { Self { last_will_message: Some(lwm), ..self @@ -251,7 +251,7 @@ impl Config { /// Set the initial message pub fn with_initial_message( self, - initial_message: impl Fn() -> Message + Send + Sync + 'static, + initial_message: impl Fn() -> MqttMessage + Send + Sync + 'static, ) -> Self { Self { initial_message: Some(InitMessageFn::new(initial_message)), diff --git a/crates/common/mqtt_channel/src/connection.rs b/crates/common/mqtt_channel/src/connection.rs index 31cf0d31621..550a536dba1 100644 --- a/crates/common/mqtt_channel/src/connection.rs +++ b/crates/common/mqtt_channel/src/connection.rs @@ -1,7 +1,7 @@ use crate::Config; use crate::ErrChannel; -use crate::Message; use crate::MqttError; +use crate::MqttMessage; use crate::PubChannel; use crate::SubChannel; use futures::channel::mpsc; @@ -24,10 +24,10 @@ use tokio::time::sleep; /// A connection to some MQTT server pub struct Connection { /// The channel of the input messages received by this connection. - pub received: mpsc::UnboundedReceiver, + pub received: mpsc::UnboundedReceiver, /// The channel of the output messages to be published on this connection. - pub published: mpsc::UnboundedSender, + pub published: mpsc::UnboundedSender, /// The channel of the error messages received by this connection. pub errors: mpsc::UnboundedReceiver, @@ -120,7 +120,7 @@ impl Connection { async fn open( config: &Config, - mut message_sender: mpsc::UnboundedSender, + mut message_sender: mpsc::UnboundedSender, mut error_sender: mpsc::UnboundedSender, ) -> Result<(AsyncClient, EventLoop), MqttError> { const INSECURE_MQTT_PORT: u16 = 1883; @@ -198,7 +198,7 @@ impl Connection { mqtt_client: AsyncClient, config: Config, mut event_loop: EventLoop, - mut message_sender: mpsc::UnboundedSender, + mut message_sender: mpsc::UnboundedSender, mut error_sender: mpsc::UnboundedSender, ) -> Result<(), MqttError> { loop { @@ -269,9 +269,9 @@ impl Connection { async fn sender_loop( mqtt_client: AsyncClient, - mut messages_receiver: mpsc::UnboundedReceiver, + mut messages_receiver: mpsc::UnboundedReceiver, mut error_sender: mpsc::UnboundedSender, - last_will: Option, + last_will: Option, done: oneshot::Sender<()>, ) { loop { diff --git a/crates/common/mqtt_channel/src/lib.rs b/crates/common/mqtt_channel/src/lib.rs index 85ca4ab23f9..389b6f23dd7 100644 --- a/crates/common/mqtt_channel/src/lib.rs +++ b/crates/common/mqtt_channel/src/lib.rs @@ -1,7 +1,7 @@ //! A library to connect the local MQTT bus, publish messages and subscribe topics. //! //! ```no_run -//! use mqtt_channel::{Config, Connection, Message, Topic, MqttError, StreamExt, SinkExt}; +//! use mqtt_channel::{Config, Connection, MqttMessage, Topic, MqttError, StreamExt, SinkExt}; //! use std::convert::TryInto; //! //! #[tokio::main] @@ -17,7 +17,7 @@ //! //! // Messages are published by sending them on the published channel //! let output_topic = "test/output/topic".try_into()?; -//! published_messages.send(Message::new(&output_topic, "hello mqtt")).await?; +//! published_messages.send(MqttMessage::new(&output_topic, "hello mqtt")).await?; //! //! // Messages are received from the subscriptions on the received channel //! let message = received_messages.next().await.ok_or(MqttError::ReadOnClosedConnection)?; diff --git a/crates/common/mqtt_channel/src/messages.rs b/crates/common/mqtt_channel/src/messages.rs index fca31cd0acd..e2ca13c05c5 100644 --- a/crates/common/mqtt_channel/src/messages.rs +++ b/crates/common/mqtt_channel/src/messages.rs @@ -12,7 +12,7 @@ use std::fmt::Write; /// A message to be sent to or received from MQTT. #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct Message { +pub struct MqttMessage { pub topic: Topic, pub payload: DebugPayload, #[serde(serialize_with = "serialize_qos", deserialize_with = "deserialize_qos")] @@ -135,12 +135,12 @@ impl DebugPayload { /// A message payload pub type Payload = Vec; -impl Message { - pub fn new(topic: &Topic, payload: B) -> Message +impl MqttMessage { + pub fn new(topic: &Topic, payload: B) -> MqttMessage where B: Into, { - Message { + MqttMessage { topic: topic.clone(), payload: DebugPayload(payload.into()), qos: QoS::AtLeastOnce, @@ -175,15 +175,15 @@ impl Message { } } -impl From for Publish { - fn from(val: Message) -> Self { +impl From for Publish { + fn from(val: MqttMessage) -> Self { let mut publish = Publish::new(&val.topic.name, val.qos, val.payload.0); publish.retain = val.retain; publish } } -impl From for Message { +impl From for MqttMessage { fn from(msg: Publish) -> Self { let Publish { topic, @@ -193,7 +193,7 @@ impl From for Message { .. } = msg; - Message { + MqttMessage { topic: Topic::new_unchecked(&topic), payload: DebugPayload(payload.to_vec()), qos, @@ -202,13 +202,13 @@ impl From for Message { } } -impl From<(T, U)> for Message +impl From<(T, U)> for MqttMessage where T: AsRef, U: AsRef, { fn from(value: (T, U)) -> Self { - Message::new(&Topic::new_unchecked(value.0.as_ref()), value.1.as_ref()) + MqttMessage::new(&Topic::new_unchecked(value.0.as_ref()), value.1.as_ref()) } } @@ -221,7 +221,7 @@ mod tests { #[test] fn check_null_terminated_messages() { let topic = Topic::new("trimmed").unwrap(); - let message = Message::new(&topic, &b"123\0"[..]); + let message = MqttMessage::new(&topic, &b"123\0"[..]); assert_eq!(message.payload_bytes(), b"123"); } @@ -229,7 +229,7 @@ mod tests { #[test] fn payload_bytes_removes_only_last_null_char() { let topic = Topic::new("trimmed").unwrap(); - let message = Message::new(&topic, &b"123\0\0"[..]); + let message = MqttMessage::new(&topic, &b"123\0\0"[..]); assert_eq!(message.payload_bytes(), b"123\0"); } @@ -237,21 +237,21 @@ mod tests { #[test] fn check_empty_messages() { let topic = Topic::new("trimmed").unwrap(); - let message = Message::new(&topic, &b""[..]); + let message = MqttMessage::new(&topic, &b""[..]); assert_eq!(message.payload_bytes(), b""); } #[test] fn check_non_null_terminated_messages() { let topic = Topic::new("trimmed").unwrap(); - let message = Message::new(&topic, &b"123"[..]); + let message = MqttMessage::new(&topic, &b"123"[..]); assert_eq!(message.payload_bytes(), b"123"); } #[test] fn payload_str_with_invalid_utf8_char_in_the_middle() { let topic = Topic::new("trimmed").unwrap(); - let message = Message::new(&topic, &b"temperature\xc3\x28"[..]); + let message = MqttMessage::new(&topic, &b"temperature\xc3\x28"[..]); assert_eq!( message.payload_str().unwrap_err().to_string(), "Invalid UTF8 payload: invalid utf-8 sequence of 1 bytes from index 11: temperature..." @@ -260,7 +260,7 @@ mod tests { #[test] fn payload_str_with_invalid_utf8_char_in_the_beginning() { let topic = Topic::new("trimmed").unwrap(); - let message = Message::new(&topic, &b"\xc3\x28"[..]); + let message = MqttMessage::new(&topic, &b"\xc3\x28"[..]); assert_eq!( message.payload_str().unwrap_err().to_string(), "Invalid UTF8 payload: invalid utf-8 sequence of 1 bytes from index 0: ..." @@ -269,7 +269,7 @@ mod tests { #[test] fn message_serialize_deserialize() { - let message = Message { + let message = MqttMessage { topic: Topic::new("test").unwrap(), payload: DebugPayload("test-payload".as_bytes().to_vec()), qos: QoS::AtMostOnce, @@ -278,7 +278,8 @@ mod tests { let json = serde_json::to_value(&message).expect("Serialization failed"); assert_eq!(json.get("payload").unwrap(), &json!("test-payload")); - let deserialized: Message = serde_json::from_value(json).expect("Deserialization failed"); + let deserialized: MqttMessage = + serde_json::from_value(json).expect("Deserialization failed"); assert_eq!(deserialized, message); } } diff --git a/crates/common/mqtt_channel/src/tests.rs b/crates/common/mqtt_channel/src/tests.rs index b4c49e74629..db6e4fd3859 100644 --- a/crates/common/mqtt_channel/src/tests.rs +++ b/crates/common/mqtt_channel/src/tests.rs @@ -45,18 +45,18 @@ async fn subscribing_to_messages() -> Result<(), anyhow::Error> { #[derive(Debug, Clone, Eq, PartialEq)] enum MaybeMessage { - Next(Message), + Next(MqttMessage), Eos, Timeout, } -fn message(t: &str, p: &str) -> Message { +fn message(t: &str, p: &str) -> MqttMessage { let topic = Topic::new(t).expect("a valid topic"); let payload = p.as_bytes(); - Message::new(&topic, payload) + MqttMessage::new(&topic, payload) } -async fn next_message(received: &mut (impl StreamExt + Unpin)) -> MaybeMessage { +async fn next_message(received: &mut (impl StreamExt + Unpin)) -> MaybeMessage { match tokio::time::timeout(TIMEOUT, received.next()).await { Ok(Some(msg)) => MaybeMessage::Next(msg), Ok(None) => MaybeMessage::Eos, @@ -181,7 +181,7 @@ async fn implementing_a_message_mapper() -> Result<(), anyhow::Error> { while let MaybeMessage::Next(msg) = next_message(&mut input).await { let req = msg.payload_str().expect("utf8 payload"); let res = req.to_uppercase(); - let msg = Message::new(&out_topic, res.as_bytes()); + let msg = MqttMessage::new(&out_topic, res.as_bytes()); if output.send(msg).await.is_err() { // the connection has been closed break; @@ -253,7 +253,7 @@ async fn testing_an_mqtt_client_without_mqtt() -> Result<(), anyhow::Error> { while let Some(msg) = input.next().await { let req = msg.payload_str().expect("utf8 payload"); let res = req.to_uppercase(); - let msg = Message::new(&out_topic, res.as_bytes()); + let msg = MqttMessage::new(&out_topic, res.as_bytes()); if output.send(msg).await.is_err() { break; } @@ -456,15 +456,15 @@ async fn ensure_that_all_messages_are_sent_before_disconnect() -> Result<(), any let mut con = Connection::new(&mqtt_config).await.expect("a connection"); con.published - .send(Message::new(&topic, "datum 1")) + .send(MqttMessage::new(&topic, "datum 1")) .await .expect("message sent"); con.published - .send(Message::new(&topic, "datum 2")) + .send(MqttMessage::new(&topic, "datum 2")) .await .expect("message sent"); con.published - .send(Message::new(&topic, "datum 3")) + .send(MqttMessage::new(&topic, "datum 3")) .await .expect("message sent"); @@ -498,7 +498,7 @@ async fn ensure_that_last_will_message_is_delivered() -> Result<(), anyhow::Erro let topic = Topic::new_unchecked(topic); let mqtt_config = Config::default() .with_port(broker.port) - .with_last_will_message(Message { + .with_last_will_message(MqttMessage { topic: topic.clone(), payload: "good bye".to_string().into(), qos: QoS::AtLeastOnce, @@ -507,17 +507,17 @@ async fn ensure_that_last_will_message_is_delivered() -> Result<(), anyhow::Erro let mut con = Connection::new(&mqtt_config).await.expect("a connection"); con.published - .send(Message::new(&topic, "hello 1")) + .send(MqttMessage::new(&topic, "hello 1")) .await .expect("message sent"); con.published - .send(Message::new(&topic, "hello 2")) + .send(MqttMessage::new(&topic, "hello 2")) .await .expect("message sent"); con.published - .send(Message::new(&topic, "hello 3")) + .send(MqttMessage::new(&topic, "hello 3")) .await .expect("message sent"); diff --git a/crates/common/mqtt_channel/src/topics.rs b/crates/common/mqtt_channel/src/topics.rs index 5e4fd95c7fc..1a930f3704c 100644 --- a/crates/common/mqtt_channel/src/topics.rs +++ b/crates/common/mqtt_channel/src/topics.rs @@ -1,5 +1,5 @@ use crate::errors::MqttError; -use crate::Message; +use crate::MqttMessage; use rumqttc::QoS; use rumqttc::SubscribeFilter; use serde::Deserialize; @@ -115,7 +115,7 @@ impl TopicFilter { } /// Check if the given message matches this filter pattern. - pub fn accept(&self, msg: &Message) -> bool { + pub fn accept(&self, msg: &MqttMessage) -> bool { self.accept_topic(&msg.topic) } diff --git a/crates/common/tedge_utils/src/size_threshold.rs b/crates/common/tedge_utils/src/size_threshold.rs index 39cf3e1998d..a1edd7d7695 100644 --- a/crates/common/tedge_utils/src/size_threshold.rs +++ b/crates/common/tedge_utils/src/size_threshold.rs @@ -1,11 +1,11 @@ -use mqtt_channel::Message; +use mqtt_channel::MqttMessage; use thiserror::Error; #[derive(Debug)] pub struct SizeThreshold(pub usize); impl SizeThreshold { - pub fn validate(&self, input: &Message) -> Result<(), SizeThresholdExceededError> { + pub fn validate(&self, input: &MqttMessage) -> Result<(), SizeThresholdExceededError> { let size = input.payload_bytes().len(); let threshold = self.0; if size > threshold { diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 4a3a8dae5a9..5bf83cbc9d9 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -154,8 +154,11 @@ impl C8yMqttJwtTokenRetriever { mqtt_con .published .publish( - mqtt_channel::Message::new(&Topic::new_unchecked(&pub_topic), "".to_string()) - .with_qos(mqtt_channel::QoS::AtMostOnce), + mqtt_channel::MqttMessage::new( + &Topic::new_unchecked(&pub_topic), + "".to_string(), + ) + .with_qos(mqtt_channel::QoS::AtMostOnce), ) .await?; info!("JWT token requested"); diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index 4628b6772a7..48bec29f3c0 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -360,7 +360,7 @@ mod tests { use crate::json_c8y::AlarmSeverity; use anyhow::Result; use assert_matches::assert_matches; - use mqtt_channel::Message; + use mqtt_channel::MqttMessage; use mqtt_channel::Topic; use serde_json::json; use std::collections::HashSet; @@ -723,7 +723,7 @@ mod tests { ) .unwrap(); - let child_registration = EntityRegistrationMessage::new(&Message::new( + let child_registration = EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new_unchecked("te/device/external_source//"), r#"{"@id": "external_source", "@type": "child-device"}"#, )) diff --git a/crates/core/c8y_api/src/smartrest/inventory.rs b/crates/core/c8y_api/src/smartrest/inventory.rs index 04b792c5718..42d3e473eb1 100644 --- a/crates/core/c8y_api/src/smartrest/inventory.rs +++ b/crates/core/c8y_api/src/smartrest/inventory.rs @@ -11,7 +11,7 @@ use crate::smartrest::csv::fields_to_csv_string; use crate::smartrest::topic::publish_topic_from_ancestors; -use mqtt_channel::Message; +use mqtt_channel::MqttMessage; use tedge_config::TopicPrefix; use super::message::sanitize_for_smartrest; @@ -25,7 +25,7 @@ pub fn child_device_creation_message( device_type: Option<&str>, ancestors: &[String], prefix: &TopicPrefix, -) -> Result { +) -> Result { if child_id.is_empty() { return Err(InvalidValueError { field_name: "child_id".to_string(), @@ -45,7 +45,7 @@ pub fn child_device_creation_message( }); } - Ok(Message::new( + Ok(MqttMessage::new( &publish_topic_from_ancestors(ancestors, prefix), // XXX: if any arguments contain commas, output will be wrong format!( @@ -67,7 +67,7 @@ pub fn service_creation_message( service_status: &str, ancestors: &[String], prefix: &TopicPrefix, -) -> Result { +) -> Result { // TODO: most of this noise can be eliminated by implementing `Serialize`/`Deserialize` for smartrest format if service_id.is_empty() { return Err(InvalidValueError { @@ -94,7 +94,7 @@ pub fn service_creation_message( }); } - Ok(Message::new( + Ok(MqttMessage::new( &publish_topic_from_ancestors(ancestors, prefix), fields_to_csv_string(&[ "102", @@ -120,13 +120,13 @@ pub fn service_status_update_message( external_ids: &[impl AsRef], service_status: &str, prefix: &TopicPrefix, -) -> Message { +) -> MqttMessage { let topic = publish_topic_from_ancestors(external_ids, prefix); let service_status = sanitize_for_smartrest(service_status, super::message::MAX_PAYLOAD_LIMIT_IN_BYTES); - Message::new(&topic, fields_to_csv_string(&["104", &service_status])) + MqttMessage::new(&topic, fields_to_csv_string(&["104", &service_status])) } #[derive(thiserror::Error, Debug)] diff --git a/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs b/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs index 4e70ff0a217..3f184dbd0ce 100644 --- a/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs +++ b/crates/core/c8y_api/src/smartrest/smartrest_serializer.rs @@ -2,7 +2,7 @@ use crate::smartrest::csv::fields_to_csv_string; use crate::smartrest::error::SmartRestSerializerError; use crate::smartrest::topic::C8yTopic; use csv::StringRecord; -use mqtt_channel::Message; +use mqtt_channel::MqttMessage; use serde::ser::SerializeSeq; use serde::Serialize; use serde::Serializer; @@ -288,21 +288,21 @@ where /// Helper to generate a SmartREST operation status message pub trait OperationStatusMessage { - fn executing(prefix: &TopicPrefix) -> Message { + fn executing(prefix: &TopicPrefix) -> MqttMessage { Self::create_message(Self::status_executing(), prefix) } - fn successful(parameter: Option<&str>, prefix: &TopicPrefix) -> Message { + fn successful(parameter: Option<&str>, prefix: &TopicPrefix) -> MqttMessage { Self::create_message(Self::status_successful(parameter), prefix) } - fn failed(failure_reason: &str, prefix: &TopicPrefix) -> Message { + fn failed(failure_reason: &str, prefix: &TopicPrefix) -> MqttMessage { Self::create_message(Self::status_failed(failure_reason), prefix) } - fn create_message(payload: SmartRest, prefix: &TopicPrefix) -> Message { + fn create_message(payload: SmartRest, prefix: &TopicPrefix) -> MqttMessage { let topic = C8yTopic::SmartRestResponse.to_topic(prefix).unwrap(); // never fail - Message::new(&topic, payload) + MqttMessage::new(&topic, payload) } fn status_executing() -> SmartRest; diff --git a/crates/core/c8y_api/src/utils.rs b/crates/core/c8y_api/src/utils.rs index a91c97ef66f..1d6c83c22bc 100644 --- a/crates/core/c8y_api/src/utils.rs +++ b/crates/core/c8y_api/src/utils.rs @@ -1,10 +1,10 @@ pub mod bridge { - use mqtt_channel::Message; + use mqtt_channel::MqttMessage; use tedge_api::main_device_health_topic; use tedge_api::MQTT_BRIDGE_DOWN_PAYLOAD; use tedge_api::MQTT_BRIDGE_UP_PAYLOAD; - pub fn is_c8y_bridge_established(message: &Message, service: &str) -> bool { + pub fn is_c8y_bridge_established(message: &MqttMessage, service: &str) -> bool { let c8y_bridge_health_topic = main_device_health_topic(service); match message.payload_str() { Ok(payload) => { @@ -30,11 +30,11 @@ pub mod bridge { pub mod child_device { use crate::smartrest::topic::C8yTopic; - use mqtt_channel::Message; + use mqtt_channel::MqttMessage; use tedge_config::TopicPrefix; - pub fn new_child_device_message(child_id: &str, prefix: &TopicPrefix) -> Message { - Message::new( + pub fn new_child_device_message(child_id: &str, prefix: &TopicPrefix) -> MqttMessage { + MqttMessage::new( &C8yTopic::upstream_topic(prefix), format!("101,{child_id},{child_id},thin-edge.io-child"), ) @@ -43,7 +43,7 @@ pub mod child_device { #[cfg(test)] mod tests { - use mqtt_channel::Message; + use mqtt_channel::MqttMessage; use mqtt_channel::Topic; use test_case::test_case; @@ -59,7 +59,7 @@ mod tests { #[test_case("tedge/not/health/topic", "0", false)] fn test_bridge_is_established(topic: &str, payload: &str, expected: bool) { let topic = Topic::new(topic).unwrap(); - let message = Message::new(&topic, payload); + let message = MqttMessage::new(&topic, payload); let actual = is_c8y_bridge_established(&message, "tedge-mapper-bridge-c8y"); assert_eq!(actual, expected); diff --git a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs b/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs index e3c575a866a..3a625f76211 100644 --- a/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs +++ b/crates/core/tedge_agent/src/tedge_operation_converter/actor.rs @@ -29,7 +29,6 @@ use tedge_api::workflow::OperationAction; use tedge_api::workflow::WorkflowExecutionError; use tedge_api::workflow::WorkflowSupervisor; use tedge_api::Jsonify; -use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; use tedge_script_ext::Execute; @@ -111,7 +110,7 @@ impl TedgeOperationConverterActor { let meta_topic = self .mqtt_schema .capability_topic_for(&self.device_topic_id, operation); - let message = Message::new(&meta_topic, payload.to_json()) + let message = MqttMessage::new(&meta_topic, payload.to_json()) .with_retain() .with_qos(QoS::AtLeastOnce); self.mqtt_publisher.send(message).await?; diff --git a/crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs b/crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs index d31ec7ee30e..6c77285303b 100644 --- a/crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs +++ b/crates/core/tedge_agent/src/tedge_to_te_converter/converter.rs @@ -23,7 +23,7 @@ impl TedgetoTeConverter { TedgetoTeConverter {} } - fn try_convert(&mut self, message: MqttMessage) -> Vec { + fn try_convert(&mut self, message: MqttMessage) -> Vec { match message.topic.clone() { topic if topic.name.starts_with("tedge/measurements") => { self.convert_measurement(message) @@ -36,7 +36,10 @@ impl TedgetoTeConverter { // tedge/measurements -> te/device/main///m/ // tedge/measurements/child -> te/device/child///m/ - fn convert_measurement(&mut self, mut message: MqttMessage) -> Vec { + fn convert_measurement( + &mut self, + mut message: MqttMessage, + ) -> Vec { let te_topic = match message.topic.name.split('/').collect::>()[..] { ["tedge", "measurements"] => Topic::new_unchecked("te/device/main///m/"), ["tedge", "measurements", cid] => { @@ -84,7 +87,7 @@ impl TedgetoTeConverter { // tedge/events/event_type -> te/device/main///e/event_type // tedge/events/event_type/child -> te/device/child///e/event_type - fn convert_event(&mut self, mut message: MqttMessage) -> Vec { + fn convert_event(&mut self, mut message: MqttMessage) -> Vec { let topic = match message.topic.name.split('/').collect::>()[..] { ["tedge", "events", event_type] => { Topic::new_unchecked(format!("te/device/main///e/{event_type}").as_str()) diff --git a/crates/core/tedge_api/src/entity_store.rs b/crates/core/tedge_api/src/entity_store.rs index 1445f96a704..6be53ae2bb5 100644 --- a/crates/core/tedge_api/src/entity_store.rs +++ b/crates/core/tedge_api/src/entity_store.rs @@ -20,7 +20,7 @@ use log::debug; use log::error; use log::info; use log::warn; -use mqtt_channel::Message; +use mqtt_channel::MqttMessage; use serde_json::json; use serde_json::Map; use serde_json::Value as JsonValue; @@ -119,10 +119,10 @@ type ExternalIdValidatorFn = /// # Examples /// /// ``` -/// # use mqtt_channel::{Message, Topic}; +/// # use mqtt_channel::{MqttMessage, Topic}; /// # use tedge_api::mqtt_topics::MqttSchema; /// # use tedge_api::entity_store::{EntityStore, EntityRegistrationMessage}; -/// let mqtt_message = Message::new( +/// let mqtt_message = MqttMessage::new( /// &Topic::new("te/device/main//").unwrap(), /// r#"{"@type": "device"}"#.to_string(), /// ); @@ -704,7 +704,7 @@ impl EntityStore { Ok(updated) } - pub fn cache_early_data_message(&mut self, message: Message) { + pub fn cache_early_data_message(&mut self, message: MqttMessage) { self.pending_entity_store.cache_early_data_message(message) } } @@ -844,7 +844,7 @@ impl EntityRegistrationMessage { // TODO: this is basically manual Deserialize implementation, better impl // Serialize/Deserialize. #[must_use] - pub fn new(message: &Message) -> Option { + pub fn new(message: &MqttMessage) -> Option { let topic_id = message .topic .name @@ -949,7 +949,7 @@ impl EntityRegistrationMessage { } // TODO: manual serialize impl - pub fn to_mqtt_message(mut self, mqtt_schema: &MqttSchema) -> Message { + pub fn to_mqtt_message(mut self, mqtt_schema: &MqttSchema) -> MqttMessage { let mut props = serde_json::Map::new(); props.insert("@type".to_string(), self.r#type.to_string().into()); @@ -967,14 +967,14 @@ impl EntityRegistrationMessage { let message = serde_json::to_string(&props).unwrap(); let message_topic = mqtt_schema.topic_for(&self.topic_id, &Channel::EntityMetadata); - Message::new(&message_topic, message).with_retain() + MqttMessage::new(&message_topic, message).with_retain() } } -impl TryFrom<&Message> for EntityRegistrationMessage { +impl TryFrom<&MqttMessage> for EntityRegistrationMessage { type Error = (); - fn try_from(value: &Message) -> Result { + fn try_from(value: &MqttMessage) -> Result { EntityRegistrationMessage::new(value).ok_or(()) } } @@ -1009,14 +1009,14 @@ impl EntityTwinMessage { } } - pub fn to_mqtt_message(self, mqtt_schema: &MqttSchema) -> Message { + pub fn to_mqtt_message(self, mqtt_schema: &MqttSchema) -> MqttMessage { let message_topic = mqtt_schema.topic_for( &self.topic_id, &Channel::EntityTwinData { fragment_key: self.fragment_key, }, ); - Message::new(&message_topic, self.fragment_value.to_string()).with_retain() + MqttMessage::new(&message_topic, self.fragment_value.to_string()).with_retain() } } @@ -1056,7 +1056,7 @@ mod tests { #[test] fn parse_entity_registration_message() { - let message = Message::new( + let message = MqttMessage::new( &Topic::new("te/device/child1//").unwrap(), json!({ "@type" : "child-device", @@ -1098,7 +1098,7 @@ mod tests { // child of the main device. let updated_entities = store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/child1//").unwrap(), json!({"@type": "child-device"}).to_string(), )) @@ -1114,7 +1114,7 @@ mod tests { let updated_entities = store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/child2//").unwrap(), json!({"@type": "child-device", "@parent": "device/main//"}).to_string(), )) @@ -1183,7 +1183,7 @@ mod tests { // Register service on main store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/main/service/collectd").unwrap(), json!({"@type": "service"}).to_string(), )) @@ -1202,7 +1202,7 @@ mod tests { // Register immediate child of main store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/child1//").unwrap(), json!({"@type": "child-device"}).to_string(), )) @@ -1221,7 +1221,7 @@ mod tests { // Register service on child1 store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/child1/service/collectd").unwrap(), json!({"@type": "service"}).to_string(), )) @@ -1240,7 +1240,7 @@ mod tests { // Register child2 as child of child1 store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/child2//").unwrap(), json!({"@type": "child-device", "@parent": "device/child1//"}).to_string(), )) @@ -1259,7 +1259,7 @@ mod tests { // Register service on child2 store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/child2/service/collectd").unwrap(), json!({"@type": "service"}).to_string(), )) @@ -1290,7 +1290,7 @@ mod tests { // Register service on main store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/main/service/collectd").unwrap(), json!({"@type": "service"}).to_string(), )) @@ -1309,7 +1309,7 @@ mod tests { // Register immediate child of main store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/child1//").unwrap(), json!({"@type": "child-device"}).to_string(), )) @@ -1328,7 +1328,7 @@ mod tests { // Register service on child1 store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/child1/service/collectd").unwrap(), json!({"@type": "service"}).to_string(), )) @@ -1349,7 +1349,7 @@ mod tests { // Register child2 as child of child1 store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/child2//").unwrap(), json!({"@type": "child-device", "@parent": "device/child1//"}).to_string(), )) @@ -1368,7 +1368,7 @@ mod tests { // Register service on child2 store .update( - EntityRegistrationMessage::new(&Message::new( + EntityRegistrationMessage::new(&MqttMessage::new( &Topic::new("te/device/child2/service/collectd").unwrap(), json!({"@type": "service"}).to_string(), )) diff --git a/crates/core/tedge_api/src/health.rs b/crates/core/tedge_api/src/health.rs index a37c9968530..8ad91803b30 100644 --- a/crates/core/tedge_api/src/health.rs +++ b/crates/core/tedge_api/src/health.rs @@ -4,7 +4,7 @@ use crate::mqtt_topics::ServiceTopicId; use clock::Clock; use clock::WallClock; use log::error; -use mqtt_channel::Message; +use mqtt_channel::MqttMessage; use mqtt_channel::Topic; use serde_json::json; use std::process; @@ -51,8 +51,8 @@ impl ServiceHealthTopic { &self.topic } - pub fn down_message(&self) -> Message { - Message { + pub fn down_message(&self) -> MqttMessage { + MqttMessage { topic: Topic::new_unchecked(self.as_str()), payload: json!({ "status": "down", @@ -64,7 +64,7 @@ impl ServiceHealthTopic { } } - pub fn up_message(&self) -> Message { + pub fn up_message(&self) -> MqttMessage { let now = WallClock.now(); let time_format = self.time_format; let timestamp = time_format.to_json(now).unwrap_or_else(|err| { @@ -83,7 +83,7 @@ impl ServiceHealthTopic { let response_topic_health = Topic::new_unchecked(self.as_str()); - Message::new(&response_topic_health, health_status) + MqttMessage::new(&response_topic_health, health_status) .with_qos(mqtt_channel::QoS::AtLeastOnce) .with_retain() } diff --git a/crates/core/tedge_api/src/lib.rs b/crates/core/tedge_api/src/lib.rs index 58df85a200e..b4636107a38 100644 --- a/crates/core/tedge_api/src/lib.rs +++ b/crates/core/tedge_api/src/lib.rs @@ -39,7 +39,7 @@ mod tests { use crate::mqtt_topics::EntityTopicId; use crate::mqtt_topics::MqttSchema; use crate::mqtt_topics::OperationType; - use mqtt_channel::Message; + use mqtt_channel::MqttMessage; use mqtt_channel::QoS; use mqtt_channel::Topic; @@ -91,7 +91,7 @@ mod tests { let device = EntityTopicId::default_child_device("abc").unwrap(); let request = SoftwareListCommand::new(&device, "1".to_string()); - let expected_msg = Message { + let expected_msg = MqttMessage { topic: Topic::new_unchecked("te/device/abc///cmd/software_list/1"), payload: r#"{"status":"init"}"#.to_string().into(), qos: QoS::AtLeastOnce, diff --git a/crates/core/tedge_api/src/message_log.rs b/crates/core/tedge_api/src/message_log.rs index d20b0dc4611..cbadb8df6b8 100644 --- a/crates/core/tedge_api/src/message_log.rs +++ b/crates/core/tedge_api/src/message_log.rs @@ -1,7 +1,7 @@ //! The message log is a persistent append-only log of MQTT messages. //! Each line is the JSON representation of that MQTT message. //! The underlying file is a JSON lines file. -use mqtt_channel::Message as MqttMessage; +use mqtt_channel::MqttMessage; use serde_json::json; use std::fs::File; use std::fs::OpenOptions; @@ -120,7 +120,7 @@ mod tests { use crate::message_log::MessageLogReader; use super::MessageLogWriter; - use mqtt_channel::Message; + use mqtt_channel::MqttMessage; use mqtt_channel::Topic; use tempfile::tempdir; @@ -131,7 +131,7 @@ mod tests { // Prepare some dummy messages let mut messages = vec![]; for i in 1..5 { - let message = Message::new( + let message = MqttMessage::new( &Topic::new(&format!("topic{i}")).unwrap(), format!("payload{i}"), ); diff --git a/crates/core/tedge_api/src/messages.rs b/crates/core/tedge_api/src/messages.rs index 977402a6d51..2a73da0ac2a 100644 --- a/crates/core/tedge_api/src/messages.rs +++ b/crates/core/tedge_api/src/messages.rs @@ -7,7 +7,7 @@ use crate::software::*; use crate::workflow::GenericCommandState; use crate::workflow::StateName; use download::DownloadInfo; -use mqtt_channel::Message; +use mqtt_channel::MqttMessage; use mqtt_channel::QoS; use mqtt_channel::Topic; use serde::de::DeserializeOwned; @@ -78,10 +78,10 @@ where } /// Return the MQTT message to register support for this types of command - pub fn capability_message(schema: &MqttSchema, target: &EntityTopicId) -> Message { + pub fn capability_message(schema: &MqttSchema, target: &EntityTopicId) -> MqttMessage { let meta_topic = schema.capability_topic_for(target, Payload::operation_type()); let payload = "{}"; - Message::new(&meta_topic, payload) + MqttMessage::new(&meta_topic, payload) .with_retain() .with_qos(QoS::AtLeastOnce) } @@ -141,18 +141,18 @@ where } /// Return the MQTT message for this command - pub fn command_message(&self, schema: &MqttSchema) -> Message { + pub fn command_message(&self, schema: &MqttSchema) -> MqttMessage { let topic = self.topic(schema); let payload = self.payload.to_bytes(); - Message::new(&topic, payload) + MqttMessage::new(&topic, payload) .with_qos(QoS::AtLeastOnce) .with_retain() } /// Return the MQTT message to clear this command - pub fn clearing_message(&self, schema: &MqttSchema) -> Message { + pub fn clearing_message(&self, schema: &MqttSchema) -> MqttMessage { let topic = self.topic(schema); - Message::new(&topic, vec![]) + MqttMessage::new(&topic, vec![]) .with_qos(QoS::AtLeastOnce) .with_retain() } diff --git a/crates/core/tedge_api/src/pending_entity_store.rs b/crates/core/tedge_api/src/pending_entity_store.rs index 6acf4170a48..a4a65f28fae 100644 --- a/crates/core/tedge_api/src/pending_entity_store.rs +++ b/crates/core/tedge_api/src/pending_entity_store.rs @@ -4,7 +4,7 @@ use crate::mqtt_topics::EntityTopicId; use crate::mqtt_topics::MqttSchema; use crate::ring_buffer::RingBuffer; use log::error; -use mqtt_channel::Message as MqttMessage; +use mqtt_channel::MqttMessage; use std::collections::HashMap; /// A store for all the entities for which data messages are received before @@ -167,7 +167,7 @@ impl PendingEntityStore { #[cfg(test)] mod tests { - use mqtt_channel::Message; + use mqtt_channel::MqttMessage; use mqtt_channel::Topic; use serde_json::json; @@ -244,19 +244,19 @@ mod tests { fn take_cached_entity_filters_telemetry() { let mut store = build_pending_entity_store(); - store.cache_early_data_message(Message::new( + store.cache_early_data_message(MqttMessage::new( &Topic::new_unchecked("te/device/child1///m/environment"), json!({"temperature": 50}).to_string(), )); - store.cache_early_data_message(Message::new( + store.cache_early_data_message(MqttMessage::new( &Topic::new_unchecked("te/device/child2///m/environment"), json!({"temperature": 60}).to_string(), )); - store.cache_early_data_message(Message::new( + store.cache_early_data_message(MqttMessage::new( &Topic::new_unchecked("te/device/child1///m/environment"), json!({"pressure": 40}).to_string(), )); - store.cache_early_data_message(Message::new( + store.cache_early_data_message(MqttMessage::new( &Topic::new_unchecked("te/device/child3///m/environment"), json!({"pressure": 30}).to_string(), )); @@ -268,11 +268,11 @@ mod tests { assert_eq!( cached_entity.data_messages, vec![ - Message::new( + MqttMessage::new( &Topic::new_unchecked("te/device/child1///m/environment"), json!({"temperature": 50}).to_string(), ), - Message::new( + MqttMessage::new( &Topic::new_unchecked("te/device/child1///m/environment"), json!({"pressure": 40}).to_string(), ), @@ -285,7 +285,7 @@ mod tests { )); assert_eq!( cached_entity.data_messages, - vec![Message::new( + vec![MqttMessage::new( &Topic::new_unchecked("te/device/child2///m/environment"), json!({"temperature": 60}).to_string(), ),] @@ -297,7 +297,7 @@ mod tests { )); assert_eq!( cached_entity.data_messages, - vec![Message::new( + vec![MqttMessage::new( &Topic::new_unchecked("te/device/child3///m/environment"), json!({"pressure": 30}).to_string(), ),] @@ -308,19 +308,19 @@ mod tests { fn cached_entity_returns_metadata_before_telemetry() { let mut store = build_pending_entity_store(); - store.cache_early_data_message(Message::new( + store.cache_early_data_message(MqttMessage::new( &Topic::new_unchecked("te/device/child1///m/environment"), json!({"temperature": 50}).to_string(), )); - store.cache_early_data_message(Message::new( + store.cache_early_data_message(MqttMessage::new( &Topic::new_unchecked("te/device/child1///twin/maintenance_mode"), "true", )); - store.cache_early_data_message(Message::new( + store.cache_early_data_message(MqttMessage::new( &Topic::new_unchecked("te/device/child1///m/environment"), json!({"pressure": 40}).to_string(), )); - store.cache_early_data_message(Message::new( + store.cache_early_data_message(MqttMessage::new( &Topic::new_unchecked("te/device/child1///twin/service_count"), "5", )); @@ -332,19 +332,19 @@ mod tests { assert_eq!( cached_entity.data_messages, vec![ - Message::new( + MqttMessage::new( &Topic::new_unchecked("te/device/child1///twin/maintenance_mode"), "true", ), - Message::new( + MqttMessage::new( &Topic::new_unchecked("te/device/child1///twin/service_count"), "5", ), - Message::new( + MqttMessage::new( &Topic::new_unchecked("te/device/child1///m/environment"), json!({"temperature": 50}).to_string(), ), - Message::new( + MqttMessage::new( &Topic::new_unchecked("te/device/child1///m/environment"), json!({"pressure": 40}).to_string(), ), diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index 62c1c821c0a..aebc2b5572c 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -9,7 +9,7 @@ use crate::mqtt_topics::EntityTopicId; use crate::mqtt_topics::MqttSchema; use crate::mqtt_topics::OperationType; pub use error::*; -use mqtt_channel::Message; +use mqtt_channel::MqttMessage; use mqtt_channel::QoS; pub use script::*; use serde::Deserialize; @@ -192,7 +192,7 @@ impl OperationWorkflow { &self, schema: &MqttSchema, target: &EntityTopicId, - ) -> Option { + ) -> Option { match self.operation { // Need to treat SoftwareList and SoftwareUpdate as exceptions as they require software types in the payload OperationType::SoftwareList | OperationType::SoftwareUpdate => None, @@ -200,7 +200,7 @@ impl OperationWorkflow { let meta_topic = schema.capability_topic_for(target, self.operation.clone()); let payload = "{}".to_string(); Some( - Message::new(&meta_topic, payload) + MqttMessage::new(&meta_topic, payload) .with_retain() .with_qos(QoS::AtLeastOnce), ) @@ -216,7 +216,7 @@ impl OperationWorkflow { /// - `Err(error)` when the request is ill-formed pub fn get_operation_current_action( &self, - message: &Message, + message: &MqttMessage, ) -> Result, WorkflowExecutionError> { match GenericCommandState::from_command_message(message) { Ok(Some(command_state)) => { diff --git a/crates/core/tedge_api/src/workflow/state.rs b/crates/core/tedge_api/src/workflow/state.rs index 15341d8116e..59032a8fa3c 100644 --- a/crates/core/tedge_api/src/workflow/state.rs +++ b/crates/core/tedge_api/src/workflow/state.rs @@ -1,6 +1,6 @@ use crate::workflow::ExitHandlers; use crate::workflow::WorkflowExecutionError; -use mqtt_channel::Message; +use mqtt_channel::MqttMessage; use mqtt_channel::QoS::AtLeastOnce; use mqtt_channel::Topic; use serde::Deserialize; @@ -25,7 +25,9 @@ pub struct GenericStateUpdate { impl GenericCommandState { /// Extract a command state from a json payload - pub fn from_command_message(message: &Message) -> Result, WorkflowExecutionError> { + pub fn from_command_message( + message: &MqttMessage, + ) -> Result, WorkflowExecutionError> { let payload = message.payload_bytes(); if payload.is_empty() { return Ok(None); @@ -42,11 +44,11 @@ impl GenericCommandState { } /// Build an MQTT message to publish the command state - pub fn into_message(mut self) -> Message { + pub fn into_message(mut self) -> MqttMessage { GenericCommandState::inject_text_property(&mut self.payload, "status", &self.status); let topic = &self.topic; let payload = self.payload.to_string(); - Message::new(topic, payload) + MqttMessage::new(topic, payload) .with_retain() .with_qos(AtLeastOnce) } @@ -291,7 +293,7 @@ mod tests { fn serde_generic_command_payload() { let topic = Topic::new_unchecked("te/device/main///cmd/make_it/123"); let payload = r#"{ "status":"init", "foo":42, "bar": { "extra": [1,2,3] }}"#; - let command = mqtt_channel::Message::new(&topic, payload); + let command = mqtt_channel::MqttMessage::new(&topic, payload); let cmd = GenericCommandState::from_command_message(&command) .expect("parsing error") .expect("no message"); @@ -348,7 +350,7 @@ mod tests { fn inject_json_into_parameters() { let topic = Topic::new_unchecked("te/device/main///cmd/make_it/123"); let payload = r#"{ "status":"init", "foo":42, "bar": { "extra": [1,2,3] }}"#; - let command = mqtt_channel::Message::new(&topic, payload); + let command = mqtt_channel::MqttMessage::new(&topic, payload); let cmd = GenericCommandState::from_command_message(&command) .expect("parsing error") .expect("no message"); diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 43aac4b53ba..929c4d832ee 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -57,7 +57,11 @@ impl WorkflowSupervisor { } /// List the capabilities provided by the registered workflows - pub fn capability_messages(&self, schema: &MqttSchema, target: &EntityTopicId) -> Vec { + pub fn capability_messages( + &self, + schema: &MqttSchema, + target: &EntityTopicId, + ) -> Vec { // To ease testing the capability messages are emitted in a deterministic order let mut operations = self.workflows.values().collect::>(); operations.sort_by(|&a, &b| a.operation.to_string().cmp(&b.operation.to_string())); @@ -73,7 +77,7 @@ impl WorkflowSupervisor { pub fn apply_external_update( &mut self, operation: &OperationType, - message: &Message, + message: &MqttMessage, ) -> Result, WorkflowExecutionError> { if !self.workflows.contains_key(operation) { return Err(WorkflowExecutionError::UnknownOperation { diff --git a/crates/core/tedge_watchdog/src/systemd_watchdog.rs b/crates/core/tedge_watchdog/src/systemd_watchdog.rs index b0473aab061..fcab1da3bc5 100644 --- a/crates/core/tedge_watchdog/src/systemd_watchdog.rs +++ b/crates/core/tedge_watchdog/src/systemd_watchdog.rs @@ -5,7 +5,7 @@ use futures::channel::mpsc; use futures::stream::FuturesUnordered; use futures::SinkExt; use futures::StreamExt; -use mqtt_channel::Message; +use mqtt_channel::MqttMessage; use mqtt_channel::PubChannel; use mqtt_channel::Topic; use serde::Deserialize; @@ -201,7 +201,7 @@ async fn monitor_tedge_service( .context("Could not send initial health status message")?; loop { - let message = Message::new(&req_topic, ""); + let message = MqttMessage::new(&req_topic, ""); let _ = publisher .publish(message) .await @@ -238,7 +238,7 @@ async fn monitor_tedge_service( async fn get_latest_health_status_message( request_timestamp: OffsetDateTime, - messages: &mut mpsc::UnboundedReceiver, + messages: &mut mpsc::UnboundedReceiver, ) -> Result { while let Some(message) = messages.next().await { if let Ok(message) = message.payload_str() { @@ -304,7 +304,7 @@ mod tests { #[tokio::test] async fn test_get_latest_health_status_message() -> Result<()> { - let (mut sender, mut receiver) = mpsc::unbounded::(); + let (mut sender, mut receiver) = mpsc::unbounded::(); let health_topic = Topic::new("te/device/main/service/test-service/status/health").expect("Valid topic"); let base_timestamp = OffsetDateTime::now_utc(); @@ -319,7 +319,7 @@ mod tests { "time": timestamp_str, }) .to_string(); - let health_message = Message::new(&health_topic, health_status); + let health_message = MqttMessage::new(&health_topic, health_status); sender.publish(health_message).await?; } @@ -344,7 +344,7 @@ mod tests { #[tokio::test] async fn test_get_latest_health_status_message_unix() { - let (mut sender, mut receiver) = mpsc::unbounded::(); + let (mut sender, mut receiver) = mpsc::unbounded::(); let health_topic = Topic::new("te/device/main/service/test-service/status/health").expect("Valid topic"); let request_timestamp = OffsetDateTime::parse( @@ -361,7 +361,7 @@ mod tests { "time": payload_timestamp, }) .to_string(); - let health_message = Message::new(&health_topic, health_status); + let health_message = MqttMessage::new(&health_topic, health_status); sender.publish(health_message).await.unwrap(); sender.close_channel(); diff --git a/crates/extensions/c8y_mapper_ext/src/actor.rs b/crates/extensions/c8y_mapper_ext/src/actor.rs index b599099221d..dde40b29cda 100644 --- a/crates/extensions/c8y_mapper_ext/src/actor.rs +++ b/crates/extensions/c8y_mapper_ext/src/actor.rs @@ -34,7 +34,6 @@ use tedge_api::main_device_health_topic; use tedge_downloader_ext::DownloadRequest; use tedge_downloader_ext::DownloadResult; use tedge_file_system_ext::FsWatchEvent; -use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; use tedge_mqtt_ext::TopicFilter; @@ -213,8 +212,8 @@ impl C8yMapperActor { } }; - let c8y_notification = Message::new(&queued_data.smartrest_topic, payload); - let clear_local_cmd = Message::new(&queued_data.clear_cmd_topic, "") + let c8y_notification = MqttMessage::new(&queued_data.smartrest_topic, payload); + let clear_local_cmd = MqttMessage::new(&queued_data.clear_cmd_topic, "") .with_retain() .with_qos(QoS::AtLeastOnce); for converted_message in [c8y_notification, clear_local_cmd] { diff --git a/crates/extensions/c8y_mapper_ext/src/alarm_converter.rs b/crates/extensions/c8y_mapper_ext/src/alarm_converter.rs index 6e1a0ccd03d..9526fee57da 100644 --- a/crates/extensions/c8y_mapper_ext/src/alarm_converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/alarm_converter.rs @@ -8,7 +8,7 @@ use tedge_api::alarm::ThinEdgeAlarmDeserializerError; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::EntityStore; use tedge_config::TopicPrefix; -use tedge_mqtt_ext::Message; +use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use crate::error::ConversionError; @@ -19,8 +19,8 @@ const C8Y_JSON_MQTT_ALARMS_TOPIC: &str = "alarm/alarms/create"; #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum AlarmConverter { Syncing { - pending_alarms_map: HashMap, - old_alarms_map: HashMap, + pending_alarms_map: HashMap, + old_alarms_map: HashMap, }, Synced, } @@ -36,12 +36,12 @@ impl AlarmConverter { pub(crate) fn try_convert_alarm( &mut self, source: &EntityTopicId, - input_message: &Message, + input_message: &MqttMessage, alarm_type: &str, entity_store: &EntityStore, c8y_prefix: &TopicPrefix, - ) -> Result, ConversionError> { - let mut output_messages: Vec = Vec::new(); + ) -> Result, ConversionError> { + let mut output_messages: Vec = Vec::new(); match self { Self::Syncing { pending_alarms_map, @@ -73,7 +73,8 @@ impl AlarmConverter { let c8y_json_mqtt_alarms_topic = format!("{c8y_prefix}/{C8Y_JSON_MQTT_ALARMS_TOPIC}"); let c8y_alarm_topic = Topic::new_unchecked(&c8y_json_mqtt_alarms_topic); - output_messages.push(Message::new(&c8y_alarm_topic, cumulocity_alarm_json)); + output_messages + .push(MqttMessage::new(&c8y_alarm_topic, cumulocity_alarm_json)); } _ => { // SmartREST @@ -81,7 +82,7 @@ impl AlarmConverter { let smartrest_topic = C8yTopic::from(&c8y_alarm) .to_topic(c8y_prefix) .expect("Infallible"); - output_messages.push(Message::new(&smartrest_topic, smartrest_alarm)); + output_messages.push(MqttMessage::new(&smartrest_topic, smartrest_alarm)); } } @@ -89,15 +90,15 @@ impl AlarmConverter { let alarm_id = input_message.topic.name.to_string(); let topic = Topic::new_unchecked(format!("{INTERNAL_ALARMS_TOPIC}{alarm_id}").as_str()); - let alarm_copy = - Message::new(&topic, input_message.payload_bytes().to_owned()).with_retain(); + let alarm_copy = MqttMessage::new(&topic, input_message.payload_bytes().to_owned()) + .with_retain(); output_messages.push(alarm_copy); } } Ok(output_messages) } - pub(crate) fn process_internal_alarm(&mut self, input: &Message) { + pub(crate) fn process_internal_alarm(&mut self, input: &MqttMessage) { match self { Self::Syncing { pending_alarms_map: _, @@ -131,8 +132,8 @@ impl AlarmConverter { /// is one that was raised while the mapper process was down. /// An alarm present in both, if their payload is the same, is one that was already processed before the restart /// and hence can be ignored during sync. - pub(crate) fn sync(&mut self) -> Vec { - let mut sync_messages: Vec = Vec::new(); + pub(crate) fn sync(&mut self) -> Vec { + let mut sync_messages: Vec = Vec::new(); match self { Self::Syncing { @@ -146,7 +147,7 @@ impl AlarmConverter { // it is assumed to have been cleared while the mapper process was down Entry::Vacant(_) => { let topic = Topic::new_unchecked(&alarm_id); - let message = Message::new(&topic, vec![]).with_retain(); + let message = MqttMessage::new(&topic, vec![]).with_retain(); // Recreate the clear alarm message and add it to the pending alarms list to be processed later sync_messages.push(message); } diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 293f7997e02..22dea1542ee 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -89,7 +89,6 @@ use tedge_api::Jsonify; use tedge_config::SoftwareManagementApiFlag; use tedge_config::TEdgeConfigError; use tedge_config::TopicPrefix; -use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use tedge_utils::file::create_directory_with_defaults; @@ -121,35 +120,35 @@ pub struct MapperConfig { } impl CumulocityConverter { - pub async fn convert(&mut self, input: &Message) -> Vec { + pub async fn convert(&mut self, input: &MqttMessage) -> Vec { let messages_or_err = self.try_convert(input).await; self.wrap_errors(messages_or_err) } pub fn wrap_errors( &self, - messages_or_err: Result, ConversionError>, - ) -> Vec { + messages_or_err: Result, ConversionError>, + ) -> Vec { messages_or_err.unwrap_or_else(|error| vec![self.new_error_message(error)]) } - pub fn wrap_error(&self, message_or_err: Result) -> Message { + pub fn wrap_error(&self, message_or_err: Result) -> MqttMessage { message_or_err.unwrap_or_else(|error| self.new_error_message(error)) } - pub fn new_error_message(&self, error: ConversionError) -> Message { + pub fn new_error_message(&self, error: ConversionError) -> MqttMessage { error!("Mapping error: {}", error); - Message::new(&self.get_mapper_config().errors_topic, error.to_string()) + MqttMessage::new(&self.get_mapper_config().errors_topic, error.to_string()) } /// This function will be the first method that's called on the converter after it's instantiated. /// Return any initialization messages that must be processed before the converter starts converting regular messages. - pub fn init_messages(&mut self) -> Vec { + pub fn init_messages(&mut self) -> Vec { match self.try_init_messages() { Ok(messages) => messages, Err(error) => { error!("Mapping error: {}", error); - vec![Message::new( + vec![MqttMessage::new( &self.get_mapper_config().errors_topic, error.to_string(), )] @@ -157,11 +156,11 @@ impl CumulocityConverter { } } - pub fn process_operation_update_message(&mut self, message: DiscoverOp) -> Message { + pub fn process_operation_update_message(&mut self, message: DiscoverOp) -> MqttMessage { let message_or_err = self.try_process_operation_update_message(&message); match message_or_err { Ok(Some(msg)) => msg, - Ok(None) => Message::new( + Ok(None) => MqttMessage::new( &self.get_mapper_config().errors_topic, "No operation update required", ), @@ -308,7 +307,7 @@ impl CumulocityConverter { pub fn try_convert_entity_registration( &mut self, input: &EntityRegistrationMessage, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { // Parse the optional fields let display_name = input.other.get("name").and_then(|v| v.as_str()); let display_type = input.other.get("type").and_then(|v| v.as_str()); @@ -441,10 +440,10 @@ impl CumulocityConverter { fn try_convert_measurement( &mut self, source: &EntityTopicId, - input: &Message, + input: &MqttMessage, measurement_type: &str, - ) -> Result, ConversionError> { - let mut mqtt_messages: Vec = Vec::new(); + ) -> Result, ConversionError> { + let mut mqtt_messages: Vec = Vec::new(); if let Some(entity) = self.entity_store.get(source) { // Need to check if the input Thin Edge JSON is valid before adding a child ID to list @@ -452,7 +451,7 @@ impl CumulocityConverter { json::from_thin_edge_json(input.payload_str()?, entity, measurement_type)?; if c8y_json_payload.len() < self.size_threshold.0 { - mqtt_messages.push(Message::new( + mqtt_messages.push(MqttMessage::new( &self.mapper_config.out_topic, c8y_json_payload, )); @@ -471,9 +470,9 @@ impl CumulocityConverter { async fn try_convert_event( &mut self, source: &EntityTopicId, - input: &Message, + input: &MqttMessage, event_type: &str, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { let mut messages = Vec::new(); let event_type = match event_type.is_empty() { @@ -505,7 +504,7 @@ impl CumulocityConverter { let message = if c8y_event.extras.is_empty() { let smartrest_event = Self::serialize_to_smartrest(&c8y_event)?; let smartrest_topic = C8yTopic::upstream_topic(&self.config.c8y_prefix); - Message::new(&smartrest_topic, smartrest_event) + MqttMessage::new(&smartrest_topic, smartrest_event) } else { // If the message contains extra fields other than `text` and `time`, convert to Cumulocity JSON let cumulocity_event_json = serde_json::to_string(&c8y_event)?; @@ -513,7 +512,7 @@ impl CumulocityConverter { "{}/{C8Y_JSON_MQTT_EVENTS_TOPIC}", self.config.c8y_prefix )); - Message::new(&json_mqtt_topic, cumulocity_event_json) + MqttMessage::new(&json_mqtt_topic, cumulocity_event_json) }; if self.can_send_over_mqtt(&message) { @@ -538,9 +537,9 @@ impl CumulocityConverter { pub fn process_alarm_messages( &mut self, source: &EntityTopicId, - input: &Message, + input: &MqttMessage, alarm_type: &str, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { self.size_threshold.validate(input)?; let mqtt_messages = self.alarm_converter.try_convert_alarm( @@ -557,8 +556,8 @@ impl CumulocityConverter { pub async fn process_health_status_message( &mut self, entity: &EntityTopicId, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { let entity_metadata = self .entity_store .get(entity) @@ -575,8 +574,8 @@ impl CumulocityConverter { async fn parse_c8y_devicecontrol_topic( &mut self, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { let operation = C8yOperation::from_json(message.payload.as_str()?)?; let device_xid = operation.external_source.external_id; let cmd_id = self.command_id.new_id_with_str(&operation.op_id); @@ -599,7 +598,7 @@ impl CumulocityConverter { device_xid: String, cmd_id: String, extras: &HashMap, - ) -> Result, CumulocityMapperError> { + ) -> Result, CumulocityMapperError> { let msgs = match C8yDeviceControlOperation::from_json_object(extras)? { C8yDeviceControlOperation::Restart(_) => { self.forward_restart_request(device_xid, cmd_id)? @@ -653,9 +652,9 @@ impl CumulocityConverter { async fn parse_c8y_smartrest_topics( &mut self, - message: &Message, - ) -> Result, ConversionError> { - let mut output: Vec = Vec::new(); + message: &MqttMessage, + ) -> Result, ConversionError> { + let mut output: Vec = Vec::new(); for smartrest_message in collect_smartrest_messages(message.payload_str()?) { let result = self.process_smartrest(smartrest_message.as_str()).await; let mut msgs = self.handle_c8y_operation_result(&result); @@ -666,8 +665,8 @@ impl CumulocityConverter { fn handle_c8y_operation_result( &mut self, - result: &Result, CumulocityMapperError>, - ) -> Vec { + result: &Result, CumulocityMapperError>, + ) -> Vec { match result { Err( err @ CumulocityMapperError::FromSmartRestDeserializer( @@ -684,8 +683,8 @@ impl CumulocityConverter { let topic = C8yTopic::SmartRestResponse .to_topic(&self.config.c8y_prefix) .unwrap(); - let msg1 = Message::new(&topic, set_operation_executing(operation)); - let msg2 = Message::new(&topic, fail_operation(operation, &err.to_string())); + let msg1 = MqttMessage::new(&topic, set_operation_executing(operation)); + let msg2 = MqttMessage::new(&topic, fail_operation(operation, &err.to_string())); error!("{err}"); vec![msg1, msg2] } @@ -701,7 +700,7 @@ impl CumulocityConverter { async fn process_smartrest( &mut self, payload: &str, - ) -> Result, CumulocityMapperError> { + ) -> Result, CumulocityMapperError> { match get_smartrest_device_id(payload) { Some(device_id) => { match get_smartrest_template_id(payload).as_str() { @@ -727,7 +726,7 @@ impl CumulocityConverter { device_xid: String, cmd_id: String, software_update_request: C8ySoftwareUpdate, - ) -> Result, CumulocityMapperError> { + ) -> Result, CumulocityMapperError> { let entity_xid: EntityExternalId = device_xid.into(); let target = self.entity_store.try_get_by_external_id(&entity_xid)?; let mut command = @@ -755,7 +754,7 @@ impl CumulocityConverter { &mut self, device_xid: String, cmd_id: String, - ) -> Result, CumulocityMapperError> { + ) -> Result, CumulocityMapperError> { let entity_xid: EntityExternalId = device_xid.into(); let target = self.entity_store.try_get_by_external_id(&entity_xid)?; let command = RestartCommand::new(&target.topic_id, cmd_id); @@ -763,7 +762,7 @@ impl CumulocityConverter { Ok(vec![message]) } - fn request_software_list(&self, target: &EntityTopicId) -> Message { + fn request_software_list(&self, target: &EntityTopicId) -> MqttMessage { let cmd_id = self.command_id.new_id(); let request = SoftwareListCommand::new(target, cmd_id); request.command_message(&self.mqtt_schema) @@ -773,7 +772,7 @@ impl CumulocityConverter { &mut self, payload: &str, template: &str, - ) -> Result, CumulocityMapperError> { + ) -> Result, CumulocityMapperError> { if let Some(operation) = self.operations.matching_smartrest_template(template) { if let Some(command) = operation.command() { self.execute_operation( @@ -842,7 +841,7 @@ impl CumulocityConverter { let topic = C8yTopic::SmartRestResponse.to_topic(&c8y_prefix).unwrap(); let executing_str = set_operation_executing(op_name); mqtt_publisher - .send(Message::new(&topic, executing_str.as_str())) + .send(MqttMessage::new(&topic, executing_str.as_str())) .await .unwrap_or_else(|err| { error!("Failed to publish a message: {executing_str}. Error: {err}") @@ -866,7 +865,7 @@ impl CumulocityConverter { }; let success_message = succeed_operation(op_name, result); match success_message { - Ok(message) => mqtt_publisher.send(Message::new(&topic, message.as_str())).await + Ok(message) => mqtt_publisher.send(MqttMessage::new(&topic, message.as_str())).await .unwrap_or_else(|err| { error!("Failed to publish a message: {message}. Error: {err}") }), @@ -874,7 +873,7 @@ impl CumulocityConverter { let fail_message = fail_operation( op_name, &format!("{:?}", anyhow::Error::from(e).context("Custom operation process exited successfully, but couldn't convert output to valid SmartREST message"))); - mqtt_publisher.send(Message::new(&topic, fail_message.as_str())).await.unwrap_or_else(|err| { + mqtt_publisher.send(MqttMessage::new(&topic, fail_message.as_str())).await.unwrap_or_else(|err| { error!("Failed to publish a message: {fail_message}. Error: {err}") }) } @@ -888,7 +887,7 @@ impl CumulocityConverter { let payload = fail_operation(op_name, &failure_reason); mqtt_publisher - .send(Message::new(&topic, payload.as_bytes())) + .send(MqttMessage::new(&topic, payload.as_bytes())) .await .unwrap_or_else(|err| { error!( @@ -917,7 +916,7 @@ impl CumulocityConverter { )) } - fn can_send_over_mqtt(&self, message: &Message) -> bool { + fn can_send_over_mqtt(&self, message: &MqttMessage) -> bool { message.payload_bytes().len() < self.size_threshold.0 } } @@ -944,8 +943,8 @@ impl CumulocityConverter { pub async fn try_convert( &mut self, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { debug!("Mapping message on topic: {}", message.topic.name); trace!("Message content: {:?}", message.payload_str()); match self.mqtt_schema.entity_channel_of(&message.topic) { @@ -958,9 +957,9 @@ impl CumulocityConverter { &mut self, source: EntityTopicId, channel: Channel, - message: &Message, - ) -> Result, ConversionError> { - let mut registration_messages: Vec = vec![]; + message: &MqttMessage, + ) -> Result, ConversionError> { + let mut registration_messages: Vec = vec![]; match &channel { Channel::EntityMetadata => { if let Ok(register_message) = EntityRegistrationMessage::try_from(message) { @@ -1035,8 +1034,8 @@ impl CumulocityConverter { &mut self, source: EntityTopicId, channel: Channel, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { match &channel { Channel::EntityTwinData { fragment_key } => { self.try_convert_entity_twin_data(&source, message, fragment_key) @@ -1131,7 +1130,7 @@ impl CumulocityConverter { async fn process_cached_entity_data( &mut self, cached_entity: PendingEntityData, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { let mut converted_messages = vec![]; for message in cached_entity.data_messages { let (source, channel) = self.mqtt_schema.entity_channel_of(&message.topic).unwrap(); @@ -1164,7 +1163,7 @@ impl CumulocityConverter { pub fn register_and_convert_entity( &mut self, registration_message: &EntityRegistrationMessage, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { let entity_topic_id = ®istration_message.topic_id; self.entity_store.update(registration_message.clone())?; if registration_message.r#type == EntityType::ChildDevice { @@ -1187,7 +1186,10 @@ impl CumulocityConverter { Ok(registration_messages) } - fn convert_entity_registration_message(&self, value: &EntityRegistrationMessage) -> Message { + fn convert_entity_registration_message( + &self, + value: &EntityRegistrationMessage, + ) -> MqttMessage { let entity_topic_id = value.topic_id.clone(); let mut register_payload: Map = Map::new(); @@ -1209,7 +1211,7 @@ impl CumulocityConverter { register_payload.extend(value.other.clone()); - Message::new( + MqttMessage::new( &Topic::new(&format!("{}/{entity_topic_id}", self.mqtt_schema.root)).unwrap(), serde_json::to_string(&Value::Object(register_payload)).unwrap(), ) @@ -1218,8 +1220,8 @@ impl CumulocityConverter { async fn try_convert_tedge_and_c8y_topics( &mut self, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { let messages = match &message.topic { topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => { self.alarm_converter.process_internal_alarm(message); @@ -1240,7 +1242,7 @@ impl CumulocityConverter { Ok(messages) } - fn try_init_messages(&mut self) -> Result, ConversionError> { + fn try_init_messages(&mut self) -> Result, ConversionError> { let mut messages = self.parse_base_inventory_file()?; let supported_operations_message = @@ -1263,7 +1265,7 @@ impl CumulocityConverter { &self, path: &Path, prefix: &TopicPrefix, - ) -> Result { + ) -> Result { let topic = if is_child_operation_path(path) { let child_id = get_child_external_id(path)?; let child_external_id = Self::validate_external_id(&child_id)?; @@ -1273,14 +1275,14 @@ impl CumulocityConverter { C8yTopic::upstream_topic(prefix) }; - Ok(Message::new( + Ok(MqttMessage::new( &topic, Operations::try_new(path)?.create_smartrest_ops_message(), )) } - pub fn sync_messages(&mut self) -> Vec { - let sync_messages: Vec = self.alarm_converter.sync(); + pub fn sync_messages(&mut self) -> Vec { + let sync_messages: Vec = self.alarm_converter.sync(); self.alarm_converter = AlarmConverter::Synced; sync_messages } @@ -1288,7 +1290,7 @@ impl CumulocityConverter { fn try_process_operation_update_message( &mut self, message: &DiscoverOp, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { let needs_cloud_update = self.update_operations(&message.ops_dir)?; if needs_cloud_update { @@ -1318,9 +1320,11 @@ fn get_child_external_id(dir_path: &Path) -> Result { } } -fn create_get_pending_operations_message(prefix: &TopicPrefix) -> Result { +fn create_get_pending_operations_message( + prefix: &TopicPrefix, +) -> Result { let topic = C8yTopic::SmartRestResponse.to_topic(prefix)?; - Ok(Message::new(&topic, request_pending_operations())) + Ok(MqttMessage::new(&topic, request_pending_operations())) } fn is_child_operation_path(path: &Path) -> bool { @@ -1343,7 +1347,7 @@ impl CumulocityConverter { &mut self, target: &EntityTopicId, c8y_operation_name: &str, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { let device = self.entity_store.try_get(target)?; let ops_dir = match device.r#type { EntityType::MainDevice => self.ops_dir.clone(), @@ -1398,7 +1402,7 @@ impl CumulocityConverter { async fn register_restart_operation( &mut self, target: &EntityTopicId, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { match self.register_operation(target, "c8y_Restart") { Err(_) => { error!("Fail to register `restart` operation for unknown device: {target}"); @@ -1412,8 +1416,8 @@ impl CumulocityConverter { &mut self, target: &EntityTopicId, cmd_id: &str, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { let command = match RestartCommand::try_from( target.clone(), cmd_id.to_owned(), @@ -1435,7 +1439,7 @@ impl CumulocityConverter { CommandStatus::Executing => { let smartrest_set_operation = set_operation_executing(CumulocitySupportedOperations::C8yRestartRequest); - Ok(vec![Message::new(&topic, smartrest_set_operation)]) + Ok(vec![MqttMessage::new(&topic, smartrest_set_operation)]) } CommandStatus::Successful => { let smartrest_set_operation = @@ -1443,7 +1447,7 @@ impl CumulocityConverter { Ok(vec![ command.clearing_message(&self.mqtt_schema), - Message::new(&topic, smartrest_set_operation), + MqttMessage::new(&topic, smartrest_set_operation), ]) } CommandStatus::Failed { ref reason } => { @@ -1454,7 +1458,7 @@ impl CumulocityConverter { Ok(vec![ command.clearing_message(&self.mqtt_schema), - Message::new(&topic, smartrest_set_operation), + MqttMessage::new(&topic, smartrest_set_operation), ]) } _ => { @@ -1468,7 +1472,7 @@ impl CumulocityConverter { &mut self, target: &EntityTopicId, c8y_op_name: &str, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { match self.register_operation(target, c8y_op_name) { Err(_) => { error!("Fail to register `{c8y_op_name}` operation for entity: {target}"); @@ -1481,8 +1485,8 @@ impl CumulocityConverter { async fn register_software_list_operation( &self, target: &EntityTopicId, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { if !self.config.software_management_with_types { debug!("Publishing c8y_SupportedSoftwareTypes is disabled. To enable it, run `tedge config set c8y.software_management.with_types true`."); return Ok(vec![]); @@ -1493,13 +1497,13 @@ impl CumulocityConverter { let payload = json!({"c8y_SupportedSoftwareTypes": data.types}).to_string(); let topic = self.get_inventory_update_topic(target)?; - Ok(vec![Message::new(&topic, payload)]) + Ok(vec![MqttMessage::new(&topic, payload)]) } async fn register_software_update_operation( &mut self, target: &EntityTopicId, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { let mut registration = match self.register_operation(target, "c8y_SoftwareUpdate") { Err(_) => { error!("Fail to register `software-list` operation for unknown device: {target}"); @@ -1516,8 +1520,8 @@ impl CumulocityConverter { &self, target: &EntityTopicId, cmd_id: &str, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { let response = match SoftwareUpdateCommand::try_from( target.clone(), cmd_id.to_string(), @@ -1544,14 +1548,17 @@ impl CumulocityConverter { CommandStatus::Executing => { let smartrest_set_operation_status = set_operation_executing(CumulocitySupportedOperations::C8ySoftwareUpdate); - Ok(vec![Message::new(&topic, smartrest_set_operation_status)]) + Ok(vec![MqttMessage::new( + &topic, + smartrest_set_operation_status, + )]) } CommandStatus::Successful => { let smartrest_set_operation = succeed_operation_no_payload(CumulocitySupportedOperations::C8ySoftwareUpdate); Ok(vec![ - Message::new(&topic, smartrest_set_operation), + MqttMessage::new(&topic, smartrest_set_operation), response.clearing_message(&self.mqtt_schema), self.request_software_list(target), ]) @@ -1561,7 +1568,7 @@ impl CumulocityConverter { fail_operation(CumulocitySupportedOperations::C8ySoftwareUpdate, &reason); Ok(vec![ - Message::new(&topic, smartrest_set_operation), + MqttMessage::new(&topic, smartrest_set_operation), response.clearing_message(&self.mqtt_schema), self.request_software_list(target), ]) @@ -1573,8 +1580,8 @@ impl CumulocityConverter { &mut self, target: &EntityTopicId, cmd_id: &str, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { let response = match SoftwareListCommand::try_from( target.clone(), cmd_id.to_owned(), @@ -1614,9 +1621,9 @@ impl CumulocityConverter { let payloads = get_advanced_software_list_payloads(&response, SOFTWARE_LIST_CHUNK_SIZE); - let mut messages: Vec = Vec::new(); + let mut messages: Vec = Vec::new(); for payload in payloads { - messages.push(Message::new(&topic, payload)) + messages.push(MqttMessage::new(&topic, payload)) } messages.push(response.clearing_message(&self.mqtt_schema)); Ok(messages) @@ -1700,7 +1707,6 @@ pub(crate) mod tests { use tedge_config::SoftwareManagementApiFlag; use tedge_config::TEdgeConfig; use tedge_mqtt_ext::test_helpers::assert_messages_matching; - use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use tedge_test_utils::fs::TempTedgeDir; @@ -1714,7 +1720,7 @@ pub(crate) mod tests { let alarm_topic = "te/device/main///a/temperature_alarm"; let alarm_payload = r#"{ "severity": "critical", "text": "Temperature very high" }"#; - let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload); + let alarm_message = MqttMessage::new(&Topic::new_unchecked(alarm_topic), alarm_payload); // During the sync phase, alarms are not converted immediately, but only cached to be synced later assert!(converter.convert(&alarm_message).await.is_empty()); @@ -1722,14 +1728,14 @@ pub(crate) mod tests { let non_alarm_topic = "te/device/main///m/"; let non_alarm_payload = r#"{"temp": 1}"#; let non_alarm_message = - Message::new(&Topic::new_unchecked(non_alarm_topic), non_alarm_payload); + MqttMessage::new(&Topic::new_unchecked(non_alarm_topic), non_alarm_payload); // But non-alarms are converted immediately, even during the sync phase assert!(!converter.convert(&non_alarm_message).await.is_empty()); let internal_alarm_topic = "c8y-internal/alarms/te/device/main///a/pressure_alarm"; let internal_alarm_payload = r#"{ "severity": "major", "text": "Temperature very high" }"#; - let internal_alarm_message = Message::new( + let internal_alarm_message = MqttMessage::new( &Topic::new_unchecked(internal_alarm_topic), internal_alarm_payload, ); @@ -1769,7 +1775,7 @@ pub(crate) mod tests { let alarm_topic = "te/device/external_sensor///a/temperature_alarm"; let alarm_payload = r#"{ "severity": "critical", "text": "Temperature very high" }"#; - let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload); + let alarm_message = MqttMessage::new(&Topic::new_unchecked(alarm_topic), alarm_payload); // Child device creation messages are published. let device_creation_msgs = converter.convert(&alarm_message).await; @@ -1789,7 +1795,7 @@ pub(crate) mod tests { }) ); - let second_msg = Message::new( + let second_msg = MqttMessage::new( &Topic::new_unchecked("c8y/s/us"), "101,test-device:device:external_sensor,external_sensor,thin-edge.io-child", ); @@ -1801,7 +1807,7 @@ pub(crate) mod tests { let non_alarm_topic = "te/device/external_sensor///m/"; let non_alarm_payload = r#"{"temp": 1}"#; let non_alarm_message = - Message::new(&Topic::new_unchecked(non_alarm_topic), non_alarm_payload); + MqttMessage::new(&Topic::new_unchecked(non_alarm_topic), non_alarm_payload); // But non-alarms are converted immediately, even during the sync phase assert!(!converter.convert(&non_alarm_message).await.is_empty()); @@ -1809,7 +1815,7 @@ pub(crate) mod tests { let internal_alarm_topic = "c8y-internal/alarms/te/device/external_sensor///a/pressure_alarm"; let internal_alarm_payload = r#"{ "severity": "major", "text": "Temperature very high" }"#; - let internal_alarm_message = Message::new( + let internal_alarm_message = MqttMessage::new( &Topic::new_unchecked(internal_alarm_topic), internal_alarm_payload, ); @@ -1847,7 +1853,7 @@ pub(crate) mod tests { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let in_message = Message::new( + let in_message = MqttMessage::new( &Topic::new_unchecked("te/device/child1///m/"), json!({ "temp": 1, @@ -1899,7 +1905,7 @@ pub(crate) mod tests { async fn convert_measurement_with_nested_child_device() { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let reg_message = Message::new( + let reg_message = MqttMessage::new( &Topic::new_unchecked("te/device/immediate_child//"), json!({ "@type":"child-device", @@ -1910,7 +1916,7 @@ pub(crate) mod tests { ); let _ = converter.convert(®_message).await; - let reg_message = Message::new( + let reg_message = MqttMessage::new( &Topic::new_unchecked("te/device/nested_child//"), json!({ "@type":"child-device", @@ -1923,9 +1929,9 @@ pub(crate) mod tests { let in_topic = "te/device/nested_child///m/"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; - let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); - let expected_c8y_json_message = Message::new( + let expected_c8y_json_message = MqttMessage::new( &Topic::new_unchecked("c8y/measurement/measurements/create"), json!({ "externalSource":{"externalId":"nested_child","type":"c8y_Serial"}, @@ -1945,7 +1951,7 @@ pub(crate) mod tests { async fn convert_measurement_with_nested_child_service() { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let reg_message = Message::new( + let reg_message = MqttMessage::new( &Topic::new_unchecked("te/device/immediate_child//"), json!({ "@type":"child-device", @@ -1956,7 +1962,7 @@ pub(crate) mod tests { ); let _ = converter.convert(®_message).await; - let reg_message = Message::new( + let reg_message = MqttMessage::new( &Topic::new_unchecked("te/device/nested_child//"), json!({ "@type":"child-device", @@ -1967,7 +1973,7 @@ pub(crate) mod tests { ); let _ = converter.convert(®_message).await; - let reg_message = Message::new( + let reg_message = MqttMessage::new( &Topic::new_unchecked("te/device/nested_child/service/nested_service"), json!({ "@type":"service", @@ -1980,9 +1986,9 @@ pub(crate) mod tests { let in_topic = "te/device/nested_child/service/nested_service/m/"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; - let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); - let expected_c8y_json_message = Message::new( + let expected_c8y_json_message = MqttMessage::new( &Topic::new_unchecked("c8y/measurement/measurements/create"), json!({ "externalSource":{"externalId":"nested_service","type":"c8y_Serial"}, @@ -2005,9 +2011,9 @@ pub(crate) mod tests { let in_topic = "te/device/child1/service/app1/m/m_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; - let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); - let expected_child_create_msg = Message::new( + let expected_child_create_msg = MqttMessage::new( &Topic::new_unchecked("te/device/child1//"), json!({ "@id":"test-device:device:child1", @@ -2018,11 +2024,11 @@ pub(crate) mod tests { ) .with_retain(); - let expected_smart_rest_message_child = Message::new( + let expected_smart_rest_message_child = MqttMessage::new( &Topic::new_unchecked("c8y/s/us"), "101,test-device:device:child1,child1,thin-edge.io-child", ); - let expected_service_create_msg = Message::new( + let expected_service_create_msg = MqttMessage::new( &Topic::new_unchecked("te/device/child1/service/app1"), json!({ "@id":"test-device:device:child1:service:app1", @@ -2035,11 +2041,11 @@ pub(crate) mod tests { ) .with_retain(); - let expected_smart_rest_message_service = Message::new( + let expected_smart_rest_message_service = MqttMessage::new( &Topic::new_unchecked("c8y/s/us/test-device:device:child1"), "102,test-device:device:child1:service:app1,service,app1,up", ); - let expected_c8y_json_message = Message::new( + let expected_c8y_json_message = MqttMessage::new( &Topic::new_unchecked("c8y/measurement/measurements/create"), json!({ "externalSource":{ @@ -2077,9 +2083,9 @@ pub(crate) mod tests { let in_topic = "te/device/main/service/appm/m/m_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; - let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); - let expected_create_service_msg = Message::new( + let expected_create_service_msg = MqttMessage::new( &Topic::new_unchecked("te/device/main/service/appm"), json!({ "@id":"test-device:device:main:service:appm", @@ -2091,7 +2097,7 @@ pub(crate) mod tests { ) .with_retain(); - let expected_c8y_json_message = Message::new( + let expected_c8y_json_message = MqttMessage::new( &Topic::new_unchecked("c8y/measurement/measurements/create"), json!({ "externalSource":{ @@ -2104,7 +2110,7 @@ pub(crate) mod tests { .to_string(), ); - let expected_smart_rest_message_service = Message::new( + let expected_smart_rest_message_service = MqttMessage::new( &Topic::new_unchecked("c8y/s/us"), "102,test-device:device:main:service:appm,service,appm,up", ); @@ -2133,12 +2139,13 @@ pub(crate) mod tests { let in_topic = "te/device/child1///m/"; let in_invalid_payload = r#"{"temp": invalid}"#; let in_valid_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; - let in_first_message = Message::new(&Topic::new_unchecked(in_topic), in_invalid_payload); - let in_second_message = Message::new(&Topic::new_unchecked(in_topic), in_valid_payload); + let in_first_message = + MqttMessage::new(&Topic::new_unchecked(in_topic), in_invalid_payload); + let in_second_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_valid_payload); // First convert invalid Thin Edge JSON message. let out_first_messages = converter.convert(&in_first_message).await; - let expected_error_message = Message::new( + let expected_error_message = MqttMessage::new( &Topic::new_unchecked("te/errors"), "Invalid JSON: expected value at line 1 column 10: `invalid}\n`", ); @@ -2151,11 +2158,11 @@ pub(crate) mod tests { .into_iter() .filter(|m| m.topic.name.starts_with("c8y")) .collect(); - let expected_smart_rest_message = Message::new( + let expected_smart_rest_message = MqttMessage::new( &Topic::new_unchecked("c8y/s/us"), "101,test-device:device:child1,child1,thin-edge.io-child", ); - let expected_c8y_json_message = Message::new( + let expected_c8y_json_message = MqttMessage::new( &Topic::new_unchecked("c8y/measurement/measurements/create"), r#"{"externalSource":{"externalId":"test-device:device:child1","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00","type":"ThinEdgeMeasurement"}"#, ); @@ -2173,18 +2180,18 @@ pub(crate) mod tests { // First message from "child1" let in_first_message = - Message::new(&Topic::new_unchecked("te/device/child1///m/"), in_payload); + MqttMessage::new(&Topic::new_unchecked("te/device/child1///m/"), in_payload); let out_first_messages: Vec<_> = converter .convert(&in_first_message) .await .into_iter() .filter(|m| m.topic.name.starts_with("c8y")) .collect(); - let expected_first_smart_rest_message = Message::new( + let expected_first_smart_rest_message = MqttMessage::new( &Topic::new_unchecked("c8y/s/us"), "101,test-device:device:child1,child1,thin-edge.io-child", ); - let expected_first_c8y_json_message = Message::new( + let expected_first_c8y_json_message = MqttMessage::new( &Topic::new_unchecked("c8y/measurement/measurements/create"), r#"{"externalSource":{"externalId":"test-device:device:child1","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00","type":"ThinEdgeMeasurement"}"#, ); @@ -2198,18 +2205,18 @@ pub(crate) mod tests { // Second message from "child2" let in_second_message = - Message::new(&Topic::new_unchecked("te/device/child2///m/"), in_payload); + MqttMessage::new(&Topic::new_unchecked("te/device/child2///m/"), in_payload); let out_second_messages: Vec<_> = converter .convert(&in_second_message) .await .into_iter() .filter(|m| m.topic.name.starts_with("c8y")) .collect(); - let expected_second_smart_rest_message = Message::new( + let expected_second_smart_rest_message = MqttMessage::new( &Topic::new_unchecked("c8y/s/us"), "101,test-device:device:child2,child2,thin-edge.io-child", ); - let expected_second_c8y_json_message = Message::new( + let expected_second_c8y_json_message = MqttMessage::new( &Topic::new_unchecked("c8y/measurement/measurements/create"), r#"{"externalSource":{"externalId":"test-device:device:child2","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00","type":"ThinEdgeMeasurement"}"#, ); @@ -2229,9 +2236,9 @@ pub(crate) mod tests { let in_topic = "te/device/main///m/test_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; - let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); - let expected_c8y_json_message = Message::new( + let expected_c8y_json_message = MqttMessage::new( &Topic::new_unchecked("c8y/measurement/measurements/create"), r#"{"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00","type":"test_type"}"#, ); @@ -2253,9 +2260,9 @@ pub(crate) mod tests { let in_topic = "te/device/main///m/test_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00","type":"type_in_payload"}"#; - let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); - let expected_c8y_json_message = Message::new( + let expected_c8y_json_message = MqttMessage::new( &Topic::new_unchecked("c8y/measurement/measurements/create"), r#"{"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00","type":"type_in_payload"}"#, ); @@ -2277,14 +2284,14 @@ pub(crate) mod tests { let in_topic = "te/device/child///m/test_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; - let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); - let expected_smart_rest_message = Message::new( + let expected_smart_rest_message = MqttMessage::new( &Topic::new_unchecked("c8y/s/us"), "101,test-device:device:child,child,thin-edge.io-child", ); - let expected_c8y_json_message = Message::new( + let expected_c8y_json_message = MqttMessage::new( &Topic::new_unchecked("c8y/measurement/measurements/create"), r#"{"externalSource":{"externalId":"test-device:device:child","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00","type":"test_type"}"#, ); @@ -2312,13 +2319,13 @@ pub(crate) mod tests { let in_topic = "te/device/child2///m/test_type"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00","type":"type_in_payload"}"#; - let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); - let expected_smart_rest_message = Message::new( + let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); + let expected_smart_rest_message = MqttMessage::new( &Topic::new_unchecked("c8y/s/us"), "101,test-device:device:child2,child2,thin-edge.io-child", ); - let expected_c8y_json_message = Message::new( + let expected_c8y_json_message = MqttMessage::new( &Topic::new_unchecked("c8y/measurement/measurements/create"), r#"{"externalSource":{"externalId":"test-device:device:child2","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00","type":"type_in_payload"}"#, ); @@ -2347,7 +2354,7 @@ pub(crate) mod tests { let alarm_topic = "te/device/main///a/temperature_alarm"; let big_alarm_text = create_packet(1024 * 20); let alarm_payload = json!({ "text": big_alarm_text }).to_string(); - let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload); + let alarm_message = MqttMessage::new(&Topic::new_unchecked(alarm_topic), alarm_payload); assert_matches!( converter.try_convert(&alarm_message).await, @@ -2367,7 +2374,7 @@ pub(crate) mod tests { let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; let event_topic = "te/device/main///e/"; let event_payload = r#"{ "text": "Someone clicked", "time": "2020-02-02T01:02:03+05:30" }"#; - let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); + let event_message = MqttMessage::new(&Topic::new_unchecked(event_topic), event_payload); let converted_events = converter.convert(&event_message).await; assert_eq!(converted_events.len(), 1); @@ -2386,7 +2393,7 @@ pub(crate) mod tests { let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; let event_topic = "te/device/main///e/topic_event"; let event_payload = r#"{ "type": "payload event", "text": "Someone clicked", "time": "2020-02-02T01:02:03+05:30" }"#; - let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); + let event_message = MqttMessage::new(&Topic::new_unchecked(event_topic), event_payload); let converted_events = converter.convert(&event_message).await; assert_eq!(converted_events.len(), 1); @@ -2405,7 +2412,7 @@ pub(crate) mod tests { let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; let event_topic = "te/device/main///e/click_event"; let event_payload = r#"{ "type": "payload event", "text": "tick", "foo": "bar" }"#; - let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); + let event_message = MqttMessage::new(&Topic::new_unchecked(event_topic), event_payload); let converted_events = converter.convert(&event_message).await; assert_eq!(converted_events.len(), 1); @@ -2429,7 +2436,7 @@ pub(crate) mod tests { let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; let event_topic = "te/device/main///e/click_event"; let event_payload = r#"{ "text": "Someone clicked", "time": "2020-02-02T01:02:03+05:30" }"#; - let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); + let event_message = MqttMessage::new(&Topic::new_unchecked(event_topic), event_payload); let converted_events = converter.convert(&event_message).await; assert_eq!(converted_events.len(), 1); @@ -2453,7 +2460,7 @@ pub(crate) mod tests { let (mut converter, _) = create_c8y_converter_from_config(config); let event_topic = "te/device/main///e/click_event"; let event_payload = r#"{ "text": "Someone clicked", "time": "2020-02-02T01:02:03+05:30" }"#; - let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); + let event_message = MqttMessage::new(&Topic::new_unchecked(event_topic), event_payload); let converted_events = converter.convert(&event_message).await; assert_eq!(converted_events.len(), 1); @@ -2472,7 +2479,7 @@ pub(crate) mod tests { let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; let event_topic = "te/device/main///e/click_event"; let event_payload = r#"{ "text": "tick", "foo": "bar" }"#; - let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); + let event_message = MqttMessage::new(&Topic::new_unchecked(event_topic), event_payload); let converted_events = converter.convert(&event_message).await; assert_eq!(converted_events.len(), 1); @@ -2507,7 +2514,8 @@ pub(crate) mod tests { let event_topic = "te/device/main///e/click_event"; let big_event_text = create_packet((16 + 1) * 1024); // Event payload > size_threshold let big_event_payload = json!({ "text": big_event_text }).to_string(); - let big_event_message = Message::new(&Topic::new_unchecked(event_topic), big_event_payload); + let big_event_message = + MqttMessage::new(&Topic::new_unchecked(event_topic), big_event_payload); assert!(converter.convert(&big_event_message).await.is_empty()); } @@ -2530,7 +2538,8 @@ pub(crate) mod tests { let event_topic = "te/device/child1///e/click_event"; let big_event_text = create_packet((16 + 1) * 1024); // Event payload > size_threshold let big_event_payload = json!({ "text": big_event_text }).to_string(); - let big_event_message = Message::new(&Topic::new_unchecked(event_topic), big_event_payload); + let big_event_message = + MqttMessage::new(&Topic::new_unchecked(event_topic), big_event_payload); let child_registration_messages = converter.convert(&big_event_message).await; @@ -2547,7 +2556,7 @@ pub(crate) mod tests { let measurement_topic = "te/device/main///m/"; let big_measurement_payload = create_thin_edge_measurement(10 * 1024); // Measurement payload > size_threshold after converting to c8y json - let big_measurement_message = Message::new( + let big_measurement_message = MqttMessage::new( &Topic::new_unchecked(measurement_topic), big_measurement_payload, ); @@ -2567,7 +2576,7 @@ pub(crate) mod tests { let measurement_topic = "te/device/main///m/"; let big_measurement_payload = create_thin_edge_measurement(20); // Measurement payload size is 20 bytes - let big_measurement_message = Message::new( + let big_measurement_message = MqttMessage::new( &Topic::new_unchecked(measurement_topic), big_measurement_payload, ); @@ -2591,7 +2600,7 @@ pub(crate) mod tests { let measurement_topic = "te/device/child1///m/"; let big_measurement_payload = create_thin_edge_measurement(10 * 1024); // Measurement payload > size_threshold after converting to c8y json - let big_measurement_message = Message::new( + let big_measurement_message = MqttMessage::new( &Topic::new_unchecked(measurement_topic), big_measurement_payload, ); @@ -2611,7 +2620,7 @@ pub(crate) mod tests { let measurement_topic = "te/device/child1///m/"; let big_measurement_payload = create_thin_edge_measurement(20); // Measurement payload size is 20 bytes - let big_measurement_message = Message::new( + let big_measurement_message = MqttMessage::new( &Topic::new_unchecked(measurement_topic), big_measurement_payload, ); @@ -2640,17 +2649,17 @@ pub(crate) mod tests { let in_topic = "te/device/child1/service/child-service-c8y/status/health"; let in_payload = r#"{"pid":"1234","status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#; - let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); let mqtt_schema = MqttSchema::new(); let (in_entity, _in_channel) = mqtt_schema.entity_channel_of(&in_message.topic).unwrap(); - let expected_child_create_smart_rest_message = Message::new( + let expected_child_create_smart_rest_message = MqttMessage::new( &Topic::new_unchecked("c8y/s/us"), "101,test-device:device:child1,child1,thin-edge.io-child", ); - let expected_service_monitor_smart_rest_message = Message::new( + let expected_service_monitor_smart_rest_message = MqttMessage::new( &Topic::new_unchecked("c8y/s/us/test-device:device:child1"), r#"102,test-device:device:child1:service:child-service-c8y,service,child-service-c8y,up"#, ); @@ -2696,12 +2705,12 @@ pub(crate) mod tests { let in_topic = "te/device/main/service/test-tedge-mapper-c8y/status/health"; let in_payload = r#"{"pid":"1234","status":"up","time":"2021-11-16T17:45:40.571760714+01:00","type":"thin-edge.io"}"#; - let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + let in_message = MqttMessage::new(&Topic::new_unchecked(in_topic), in_payload); let mqtt_schema = MqttSchema::new(); let (in_entity, _in_channel) = mqtt_schema.entity_channel_of(&in_message.topic).unwrap(); - let expected_service_monitor_smart_rest_message = Message::new( + let expected_service_monitor_smart_rest_message = MqttMessage::new( &Topic::new_unchecked("c8y/s/us"), r#"102,test-device:device:main:service:test-tedge-mapper-c8y,service,test-tedge-mapper-c8y,up"#, ); @@ -2773,7 +2782,7 @@ pub(crate) mod tests { let registration = registrations.get(0).unwrap().clone(); assert_eq!( registration, - Message::new( + MqttMessage::new( &Topic::new_unchecked("te/device/childId//"), r#"{"@id":"test-device:device:childId","@type":"child-device","name":"childId"}"#, ) @@ -2889,7 +2898,7 @@ pub(crate) mod tests { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let measurement_message = Message::new( + let measurement_message = MqttMessage::new( &Topic::new_unchecked("te/device/main/service/my_measurement_service/m/my_type"), r#"{"temperature": 21.37}"#, ); @@ -2925,7 +2934,7 @@ pub(crate) mod tests { let (mut converter, _) = create_c8y_converter_from_config(config); - let service_health_message = Message::new( + let service_health_message = MqttMessage::new( &Topic::new_unchecked("te/device/main/service/service1/status/health"), serde_json::to_string(&json!({"status": "up"})).unwrap(), ); @@ -2962,7 +2971,7 @@ pub(crate) mod tests { // Register main device service let _ = converter - .convert(&Message::new( + .convert(&MqttMessage::new( &Topic::new_unchecked("te/device/main/service/dummy"), json!({ "@type":"service", @@ -2972,7 +2981,7 @@ pub(crate) mod tests { .await; // Register immediate child device let _ = converter - .convert(&Message::new( + .convert(&MqttMessage::new( &Topic::new_unchecked("te/device/immediate_child//"), json!({ "@type":"child-device", @@ -2982,7 +2991,7 @@ pub(crate) mod tests { .await; // Register immediate child device service let _ = converter - .convert(&Message::new( + .convert(&MqttMessage::new( &Topic::new_unchecked("te/device/immediate_child/service/dummy"), json!({ "@type":"service", @@ -2992,7 +3001,7 @@ pub(crate) mod tests { .await; // Register nested child device let _ = converter - .convert(&Message::new( + .convert(&MqttMessage::new( &Topic::new_unchecked("te/device/nested_child//"), json!({ "@type":"child-device", @@ -3003,7 +3012,7 @@ pub(crate) mod tests { .await; // Register nested child device service let _ = converter - .convert(&Message::new( + .convert(&MqttMessage::new( &Topic::new_unchecked("te/device/nested_child/service/dummy"), json!({ "@type":"service", @@ -3014,7 +3023,7 @@ pub(crate) mod tests { for device_id in ["main", "immediate_child", "nested_child"] { let messages = converter - .convert(&Message::new( + .convert(&MqttMessage::new( &Topic::new_unchecked(&format!( "te/device/{device_id}/service/dummy/cmd/{op_type}" )), @@ -3040,10 +3049,10 @@ pub(crate) mod tests { let alarm_topic = "te/custom/child2///m/"; let alarm_payload = json!({ "text": "" }).to_string(); - let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload); + let alarm_message = MqttMessage::new(&Topic::new_unchecked(alarm_topic), alarm_payload); begin_capture(); let res = converter.try_convert(&alarm_message).await.unwrap(); - let expected_err_msg = Message::new(&Topic::new_unchecked("te/errors"), "Auto registration of the entity with topic id custom/child2// failed as it does not match the default topic scheme: 'device//service/'. Try explicit registration instead."); + let expected_err_msg = MqttMessage::new(&Topic::new_unchecked("te/errors"), "Auto registration of the entity with topic id custom/child2// failed as it does not match the default topic scheme: 'device//service/'. Try explicit registration instead."); assert_eq!(res[0], expected_err_msg); let expected_log = "Auto registration of the entity with topic id custom/child2// failed as it does not match the default topic scheme: 'device//service/'. Try explicit registration instead."; // skip other log messages @@ -3064,7 +3073,7 @@ pub(crate) mod tests { // Publish some measurements that are only cached and not converted for i in 0..3 { - let measurement_message = Message::new( + let measurement_message = MqttMessage::new( &Topic::new_unchecked("te/custom/child1///m/environment"), json!({ "temperature": i }).to_string(), ); @@ -3076,7 +3085,7 @@ pub(crate) mod tests { } // Publish a twin message which is also cached - let twin_message = Message::new( + let twin_message = MqttMessage::new( &Topic::new_unchecked("te/custom/child1///twin/foo"), r#"5.6789"#, ); @@ -3087,7 +3096,7 @@ pub(crate) mod tests { ); // Publish the registration message which will trigger the conversion of cached messages as well - let reg_message = Message::new( + let reg_message = MqttMessage::new( &Topic::new_unchecked("te/custom/child1//"), json!({"@type": "child-device", "@id": "child1", "name": "child1"}).to_string(), ); @@ -3154,7 +3163,7 @@ pub(crate) mod tests { let (mut converter, _http_proxy) = create_c8y_converter_from_config(config); // Publish great-grand-child registration before grand-child and child - let reg_message = Message::new( + let reg_message = MqttMessage::new( &Topic::new_unchecked("te/device/child000//"), json!({ "@type": "child-device", @@ -3171,7 +3180,7 @@ pub(crate) mod tests { ); // Publish grand-child registration before child - let reg_message = Message::new( + let reg_message = MqttMessage::new( &Topic::new_unchecked("te/device/child00//"), json!({ "@type": "child-device", @@ -3188,7 +3197,7 @@ pub(crate) mod tests { ); // Register the immediate child device which will trigger the conversion of cached messages as well - let reg_message = Message::new( + let reg_message = MqttMessage::new( &Topic::new_unchecked("te/device/child0//"), json!({ "@type": "child-device", diff --git a/crates/extensions/c8y_mapper_ext/src/inventory.rs b/crates/extensions/c8y_mapper_ext/src/inventory.rs index 72ac36cbcbc..be864449fc6 100644 --- a/crates/extensions/c8y_mapper_ext/src/inventory.rs +++ b/crates/extensions/c8y_mapper_ext/src/inventory.rs @@ -11,7 +11,7 @@ use std::path::Path; use tedge_api::entity_store::EntityTwinMessage; use tedge_api::mqtt_topics::Channel; use tedge_api::mqtt_topics::EntityTopicId; -use tedge_mqtt_ext::Message; +use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use tracing::info; use tracing::warn; @@ -22,7 +22,9 @@ const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "inventory/managedObjects/update"; impl CumulocityConverter { /// Creates the inventory update message with fragments from inventory.json file /// while also updating the live `inventory_model` of this converter - pub(crate) fn parse_base_inventory_file(&mut self) -> Result, ConversionError> { + pub(crate) fn parse_base_inventory_file( + &mut self, + ) -> Result, ConversionError> { let mut messages = vec![]; let inventory_file_path = self.cfg_dir.join(INVENTORY_FRAGMENTS_FILE_LOCATION); let mut inventory_base = Self::get_inventory_fragments(&inventory_file_path)?; @@ -63,10 +65,10 @@ impl CumulocityConverter { entity: &EntityTopicId, fragment_key: String, fragment_value: JsonValue, - ) -> Message { + ) -> MqttMessage { let twin_channel = Channel::EntityTwinData { fragment_key }; let topic = self.mqtt_schema.topic_for(entity, &twin_channel); - Message::new(&topic, fragment_value.to_string()).with_retain() + MqttMessage::new(&topic, fragment_value.to_string()).with_retain() } /// Convert a twin metadata message into Cumulocity inventory update messages. @@ -75,9 +77,9 @@ impl CumulocityConverter { pub(crate) fn try_convert_entity_twin_data( &mut self, source: &EntityTopicId, - message: &Message, + message: &MqttMessage, mut fragment_key: &str, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { if fragment_key == "name" || fragment_key == "type" { warn!("Updating the entity `name` and `type` fields via the twin/ topic channel is not supported"); return Ok(vec![]); @@ -112,17 +114,19 @@ impl CumulocityConverter { &self, source: &EntityTopicId, fragment_value: JsonValue, - ) -> Result { + ) -> Result { let inventory_update_topic = self.get_inventory_update_topic(source)?; - Ok(Message::new( + Ok(MqttMessage::new( &inventory_update_topic, fragment_value.to_string(), )) } /// Create the inventory update message to update the `type` of the main device - pub(crate) fn inventory_device_type_update_message(&self) -> Result { + pub(crate) fn inventory_device_type_update_message( + &self, + ) -> Result { let device_data = C8yDeviceDataFragment::from_type(&self.device_type)?; let device_type_fragment = device_data.to_json()?; @@ -187,7 +191,7 @@ mod tests { use crate::converter::tests::create_c8y_converter; use serde_json::json; use tedge_mqtt_ext::test_helpers::assert_messages_matching; - use tedge_mqtt_ext::Message; + use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::Topic; use tedge_test_utils::fs::TempTedgeDir; @@ -202,7 +206,7 @@ mod tests { "version": "11" }); let twin_message = - Message::new(&Topic::new_unchecked(twin_topic), twin_payload.to_string()); + MqttMessage::new(&Topic::new_unchecked(twin_topic), twin_payload.to_string()); let inventory_messages = converter.convert(&twin_message).await; assert_messages_matching( @@ -225,7 +229,7 @@ mod tests { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let twin_message = Message::new( + let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/foo"), r#""bar""#, // String values must be quoted to be valid JSON string values ); @@ -248,7 +252,7 @@ mod tests { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let twin_message = Message::new( + let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/foo"), "unquoted value", ); @@ -261,7 +265,7 @@ mod tests { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let twin_message = Message::new( + let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/foo"), r#"5.6789"#, ); @@ -284,7 +288,7 @@ mod tests { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let twin_message = Message::new( + let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/enabled"), r#"false"#, ); @@ -307,7 +311,7 @@ mod tests { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let twin_message = Message::new( + let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/name"), r#"New Name"#, ); @@ -326,14 +330,14 @@ mod tests { let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; // Register a twin data fragment first - let twin_message = Message::new( + let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/foo"), "\"bar\"", ); let _ = converter.convert(&twin_message).await; // Clear that fragment - let twin_message = Message::new(&Topic::new_unchecked("te/device/main///twin/foo"), ""); + let twin_message = MqttMessage::new(&Topic::new_unchecked("te/device/main///twin/foo"), ""); let inventory_messages = converter.convert(&twin_message).await; assert_messages_matching( @@ -356,7 +360,7 @@ mod tests { "version": "11" }); let twin_message = - Message::new(&Topic::new_unchecked(twin_topic), twin_payload.to_string()); + MqttMessage::new(&Topic::new_unchecked(twin_topic), twin_payload.to_string()); let inventory_messages = converter.convert(&twin_message).await; assert_messages_matching( @@ -382,7 +386,7 @@ mod tests { ); // Assert that the same payload with different key order is also ignored - let twin_message = Message::new( + let twin_message = MqttMessage::new( &Topic::new_unchecked(twin_topic), r#"{"version": "11","family": "Debian"}"#, ); @@ -405,7 +409,7 @@ mod tests { "version": "11" }); let twin_message = - Message::new(&Topic::new_unchecked(twin_topic), twin_payload.to_string()); + MqttMessage::new(&Topic::new_unchecked(twin_topic), twin_payload.to_string()); let inventory_messages = converter.convert(&twin_message).await; let expected_message = ( @@ -421,7 +425,7 @@ mod tests { assert_messages_matching(&inventory_messages, [expected_message.clone()]); let clear_message = - Message::new(&Topic::new_unchecked("te/device/main///twin/device_os"), ""); + MqttMessage::new(&Topic::new_unchecked("te/device/main///twin/device_os"), ""); let _ = converter.convert(&clear_message).await; // Assert duplicate payload converted after it was cleared @@ -434,7 +438,7 @@ mod tests { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let twin_message = Message::new( + let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/main///twin/firmware"), r#"{"name":"firmware", "version":"1.0"}"#, ); @@ -455,7 +459,7 @@ mod tests { let tmp_dir = TempTedgeDir::new(); let (mut converter, _http_proxy) = create_c8y_converter(&tmp_dir).await; - let twin_message = Message::new( + let twin_message = MqttMessage::new( &Topic::new_unchecked("te/device/child1///twin/firmware"), r#"{"name":"firmware", "version":"1.0"}"#, ); diff --git a/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs b/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs index 55ba717443d..3fd1fa18ba4 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs @@ -25,7 +25,7 @@ use tedge_api::mqtt_topics::OperationType; use tedge_api::Jsonify; use tedge_downloader_ext::DownloadRequest; use tedge_downloader_ext::DownloadResult; -use tedge_mqtt_ext::Message; +use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; use tedge_mqtt_ext::TopicFilter; use tedge_uploader_ext::UploadRequest; @@ -54,7 +54,7 @@ impl CumulocityConverter { device_xid: String, cmd_id: String, config_upload_request: C8yUploadConfigFile, - ) -> Result, CumulocityMapperError> { + ) -> Result, CumulocityMapperError> { let target = self .entity_store .try_get_by_external_id(&device_xid.into())?; @@ -82,7 +82,9 @@ impl CumulocityConverter { }; // Command messages must be retained - Ok(vec![Message::new(&topic, request.to_json()).with_retain()]) + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) } /// Address received ThinEdge config_snapshot command. If its status is @@ -93,8 +95,8 @@ impl CumulocityConverter { &mut self, topic_id: &EntityTopicId, cmd_id: &str, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { if !self.config.capabilities.config_snapshot { warn!( "Received a config_snapshot command, however, config_snapshot feature is disabled" @@ -111,7 +113,10 @@ impl CumulocityConverter { CommandStatus::Executing => { let smartrest_operation_status = set_operation_executing(CumulocitySupportedOperations::C8yUploadConfigFile); - vec![Message::new(&smartrest_topic, smartrest_operation_status)] + vec![MqttMessage::new( + &smartrest_topic, + smartrest_operation_status, + )] } CommandStatus::Successful => { // Send a request to the Downloader to download the file asynchronously from FTS @@ -154,8 +159,9 @@ impl CumulocityConverter { CommandStatus::Failed { reason } => { let smartrest_operation_status = fail_operation(CumulocitySupportedOperations::C8yUploadConfigFile, reason); - let c8y_notification = Message::new(&smartrest_topic, smartrest_operation_status); - let clear_local_cmd = Message::new(&message.topic, "") + let c8y_notification = + MqttMessage::new(&smartrest_topic, smartrest_operation_status); + let clear_local_cmd = MqttMessage::new(&message.topic, "") .with_retain() .with_qos(QoS::AtLeastOnce); vec![c8y_notification, clear_local_cmd] @@ -175,7 +181,7 @@ impl CumulocityConverter { cmd_id: CmdId, download_result: DownloadResult, fts_download: FtsDownloadOperationData, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { let target = self.entity_store.try_get(&fts_download.entity_topic_id)?; let smartrest_topic = self.smartrest_publish_topic_for_entity(&fts_download.entity_topic_id)?; @@ -190,8 +196,8 @@ impl CumulocityConverter { &format!("tedge-mapper-c8y failed to download configuration snapshot from file-transfer service: {err}"), ); - let c8y_notification = Message::new(&smartrest_topic, smartrest_error); - let clean_operation = Message::new(&fts_download.message.topic, "") + let c8y_notification = MqttMessage::new(&smartrest_topic, smartrest_error); + let clean_operation = MqttMessage::new(&fts_download.message.topic, "") .with_retain() .with_qos(QoS::AtLeastOnce); @@ -246,8 +252,8 @@ impl CumulocityConverter { pub fn convert_config_snapshot_metadata( &mut self, topic_id: &EntityTopicId, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { if !self.config.capabilities.config_snapshot { warn!( "Received config_snapshot metadata, however, config_snapshot feature is disabled" diff --git a/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs index 5d591b839d5..9eb47ae0084 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/config_update.rs @@ -18,7 +18,7 @@ use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; use tedge_api::Jsonify; -use tedge_mqtt_ext::Message; +use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; use tedge_mqtt_ext::TopicFilter; use tracing::log::warn; @@ -47,8 +47,8 @@ impl CumulocityConverter { &mut self, topic_id: &EntityTopicId, _cmd_id: &str, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { if !self.config.capabilities.config_update { warn!("Received a config_update command, however, config_update feature is disabled"); return Ok(vec![]); @@ -63,14 +63,14 @@ impl CumulocityConverter { let smartrest_operation_status = set_operation_executing(CumulocitySupportedOperations::C8yDownloadConfigFile); - vec![Message::new(&sm_topic, smartrest_operation_status)] + vec![MqttMessage::new(&sm_topic, smartrest_operation_status)] } CommandStatus::Successful => { let smartrest_operation_status = succeed_operation_no_payload( CumulocitySupportedOperations::C8yDownloadConfigFile, ); - let c8y_notification = Message::new(&sm_topic, smartrest_operation_status); - let clear_local_cmd = Message::new(&message.topic, "") + let c8y_notification = MqttMessage::new(&sm_topic, smartrest_operation_status); + let clear_local_cmd = MqttMessage::new(&message.topic, "") .with_retain() .with_qos(QoS::AtLeastOnce); @@ -79,8 +79,8 @@ impl CumulocityConverter { CommandStatus::Failed { reason } => { let smartrest_operation_status = fail_operation(CumulocitySupportedOperations::C8yDownloadConfigFile, reason); - let c8y_notification = Message::new(&sm_topic, smartrest_operation_status); - let clear_local_cmd = Message::new(&message.topic, "") + let c8y_notification = MqttMessage::new(&sm_topic, smartrest_operation_status); + let clear_local_cmd = MqttMessage::new(&message.topic, "") .with_retain() .with_qos(QoS::AtLeastOnce); @@ -101,7 +101,7 @@ impl CumulocityConverter { device_xid: String, cmd_id: String, config_download_request: C8yDownloadConfigFile, - ) -> Result, CumulocityMapperError> { + ) -> Result, CumulocityMapperError> { let entity_xid: EntityExternalId = device_xid.into(); let target = self.entity_store.try_get_by_external_id(&entity_xid)?; @@ -116,8 +116,8 @@ impl CumulocityConverter { pub fn convert_config_update_metadata( &mut self, topic_id: &EntityTopicId, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { if !self.config.capabilities.config_update { warn!("Received config_update metadata, however, config_update feature is disabled"); return Ok(vec![]); @@ -130,7 +130,7 @@ impl CumulocityConverter { cmd_id: Arc, config_download_request: &C8yDownloadConfigFile, target: &EntityMetadata, - ) -> Vec { + ) -> Vec { let channel = Channel::Command { operation: OperationType::ConfigUpdate, cmd_id: cmd_id.to_string(), @@ -153,7 +153,7 @@ impl CumulocityConverter { }; // Command messages must be retained - vec![Message::new(&topic, request.to_json()).with_retain()] + vec![MqttMessage::new(&topic, request.to_json()).with_retain()] } } diff --git a/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs b/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs index 83077e29b8e..73ae4a925a1 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/firmware_update.rs @@ -18,7 +18,7 @@ use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; use tedge_api::CommandStatus; use tedge_api::Jsonify; -use tedge_mqtt_ext::Message; +use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; use tedge_mqtt_ext::TopicFilter; use tracing::error; @@ -40,7 +40,7 @@ impl CumulocityConverter { device_xid: String, cmd_id: String, firmware_request: C8yFirmware, - ) -> Result, CumulocityMapperError> { + ) -> Result, CumulocityMapperError> { let entity_xid: EntityExternalId = device_xid.into(); let target = self.entity_store.try_get_by_external_id(&entity_xid)?; @@ -67,7 +67,9 @@ impl CumulocityConverter { }; // Command messages must be retained - Ok(vec![Message::new(&topic, request.to_json()).with_retain()]) + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) } /// Address a received ThinEdge firmware_update command. If its status is @@ -77,8 +79,8 @@ impl CumulocityConverter { pub async fn handle_firmware_update_state_change( &mut self, topic_id: &EntityTopicId, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { if !self.config.capabilities.firmware_update { warn!( "Received a firmware_update command, however, firmware_update feature is disabled" @@ -95,14 +97,14 @@ impl CumulocityConverter { let smartrest_operation_status = set_operation_executing(CumulocitySupportedOperations::C8yFirmware); - vec![Message::new(&sm_topic, smartrest_operation_status)] + vec![MqttMessage::new(&sm_topic, smartrest_operation_status)] } CommandStatus::Successful => { let smartrest_operation_status = succeed_operation_no_payload(CumulocitySupportedOperations::C8yFirmware); - let c8y_notification = Message::new(&sm_topic, smartrest_operation_status); + let c8y_notification = MqttMessage::new(&sm_topic, smartrest_operation_status); - let clear_local_cmd = Message::new(&message.topic, "") + let clear_local_cmd = MqttMessage::new(&message.topic, "") .with_retain() .with_qos(QoS::AtLeastOnce); @@ -120,7 +122,7 @@ impl CumulocityConverter { }; let twin_metadata = - Message::new(&twin_metadata_topic, twin_metadata_payload.to_json()) + MqttMessage::new(&twin_metadata_topic, twin_metadata_payload.to_json()) .with_retain() .with_qos(QoS::AtLeastOnce); @@ -129,8 +131,8 @@ impl CumulocityConverter { CommandStatus::Failed { reason } => { let smartrest_operation_status = fail_operation(CumulocitySupportedOperations::C8yFirmware, reason); - let c8y_notification = Message::new(&sm_topic, smartrest_operation_status); - let clear_local_cmd = Message::new(&message.topic, "") + let c8y_notification = MqttMessage::new(&sm_topic, smartrest_operation_status); + let clear_local_cmd = MqttMessage::new(&message.topic, "") .with_retain() .with_qos(QoS::AtLeastOnce); @@ -147,7 +149,7 @@ impl CumulocityConverter { pub fn register_firmware_update_operation( &mut self, topic_id: &EntityTopicId, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { if !self.config.capabilities.firmware_update { warn!( "Received firmware_update metadata, however, firmware_update feature is disabled" diff --git a/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs b/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs index 2edb9ffd053..034ef4644ac 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs @@ -27,7 +27,6 @@ use tedge_api::mqtt_topics::OperationType; use tedge_api::Jsonify; use tedge_downloader_ext::DownloadRequest; use tedge_downloader_ext::DownloadResult; -use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::QoS; use tedge_mqtt_ext::TopicFilter; @@ -54,7 +53,7 @@ impl CumulocityConverter { device_xid: String, cmd_id: String, log_request: C8yLogfileRequest, - ) -> Result, CumulocityMapperError> { + ) -> Result, CumulocityMapperError> { let target = self .entity_store .try_get_by_external_id(&device_xid.into())?; @@ -84,7 +83,9 @@ impl CumulocityConverter { }; // Command messages must be retained - Ok(vec![Message::new(&topic, request.to_json()).with_retain()]) + Ok(vec![ + MqttMessage::new(&topic, request.to_json()).with_retain() + ]) } /// Address a received log_upload command. If its status is @@ -95,8 +96,8 @@ impl CumulocityConverter { &mut self, topic_id: &EntityTopicId, cmd_id: &str, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { debug!("Handling log_upload command"); if !self.config.capabilities.log_upload { @@ -113,7 +114,10 @@ impl CumulocityConverter { CommandStatus::Executing => { let smartrest_operation_status = set_operation_executing(CumulocitySupportedOperations::C8yLogFileRequest); - vec![Message::new(&smartrest_topic, smartrest_operation_status)] + vec![MqttMessage::new( + &smartrest_topic, + smartrest_operation_status, + )] } CommandStatus::Successful => { // Send a request to the Downloader to download the file asynchronously from FTS @@ -155,8 +159,9 @@ impl CumulocityConverter { CommandStatus::Failed { reason } => { let smartrest_operation_status = fail_operation(CumulocitySupportedOperations::C8yLogFileRequest, reason); - let c8y_notification = Message::new(&smartrest_topic, smartrest_operation_status); - let clean_operation = Message::new(&message.topic, "") + let c8y_notification = + MqttMessage::new(&smartrest_topic, smartrest_operation_status); + let clean_operation = MqttMessage::new(&message.topic, "") .with_retain() .with_qos(QoS::AtLeastOnce); vec![c8y_notification, clean_operation] @@ -176,7 +181,7 @@ impl CumulocityConverter { cmd_id: CmdId, download_result: DownloadResult, fts_download: FtsDownloadOperationData, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { let topic_id = fts_download.entity_topic_id; let target = self.entity_store.try_get(&topic_id)?; let smartrest_topic = self.smartrest_publish_topic_for_entity(&topic_id)?; @@ -192,8 +197,8 @@ impl CumulocityConverter { ), ); - let c8y_notification = Message::new(&smartrest_topic, smartrest_error); - let clean_operation = Message::new(&fts_download.message.topic, "") + let c8y_notification = MqttMessage::new(&smartrest_topic, smartrest_error); + let clean_operation = MqttMessage::new(&fts_download.message.topic, "") .with_retain() .with_qos(QoS::AtLeastOnce); return Ok(vec![c8y_notification, clean_operation]); @@ -248,8 +253,8 @@ impl CumulocityConverter { pub fn convert_log_metadata( &mut self, topic_id: &EntityTopicId, - message: &Message, - ) -> Result, ConversionError> { + message: &MqttMessage, + ) -> Result, ConversionError> { if !self.config.capabilities.log_upload { warn!("Received log_upload metadata, however, log_upload feature is disabled"); return Ok(vec![]); diff --git a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs index a4f6bb25cf3..6bf9b2ed7d9 100644 --- a/crates/extensions/c8y_mapper_ext/src/operations/mod.rs +++ b/crates/extensions/c8y_mapper_ext/src/operations/mod.rs @@ -18,7 +18,6 @@ use crate::error::ConversionError; use tedge_api::messages::ConfigMetadata; use tedge_api::mqtt_topics::EntityTopicId; use tedge_api::Jsonify; -use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttMessage; use tracing::error; @@ -57,9 +56,9 @@ impl CumulocityConverter { fn convert_config_metadata( &mut self, topic_id: &EntityTopicId, - message: &Message, + message: &MqttMessage, c8y_op_name: &str, - ) -> Result, ConversionError> { + ) -> Result, ConversionError> { let metadata = ConfigMetadata::from_json(message.payload_str()?)?; let mut messages = match self.register_operation(topic_id, c8y_op_name) { diff --git a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs index e5ca78b41c0..1a035943ac9 100644 --- a/crates/extensions/c8y_mapper_ext/src/service_monitor.rs +++ b/crates/extensions/c8y_mapper_ext/src/service_monitor.rs @@ -4,7 +4,7 @@ use serde::Serialize; use tedge_api::entity_store::EntityMetadata; use tedge_api::entity_store::EntityType; use tedge_config::TopicPrefix; -use tedge_mqtt_ext::Message; +use tedge_mqtt_ext::MqttMessage; use tracing::error; #[derive(Deserialize, Serialize, Debug, Default)] @@ -20,9 +20,9 @@ fn default_status() -> String { pub fn convert_health_status_message( entity: &EntityMetadata, ancestors_external_ids: &[String], - message: &Message, + message: &MqttMessage, prefix: &TopicPrefix, -) -> Vec { +) -> Vec { // TODO: introduce type to remove entity type guards if entity.r#type != EntityType::Service { return vec![]; @@ -140,8 +140,8 @@ mod tests { let mqtt_schema = MqttSchema::new(); let (entity_topic_id, _) = mqtt_schema.entity_channel_of(&topic).unwrap(); - let health_message = Message::new(&topic, health_payload.as_bytes().to_owned()); - let expected_message = Message::new( + let health_message = MqttMessage::new(&topic, health_payload.as_bytes().to_owned()); + let expected_message = MqttMessage::new( &Topic::new_unchecked(c8y_monitor_topic), c8y_monitor_payload.as_bytes(), ); diff --git a/crates/extensions/tedge_health_ext/src/actor.rs b/crates/extensions/tedge_health_ext/src/actor.rs index c50135f1c7d..75cf580e87f 100644 --- a/crates/extensions/tedge_health_ext/src/actor.rs +++ b/crates/extensions/tedge_health_ext/src/actor.rs @@ -5,19 +5,18 @@ use tedge_actors::RuntimeError; use tedge_actors::Sender; use tedge_actors::SimpleMessageBox; use tedge_api::health::ServiceHealthTopic; -use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttMessage; pub struct HealthMonitorActor { // TODO(marcel): move this - service_registration_message: Option, + service_registration_message: Option, health_topic: ServiceHealthTopic, messages: SimpleMessageBox, } impl HealthMonitorActor { pub fn new( - service_registration_message: Option, + service_registration_message: Option, health_topic: ServiceHealthTopic, messages: SimpleMessageBox, ) -> Self { diff --git a/crates/extensions/tedge_health_ext/src/lib.rs b/crates/extensions/tedge_health_ext/src/lib.rs index c134a678b99..63b0fe93368 100644 --- a/crates/extensions/tedge_health_ext/src/lib.rs +++ b/crates/extensions/tedge_health_ext/src/lib.rs @@ -21,13 +21,12 @@ use tedge_api::mqtt_topics::MqttSchema; use tedge_api::mqtt_topics::OperationType; use tedge_api::mqtt_topics::Service; use tedge_config::TEdgeConfigReaderService; -use tedge_mqtt_ext::Message; use tedge_mqtt_ext::MqttConfig; use tedge_mqtt_ext::MqttMessage; use tedge_mqtt_ext::TopicFilter; pub struct HealthMonitorBuilder { - registration_message: Option, + registration_message: Option, health_topic: ServiceHealthTopic, box_builder: SimpleMessageBoxBuilder, } diff --git a/crates/extensions/tedge_mqtt_bridge/src/lib.rs b/crates/extensions/tedge_mqtt_bridge/src/lib.rs index abdb028b9da..f1640bee751 100644 --- a/crates/extensions/tedge_mqtt_bridge/src/lib.rs +++ b/crates/extensions/tedge_mqtt_bridge/src/lib.rs @@ -29,11 +29,10 @@ use tracing::error; use tracing::log::info; pub type MqttConfig = mqtt_channel::Config; -pub type MqttMessage = Message; pub use mqtt_channel::DebugPayload; -pub use mqtt_channel::Message; pub use mqtt_channel::MqttError; +pub use mqtt_channel::MqttMessage; pub use mqtt_channel::QoS; pub use mqtt_channel::Topic; pub use mqtt_channel::TopicFilter; diff --git a/crates/extensions/tedge_mqtt_ext/src/lib.rs b/crates/extensions/tedge_mqtt_ext/src/lib.rs index f0720d3976d..3f45453def4 100644 --- a/crates/extensions/tedge_mqtt_ext/src/lib.rs +++ b/crates/extensions/tedge_mqtt_ext/src/lib.rs @@ -23,10 +23,9 @@ use tedge_actors::RuntimeRequestSink; use tedge_actors::Sender; pub type MqttConfig = mqtt_channel::Config; -pub type MqttMessage = mqtt_channel::Message; pub use mqtt_channel::DebugPayload; -pub use mqtt_channel::Message; pub use mqtt_channel::MqttError; +pub use mqtt_channel::MqttMessage; pub use mqtt_channel::QoS; pub use mqtt_channel::Topic; pub use mqtt_channel::TopicFilter; diff --git a/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs b/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs index 6d5a3d9fd9b..705a3ab1ad6 100644 --- a/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs +++ b/crates/extensions/tedge_mqtt_ext/src/test_helpers.rs @@ -1,6 +1,5 @@ -use crate::MqttMessage; use assert_json_diff::assert_json_include; -use mqtt_channel::Message; +use mqtt_channel::MqttMessage; use mqtt_channel::TopicFilter; use std::fmt::Debug; use tedge_actors::MessageReceiver; @@ -36,7 +35,7 @@ pub async fn assert_received_includes_json( } } -pub fn assert_message_contains_str(message: &Message, expected: (&str, &str)) { +pub fn assert_message_contains_str(message: &MqttMessage, expected: (&str, &str)) { let expected_topic = expected.0; let expected_payload = expected.1; assert!( @@ -51,7 +50,7 @@ pub fn assert_message_contains_str(message: &Message, expected: (&str, &str)) { ) } -pub fn assert_message_includes_json(message: &Message, expected: (S, serde_json::Value)) +pub fn assert_message_includes_json(message: &MqttMessage, expected: (S, serde_json::Value)) where S: AsRef, { @@ -92,7 +91,7 @@ impl From for MessagePayloadMatcher { pub fn assert_messages_matching<'a, M, I>(messages: M, expected: I) where - M: IntoIterator, + M: IntoIterator, I: IntoIterator, { let mut messages_iter = messages.into_iter();