diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index 88bc2d319..8f46b1347 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +* Validate filters while creating subscription requests. * Make v4::Connect::write return correct value ### Security diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index 863b73191..15cd5f5ad 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -3,7 +3,7 @@ use std::time::Duration; use crate::mqttbytes::{v4::*, QoS}; -use crate::{valid_topic, ConnectionError, Event, EventLoop, MqttOptions, Request}; +use crate::{valid_filter, valid_topic, ConnectionError, Event, EventLoop, MqttOptions, Request}; use bytes::Bytes; use flume::{SendError, Sender, TrySendError}; @@ -150,16 +150,24 @@ impl AsyncClient { /// Sends a MQTT Subscribe to the `EventLoop` pub async fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { - let subscribe = Subscribe::new(topic.into(), qos); + let topic = topic.into(); + let subscribe = Subscribe::new(&topic, qos); let request = Request::Subscribe(subscribe); + if !valid_filter(&topic) { + return Err(ClientError::Request(request)); + } self.request_tx.send_async(request).await?; Ok(()) } /// Attempts to send a MQTT Subscribe to the `EventLoop` pub fn try_subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { - let subscribe = Subscribe::new(topic.into(), qos); + let topic = topic.into(); + let subscribe = Subscribe::new(&topic, qos); let request = Request::Subscribe(subscribe); + if !valid_filter(&topic) { + return Err(ClientError::TryRequest(request)); + } self.request_tx.try_send(request)?; Ok(()) } @@ -169,8 +177,13 @@ impl AsyncClient { where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics); + let mut topics_iter = topics.into_iter(); + let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); + let subscribe = Subscribe::new_many(topics_iter); let request = Request::Subscribe(subscribe); + if !is_valid_filters { + return Err(ClientError::Request(request)); + } self.request_tx.send_async(request).await?; Ok(()) } @@ -180,8 +193,13 @@ impl AsyncClient { where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics); + let mut topics_iter = topics.into_iter(); + let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); + let subscribe = Subscribe::new_many(topics_iter); let request = Request::Subscribe(subscribe); + if !is_valid_filters { + return Err(ClientError::TryRequest(request)); + } self.request_tx.try_send(request)?; Ok(()) } @@ -323,8 +341,12 @@ impl Client { /// Sends a MQTT Subscribe to the `EventLoop` pub fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { - let subscribe = Subscribe::new(topic.into(), qos); + let topic = topic.into(); + let subscribe = Subscribe::new(&topic, qos); let request = Request::Subscribe(subscribe); + if !valid_filter(&topic) { + return Err(ClientError::Request(request)); + } self.client.request_tx.send(request)?; Ok(()) } @@ -340,8 +362,13 @@ impl Client { where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics); + let mut topics_iter = topics.into_iter(); + let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); + let subscribe = Subscribe::new_many(topics_iter); let request = Request::Subscribe(subscribe); + if !is_valid_filters { + return Err(ClientError::Request(request)); + } self.client.request_tx.send(request)?; Ok(()) } diff --git a/rumqttc/src/mqttbytes/v4/mod.rs b/rumqttc/src/mqttbytes/v4/mod.rs index ed438dd0f..3621945de 100644 --- a/rumqttc/src/mqttbytes/v4/mod.rs +++ b/rumqttc/src/mqttbytes/v4/mod.rs @@ -15,6 +15,7 @@ mod subscribe; mod unsuback; mod unsubscribe; +pub use codec::*; pub use connack::*; pub use connect::*; pub use disconnect::*; @@ -28,7 +29,6 @@ pub use suback::*; pub use subscribe::*; pub use unsuback::*; pub use unsubscribe::*; -pub use codec::*; /// Encapsulates all MQTT packet types #[derive(Debug, Clone, PartialEq, Eq)] @@ -116,7 +116,7 @@ impl Packet { return Err(Error::OutgoingPacketTooLarge { pkt_size: self.size(), max: max_size, - }) + }); } match self { diff --git a/rumqttc/src/proxy.rs b/rumqttc/src/proxy.rs index e7f84cd37..94c7aabd3 100644 --- a/rumqttc/src/proxy.rs +++ b/rumqttc/src/proxy.rs @@ -49,7 +49,8 @@ impl Proxy { ) -> Result, ProxyError> { let proxy_addr = format!("{}:{}", self.addr, self.port); - let tcp: Box = Box::new(socket_connect(proxy_addr, network_options).await?); + let tcp: Box = + Box::new(socket_connect(proxy_addr, network_options).await?); let mut tcp = match self.ty { ProxyType::Http => tcp, #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))] diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index 910da504f..f8629b8c5 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -6,7 +6,7 @@ use super::mqttbytes::v5::{ Filter, PubAck, PubRec, Publish, PublishProperties, Subscribe, SubscribeProperties, Unsubscribe, UnsubscribeProperties, }; -use super::mqttbytes::QoS; +use super::mqttbytes::{valid_filter, QoS}; use super::{ConnectionError, Event, EventLoop, MqttOptions, Request}; use crate::valid_topic; @@ -256,8 +256,12 @@ impl AsyncClient { properties: Option, ) -> Result<(), ClientError> { let filter = Filter::new(topic, qos); + let is_filter_valid = valid_filter(&filter.path); let subscribe = Subscribe::new(filter, properties); let request: Request = Request::Subscribe(subscribe); + if !is_filter_valid { + return Err(ClientError::Request(request)); + } self.request_tx.send_async(request).await?; Ok(()) } @@ -283,8 +287,12 @@ impl AsyncClient { properties: Option, ) -> Result<(), ClientError> { let filter = Filter::new(topic, qos); + let is_filter_valid = valid_filter(&filter.path); let subscribe = Subscribe::new(filter, properties); let request = Request::Subscribe(subscribe); + if !is_filter_valid { + return Err(ClientError::TryRequest(request)); + } self.request_tx.try_send(request)?; Ok(()) } @@ -311,8 +319,14 @@ impl AsyncClient { where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics, properties); + let mut topics_iter = topics.into_iter(); + let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); + let subscribe = Subscribe::new_many(topics_iter, properties); let request = Request::Subscribe(subscribe); + if !is_valid_filters { + return Err(ClientError::Request(request)); + } + self.request_tx.send_async(request).await?; Ok(()) } @@ -344,8 +358,13 @@ impl AsyncClient { where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics, properties); + let mut topics_iter = topics.into_iter(); + let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); + let subscribe = Subscribe::new_many(topics_iter, properties); let request = Request::Subscribe(subscribe); + if !is_valid_filters { + return Err(ClientError::TryRequest(request)); + } self.request_tx.try_send(request)?; Ok(()) } @@ -589,8 +608,12 @@ impl Client { properties: Option, ) -> Result<(), ClientError> { let filter = Filter::new(topic, qos); + let is_filter_valid = valid_filter(&filter.path); let subscribe = Subscribe::new(filter, properties); let request = Request::Subscribe(subscribe); + if !is_filter_valid { + return Err(ClientError::Request(request)); + } self.client.request_tx.send(request)?; Ok(()) } @@ -632,8 +655,13 @@ impl Client { where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics, properties); + let mut topics_iter = topics.into_iter(); + let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path)); + let subscribe = Subscribe::new_many(topics_iter, properties); let request = Request::Subscribe(subscribe); + if !is_valid_filters { + return Err(ClientError::Request(request)); + } self.client.request_tx.send(request)?; Ok(()) }