Skip to content

Commit

Permalink
fix(rumqttc): validate filters for subscribe (#828)
Browse files Browse the repository at this point in the history
  • Loading branch information
pranavgoel29 authored Mar 25, 2024
1 parent 7940002 commit 5e2c1de
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 14 deletions.
1 change: 1 addition & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

* Validate filters while creating subscription requests.
* Make v4::Connect::write return correct value

### Security
Expand Down
41 changes: 34 additions & 7 deletions rumqttc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::time::Duration;

use crate::mqttbytes::{v4::*, QoS};
use crate::{valid_topic, ConnectionError, Event, EventLoop, MqttOptions, Request};
use crate::{valid_filter, valid_topic, ConnectionError, Event, EventLoop, MqttOptions, Request};

use bytes::Bytes;
use flume::{SendError, Sender, TrySendError};
Expand Down Expand Up @@ -150,16 +150,24 @@ impl AsyncClient {

/// Sends a MQTT Subscribe to the `EventLoop`
pub async fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let subscribe = Subscribe::new(topic.into(), qos);
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
return Err(ClientError::Request(request));
}
self.request_tx.send_async(request).await?;
Ok(())
}

/// Attempts to send a MQTT Subscribe to the `EventLoop`
pub fn try_subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let subscribe = Subscribe::new(topic.into(), qos);
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
return Err(ClientError::TryRequest(request));
}
self.request_tx.try_send(request)?;
Ok(())
}
Expand All @@ -169,8 +177,13 @@ impl AsyncClient {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let subscribe = Subscribe::new_many(topics);
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
}
self.request_tx.send_async(request).await?;
Ok(())
}
Expand All @@ -180,8 +193,13 @@ impl AsyncClient {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let subscribe = Subscribe::new_many(topics);
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::TryRequest(request));
}
self.request_tx.try_send(request)?;
Ok(())
}
Expand Down Expand Up @@ -323,8 +341,12 @@ impl Client {

/// Sends a MQTT Subscribe to the `EventLoop`
pub fn subscribe<S: Into<String>>(&self, topic: S, qos: QoS) -> Result<(), ClientError> {
let subscribe = Subscribe::new(topic.into(), qos);
let topic = topic.into();
let subscribe = Subscribe::new(&topic, qos);
let request = Request::Subscribe(subscribe);
if !valid_filter(&topic) {
return Err(ClientError::Request(request));
}
self.client.request_tx.send(request)?;
Ok(())
}
Expand All @@ -340,8 +362,13 @@ impl Client {
where
T: IntoIterator<Item = SubscribeFilter>,
{
let subscribe = Subscribe::new_many(topics);
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
}
self.client.request_tx.send(request)?;
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions rumqttc/src/mqttbytes/v4/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod subscribe;
mod unsuback;
mod unsubscribe;

pub use codec::*;
pub use connack::*;
pub use connect::*;
pub use disconnect::*;
Expand All @@ -28,7 +29,6 @@ pub use suback::*;
pub use subscribe::*;
pub use unsuback::*;
pub use unsubscribe::*;
pub use codec::*;

/// Encapsulates all MQTT packet types
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -116,7 +116,7 @@ impl Packet {
return Err(Error::OutgoingPacketTooLarge {
pkt_size: self.size(),
max: max_size,
})
});
}

match self {
Expand Down
3 changes: 2 additions & 1 deletion rumqttc/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ impl Proxy {
) -> Result<Box<dyn AsyncReadWrite>, ProxyError> {
let proxy_addr = format!("{}:{}", self.addr, self.port);

let tcp: Box<dyn AsyncReadWrite> = Box::new(socket_connect(proxy_addr, network_options).await?);
let tcp: Box<dyn AsyncReadWrite> =
Box::new(socket_connect(proxy_addr, network_options).await?);
let mut tcp = match self.ty {
ProxyType::Http => tcp,
#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
Expand Down
36 changes: 32 additions & 4 deletions rumqttc/src/v5/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::mqttbytes::v5::{
Filter, PubAck, PubRec, Publish, PublishProperties, Subscribe, SubscribeProperties,
Unsubscribe, UnsubscribeProperties,
};
use super::mqttbytes::QoS;
use super::mqttbytes::{valid_filter, QoS};
use super::{ConnectionError, Event, EventLoop, MqttOptions, Request};
use crate::valid_topic;

Expand Down Expand Up @@ -256,8 +256,12 @@ impl AsyncClient {
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError> {
let filter = Filter::new(topic, qos);
let is_filter_valid = valid_filter(&filter.path);
let subscribe = Subscribe::new(filter, properties);
let request: Request = Request::Subscribe(subscribe);
if !is_filter_valid {
return Err(ClientError::Request(request));
}
self.request_tx.send_async(request).await?;
Ok(())
}
Expand All @@ -283,8 +287,12 @@ impl AsyncClient {
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError> {
let filter = Filter::new(topic, qos);
let is_filter_valid = valid_filter(&filter.path);
let subscribe = Subscribe::new(filter, properties);
let request = Request::Subscribe(subscribe);
if !is_filter_valid {
return Err(ClientError::TryRequest(request));
}
self.request_tx.try_send(request)?;
Ok(())
}
Expand All @@ -311,8 +319,14 @@ impl AsyncClient {
where
T: IntoIterator<Item = Filter>,
{
let subscribe = Subscribe::new_many(topics, properties);
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter, properties);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
}

self.request_tx.send_async(request).await?;
Ok(())
}
Expand Down Expand Up @@ -344,8 +358,13 @@ impl AsyncClient {
where
T: IntoIterator<Item = Filter>,
{
let subscribe = Subscribe::new_many(topics, properties);
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter, properties);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::TryRequest(request));
}
self.request_tx.try_send(request)?;
Ok(())
}
Expand Down Expand Up @@ -589,8 +608,12 @@ impl Client {
properties: Option<SubscribeProperties>,
) -> Result<(), ClientError> {
let filter = Filter::new(topic, qos);
let is_filter_valid = valid_filter(&filter.path);
let subscribe = Subscribe::new(filter, properties);
let request = Request::Subscribe(subscribe);
if !is_filter_valid {
return Err(ClientError::Request(request));
}
self.client.request_tx.send(request)?;
Ok(())
}
Expand Down Expand Up @@ -632,8 +655,13 @@ impl Client {
where
T: IntoIterator<Item = Filter>,
{
let subscribe = Subscribe::new_many(topics, properties);
let mut topics_iter = topics.into_iter();
let is_valid_filters = topics_iter.all(|filter| valid_filter(&filter.path));
let subscribe = Subscribe::new_many(topics_iter, properties);
let request = Request::Subscribe(subscribe);
if !is_valid_filters {
return Err(ClientError::Request(request));
}
self.client.request_tx.send(request)?;
Ok(())
}
Expand Down

0 comments on commit 5e2c1de

Please sign in to comment.