From e29b9ad214b6cba3ddbeea03c38181d6268db993 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 22 Feb 2024 15:59:58 +0000 Subject: [PATCH 01/16] feat: oneshot channel returns `Pkid` --- rumqttc/src/client.rs | 157 +++++++++++++++++------- rumqttc/src/lib.rs | 3 + rumqttc/src/mqttbytes/v4/publish.rs | 42 ++++++- rumqttc/src/mqttbytes/v4/subscribe.rs | 43 ++++++- rumqttc/src/mqttbytes/v4/unsubscribe.rs | 38 +++++- 5 files changed, 228 insertions(+), 55 deletions(-) diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index 863b73191..cf74ede5b 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -3,7 +3,7 @@ use std::time::Duration; use crate::mqttbytes::{v4::*, QoS}; -use crate::{valid_topic, ConnectionError, Event, EventLoop, MqttOptions, Request}; +use crate::{valid_topic, ConnectionError, Event, EventLoop, MqttOptions, PkidPromise, Request}; use bytes::Bytes; use flume::{SendError, Sender, TrySendError}; @@ -72,7 +72,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: V, - ) -> Result<(), ClientError> + ) -> Result where S: Into, V: Into>, @@ -80,12 +80,16 @@ impl AsyncClient { let topic = topic.into(); let mut publish = Publish::new(&topic, qos, payload); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + publish.place_pkid_tx(pkid_tx); + let publish = Request::Publish(publish); if !valid_topic(&topic) { return Err(ClientError::Request(publish)); } self.request_tx.send_async(publish).await?; - Ok(()) + Ok(pkid_rx) } /// Attempts to send a MQTT Publish to the `EventLoop`. @@ -95,7 +99,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: V, - ) -> Result<(), ClientError> + ) -> Result where S: Into, V: Into>, @@ -103,12 +107,16 @@ impl AsyncClient { let topic = topic.into(); let mut publish = Publish::new(&topic, qos, payload); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + publish.place_pkid_tx(pkid_tx); + let publish = Request::Publish(publish); if !valid_topic(&topic) { return Err(ClientError::TryRequest(publish)); } self.request_tx.try_send(publish)?; - Ok(()) + Ok(pkid_rx) } /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. @@ -137,69 +145,105 @@ impl AsyncClient { qos: QoS, retain: bool, payload: Bytes, - ) -> Result<(), ClientError> + ) -> Result where S: Into, { let mut publish = Publish::from_bytes(topic, qos, payload); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + publish.place_pkid_tx(pkid_tx); + let publish = Request::Publish(publish); self.request_tx.send_async(publish).await?; - Ok(()) + Ok(pkid_rx) } /// Sends a MQTT Subscribe to the `EventLoop` - pub async fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { - let subscribe = Subscribe::new(topic.into(), qos); + pub async fn subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { + let mut subscribe = Subscribe::new(topic.into(), qos); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.request_tx.send_async(request).await?; - Ok(()) + Ok(pkid_rx) } /// Attempts to send a MQTT Subscribe to the `EventLoop` - pub fn try_subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { - let subscribe = Subscribe::new(topic.into(), qos); + pub fn try_subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { + let mut subscribe = Subscribe::new(topic.into(), qos); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.request_tx.try_send(request)?; - Ok(()) + Ok(pkid_rx) } /// Sends a MQTT Subscribe for multiple topics to the `EventLoop` - pub async fn subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub async fn subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics); + let mut subscribe = Subscribe::new_many(topics); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.request_tx.send_async(request).await?; - Ok(()) + Ok(pkid_rx) } /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop` - pub fn try_subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub fn try_subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics); + let mut subscribe = Subscribe::new_many(topics); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.request_tx.try_send(request)?; - Ok(()) + Ok(pkid_rx) } /// Sends a MQTT Unsubscribe to the `EventLoop` - pub async fn unsubscribe>(&self, topic: S) -> Result<(), ClientError> { - let unsubscribe = Unsubscribe::new(topic.into()); + pub async fn unsubscribe>(&self, topic: S) -> Result { + let mut unsubscribe = Unsubscribe::new(topic.into()); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + unsubscribe.place_pkid_tx(pkid_tx); + let request = Request::Unsubscribe(unsubscribe); self.request_tx.send_async(request).await?; - Ok(()) + Ok(pkid_rx) } /// Attempts to send a MQTT Unsubscribe to the `EventLoop` - pub fn try_unsubscribe>(&self, topic: S) -> Result<(), ClientError> { - let unsubscribe = Unsubscribe::new(topic.into()); + pub fn try_unsubscribe>(&self, topic: S) -> Result { + let mut unsubscribe = Unsubscribe::new(topic.into()); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + unsubscribe.place_pkid_tx(pkid_tx); + let request = Request::Unsubscribe(unsubscribe); self.request_tx.try_send(request)?; - Ok(()) + Ok(pkid_rx) } /// Sends a MQTT disconnect to the `EventLoop` @@ -274,7 +318,7 @@ impl Client { qos: QoS, retain: bool, payload: V, - ) -> Result<(), ClientError> + ) -> Result where S: Into, V: Into>, @@ -282,12 +326,16 @@ impl Client { let topic = topic.into(); let mut publish = Publish::new(&topic, qos, payload); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + publish.place_pkid_tx(pkid_tx); + let publish = Request::Publish(publish); if !valid_topic(&topic) { return Err(ClientError::Request(publish)); } self.client.request_tx.send(publish)?; - Ok(()) + Ok(pkid_rx) } pub fn try_publish( @@ -296,13 +344,12 @@ impl Client { qos: QoS, retain: bool, payload: V, - ) -> Result<(), ClientError> + ) -> Result where S: Into, V: Into>, { - self.client.try_publish(topic, qos, retain, payload)?; - Ok(()) + self.client.try_publish(topic, qos, retain, payload) } /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. @@ -322,31 +369,46 @@ impl Client { } /// Sends a MQTT Subscribe to the `EventLoop` - pub fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { - let subscribe = Subscribe::new(topic.into(), qos); + pub fn subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { + let mut subscribe = Subscribe::new(topic.into(), qos); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.client.request_tx.send(request)?; - Ok(()) + Ok(pkid_rx) } /// Sends a MQTT Subscribe to the `EventLoop` - pub fn try_subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { - self.client.try_subscribe(topic, qos)?; - Ok(()) + pub fn try_subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { + self.client.try_subscribe(topic, qos) } /// Sends a MQTT Subscribe for multiple topics to the `EventLoop` - pub fn subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub fn subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics); + let mut subscribe = Subscribe::new_many(topics); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.client.request_tx.send(request)?; - Ok(()) + Ok(pkid_rx) } - pub fn try_subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub fn try_subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { @@ -354,17 +416,20 @@ impl Client { } /// Sends a MQTT Unsubscribe to the `EventLoop` - pub fn unsubscribe>(&self, topic: S) -> Result<(), ClientError> { - let unsubscribe = Unsubscribe::new(topic.into()); + pub fn unsubscribe>(&self, topic: S) -> Result { + let mut unsubscribe = Unsubscribe::new(topic.into()); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + unsubscribe.place_pkid_tx(pkid_tx); + let request = Request::Unsubscribe(unsubscribe); self.client.request_tx.send(request)?; - Ok(()) + Ok(pkid_rx) } /// Sends a MQTT Unsubscribe to the `EventLoop` - pub fn try_unsubscribe>(&self, topic: S) -> Result<(), ClientError> { - self.client.try_unsubscribe(topic)?; - Ok(()) + pub fn try_unsubscribe>(&self, topic: S) -> Result { + self.client.try_unsubscribe(topic) } /// Sends a MQTT disconnect to the `EventLoop` diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index 43dbb3bed..e1e809ebb 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -155,6 +155,9 @@ pub use proxy::{Proxy, ProxyAuth, ProxyType}; pub type Incoming = Packet; +pub type Pkid = u16; +pub type PkidPromise = tokio::sync::oneshot::Receiver; + /// Current outgoing activity on the eventloop #[derive(Debug, Clone, PartialEq, Eq)] pub enum Outgoing { diff --git a/rumqttc/src/mqttbytes/v4/publish.rs b/rumqttc/src/mqttbytes/v4/publish.rs index 5d8978c46..8457f81aa 100644 --- a/rumqttc/src/mqttbytes/v4/publish.rs +++ b/rumqttc/src/mqttbytes/v4/publish.rs @@ -1,8 +1,10 @@ -use super::*; use bytes::{Buf, Bytes}; +use tokio::sync::oneshot::Sender; + +use super::*; +use crate::Pkid; /// Publish packet -#[derive(Clone, PartialEq, Eq)] pub struct Publish { pub dup: bool, pub qos: QoS, @@ -10,8 +12,37 @@ pub struct Publish { pub topic: String, pub pkid: u16, pub payload: Bytes, + pub pkid_tx: Option>, +} + +// TODO: figure out if this is even required +impl Clone for Publish { + fn clone(&self) -> Self { + Self { + dup: self.dup, + qos: self.qos, + retain: self.retain, + topic: self.topic.clone(), + payload: self.payload.clone(), + pkid: self.pkid, + pkid_tx: None, + } + } } +impl PartialEq for Publish { + fn eq(&self, other: &Self) -> bool { + self.dup == other.dup + && self.qos == other.qos + && self.retain == other.retain + && self.topic == other.topic + && self.payload == other.payload + && self.pkid == other.pkid + } +} + +impl Eq for Publish {} + impl Publish { pub fn new, P: Into>>(topic: S, qos: QoS, payload: P) -> Publish { Publish { @@ -21,6 +52,7 @@ impl Publish { pkid: 0, topic: topic.into(), payload: Bytes::from(payload.into()), + pkid_tx: None, } } @@ -32,6 +64,7 @@ impl Publish { pkid: 0, topic: topic.into(), payload, + pkid_tx: None, } } @@ -77,6 +110,7 @@ impl Publish { pkid, topic, payload: bytes, + pkid_tx: None, }; Ok(publish) @@ -107,6 +141,10 @@ impl Publish { // TODO: Returned length is wrong in other packets. Fix it Ok(1 + count + len) } + + pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { + self.pkid_tx = Some(pkid_tx) + } } impl fmt::Debug for Publish { diff --git a/rumqttc/src/mqttbytes/v4/subscribe.rs b/rumqttc/src/mqttbytes/v4/subscribe.rs index 42ddb57b1..b5eec9243 100644 --- a/rumqttc/src/mqttbytes/v4/subscribe.rs +++ b/rumqttc/src/mqttbytes/v4/subscribe.rs @@ -1,13 +1,35 @@ -use super::*; use bytes::{Buf, Bytes}; +use tokio::sync::oneshot::Sender; + +use super::*; +use crate::Pkid; /// Subscription packet -#[derive(Clone, PartialEq, Eq)] pub struct Subscribe { pub pkid: u16, pub filters: Vec, + pub pkid_tx: Option>, } +// TODO: figure out if this is even required +impl Clone for Subscribe { + fn clone(&self) -> Self { + Self { + pkid: self.pkid, + filters: self.filters.clone(), + pkid_tx: None, + } + } +} + +impl PartialEq for Subscribe { + fn eq(&self, other: &Self) -> bool { + self.pkid == other.pkid && self.filters == other.filters + } +} + +impl Eq for Subscribe {} + impl Subscribe { pub fn new>(path: S, qos: QoS) -> Subscribe { let filter = SubscribeFilter { @@ -18,6 +40,7 @@ impl Subscribe { Subscribe { pkid: 0, filters: vec![filter], + pkid_tx: None, } } @@ -27,7 +50,11 @@ impl Subscribe { { let filters: Vec = topics.into_iter().collect(); - Subscribe { pkid: 0, filters } + Subscribe { + pkid: 0, + filters, + pkid_tx: None, + } } pub fn add(&mut self, path: String, qos: QoS) -> &mut Self { @@ -71,7 +98,11 @@ impl Subscribe { match filters.len() { 0 => Err(Error::EmptySubscription), - _ => Ok(Subscribe { pkid, filters }), + _ => Ok(Subscribe { + pkid, + filters, + pkid_tx: None, + }), } } @@ -93,6 +124,10 @@ impl Subscribe { Ok(1 + remaining_len_bytes + remaining_len) } + + pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { + self.pkid_tx = Some(pkid_tx) + } } /// Subscription filter diff --git a/rumqttc/src/mqttbytes/v4/unsubscribe.rs b/rumqttc/src/mqttbytes/v4/unsubscribe.rs index da34fbcc6..e4b1ed18a 100644 --- a/rumqttc/src/mqttbytes/v4/unsubscribe.rs +++ b/rumqttc/src/mqttbytes/v4/unsubscribe.rs @@ -1,18 +1,42 @@ -use super::*; use bytes::{Buf, Bytes}; +use tokio::sync::oneshot::Sender; + +use super::*; +use crate::Pkid; /// Unsubscribe packet -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug)] pub struct Unsubscribe { pub pkid: u16, pub topics: Vec, + pub pkid_tx: Option>, +} + +// TODO: figure out if this is even required +impl Clone for Unsubscribe { + fn clone(&self) -> Self { + Self { + pkid: self.pkid, + topics: self.topics.clone(), + pkid_tx: None, + } + } } +impl PartialEq for Unsubscribe { + fn eq(&self, other: &Self) -> bool { + self.pkid == other.pkid && self.topics == other.topics + } +} + +impl Eq for Unsubscribe {} + impl Unsubscribe { pub fn new>(topic: S) -> Unsubscribe { Unsubscribe { pkid: 0, topics: vec![topic.into()], + pkid_tx: None, } } @@ -42,7 +66,11 @@ impl Unsubscribe { topics.push(topic_filter); } - let unsubscribe = Unsubscribe { pkid, topics }; + let unsubscribe = Unsubscribe { + pkid, + topics, + pkid_tx: None, + }; Ok(unsubscribe) } @@ -58,4 +86,8 @@ impl Unsubscribe { } Ok(1 + remaining_len_bytes + remaining_len) } + + pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { + self.pkid_tx = Some(pkid_tx) + } } From 44a709045a0dfa4fabae4b970525f8ce91255f1e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 22 Feb 2024 16:10:44 +0000 Subject: [PATCH 02/16] fulfill promise on handling packets --- rumqttc/src/state.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index da33bd2f2..eb468744f 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -329,12 +329,19 @@ impl MqttState { /// Adds next packet identifier to QoS 1 and 2 publish packets and returns /// it buy wrapping publish in packet fn outgoing_publish(&mut self, mut publish: Publish) -> Result<(), StateError> { + // NOTE: pkid promise need not be fulfilled for QoS 0, + // user should know this but still handled in Client. if publish.qos != QoS::AtMostOnce { if publish.pkid == 0 { publish.pkid = self.next_pkid(); } let pkid = publish.pkid; + // Fulfill the pkid promise + if let Some(pkid_tx) = publish.pkid_tx.take() { + _ = pkid_tx.send(pkid); + } + if self .outgoing_pub .get(publish.pkid as usize) @@ -434,6 +441,10 @@ impl MqttState { let pkid = self.next_pkid(); subscription.pkid = pkid; + // Fulfill the pkid promise + if let Some(pkid_tx) = subscription.pkid_tx.take() { + _ = pkid_tx.send(pkid); + } debug!( "Subscribe. Topics = {:?}, Pkid = {:?}", @@ -450,6 +461,11 @@ impl MqttState { let pkid = self.next_pkid(); unsub.pkid = pkid; + // Fulfill the pkid promise + if let Some(pkid_tx) = unsub.pkid_tx.take() { + _ = pkid_tx.send(pkid); + } + debug!( "Unsubscribe. Topics = {:?}, Pkid = {:?}", unsub.topics, unsub.pkid From 941fa5456bd3ea95ccd9f550d7a7fdd06077f63a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 22 Feb 2024 16:24:03 +0000 Subject: [PATCH 03/16] handle QoS 0 --- rumqttc/src/client.rs | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index cf74ede5b..34b6d0ccf 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -82,7 +82,12 @@ impl AsyncClient { publish.retain = retain; let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - publish.place_pkid_tx(pkid_tx); + // Fulfill instantly for QoS 0 + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } let publish = Request::Publish(publish); if !valid_topic(&topic) { @@ -109,7 +114,11 @@ impl AsyncClient { publish.retain = retain; let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - publish.place_pkid_tx(pkid_tx); + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } let publish = Request::Publish(publish); if !valid_topic(&topic) { @@ -153,7 +162,11 @@ impl AsyncClient { publish.retain = retain; let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - publish.place_pkid_tx(pkid_tx); + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } let publish = Request::Publish(publish); self.request_tx.send_async(publish).await?; @@ -328,7 +341,11 @@ impl Client { publish.retain = retain; let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - publish.place_pkid_tx(pkid_tx); + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } let publish = Request::Publish(publish); if !valid_topic(&topic) { From 8fc2d4a363bcbe1dacd6fb84a0895349d14fa94f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 22 Feb 2024 16:46:18 +0000 Subject: [PATCH 04/16] doc: example usecase --- rumqttc/examples/pkid_promise.rs | 61 ++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 rumqttc/examples/pkid_promise.rs diff --git a/rumqttc/examples/pkid_promise.rs b/rumqttc/examples/pkid_promise.rs new file mode 100644 index 000000000..de46e9b43 --- /dev/null +++ b/rumqttc/examples/pkid_promise.rs @@ -0,0 +1,61 @@ +use tokio::{ + task::{self, JoinSet}, + time, +}; + +use rumqttc::{AsyncClient, MqttOptions, QoS}; +use std::error::Error; +use std::time::Duration; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + // color_backtrace::install(); + + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + task::spawn(async move { + requests(client).await; + }); + + loop { + let event = eventloop.poll().await; + match &event { + Ok(v) => { + println!("Event = {v:?}"); + } + Err(e) => { + println!("Error = {e:?}"); + return Ok(()); + } + } + } +} + +async fn requests(client: AsyncClient) { + let mut joins = JoinSet::new(); + joins.spawn( + client + .subscribe("hello/world", QoS::AtMostOnce) + .await + .unwrap(), + ); + + for i in 1..=10 { + joins.spawn( + client + .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i]) + .await + .unwrap(), + ); + + time::sleep(Duration::from_secs(1)).await; + } + + // TODO: maybe rewrite to showcase in-between resolutions? + while let Some(Ok(Ok(pkid))) = joins.join_next().await { + println!("Pkid: {:?}", pkid); + } +} From 0b3406ff5187358430ab7137ec46d6d525161951 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 22 Feb 2024 16:52:40 +0000 Subject: [PATCH 05/16] get tests to compile, partially --- rumqttc/src/mqttbytes/v4/publish.rs | 4 ++++ rumqttc/src/mqttbytes/v4/subscribe.rs | 2 ++ rumqttc/src/state.rs | 4 ++++ 3 files changed, 10 insertions(+) diff --git a/rumqttc/src/mqttbytes/v4/publish.rs b/rumqttc/src/mqttbytes/v4/publish.rs index 8457f81aa..fa8e30564 100644 --- a/rumqttc/src/mqttbytes/v4/publish.rs +++ b/rumqttc/src/mqttbytes/v4/publish.rs @@ -204,6 +204,7 @@ mod test { topic: "a/b".to_owned(), pkid: 10, payload: Bytes::from(&payload[..]), + pkid_tx: None, } ); } @@ -240,6 +241,7 @@ mod test { topic: "a/b".to_owned(), pkid: 0, payload: Bytes::from(&[0x01, 0x02][..]), + pkid_tx: None, } ); } @@ -253,6 +255,7 @@ mod test { topic: "a/b".to_owned(), pkid: 10, payload: Bytes::from(vec![0xF1, 0xF2, 0xF3, 0xF4]), + pkid_tx: None, }; let mut buf = BytesMut::new(); @@ -287,6 +290,7 @@ mod test { topic: "a/b".to_owned(), pkid: 0, payload: Bytes::from(vec![0xE1, 0xE2, 0xE3, 0xE4]), + pkid_tx: None, }; let mut buf = BytesMut::new(); diff --git a/rumqttc/src/mqttbytes/v4/subscribe.rs b/rumqttc/src/mqttbytes/v4/subscribe.rs index b5eec9243..407100d3e 100644 --- a/rumqttc/src/mqttbytes/v4/subscribe.rs +++ b/rumqttc/src/mqttbytes/v4/subscribe.rs @@ -229,6 +229,7 @@ mod test { SubscribeFilter::new("#".to_owned(), QoS::AtLeastOnce), SubscribeFilter::new("a/b/c".to_owned(), QoS::ExactlyOnce) ], + pkid_tx: None, } ); } @@ -242,6 +243,7 @@ mod test { SubscribeFilter::new("#".to_owned(), QoS::AtLeastOnce), SubscribeFilter::new("a/b/c".to_owned(), QoS::ExactlyOnce), ], + pkid_tx: None, }; let mut buf = BytesMut::new(); diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index eb468744f..aeb188504 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -887,6 +887,7 @@ mod test { topic: "test".to_string(), pkid: 1, payload: "".into(), + pkid_tx: None, }), Some(Publish { dup: false, @@ -895,6 +896,7 @@ mod test { topic: "test".to_string(), pkid: 2, payload: "".into(), + pkid_tx: None, }), Some(Publish { dup: false, @@ -903,6 +905,7 @@ mod test { topic: "test".to_string(), pkid: 3, payload: "".into(), + pkid_tx: None, }), None, None, @@ -913,6 +916,7 @@ mod test { topic: "test".to_string(), pkid: 6, payload: "".into(), + pkid_tx: None, }), ] } From c7ac03ddba4ed372a2985d6cb7cb4f13956614c4 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 23 Feb 2024 01:32:19 +0000 Subject: [PATCH 06/16] ci: fix missing dep feature --- rumqttc/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rumqttc/Cargo.toml b/rumqttc/Cargo.toml index 8c5fa31ff..b465cf4f6 100644 --- a/rumqttc/Cargo.toml +++ b/rumqttc/Cargo.toml @@ -24,7 +24,7 @@ proxy = ["dep:async-http-proxy"] [dependencies] futures-util = { version = "0.3", default_features = false, features = ["std"] } -tokio = { version = "1.36", features = ["rt", "macros", "io-util", "net", "time"] } +tokio = { version = "1.36", features = ["rt", "macros", "io-util", "net", "time", "sync"] } bytes = "1.5" log = "0.4" flume = { version = "0.11", default-features = false, features = ["async"] } From 408b01cc2cf05ceb1552b814beb902e51af56416 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 23 Feb 2024 01:47:32 +0000 Subject: [PATCH 07/16] make example fun --- Cargo.lock | 2 ++ rumqttc/Cargo.toml | 1 + rumqttc/examples/pkid_promise.rs | 34 ++++++++++++++++++++------------ 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e7e3bc8a3..ad7143001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1953,6 +1953,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", + "tokio-util", "url", "ws_stream_tungstenite", ] @@ -2555,6 +2556,7 @@ dependencies = [ "futures-core", "futures-sink", "pin-project-lite", + "slab", "tokio", "tracing", ] diff --git a/rumqttc/Cargo.toml b/rumqttc/Cargo.toml index b465cf4f6..104d37f88 100644 --- a/rumqttc/Cargo.toml +++ b/rumqttc/Cargo.toml @@ -55,6 +55,7 @@ matches = "0.1" pretty_assertions = "1" pretty_env_logger = "0.5" serde = { version = "1", features = ["derive"] } +tokio-util = { version = "0.7", features = ["time"] } [[example]] name = "tls" diff --git a/rumqttc/examples/pkid_promise.rs b/rumqttc/examples/pkid_promise.rs index de46e9b43..bcb3fbd5b 100644 --- a/rumqttc/examples/pkid_promise.rs +++ b/rumqttc/examples/pkid_promise.rs @@ -1,7 +1,9 @@ use tokio::{ task::{self, JoinSet}, - time, + select }; +use tokio_util::time::DelayQueue; +use futures_util::stream::StreamExt; use rumqttc::{AsyncClient, MqttOptions, QoS}; use std::error::Error; @@ -12,7 +14,7 @@ async fn main() -> Result<(), Box> { pretty_env_logger::init(); // color_backtrace::install(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883); mqttoptions.set_keep_alive(Duration::from_secs(5)); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); @@ -43,19 +45,25 @@ async fn requests(client: AsyncClient) { .unwrap(), ); + let mut queue = DelayQueue::new(); for i in 1..=10 { - joins.spawn( - client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i]) - .await - .unwrap(), - ); - - time::sleep(Duration::from_secs(1)).await; + queue.insert(i as usize, Duration::from_secs(i)); } - // TODO: maybe rewrite to showcase in-between resolutions? - while let Some(Ok(Ok(pkid))) = joins.join_next().await { - println!("Pkid: {:?}", pkid); + loop { + select!{ + Some(i) = queue.next() => { + joins.spawn( + client + .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i.into_inner()]) + .await + .unwrap(), + ); + } + Some(Ok(Ok(pkid))) = joins.join_next() => { + println!("Pkid: {:?}", pkid); + } + else => break, + } } } From 76af76df2a6526c13680c73b1359f4e5a873d872 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 23 Feb 2024 02:21:07 +0000 Subject: [PATCH 08/16] implement feature for v5 --- rumqttc/src/v5/client.rs | 229 ++++++++++++++------- rumqttc/src/v5/mqttbytes/v5/publish.rs | 43 +++- rumqttc/src/v5/mqttbytes/v5/subscribe.rs | 35 +++- rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs | 33 ++- rumqttc/src/v5/state.rs | 15 ++ 5 files changed, 279 insertions(+), 76 deletions(-) diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index 910da504f..ed7005794 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -8,7 +8,7 @@ use super::mqttbytes::v5::{ }; use super::mqttbytes::QoS; use super::{ConnectionError, Event, EventLoop, MqttOptions, Request}; -use crate::valid_topic; +use crate::{valid_topic, PkidPromise}; use bytes::Bytes; use flume::{SendError, Sender, TrySendError}; @@ -78,7 +78,7 @@ impl AsyncClient { retain: bool, payload: P, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -86,12 +86,21 @@ impl AsyncClient { let topic = topic.into(); let mut publish = Publish::new(&topic, qos, payload, properties); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + // Fulfill instantly for QoS 0 + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } + let publish = Request::Publish(publish); if !valid_topic(&topic) { return Err(ClientError::Request(publish)); } self.request_tx.send_async(publish).await?; - Ok(()) + Ok(pkid_rx) } pub async fn publish_with_properties( @@ -101,7 +110,7 @@ impl AsyncClient { retain: bool, payload: P, properties: PublishProperties, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -116,7 +125,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: P, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -132,7 +141,7 @@ impl AsyncClient { retain: bool, payload: P, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -140,12 +149,21 @@ impl AsyncClient { let topic = topic.into(); let mut publish = Publish::new(&topic, qos, payload, properties); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + // Fulfill instantly for QoS 0 + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } + let publish = Request::Publish(publish); if !valid_topic(&topic) { return Err(ClientError::TryRequest(publish)); } self.request_tx.try_send(publish)?; - Ok(()) + Ok(pkid_rx) } pub fn try_publish_with_properties( @@ -155,7 +173,7 @@ impl AsyncClient { retain: bool, payload: P, properties: PublishProperties, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -169,7 +187,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: P, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -204,19 +222,28 @@ impl AsyncClient { retain: bool, payload: Bytes, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where S: Into, { let topic = topic.into(); let mut publish = Publish::new(&topic, qos, payload, properties); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + // Fulfill instantly for QoS 0 + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } + let publish = Request::Publish(publish); if !valid_topic(&topic) { return Err(ClientError::TryRequest(publish)); } self.request_tx.send_async(publish).await?; - Ok(()) + Ok(pkid_rx) } pub async fn publish_bytes_with_properties( @@ -226,7 +253,7 @@ impl AsyncClient { retain: bool, payload: Bytes, properties: PublishProperties, - ) -> Result<(), ClientError> + ) -> Result where S: Into, { @@ -240,7 +267,7 @@ impl AsyncClient { qos: QoS, retain: bool, payload: Bytes, - ) -> Result<(), ClientError> + ) -> Result where S: Into, { @@ -254,12 +281,17 @@ impl AsyncClient { topic: S, qos: QoS, properties: Option, - ) -> Result<(), ClientError> { + ) -> Result { let filter = Filter::new(topic, qos); - let subscribe = Subscribe::new(filter, properties); + let mut subscribe = Subscribe::new(filter, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + + subscribe.place_pkid_tx(pkid_tx); + let request: Request = Request::Subscribe(subscribe); self.request_tx.send_async(request).await?; - Ok(()) + Ok(pkid_rx) } pub async fn subscribe_with_properties>( @@ -267,11 +299,15 @@ impl AsyncClient { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_subscribe(topic, qos, Some(properties)).await } - pub async fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { + pub async fn subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { self.handle_subscribe(topic, qos, None).await } @@ -281,12 +317,16 @@ impl AsyncClient { topic: S, qos: QoS, properties: Option, - ) -> Result<(), ClientError> { + ) -> Result { let filter = Filter::new(topic, qos); - let subscribe = Subscribe::new(filter, properties); + let mut subscribe = Subscribe::new(filter, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.request_tx.try_send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn try_subscribe_with_properties>( @@ -294,11 +334,15 @@ impl AsyncClient { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_try_subscribe(topic, qos, Some(properties)) } - pub fn try_subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { + pub fn try_subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { self.handle_try_subscribe(topic, qos, None) } @@ -307,28 +351,32 @@ impl AsyncClient { &self, topics: T, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics, properties); + let mut subscribe = Subscribe::new_many(topics, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.request_tx.send_async(request).await?; - Ok(()) + Ok(pkid_rx) } pub async fn subscribe_many_with_properties( &self, topics: T, properties: SubscribeProperties, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { self.handle_subscribe_many(topics, Some(properties)).await } - pub async fn subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub async fn subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { @@ -340,28 +388,32 @@ impl AsyncClient { &self, topics: T, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics, properties); + let mut subscribe = Subscribe::new_many(topics, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.request_tx.try_send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn try_subscribe_many_with_properties( &self, topics: T, properties: SubscribeProperties, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { self.handle_try_subscribe_many(topics, Some(properties)) } - pub fn try_subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub fn try_subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { @@ -373,22 +425,26 @@ impl AsyncClient { &self, topic: S, properties: Option, - ) -> Result<(), ClientError> { - let unsubscribe = Unsubscribe::new(topic, properties); + ) -> Result { + let mut unsubscribe = Unsubscribe::new(topic, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + unsubscribe.place_pkid_tx(pkid_tx); + let request = Request::Unsubscribe(unsubscribe); self.request_tx.send_async(request).await?; - Ok(()) + Ok(pkid_rx) } pub async fn unsubscribe_with_properties>( &self, topic: S, properties: UnsubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_unsubscribe(topic, Some(properties)).await } - pub async fn unsubscribe>(&self, topic: S) -> Result<(), ClientError> { + pub async fn unsubscribe>(&self, topic: S) -> Result { self.handle_unsubscribe(topic, None).await } @@ -397,22 +453,26 @@ impl AsyncClient { &self, topic: S, properties: Option, - ) -> Result<(), ClientError> { - let unsubscribe = Unsubscribe::new(topic, properties); + ) -> Result { + let mut unsubscribe = Unsubscribe::new(topic, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + unsubscribe.place_pkid_tx(pkid_tx); + let request = Request::Unsubscribe(unsubscribe); self.request_tx.try_send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn try_unsubscribe_with_properties>( &self, topic: S, properties: UnsubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_try_unsubscribe(topic, Some(properties)) } - pub fn try_unsubscribe>(&self, topic: S) -> Result<(), ClientError> { + pub fn try_unsubscribe>(&self, topic: S) -> Result { self.handle_try_unsubscribe(topic, None) } @@ -490,7 +550,7 @@ impl Client { retain: bool, payload: P, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -498,12 +558,21 @@ impl Client { let topic = topic.into(); let mut publish = Publish::new(&topic, qos, payload, properties); publish.retain = retain; + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + // Fulfill instantly for QoS 0 + if qos == QoS::AtMostOnce { + _ = pkid_tx.send(0); + } else { + publish.place_pkid_tx(pkid_tx); + } + let publish = Request::Publish(publish); if !valid_topic(&topic) { return Err(ClientError::Request(publish)); } self.client.request_tx.send(publish)?; - Ok(()) + Ok(pkid_rx) } pub fn publish_with_properties( @@ -513,7 +582,7 @@ impl Client { retain: bool, payload: P, properties: PublishProperties, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -527,7 +596,7 @@ impl Client { qos: QoS, retain: bool, payload: P, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -542,7 +611,7 @@ impl Client { retain: bool, payload: P, properties: PublishProperties, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -557,7 +626,7 @@ impl Client { qos: QoS, retain: bool, payload: P, - ) -> Result<(), ClientError> + ) -> Result where S: Into, P: Into, @@ -587,12 +656,16 @@ impl Client { topic: S, qos: QoS, properties: Option, - ) -> Result<(), ClientError> { + ) -> Result { let filter = Filter::new(topic, qos); - let subscribe = Subscribe::new(filter, properties); + let mut subscribe = Subscribe::new(filter, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.client.request_tx.send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn subscribe_with_properties>( @@ -600,11 +673,15 @@ impl Client { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_subscribe(topic, qos, Some(properties)) } - pub fn subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { + pub fn subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { self.handle_subscribe(topic, qos, None) } @@ -614,12 +691,16 @@ impl Client { topic: S, qos: QoS, properties: SubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.client .try_subscribe_with_properties(topic, qos, properties) } - pub fn try_subscribe>(&self, topic: S, qos: QoS) -> Result<(), ClientError> { + pub fn try_subscribe>( + &self, + topic: S, + qos: QoS, + ) -> Result { self.client.try_subscribe(topic, qos) } @@ -628,28 +709,32 @@ impl Client { &self, topics: T, properties: Option, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { - let subscribe = Subscribe::new_many(topics, properties); + let mut subscribe = Subscribe::new_many(topics, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + subscribe.place_pkid_tx(pkid_tx); + let request = Request::Subscribe(subscribe); self.client.request_tx.send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn subscribe_many_with_properties( &self, topics: T, properties: SubscribeProperties, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { self.handle_subscribe_many(topics, Some(properties)) } - pub fn subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub fn subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { @@ -660,7 +745,7 @@ impl Client { &self, topics: T, properties: SubscribeProperties, - ) -> Result<(), ClientError> + ) -> Result where T: IntoIterator, { @@ -668,7 +753,7 @@ impl Client { .try_subscribe_many_with_properties(topics, properties) } - pub fn try_subscribe_many(&self, topics: T) -> Result<(), ClientError> + pub fn try_subscribe_many(&self, topics: T) -> Result where T: IntoIterator, { @@ -680,22 +765,26 @@ impl Client { &self, topic: S, properties: Option, - ) -> Result<(), ClientError> { - let unsubscribe = Unsubscribe::new(topic, properties); + ) -> Result { + let mut unsubscribe = Unsubscribe::new(topic, properties); + + let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); + unsubscribe.place_pkid_tx(pkid_tx); + let request = Request::Unsubscribe(unsubscribe); self.client.request_tx.send(request)?; - Ok(()) + Ok(pkid_rx) } pub fn unsubscribe_with_properties>( &self, topic: S, properties: UnsubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.handle_unsubscribe(topic, Some(properties)) } - pub fn unsubscribe>(&self, topic: S) -> Result<(), ClientError> { + pub fn unsubscribe>(&self, topic: S) -> Result { self.handle_unsubscribe(topic, None) } @@ -704,12 +793,12 @@ impl Client { &self, topic: S, properties: UnsubscribeProperties, - ) -> Result<(), ClientError> { + ) -> Result { self.client .try_unsubscribe_with_properties(topic, properties) } - pub fn try_unsubscribe>(&self, topic: S) -> Result<(), ClientError> { + pub fn try_unsubscribe>(&self, topic: S) -> Result { self.client.try_unsubscribe(topic) } diff --git a/rumqttc/src/v5/mqttbytes/v5/publish.rs b/rumqttc/src/v5/mqttbytes/v5/publish.rs index 74fbee225..c787e6004 100644 --- a/rumqttc/src/v5/mqttbytes/v5/publish.rs +++ b/rumqttc/src/v5/mqttbytes/v5/publish.rs @@ -1,8 +1,11 @@ -use super::*; use bytes::{Buf, Bytes}; +use tokio::sync::oneshot::Sender; + +use super::*; +use crate::Pkid; /// Publish packet -#[derive(Clone, Debug, PartialEq, Eq, Default)] +#[derive(Debug, Default)] pub struct Publish { pub dup: bool, pub qos: QoS, @@ -11,8 +14,39 @@ pub struct Publish { pub pkid: u16, pub payload: Bytes, pub properties: Option, + pub pkid_tx: Option>, +} + +// TODO: figure out if this is even required +impl Clone for Publish { + fn clone(&self) -> Self { + Self { + dup: self.dup, + qos: self.qos, + retain: self.retain, + topic: self.topic.clone(), + payload: self.payload.clone(), + pkid: self.pkid, + properties: self.properties.clone(), + pkid_tx: None, + } + } +} + +impl PartialEq for Publish { + fn eq(&self, other: &Self) -> bool { + self.dup == other.dup + && self.qos == other.qos + && self.retain == other.retain + && self.topic == other.topic + && self.payload == other.payload + && self.pkid == other.pkid + && self.properties == other.properties + } } +impl Eq for Publish {} + impl Publish { pub fn new, P: Into>( topic: T, @@ -85,6 +119,7 @@ impl Publish { topic, payload: bytes, properties, + pkid_tx: None, }; Ok(publish) @@ -120,6 +155,10 @@ impl Publish { Ok(1 + count + len) } + + pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { + self.pkid_tx = Some(pkid_tx) + } } #[derive(Debug, Clone, PartialEq, Eq, Default)] diff --git a/rumqttc/src/v5/mqttbytes/v5/subscribe.rs b/rumqttc/src/v5/mqttbytes/v5/subscribe.rs index 4167cd671..28796fe30 100644 --- a/rumqttc/src/v5/mqttbytes/v5/subscribe.rs +++ b/rumqttc/src/v5/mqttbytes/v5/subscribe.rs @@ -1,14 +1,40 @@ -use super::*; use bytes::{Buf, Bytes}; +use tokio::sync::oneshot::Sender; + +use super::*; +use crate::Pkid; /// Subscription packet -#[derive(Clone, Debug, PartialEq, Eq, Default)] +#[derive(Debug, Default)] pub struct Subscribe { pub pkid: u16, pub filters: Vec, pub properties: Option, + pub pkid_tx: Option>, } +// TODO: figure out if this is even required +impl Clone for Subscribe { + fn clone(&self) -> Self { + Self { + pkid: self.pkid, + filters: self.filters.clone(), + properties: self.properties.clone(), + pkid_tx: None, + } + } +} + +impl PartialEq for Subscribe { + fn eq(&self, other: &Self) -> bool { + self.pkid == other.pkid + && self.filters == other.filters + && self.properties == other.properties + } +} + +impl Eq for Subscribe {} + impl Subscribe { pub fn new(filter: Filter, properties: Option) -> Self { Self { @@ -67,6 +93,7 @@ impl Subscribe { pkid, filters, properties, + pkid_tx: None, }), } } @@ -95,6 +122,10 @@ impl Subscribe { Ok(1 + remaining_len_bytes + remaining_len) } + + pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { + self.pkid_tx = Some(pkid_tx) + } } /// Subscription filter diff --git a/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs b/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs index 2b671ce39..92d1d5610 100644 --- a/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs +++ b/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs @@ -1,14 +1,38 @@ -use super::*; use bytes::{Buf, Bytes}; +use tokio::sync::oneshot::Sender; + +use super::*; +use crate::Pkid; /// Unsubscribe packet -#[derive(Debug, Clone, PartialEq, Eq, Default)] +#[derive(Debug, Default)] pub struct Unsubscribe { pub pkid: u16, pub filters: Vec, pub properties: Option, + pub pkid_tx: Option>, +} + +// TODO: figure out if this is even required +impl Clone for Unsubscribe { + fn clone(&self) -> Self { + Self { + pkid: self.pkid, + filters: self.filters.clone(), + properties: self.properties.clone(), + pkid_tx: None, + } + } +} + +impl PartialEq for Unsubscribe { + fn eq(&self, other: &Self) -> bool { + self.pkid == other.pkid && self.filters == other.filters + } } +impl Eq for Unsubscribe {} + impl Unsubscribe { pub fn new>(filter: S, properties: Option) -> Self { Self { @@ -59,6 +83,7 @@ impl Unsubscribe { pkid, filters, properties, + pkid_tx: None, }; Ok(unsubscribe) } @@ -86,6 +111,10 @@ impl Unsubscribe { Ok(1 + remaining_len_bytes + remaining_len) } + + pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { + self.pkid_tx = Some(pkid_tx) + } } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 8473f1f4c..24c75a21b 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -485,12 +485,19 @@ impl MqttState { /// Adds next packet identifier to QoS 1 and 2 publish packets and returns /// it buy wrapping publish in packet fn outgoing_publish(&mut self, mut publish: Publish) -> Result<(), StateError> { + // NOTE: pkid promise need not be fulfilled for QoS 0, + // user should know this but still handled in Client. if publish.qos != QoS::AtMostOnce { if publish.pkid == 0 { publish.pkid = self.next_pkid(); } let pkid = publish.pkid; + // Fulfill the pkid promise + if let Some(pkid_tx) = publish.pkid_tx.take() { + _ = pkid_tx.send(pkid); + } + if self .outgoing_pub .get(publish.pkid as usize) @@ -604,6 +611,10 @@ impl MqttState { let pkid = self.next_pkid(); subscription.pkid = pkid; + // Fulfill the pkid promise + if let Some(pkid_tx) = subscription.pkid_tx.take() { + _ = pkid_tx.send(pkid); + } debug!( "Subscribe. Topics = {:?}, Pkid = {:?}", @@ -620,6 +631,10 @@ impl MqttState { fn outgoing_unsubscribe(&mut self, mut unsub: Unsubscribe) -> Result<(), StateError> { let pkid = self.next_pkid(); unsub.pkid = pkid; + // Fulfill the pkid promise + if let Some(pkid_tx) = unsub.pkid_tx.take() { + _ = pkid_tx.send(pkid); + } debug!( "Unsubscribe. Topics = {:?}, Pkid = {:?}", From 4999ae3c83a1abe9173d4cb50f53ccc631942ca0 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 23 Feb 2024 02:24:44 +0000 Subject: [PATCH 09/16] doc: v5 example --- rumqttc/examples/pkid_promise.rs | 6 +-- rumqttc/examples/pkid_promise_v5.rs | 69 +++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 3 deletions(-) create mode 100644 rumqttc/examples/pkid_promise_v5.rs diff --git a/rumqttc/examples/pkid_promise.rs b/rumqttc/examples/pkid_promise.rs index bcb3fbd5b..3042c03a1 100644 --- a/rumqttc/examples/pkid_promise.rs +++ b/rumqttc/examples/pkid_promise.rs @@ -1,9 +1,9 @@ +use futures_util::stream::StreamExt; use tokio::{ + select, task::{self, JoinSet}, - select }; use tokio_util::time::DelayQueue; -use futures_util::stream::StreamExt; use rumqttc::{AsyncClient, MqttOptions, QoS}; use std::error::Error; @@ -51,7 +51,7 @@ async fn requests(client: AsyncClient) { } loop { - select!{ + select! { Some(i) = queue.next() => { joins.spawn( client diff --git a/rumqttc/examples/pkid_promise_v5.rs b/rumqttc/examples/pkid_promise_v5.rs new file mode 100644 index 000000000..95e64a7d1 --- /dev/null +++ b/rumqttc/examples/pkid_promise_v5.rs @@ -0,0 +1,69 @@ +use futures_util::stream::StreamExt; +use tokio::{ + select, + task::{self, JoinSet}, +}; +use tokio_util::time::DelayQueue; + +use rumqttc::v5::{mqttbytes::QoS, AsyncClient, MqttOptions}; +use std::error::Error; +use std::time::Duration; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + // color_backtrace::install(); + + let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + task::spawn(async move { + requests(client).await; + }); + + loop { + let event = eventloop.poll().await; + match &event { + Ok(v) => { + println!("Event = {v:?}"); + } + Err(e) => { + println!("Error = {e:?}"); + return Ok(()); + } + } + } +} + +async fn requests(client: AsyncClient) { + let mut joins = JoinSet::new(); + joins.spawn( + client + .subscribe("hello/world", QoS::AtMostOnce) + .await + .unwrap(), + ); + + let mut queue = DelayQueue::new(); + for i in 1..=10 { + queue.insert(i as usize, Duration::from_secs(i)); + } + + loop { + select! { + Some(i) = queue.next() => { + joins.spawn( + client + .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i.into_inner()]) + .await + .unwrap(), + ); + } + Some(Ok(Ok(pkid))) = joins.join_next() => { + println!("Pkid: {:?}", pkid); + } + else => break, + } + } +} From 20eca903c860b926febcbc89c502b02102a21c0c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 23 Feb 2024 02:28:19 +0000 Subject: [PATCH 10/16] add entry into changelog --- rumqttc/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/rumqttc/CHANGELOG.md b/rumqttc/CHANGELOG.md index c6fd3a88f..a710d9b8d 100644 --- a/rumqttc/CHANGELOG.md +++ b/rumqttc/CHANGELOG.md @@ -29,6 +29,7 @@ To update your code simply remove `Key::ECC()` or `Key::RSA()` from the initiali `rusttls-pemfile` to `2.0.0`, `async-tungstenite` to `0.24.0`, `ws_stream_tungstenite` to `0.12.0` and `http` to `1.0.0`. This is a breaking change as types from some of these crates are part of the public API. +- `publish` / `subscribe` / `unsubscribe` methods on `AsyncClient` and `Client` now return a `PkidPromise` which resolves into the identifier value chosen by the `EventLoop` when handling the packet. ### Deprecated From 8a06b11e0bd40d29eec0a5accb2c8fa6e90c2c80 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 23 Feb 2024 22:59:40 +0530 Subject: [PATCH 11/16] Abstract true nature of `PkidPromise` --- rumqttc/examples/pkid_promise.rs | 7 ++++--- rumqttc/examples/pkid_promise_v5.rs | 5 +++-- rumqttc/src/client.rs | 26 +++++++++++++------------- rumqttc/src/lib.rs | 24 +++++++++++++++++++++++- rumqttc/src/v5/client.rs | 26 +++++++++++++------------- 5 files changed, 56 insertions(+), 32 deletions(-) diff --git a/rumqttc/examples/pkid_promise.rs b/rumqttc/examples/pkid_promise.rs index 3042c03a1..caac8d122 100644 --- a/rumqttc/examples/pkid_promise.rs +++ b/rumqttc/examples/pkid_promise.rs @@ -42,7 +42,8 @@ async fn requests(client: AsyncClient) { client .subscribe("hello/world", QoS::AtMostOnce) .await - .unwrap(), + .unwrap() + .wait_async(), ); let mut queue = DelayQueue::new(); @@ -57,10 +58,10 @@ async fn requests(client: AsyncClient) { client .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i.into_inner()]) .await - .unwrap(), + .unwrap().wait_async(), ); } - Some(Ok(Ok(pkid))) = joins.join_next() => { + Some(Ok(Some(pkid))) = joins.join_next() => { println!("Pkid: {:?}", pkid); } else => break, diff --git a/rumqttc/examples/pkid_promise_v5.rs b/rumqttc/examples/pkid_promise_v5.rs index 95e64a7d1..e4bd3d31c 100644 --- a/rumqttc/examples/pkid_promise_v5.rs +++ b/rumqttc/examples/pkid_promise_v5.rs @@ -42,7 +42,8 @@ async fn requests(client: AsyncClient) { client .subscribe("hello/world", QoS::AtMostOnce) .await - .unwrap(), + .unwrap() + .wait_async(), ); let mut queue = DelayQueue::new(); @@ -57,7 +58,7 @@ async fn requests(client: AsyncClient) { client .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i.into_inner()]) .await - .unwrap(), + .unwrap().wait_async(), ); } Some(Ok(Ok(pkid))) = joins.join_next() => { diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index 34b6d0ccf..b234fec66 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -94,7 +94,7 @@ impl AsyncClient { return Err(ClientError::Request(publish)); } self.request_tx.send_async(publish).await?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } /// Attempts to send a MQTT Publish to the `EventLoop`. @@ -125,7 +125,7 @@ impl AsyncClient { return Err(ClientError::TryRequest(publish)); } self.request_tx.try_send(publish)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } /// Sends a MQTT PubAck to the `EventLoop`. Only needed in if `manual_acks` flag is set. @@ -170,7 +170,7 @@ impl AsyncClient { let publish = Request::Publish(publish); self.request_tx.send_async(publish).await?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } /// Sends a MQTT Subscribe to the `EventLoop` @@ -186,7 +186,7 @@ impl AsyncClient { let request = Request::Subscribe(subscribe); self.request_tx.send_async(request).await?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } /// Attempts to send a MQTT Subscribe to the `EventLoop` @@ -202,7 +202,7 @@ impl AsyncClient { let request = Request::Subscribe(subscribe); self.request_tx.try_send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } /// Sends a MQTT Subscribe for multiple topics to the `EventLoop` @@ -217,7 +217,7 @@ impl AsyncClient { let request = Request::Subscribe(subscribe); self.request_tx.send_async(request).await?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } /// Attempts to send a MQTT Subscribe for multiple topics to the `EventLoop` @@ -232,7 +232,7 @@ impl AsyncClient { let request = Request::Subscribe(subscribe); self.request_tx.try_send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } /// Sends a MQTT Unsubscribe to the `EventLoop` @@ -244,7 +244,7 @@ impl AsyncClient { let request = Request::Unsubscribe(unsubscribe); self.request_tx.send_async(request).await?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } /// Attempts to send a MQTT Unsubscribe to the `EventLoop` @@ -256,7 +256,7 @@ impl AsyncClient { let request = Request::Unsubscribe(unsubscribe); self.request_tx.try_send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } /// Sends a MQTT disconnect to the `EventLoop` @@ -352,7 +352,7 @@ impl Client { return Err(ClientError::Request(publish)); } self.client.request_tx.send(publish)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub fn try_publish( @@ -398,7 +398,7 @@ impl Client { let request = Request::Subscribe(subscribe); self.client.request_tx.send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } /// Sends a MQTT Subscribe to the `EventLoop` @@ -422,7 +422,7 @@ impl Client { let request = Request::Subscribe(subscribe); self.client.request_tx.send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub fn try_subscribe_many(&self, topics: T) -> Result @@ -441,7 +441,7 @@ impl Client { let request = Request::Unsubscribe(unsubscribe); self.client.request_tx.send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } /// Sends a MQTT Unsubscribe to the `EventLoop` diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index e1e809ebb..babb4286c 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -145,6 +145,7 @@ use rustls_native_certs::load_native_certs; pub use state::{MqttState, StateError}; #[cfg(any(feature = "use-rustls", feature = "use-native-tls"))] pub use tls::Error as TlsError; +use tokio::sync::oneshot; #[cfg(feature = "use-rustls")] pub use tokio_rustls; #[cfg(feature = "use-rustls")] @@ -156,7 +157,28 @@ pub use proxy::{Proxy, ProxyAuth, ProxyType}; pub type Incoming = Packet; pub type Pkid = u16; -pub type PkidPromise = tokio::sync::oneshot::Receiver; + +/// A token through which the user can access the assigned packet id of an associated request, once it +/// is processed by the [`EventLoop`]. +pub struct PkidPromise { + inner: oneshot::Receiver, +} + +impl PkidPromise { + pub fn new(inner: oneshot::Receiver) -> Self { + Self { inner } + } + + /// Wait for the pkid to resolve by blocking the current thread + pub fn wait(self) -> Option { + self.inner.blocking_recv().ok() + } + + /// Await pkid resolution without blocking the current thread + pub async fn wait_async(self) -> Option { + self.inner.await.ok() + } +} /// Current outgoing activity on the eventloop #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index ed7005794..e92df5df8 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -100,7 +100,7 @@ impl AsyncClient { return Err(ClientError::Request(publish)); } self.request_tx.send_async(publish).await?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub async fn publish_with_properties( @@ -163,7 +163,7 @@ impl AsyncClient { return Err(ClientError::TryRequest(publish)); } self.request_tx.try_send(publish)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub fn try_publish_with_properties( @@ -243,7 +243,7 @@ impl AsyncClient { return Err(ClientError::TryRequest(publish)); } self.request_tx.send_async(publish).await?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub async fn publish_bytes_with_properties( @@ -291,7 +291,7 @@ impl AsyncClient { let request: Request = Request::Subscribe(subscribe); self.request_tx.send_async(request).await?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub async fn subscribe_with_properties>( @@ -326,7 +326,7 @@ impl AsyncClient { let request = Request::Subscribe(subscribe); self.request_tx.try_send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub fn try_subscribe_with_properties>( @@ -362,7 +362,7 @@ impl AsyncClient { let request = Request::Subscribe(subscribe); self.request_tx.send_async(request).await?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub async fn subscribe_many_with_properties( @@ -399,7 +399,7 @@ impl AsyncClient { let request = Request::Subscribe(subscribe); self.request_tx.try_send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub fn try_subscribe_many_with_properties( @@ -433,7 +433,7 @@ impl AsyncClient { let request = Request::Unsubscribe(unsubscribe); self.request_tx.send_async(request).await?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub async fn unsubscribe_with_properties>( @@ -461,7 +461,7 @@ impl AsyncClient { let request = Request::Unsubscribe(unsubscribe); self.request_tx.try_send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub fn try_unsubscribe_with_properties>( @@ -572,7 +572,7 @@ impl Client { return Err(ClientError::Request(publish)); } self.client.request_tx.send(publish)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub fn publish_with_properties( @@ -665,7 +665,7 @@ impl Client { let request = Request::Subscribe(subscribe); self.client.request_tx.send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub fn subscribe_with_properties>( @@ -720,7 +720,7 @@ impl Client { let request = Request::Subscribe(subscribe); self.client.request_tx.send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub fn subscribe_many_with_properties( @@ -773,7 +773,7 @@ impl Client { let request = Request::Unsubscribe(unsubscribe); self.client.request_tx.send(request)?; - Ok(pkid_rx) + Ok(PkidPromise::new(pkid_rx)) } pub fn unsubscribe_with_properties>( From ad34d6ef82283e16b6553602567288b96c5947fd Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 23 Feb 2024 23:05:28 +0530 Subject: [PATCH 12/16] fix example --- rumqttc/examples/pkid_promise_v5.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rumqttc/examples/pkid_promise_v5.rs b/rumqttc/examples/pkid_promise_v5.rs index e4bd3d31c..c8cbd6422 100644 --- a/rumqttc/examples/pkid_promise_v5.rs +++ b/rumqttc/examples/pkid_promise_v5.rs @@ -61,7 +61,7 @@ async fn requests(client: AsyncClient) { .unwrap().wait_async(), ); } - Some(Ok(Ok(pkid))) = joins.join_next() => { + Some(Ok(Some(pkid))) = joins.join_next() => { println!("Pkid: {:?}", pkid); } else => break, From f0905557413b499c14c94729b69566c7640b632e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 24 Feb 2024 17:44:06 +0530 Subject: [PATCH 13/16] move tx out of mqttbytes --- rumqttc/src/client.rs | 84 +++++++++++----------- rumqttc/src/lib.rs | 61 +++++++++++++--- rumqttc/src/mqttbytes/v4/publish.rs | 46 +----------- rumqttc/src/mqttbytes/v4/subscribe.rs | 45 ++---------- rumqttc/src/mqttbytes/v4/unsubscribe.rs | 38 +--------- rumqttc/src/state.rs | 73 ++++++++++--------- rumqttc/src/v5/client.rs | 82 ++++++++++----------- rumqttc/src/v5/mod.rs | 52 ++++++++++++-- rumqttc/src/v5/mqttbytes/v5/publish.rs | 43 +---------- rumqttc/src/v5/mqttbytes/v5/subscribe.rs | 35 +-------- rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs | 30 +------- rumqttc/src/v5/state.rs | 73 +++++++++++-------- 12 files changed, 276 insertions(+), 386 deletions(-) diff --git a/rumqttc/src/client.rs b/rumqttc/src/client.rs index b234fec66..4738ac084 100644 --- a/rumqttc/src/client.rs +++ b/rumqttc/src/client.rs @@ -83,13 +83,14 @@ impl AsyncClient { let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); // Fulfill instantly for QoS 0 - if qos == QoS::AtMostOnce { + let pkid_tx = if qos == QoS::AtMostOnce { _ = pkid_tx.send(0); + None } else { - publish.place_pkid_tx(pkid_tx); - } + Some(pkid_tx) + }; - let publish = Request::Publish(publish); + let publish = Request::Publish(pkid_tx, publish); if !valid_topic(&topic) { return Err(ClientError::Request(publish)); } @@ -114,13 +115,15 @@ impl AsyncClient { publish.retain = retain; let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - if qos == QoS::AtMostOnce { + // Fulfill instantly for QoS 0 + let pkid_tx = if qos == QoS::AtMostOnce { _ = pkid_tx.send(0); + None } else { - publish.place_pkid_tx(pkid_tx); - } + Some(pkid_tx) + }; - let publish = Request::Publish(publish); + let publish = Request::Publish(pkid_tx, publish); if !valid_topic(&topic) { return Err(ClientError::TryRequest(publish)); } @@ -162,13 +165,15 @@ impl AsyncClient { publish.retain = retain; let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - if qos == QoS::AtMostOnce { + // Fulfill instantly for QoS 0 + let pkid_tx = if qos == QoS::AtMostOnce { _ = pkid_tx.send(0); + None } else { - publish.place_pkid_tx(pkid_tx); - } + Some(pkid_tx) + }; - let publish = Request::Publish(publish); + let publish = Request::Publish(pkid_tx, publish); self.request_tx.send_async(publish).await?; Ok(PkidPromise::new(pkid_rx)) } @@ -179,12 +184,11 @@ impl AsyncClient { topic: S, qos: QoS, ) -> Result { - let mut subscribe = Subscribe::new(topic.into(), qos); + let subscribe = Subscribe::new(topic.into(), qos); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - let request = Request::Subscribe(subscribe); + let request = Request::Subscribe(Some(pkid_tx), subscribe); self.request_tx.send_async(request).await?; Ok(PkidPromise::new(pkid_rx)) } @@ -195,12 +199,11 @@ impl AsyncClient { topic: S, qos: QoS, ) -> Result { - let mut subscribe = Subscribe::new(topic.into(), qos); + let subscribe = Subscribe::new(topic.into(), qos); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - let request = Request::Subscribe(subscribe); + let request = Request::Subscribe(Some(pkid_tx), subscribe); self.request_tx.try_send(request)?; Ok(PkidPromise::new(pkid_rx)) } @@ -210,12 +213,11 @@ impl AsyncClient { where T: IntoIterator, { - let mut subscribe = Subscribe::new_many(topics); + let subscribe = Subscribe::new_many(topics); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - let request = Request::Subscribe(subscribe); + let request = Request::Subscribe(Some(pkid_tx), subscribe); self.request_tx.send_async(request).await?; Ok(PkidPromise::new(pkid_rx)) } @@ -225,36 +227,33 @@ impl AsyncClient { where T: IntoIterator, { - let mut subscribe = Subscribe::new_many(topics); + let subscribe = Subscribe::new_many(topics); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - let request = Request::Subscribe(subscribe); + let request = Request::Subscribe(Some(pkid_tx), subscribe); self.request_tx.try_send(request)?; Ok(PkidPromise::new(pkid_rx)) } /// Sends a MQTT Unsubscribe to the `EventLoop` pub async fn unsubscribe>(&self, topic: S) -> Result { - let mut unsubscribe = Unsubscribe::new(topic.into()); + let unsubscribe = Unsubscribe::new(topic.into()); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - unsubscribe.place_pkid_tx(pkid_tx); - let request = Request::Unsubscribe(unsubscribe); + let request = Request::Unsubscribe(Some(pkid_tx), unsubscribe); self.request_tx.send_async(request).await?; Ok(PkidPromise::new(pkid_rx)) } /// Attempts to send a MQTT Unsubscribe to the `EventLoop` pub fn try_unsubscribe>(&self, topic: S) -> Result { - let mut unsubscribe = Unsubscribe::new(topic.into()); + let unsubscribe = Unsubscribe::new(topic.into()); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - unsubscribe.place_pkid_tx(pkid_tx); - let request = Request::Unsubscribe(unsubscribe); + let request = Request::Unsubscribe(Some(pkid_tx), unsubscribe); self.request_tx.try_send(request)?; Ok(PkidPromise::new(pkid_rx)) } @@ -341,13 +340,15 @@ impl Client { publish.retain = retain; let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - if qos == QoS::AtMostOnce { + // Fulfill instantly for QoS 0 + let pkid_tx = if qos == QoS::AtMostOnce { _ = pkid_tx.send(0); + None } else { - publish.place_pkid_tx(pkid_tx); - } + Some(pkid_tx) + }; - let publish = Request::Publish(publish); + let publish = Request::Publish(pkid_tx, publish); if !valid_topic(&topic) { return Err(ClientError::Request(publish)); } @@ -391,12 +392,11 @@ impl Client { topic: S, qos: QoS, ) -> Result { - let mut subscribe = Subscribe::new(topic.into(), qos); + let subscribe = Subscribe::new(topic.into(), qos); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - let request = Request::Subscribe(subscribe); + let request = Request::Subscribe(Some(pkid_tx), subscribe); self.client.request_tx.send(request)?; Ok(PkidPromise::new(pkid_rx)) } @@ -415,12 +415,11 @@ impl Client { where T: IntoIterator, { - let mut subscribe = Subscribe::new_many(topics); + let subscribe = Subscribe::new_many(topics); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - let request = Request::Subscribe(subscribe); + let request = Request::Subscribe(Some(pkid_tx), subscribe); self.client.request_tx.send(request)?; Ok(PkidPromise::new(pkid_rx)) } @@ -434,12 +433,11 @@ impl Client { /// Sends a MQTT Unsubscribe to the `EventLoop` pub fn unsubscribe>(&self, topic: S) -> Result { - let mut unsubscribe = Unsubscribe::new(topic.into()); + let unsubscribe = Unsubscribe::new(topic.into()); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - unsubscribe.place_pkid_tx(pkid_tx); - let request = Request::Unsubscribe(unsubscribe); + let request = Request::Unsubscribe(Some(pkid_tx), unsubscribe); self.client.request_tx.send(request)?; Ok(PkidPromise::new(pkid_rx)) } diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index babb4286c..5d27b994a 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -209,35 +209,76 @@ pub enum Outgoing { /// Requests by the client to mqtt event loop. Request are /// handled one by one. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Debug)] pub enum Request { - Publish(Publish), + Publish(Option>, Publish), PubAck(PubAck), PubRec(PubRec), PubComp(PubComp), PubRel(PubRel), PingReq(PingReq), PingResp(PingResp), - Subscribe(Subscribe), + Subscribe(Option>, Subscribe), SubAck(SubAck), - Unsubscribe(Unsubscribe), + Unsubscribe(Option>, Unsubscribe), UnsubAck(UnsubAck), Disconnect(Disconnect), } +impl Clone for Request { + fn clone(&self) -> Self { + match self { + Self::Publish(_, p) => Self::Publish(None, p.clone()), + Self::PubAck(p) => Self::PubAck(p.clone()), + Self::PubRec(p) => Self::PubRec(p.clone()), + Self::PubRel(p) => Self::PubRel(p.clone()), + Self::PubComp(p) => Self::PubComp(p.clone()), + Self::Subscribe(_, p) => Self::Subscribe(None, p.clone()), + Self::SubAck(p) => Self::SubAck(p.clone()), + Self::PingReq(p) => Self::PingReq(p.clone()), + Self::PingResp(p) => Self::PingResp(p.clone()), + Self::Disconnect(p) => Self::Disconnect(p.clone()), + Self::Unsubscribe(_, p) => Self::Unsubscribe(None, p.clone()), + Self::UnsubAck(p) => Self::UnsubAck(p.clone()), + } + } +} + +impl PartialEq for Request { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Publish(_, p1), Self::Publish(_, p2)) => p1 == p2, + (Self::PubAck(p1), Self::PubAck(p2)) => p1 == p2, + (Self::PubRec(p1), Self::PubRec(p2)) => p1 == p2, + (Self::PubRel(p1), Self::PubRel(p2)) => p1 == p2, + (Self::PubComp(p1), Self::PubComp(p2)) => p1 == p2, + (Self::Subscribe(_, p1), Self::Subscribe(_, p2)) => p1 == p2, + (Self::SubAck(p1), Self::SubAck(p2)) => p1 == p2, + (Self::PingReq(p1), Self::PingReq(p2)) => p1 == p2, + (Self::PingResp(p1), Self::PingResp(p2)) => p1 == p2, + (Self::Unsubscribe(_, p1), Self::Unsubscribe(_, p2)) => p1 == p2, + (Self::UnsubAck(p1), Self::UnsubAck(p2)) => p1 == p2, + (Self::Disconnect(p1), Self::Disconnect(p2)) => p1 == p2, + _ => false, + } + } +} + +impl Eq for Request {} + impl Request { fn size(&self) -> usize { match &self { - Request::Publish(publish) => publish.size(), + Request::Publish(_, publish) => publish.size(), Request::PubAck(puback) => puback.size(), Request::PubRec(pubrec) => pubrec.size(), Request::PubComp(pubcomp) => pubcomp.size(), Request::PubRel(pubrel) => pubrel.size(), Request::PingReq(pingreq) => pingreq.size(), Request::PingResp(pingresp) => pingresp.size(), - Request::Subscribe(subscribe) => subscribe.size(), + Request::Subscribe(_, subscribe) => subscribe.size(), Request::SubAck(suback) => suback.size(), - Request::Unsubscribe(unsubscribe) => unsubscribe.size(), + Request::Unsubscribe(_, unsubscribe) => unsubscribe.size(), Request::UnsubAck(unsuback) => unsuback.size(), Request::Disconnect(disconn) => disconn.size(), } @@ -246,19 +287,19 @@ impl Request { impl From for Request { fn from(publish: Publish) -> Request { - Request::Publish(publish) + Request::Publish(None, publish) } } impl From for Request { fn from(subscribe: Subscribe) -> Request { - Request::Subscribe(subscribe) + Request::Subscribe(None, subscribe) } } impl From for Request { fn from(unsubscribe: Unsubscribe) -> Request { - Request::Unsubscribe(unsubscribe) + Request::Unsubscribe(None, unsubscribe) } } diff --git a/rumqttc/src/mqttbytes/v4/publish.rs b/rumqttc/src/mqttbytes/v4/publish.rs index fa8e30564..d1a3ceba5 100644 --- a/rumqttc/src/mqttbytes/v4/publish.rs +++ b/rumqttc/src/mqttbytes/v4/publish.rs @@ -1,10 +1,8 @@ -use bytes::{Buf, Bytes}; -use tokio::sync::oneshot::Sender; - use super::*; -use crate::Pkid; +use bytes::{Buf, Bytes}; /// Publish packet +#[derive(Clone, Eq, PartialEq)] pub struct Publish { pub dup: bool, pub qos: QoS, @@ -12,37 +10,8 @@ pub struct Publish { pub topic: String, pub pkid: u16, pub payload: Bytes, - pub pkid_tx: Option>, -} - -// TODO: figure out if this is even required -impl Clone for Publish { - fn clone(&self) -> Self { - Self { - dup: self.dup, - qos: self.qos, - retain: self.retain, - topic: self.topic.clone(), - payload: self.payload.clone(), - pkid: self.pkid, - pkid_tx: None, - } - } } -impl PartialEq for Publish { - fn eq(&self, other: &Self) -> bool { - self.dup == other.dup - && self.qos == other.qos - && self.retain == other.retain - && self.topic == other.topic - && self.payload == other.payload - && self.pkid == other.pkid - } -} - -impl Eq for Publish {} - impl Publish { pub fn new, P: Into>>(topic: S, qos: QoS, payload: P) -> Publish { Publish { @@ -52,7 +21,6 @@ impl Publish { pkid: 0, topic: topic.into(), payload: Bytes::from(payload.into()), - pkid_tx: None, } } @@ -64,7 +32,6 @@ impl Publish { pkid: 0, topic: topic.into(), payload, - pkid_tx: None, } } @@ -110,7 +77,6 @@ impl Publish { pkid, topic, payload: bytes, - pkid_tx: None, }; Ok(publish) @@ -141,10 +107,6 @@ impl Publish { // TODO: Returned length is wrong in other packets. Fix it Ok(1 + count + len) } - - pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { - self.pkid_tx = Some(pkid_tx) - } } impl fmt::Debug for Publish { @@ -204,7 +166,6 @@ mod test { topic: "a/b".to_owned(), pkid: 10, payload: Bytes::from(&payload[..]), - pkid_tx: None, } ); } @@ -241,7 +202,6 @@ mod test { topic: "a/b".to_owned(), pkid: 0, payload: Bytes::from(&[0x01, 0x02][..]), - pkid_tx: None, } ); } @@ -255,7 +215,6 @@ mod test { topic: "a/b".to_owned(), pkid: 10, payload: Bytes::from(vec![0xF1, 0xF2, 0xF3, 0xF4]), - pkid_tx: None, }; let mut buf = BytesMut::new(); @@ -290,7 +249,6 @@ mod test { topic: "a/b".to_owned(), pkid: 0, payload: Bytes::from(vec![0xE1, 0xE2, 0xE3, 0xE4]), - pkid_tx: None, }; let mut buf = BytesMut::new(); diff --git a/rumqttc/src/mqttbytes/v4/subscribe.rs b/rumqttc/src/mqttbytes/v4/subscribe.rs index 407100d3e..8bc6497a1 100644 --- a/rumqttc/src/mqttbytes/v4/subscribe.rs +++ b/rumqttc/src/mqttbytes/v4/subscribe.rs @@ -1,35 +1,13 @@ -use bytes::{Buf, Bytes}; -use tokio::sync::oneshot::Sender; - use super::*; -use crate::Pkid; +use bytes::{Buf, Bytes}; /// Subscription packet +#[derive(Clone, Eq, PartialEq)] pub struct Subscribe { pub pkid: u16, pub filters: Vec, - pub pkid_tx: Option>, -} - -// TODO: figure out if this is even required -impl Clone for Subscribe { - fn clone(&self) -> Self { - Self { - pkid: self.pkid, - filters: self.filters.clone(), - pkid_tx: None, - } - } -} - -impl PartialEq for Subscribe { - fn eq(&self, other: &Self) -> bool { - self.pkid == other.pkid && self.filters == other.filters - } } -impl Eq for Subscribe {} - impl Subscribe { pub fn new>(path: S, qos: QoS) -> Subscribe { let filter = SubscribeFilter { @@ -40,7 +18,6 @@ impl Subscribe { Subscribe { pkid: 0, filters: vec![filter], - pkid_tx: None, } } @@ -50,11 +27,7 @@ impl Subscribe { { let filters: Vec = topics.into_iter().collect(); - Subscribe { - pkid: 0, - filters, - pkid_tx: None, - } + Subscribe { pkid: 0, filters } } pub fn add(&mut self, path: String, qos: QoS) -> &mut Self { @@ -98,11 +71,7 @@ impl Subscribe { match filters.len() { 0 => Err(Error::EmptySubscription), - _ => Ok(Subscribe { - pkid, - filters, - pkid_tx: None, - }), + _ => Ok(Subscribe { pkid, filters }), } } @@ -124,10 +93,6 @@ impl Subscribe { Ok(1 + remaining_len_bytes + remaining_len) } - - pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { - self.pkid_tx = Some(pkid_tx) - } } /// Subscription filter @@ -229,7 +194,6 @@ mod test { SubscribeFilter::new("#".to_owned(), QoS::AtLeastOnce), SubscribeFilter::new("a/b/c".to_owned(), QoS::ExactlyOnce) ], - pkid_tx: None, } ); } @@ -243,7 +207,6 @@ mod test { SubscribeFilter::new("#".to_owned(), QoS::AtLeastOnce), SubscribeFilter::new("a/b/c".to_owned(), QoS::ExactlyOnce), ], - pkid_tx: None, }; let mut buf = BytesMut::new(); diff --git a/rumqttc/src/mqttbytes/v4/unsubscribe.rs b/rumqttc/src/mqttbytes/v4/unsubscribe.rs index e4b1ed18a..8263752e3 100644 --- a/rumqttc/src/mqttbytes/v4/unsubscribe.rs +++ b/rumqttc/src/mqttbytes/v4/unsubscribe.rs @@ -1,42 +1,18 @@ -use bytes::{Buf, Bytes}; -use tokio::sync::oneshot::Sender; - use super::*; -use crate::Pkid; +use bytes::{Buf, Bytes}; /// Unsubscribe packet -#[derive(Debug)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct Unsubscribe { pub pkid: u16, pub topics: Vec, - pub pkid_tx: Option>, -} - -// TODO: figure out if this is even required -impl Clone for Unsubscribe { - fn clone(&self) -> Self { - Self { - pkid: self.pkid, - topics: self.topics.clone(), - pkid_tx: None, - } - } } -impl PartialEq for Unsubscribe { - fn eq(&self, other: &Self) -> bool { - self.pkid == other.pkid && self.topics == other.topics - } -} - -impl Eq for Unsubscribe {} - impl Unsubscribe { pub fn new>(topic: S) -> Unsubscribe { Unsubscribe { pkid: 0, topics: vec![topic.into()], - pkid_tx: None, } } @@ -66,11 +42,7 @@ impl Unsubscribe { topics.push(topic_filter); } - let unsubscribe = Unsubscribe { - pkid, - topics, - pkid_tx: None, - }; + let unsubscribe = Unsubscribe { pkid, topics }; Ok(unsubscribe) } @@ -86,8 +58,4 @@ impl Unsubscribe { } Ok(1 + remaining_len_bytes + remaining_len) } - - pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { - self.pkid_tx = Some(pkid_tx) - } } diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index aeb188504..933461745 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -1,10 +1,11 @@ -use crate::{Event, Incoming, Outgoing, Request}; +use crate::{Event, Incoming, Outgoing, Pkid, Request}; use crate::mqttbytes::v4::*; use crate::mqttbytes::{self, *}; use bytes::BytesMut; use std::collections::VecDeque; use std::{io, time::Instant}; +use tokio::sync::oneshot; /// Errors during state handling #[derive(Debug, thiserror::Error)] @@ -114,7 +115,7 @@ impl MqttState { for publish in second_half.iter_mut().chain(first_half) { if let Some(publish) = publish.take() { - let request = Request::Publish(publish); + let request = Request::Publish(None, publish); pending.push(request); } } @@ -149,10 +150,10 @@ impl MqttState { // Enforce max outgoing packet size self.check_size(request.size())?; match request { - Request::Publish(publish) => self.outgoing_publish(publish)?, + Request::Publish(tx, publish) => self.outgoing_publish(publish, tx)?, Request::PubRel(pubrel) => self.outgoing_pubrel(pubrel)?, - Request::Subscribe(subscribe) => self.outgoing_subscribe(subscribe)?, - Request::Unsubscribe(unsubscribe) => self.outgoing_unsubscribe(unsubscribe)?, + Request::Subscribe(tx, subscribe) => self.outgoing_subscribe(subscribe, tx)?, + Request::Unsubscribe(tx, unsubscribe) => self.outgoing_unsubscribe(unsubscribe, tx)?, Request::PingReq(_) => self.outgoing_ping()?, Request::Disconnect(_) => self.outgoing_disconnect()?, Request::PubAck(puback) => self.outgoing_puback(puback)?, @@ -328,7 +329,11 @@ impl MqttState { /// Adds next packet identifier to QoS 1 and 2 publish packets and returns /// it buy wrapping publish in packet - fn outgoing_publish(&mut self, mut publish: Publish) -> Result<(), StateError> { + fn outgoing_publish( + &mut self, + mut publish: Publish, + pkid_tx: Option>, + ) -> Result<(), StateError> { // NOTE: pkid promise need not be fulfilled for QoS 0, // user should know this but still handled in Client. if publish.qos != QoS::AtMostOnce { @@ -338,7 +343,7 @@ impl MqttState { let pkid = publish.pkid; // Fulfill the pkid promise - if let Some(pkid_tx) = publish.pkid_tx.take() { + if let Some(pkid_tx) = pkid_tx { _ = pkid_tx.send(pkid); } @@ -434,7 +439,11 @@ impl MqttState { Ok(()) } - fn outgoing_subscribe(&mut self, mut subscription: Subscribe) -> Result<(), StateError> { + fn outgoing_subscribe( + &mut self, + mut subscription: Subscribe, + pkid_tx: Option>, + ) -> Result<(), StateError> { if subscription.filters.is_empty() { return Err(StateError::EmptySubscription); } @@ -442,7 +451,7 @@ impl MqttState { let pkid = self.next_pkid(); subscription.pkid = pkid; // Fulfill the pkid promise - if let Some(pkid_tx) = subscription.pkid_tx.take() { + if let Some(pkid_tx) = pkid_tx { _ = pkid_tx.send(pkid); } @@ -457,12 +466,16 @@ impl MqttState { Ok(()) } - fn outgoing_unsubscribe(&mut self, mut unsub: Unsubscribe) -> Result<(), StateError> { + fn outgoing_unsubscribe( + &mut self, + mut unsub: Unsubscribe, + pkid_tx: Option>, + ) -> Result<(), StateError> { let pkid = self.next_pkid(); unsub.pkid = pkid; // Fulfill the pkid promise - if let Some(pkid_tx) = unsub.pkid_tx.take() { + if let Some(pkid_tx) = pkid_tx { _ = pkid_tx.send(pkid); } @@ -596,14 +609,14 @@ mod test { let small_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 100]); assert_eq!( - mqtt.handle_outgoing_packet(Request::Publish(small_publish)) + mqtt.handle_outgoing_packet(Request::Publish(None, small_publish)) .is_ok(), true ); let large_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 265]); assert_eq!( - mqtt.handle_outgoing_packet(Request::Publish(large_publish)) + mqtt.handle_outgoing_packet(Request::Publish(None, large_publish)) .is_ok(), false ); @@ -617,7 +630,7 @@ mod test { let publish = build_outgoing_publish(QoS::AtMostOnce); // QoS 0 publish shouldn't be saved in queue - mqtt.outgoing_publish(publish).unwrap(); + mqtt.outgoing_publish(publish, None).unwrap(); assert_eq!(mqtt.last_pkid, 0); assert_eq!(mqtt.inflight, 0); @@ -625,12 +638,12 @@ mod test { let publish = build_outgoing_publish(QoS::AtLeastOnce); // Packet id should be set and publish should be saved in queue - mqtt.outgoing_publish(publish.clone()).unwrap(); + mqtt.outgoing_publish(publish.clone(), None).unwrap(); assert_eq!(mqtt.last_pkid, 1); assert_eq!(mqtt.inflight, 1); // Packet id should be incremented and publish should be saved in queue - mqtt.outgoing_publish(publish).unwrap(); + mqtt.outgoing_publish(publish, None).unwrap(); assert_eq!(mqtt.last_pkid, 2); assert_eq!(mqtt.inflight, 2); @@ -638,12 +651,12 @@ mod test { let publish = build_outgoing_publish(QoS::ExactlyOnce); // Packet id should be set and publish should be saved in queue - mqtt.outgoing_publish(publish.clone()).unwrap(); + mqtt.outgoing_publish(publish.clone(), None).unwrap(); assert_eq!(mqtt.last_pkid, 3); assert_eq!(mqtt.inflight, 3); // Packet id should be incremented and publish should be saved in queue - mqtt.outgoing_publish(publish).unwrap(); + mqtt.outgoing_publish(publish, None).unwrap(); assert_eq!(mqtt.last_pkid, 4); assert_eq!(mqtt.inflight, 4); } @@ -733,8 +746,8 @@ mod test { let publish1 = build_outgoing_publish(QoS::AtLeastOnce); let publish2 = build_outgoing_publish(QoS::ExactlyOnce); - mqtt.outgoing_publish(publish1).unwrap(); - mqtt.outgoing_publish(publish2).unwrap(); + mqtt.outgoing_publish(publish1, None).unwrap(); + mqtt.outgoing_publish(publish2, None).unwrap(); assert_eq!(mqtt.inflight, 2); mqtt.handle_incoming_puback(&PubAck::new(1)).unwrap(); @@ -766,8 +779,8 @@ mod test { let publish1 = build_outgoing_publish(QoS::AtLeastOnce); let publish2 = build_outgoing_publish(QoS::ExactlyOnce); - let _publish_out = mqtt.outgoing_publish(publish1); - let _publish_out = mqtt.outgoing_publish(publish2); + let _publish_out = mqtt.outgoing_publish(publish1, None); + let _publish_out = mqtt.outgoing_publish(publish2, None); mqtt.handle_incoming_pubrec(&PubRec::new(2)).unwrap(); assert_eq!(mqtt.inflight, 2); @@ -785,7 +798,7 @@ mod test { let mut mqtt = build_mqttstate(); let publish = build_outgoing_publish(QoS::ExactlyOnce); - mqtt.outgoing_publish(publish).unwrap(); + mqtt.outgoing_publish(publish, None).unwrap(); let packet = read(&mut mqtt.write, 10 * 1024).unwrap(); match packet { Packet::Publish(publish) => assert_eq!(publish.pkid, 1), @@ -825,7 +838,7 @@ mod test { let mut mqtt = build_mqttstate(); let publish = build_outgoing_publish(QoS::ExactlyOnce); - mqtt.outgoing_publish(publish).unwrap(); + mqtt.outgoing_publish(publish, None).unwrap(); mqtt.handle_incoming_pubrec(&PubRec::new(1)).unwrap(); mqtt.handle_incoming_pubcomp(&PubComp::new(1)).unwrap(); @@ -839,7 +852,7 @@ mod test { // network activity other than pingresp let publish = build_outgoing_publish(QoS::AtLeastOnce); - mqtt.handle_outgoing_packet(Request::Publish(publish)) + mqtt.handle_outgoing_packet(Request::Publish(None, publish)) .unwrap(); mqtt.handle_incoming_packet(Incoming::PubAck(PubAck::new(1))) .unwrap(); @@ -887,7 +900,6 @@ mod test { topic: "test".to_string(), pkid: 1, payload: "".into(), - pkid_tx: None, }), Some(Publish { dup: false, @@ -896,7 +908,6 @@ mod test { topic: "test".to_string(), pkid: 2, payload: "".into(), - pkid_tx: None, }), Some(Publish { dup: false, @@ -905,7 +916,6 @@ mod test { topic: "test".to_string(), pkid: 3, payload: "".into(), - pkid_tx: None, }), None, None, @@ -916,7 +926,6 @@ mod test { topic: "test".to_string(), pkid: 6, payload: "".into(), - pkid_tx: None, }), ] } @@ -926,7 +935,7 @@ mod test { let requests = mqtt.clean(); let res = vec![6, 1, 2, 3]; for (req, idx) in requests.iter().zip(res) { - if let Request::Publish(publish) = req { + if let Request::Publish(_, publish) = req { assert_eq!(publish.pkid, idx); } else { unreachable!() @@ -938,7 +947,7 @@ mod test { let requests = mqtt.clean(); let res = vec![1, 2, 3, 6]; for (req, idx) in requests.iter().zip(res) { - if let Request::Publish(publish) = req { + if let Request::Publish(_, publish) = req { assert_eq!(publish.pkid, idx); } else { unreachable!() @@ -950,7 +959,7 @@ mod test { let requests = mqtt.clean(); let res = vec![1, 2, 3, 6]; for (req, idx) in requests.iter().zip(res) { - if let Request::Publish(publish) = req { + if let Request::Publish(_, publish) = req { assert_eq!(publish.pkid, idx); } else { unreachable!() diff --git a/rumqttc/src/v5/client.rs b/rumqttc/src/v5/client.rs index e92df5df8..8098c827b 100644 --- a/rumqttc/src/v5/client.rs +++ b/rumqttc/src/v5/client.rs @@ -89,13 +89,14 @@ impl AsyncClient { let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); // Fulfill instantly for QoS 0 - if qos == QoS::AtMostOnce { + let pkid_tx = if qos == QoS::AtMostOnce { _ = pkid_tx.send(0); + None } else { - publish.place_pkid_tx(pkid_tx); - } + Some(pkid_tx) + }; - let publish = Request::Publish(publish); + let publish = Request::Publish(pkid_tx, publish); if !valid_topic(&topic) { return Err(ClientError::Request(publish)); } @@ -152,13 +153,14 @@ impl AsyncClient { let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); // Fulfill instantly for QoS 0 - if qos == QoS::AtMostOnce { + let pkid_tx = if qos == QoS::AtMostOnce { _ = pkid_tx.send(0); + None } else { - publish.place_pkid_tx(pkid_tx); - } + Some(pkid_tx) + }; - let publish = Request::Publish(publish); + let publish = Request::Publish(pkid_tx, publish); if !valid_topic(&topic) { return Err(ClientError::TryRequest(publish)); } @@ -232,13 +234,14 @@ impl AsyncClient { let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); // Fulfill instantly for QoS 0 - if qos == QoS::AtMostOnce { + let pkid_tx = if qos == QoS::AtMostOnce { _ = pkid_tx.send(0); + None } else { - publish.place_pkid_tx(pkid_tx); - } + Some(pkid_tx) + }; - let publish = Request::Publish(publish); + let publish = Request::Publish(pkid_tx, publish); if !valid_topic(&topic) { return Err(ClientError::TryRequest(publish)); } @@ -283,13 +286,11 @@ impl AsyncClient { properties: Option, ) -> Result { let filter = Filter::new(topic, qos); - let mut subscribe = Subscribe::new(filter, properties); + let subscribe = Subscribe::new(filter, properties); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - - let request: Request = Request::Subscribe(subscribe); + let request: Request = Request::Subscribe(Some(pkid_tx), subscribe); self.request_tx.send_async(request).await?; Ok(PkidPromise::new(pkid_rx)) } @@ -319,12 +320,11 @@ impl AsyncClient { properties: Option, ) -> Result { let filter = Filter::new(topic, qos); - let mut subscribe = Subscribe::new(filter, properties); + let subscribe = Subscribe::new(filter, properties); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - let request = Request::Subscribe(subscribe); + let request = Request::Subscribe(Some(pkid_tx), subscribe); self.request_tx.try_send(request)?; Ok(PkidPromise::new(pkid_rx)) } @@ -355,12 +355,11 @@ impl AsyncClient { where T: IntoIterator, { - let mut subscribe = Subscribe::new_many(topics, properties); + let subscribe = Subscribe::new_many(topics, properties); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - let request = Request::Subscribe(subscribe); + let request = Request::Subscribe(Some(pkid_tx), subscribe); self.request_tx.send_async(request).await?; Ok(PkidPromise::new(pkid_rx)) } @@ -392,12 +391,11 @@ impl AsyncClient { where T: IntoIterator, { - let mut subscribe = Subscribe::new_many(topics, properties); + let subscribe = Subscribe::new_many(topics, properties); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - let request = Request::Subscribe(subscribe); + let request = Request::Subscribe(Some(pkid_tx), subscribe); self.request_tx.try_send(request)?; Ok(PkidPromise::new(pkid_rx)) } @@ -426,12 +424,11 @@ impl AsyncClient { topic: S, properties: Option, ) -> Result { - let mut unsubscribe = Unsubscribe::new(topic, properties); + let unsubscribe = Unsubscribe::new(topic, properties); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - unsubscribe.place_pkid_tx(pkid_tx); - let request = Request::Unsubscribe(unsubscribe); + let request = Request::Unsubscribe(Some(pkid_tx), unsubscribe); self.request_tx.send_async(request).await?; Ok(PkidPromise::new(pkid_rx)) } @@ -454,12 +451,11 @@ impl AsyncClient { topic: S, properties: Option, ) -> Result { - let mut unsubscribe = Unsubscribe::new(topic, properties); + let unsubscribe = Unsubscribe::new(topic, properties); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - unsubscribe.place_pkid_tx(pkid_tx); - let request = Request::Unsubscribe(unsubscribe); + let request = Request::Unsubscribe(Some(pkid_tx), unsubscribe); self.request_tx.try_send(request)?; Ok(PkidPromise::new(pkid_rx)) } @@ -561,13 +557,14 @@ impl Client { let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); // Fulfill instantly for QoS 0 - if qos == QoS::AtMostOnce { + let pkid_tx = if qos == QoS::AtMostOnce { _ = pkid_tx.send(0); + None } else { - publish.place_pkid_tx(pkid_tx); - } + Some(pkid_tx) + }; - let publish = Request::Publish(publish); + let publish = Request::Publish(pkid_tx, publish); if !valid_topic(&topic) { return Err(ClientError::Request(publish)); } @@ -658,12 +655,11 @@ impl Client { properties: Option, ) -> Result { let filter = Filter::new(topic, qos); - let mut subscribe = Subscribe::new(filter, properties); + let subscribe = Subscribe::new(filter, properties); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - let request = Request::Subscribe(subscribe); + let request = Request::Subscribe(Some(pkid_tx), subscribe); self.client.request_tx.send(request)?; Ok(PkidPromise::new(pkid_rx)) } @@ -713,12 +709,11 @@ impl Client { where T: IntoIterator, { - let mut subscribe = Subscribe::new_many(topics, properties); + let subscribe = Subscribe::new_many(topics, properties); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - subscribe.place_pkid_tx(pkid_tx); - let request = Request::Subscribe(subscribe); + let request = Request::Subscribe(Some(pkid_tx), subscribe); self.client.request_tx.send(request)?; Ok(PkidPromise::new(pkid_rx)) } @@ -766,12 +761,11 @@ impl Client { topic: S, properties: Option, ) -> Result { - let mut unsubscribe = Unsubscribe::new(topic, properties); + let unsubscribe = Unsubscribe::new(topic, properties); let (pkid_tx, pkid_rx) = tokio::sync::oneshot::channel(); - unsubscribe.place_pkid_tx(pkid_tx); - let request = Request::Unsubscribe(unsubscribe); + let request = Request::Unsubscribe(Some(pkid_tx), unsubscribe); self.client.request_tx.send(request)?; Ok(PkidPromise::new(pkid_rx)) } diff --git a/rumqttc/src/v5/mod.rs b/rumqttc/src/v5/mod.rs index 663cfd278..bb4013835 100644 --- a/rumqttc/src/v5/mod.rs +++ b/rumqttc/src/v5/mod.rs @@ -7,6 +7,7 @@ use std::{ pin::Pin, sync::Arc, }; +use tokio::sync::oneshot; mod client; mod eventloop; @@ -14,8 +15,8 @@ mod framed; pub mod mqttbytes; mod state; -use crate::Outgoing; use crate::{NetworkOptions, Transport}; +use crate::{Outgoing, Pkid}; use mqttbytes::v5::*; @@ -33,22 +34,63 @@ pub type Incoming = Packet; /// Requests by the client to mqtt event loop. Request are /// handled one by one. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Debug)] pub enum Request { - Publish(Publish), + Publish(Option>, Publish), PubAck(PubAck), PubRec(PubRec), PubComp(PubComp), PubRel(PubRel), PingReq, PingResp, - Subscribe(Subscribe), + Subscribe(Option>, Subscribe), SubAck(SubAck), - Unsubscribe(Unsubscribe), + Unsubscribe(Option>, Unsubscribe), UnsubAck(UnsubAck), Disconnect, } +impl Clone for Request { + fn clone(&self) -> Self { + match self { + Self::Publish(_, p) => Self::Publish(None, p.clone()), + Self::PubAck(p) => Self::PubAck(p.clone()), + Self::PubRec(p) => Self::PubRec(p.clone()), + Self::PubRel(p) => Self::PubRel(p.clone()), + Self::PubComp(p) => Self::PubComp(p.clone()), + Self::Subscribe(_, p) => Self::Subscribe(None, p.clone()), + Self::SubAck(p) => Self::SubAck(p.clone()), + Self::PingReq => Self::PingReq, + Self::PingResp => Self::PingResp, + Self::Disconnect => Self::Disconnect, + Self::Unsubscribe(_, p) => Self::Unsubscribe(None, p.clone()), + Self::UnsubAck(p) => Self::UnsubAck(p.clone()), + } + } +} + +impl PartialEq for Request { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Publish(_, p1), Self::Publish(_, p2)) => p1 == p2, + (Self::PubAck(p1), Self::PubAck(p2)) => p1 == p2, + (Self::PubRec(p1), Self::PubRec(p2)) => p1 == p2, + (Self::PubRel(p1), Self::PubRel(p2)) => p1 == p2, + (Self::PubComp(p1), Self::PubComp(p2)) => p1 == p2, + (Self::Subscribe(_, p1), Self::Subscribe(_, p2)) => p1 == p2, + (Self::SubAck(p1), Self::SubAck(p2)) => p1 == p2, + (Self::PingReq, Self::PingReq) + | (Self::PingResp, Self::PingResp) + | (Self::Disconnect, Self::Disconnect) => true, + (Self::Unsubscribe(_, p1), Self::Unsubscribe(_, p2)) => p1 == p2, + (Self::UnsubAck(p1), Self::UnsubAck(p2)) => p1 == p2, + _ => false, + } + } +} + +impl Eq for Request {} + #[cfg(feature = "websocket")] type RequestModifierFn = Arc< dyn Fn(http::Request<()>) -> Pin> + Send>> diff --git a/rumqttc/src/v5/mqttbytes/v5/publish.rs b/rumqttc/src/v5/mqttbytes/v5/publish.rs index c787e6004..6699e4b20 100644 --- a/rumqttc/src/v5/mqttbytes/v5/publish.rs +++ b/rumqttc/src/v5/mqttbytes/v5/publish.rs @@ -1,11 +1,8 @@ -use bytes::{Buf, Bytes}; -use tokio::sync::oneshot::Sender; - use super::*; -use crate::Pkid; +use bytes::{Buf, Bytes}; /// Publish packet -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Publish { pub dup: bool, pub qos: QoS, @@ -14,39 +11,8 @@ pub struct Publish { pub pkid: u16, pub payload: Bytes, pub properties: Option, - pub pkid_tx: Option>, -} - -// TODO: figure out if this is even required -impl Clone for Publish { - fn clone(&self) -> Self { - Self { - dup: self.dup, - qos: self.qos, - retain: self.retain, - topic: self.topic.clone(), - payload: self.payload.clone(), - pkid: self.pkid, - properties: self.properties.clone(), - pkid_tx: None, - } - } -} - -impl PartialEq for Publish { - fn eq(&self, other: &Self) -> bool { - self.dup == other.dup - && self.qos == other.qos - && self.retain == other.retain - && self.topic == other.topic - && self.payload == other.payload - && self.pkid == other.pkid - && self.properties == other.properties - } } -impl Eq for Publish {} - impl Publish { pub fn new, P: Into>( topic: T, @@ -119,7 +85,6 @@ impl Publish { topic, payload: bytes, properties, - pkid_tx: None, }; Ok(publish) @@ -155,10 +120,6 @@ impl Publish { Ok(1 + count + len) } - - pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { - self.pkid_tx = Some(pkid_tx) - } } #[derive(Debug, Clone, PartialEq, Eq, Default)] diff --git a/rumqttc/src/v5/mqttbytes/v5/subscribe.rs b/rumqttc/src/v5/mqttbytes/v5/subscribe.rs index 28796fe30..8b5667b6c 100644 --- a/rumqttc/src/v5/mqttbytes/v5/subscribe.rs +++ b/rumqttc/src/v5/mqttbytes/v5/subscribe.rs @@ -1,40 +1,14 @@ -use bytes::{Buf, Bytes}; -use tokio::sync::oneshot::Sender; - use super::*; -use crate::Pkid; +use bytes::{Buf, Bytes}; /// Subscription packet -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Subscribe { pub pkid: u16, pub filters: Vec, pub properties: Option, - pub pkid_tx: Option>, -} - -// TODO: figure out if this is even required -impl Clone for Subscribe { - fn clone(&self) -> Self { - Self { - pkid: self.pkid, - filters: self.filters.clone(), - properties: self.properties.clone(), - pkid_tx: None, - } - } -} - -impl PartialEq for Subscribe { - fn eq(&self, other: &Self) -> bool { - self.pkid == other.pkid - && self.filters == other.filters - && self.properties == other.properties - } } -impl Eq for Subscribe {} - impl Subscribe { pub fn new(filter: Filter, properties: Option) -> Self { Self { @@ -93,7 +67,6 @@ impl Subscribe { pkid, filters, properties, - pkid_tx: None, }), } } @@ -122,10 +95,6 @@ impl Subscribe { Ok(1 + remaining_len_bytes + remaining_len) } - - pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { - self.pkid_tx = Some(pkid_tx) - } } /// Subscription filter diff --git a/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs b/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs index 92d1d5610..5e357b96e 100644 --- a/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs +++ b/rumqttc/src/v5/mqttbytes/v5/unsubscribe.rs @@ -1,38 +1,15 @@ use bytes::{Buf, Bytes}; -use tokio::sync::oneshot::Sender; use super::*; -use crate::Pkid; /// Unsubscribe packet -#[derive(Debug, Default)] +#[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Unsubscribe { pub pkid: u16, pub filters: Vec, pub properties: Option, - pub pkid_tx: Option>, } -// TODO: figure out if this is even required -impl Clone for Unsubscribe { - fn clone(&self) -> Self { - Self { - pkid: self.pkid, - filters: self.filters.clone(), - properties: self.properties.clone(), - pkid_tx: None, - } - } -} - -impl PartialEq for Unsubscribe { - fn eq(&self, other: &Self) -> bool { - self.pkid == other.pkid && self.filters == other.filters - } -} - -impl Eq for Unsubscribe {} - impl Unsubscribe { pub fn new>(filter: S, properties: Option) -> Self { Self { @@ -83,7 +60,6 @@ impl Unsubscribe { pkid, filters, properties, - pkid_tx: None, }; Ok(unsubscribe) } @@ -111,10 +87,6 @@ impl Unsubscribe { Ok(1 + remaining_len_bytes + remaining_len) } - - pub fn place_pkid_tx(&mut self, pkid_tx: Sender) { - self.pkid_tx = Some(pkid_tx) - } } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 24c75a21b..8d39b68a3 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -1,3 +1,5 @@ +use crate::Pkid; + use super::mqttbytes::v5::{ ConnAck, ConnectReturnCode, Disconnect, DisconnectReasonCode, Packet, PingReq, PubAck, PubAckReason, PubComp, PubCompReason, PubRec, PubRecReason, PubRel, PubRelReason, Publish, @@ -11,6 +13,7 @@ use bytes::{Bytes, BytesMut}; use std::collections::{HashMap, VecDeque}; use std::convert::TryInto; use std::{io, time::Instant}; +use tokio::sync::oneshot; /// Errors during state handling #[derive(Debug, thiserror::Error)] @@ -151,7 +154,7 @@ impl MqttState { // remove and collect pending publishes for publish in self.outgoing_pub.iter_mut() { if let Some(publish) = publish.take() { - let request = Request::Publish(publish); + let request = Request::Publish(None, publish); pending.push(request); } } @@ -183,21 +186,21 @@ impl MqttState { /// be put on to the network by the eventloop pub fn handle_outgoing_packet(&mut self, request: Request) -> Result<(), StateError> { match request { - Request::Publish(publish) => { + Request::Publish(tx, publish) => { self.check_size(publish.size())?; - self.outgoing_publish(publish)? + self.outgoing_publish(publish, tx)? } Request::PubRel(pubrel) => { self.check_size(pubrel.size())?; self.outgoing_pubrel(pubrel)? } - Request::Subscribe(subscribe) => { + Request::Subscribe(tx, subscribe) => { self.check_size(subscribe.size())?; - self.outgoing_subscribe(subscribe)? + self.outgoing_subscribe(subscribe, tx)? } - Request::Unsubscribe(unsubscribe) => { + Request::Unsubscribe(tx, unsubscribe) => { self.check_size(unsubscribe.size())?; - self.outgoing_unsubscribe(unsubscribe)? + self.outgoing_unsubscribe(unsubscribe, tx)? } Request::PingReq => self.outgoing_ping()?, Request::Disconnect => { @@ -484,7 +487,11 @@ impl MqttState { /// Adds next packet identifier to QoS 1 and 2 publish packets and returns /// it buy wrapping publish in packet - fn outgoing_publish(&mut self, mut publish: Publish) -> Result<(), StateError> { + fn outgoing_publish( + &mut self, + mut publish: Publish, + pkid_tx: Option>, + ) -> Result<(), StateError> { // NOTE: pkid promise need not be fulfilled for QoS 0, // user should know this but still handled in Client. if publish.qos != QoS::AtMostOnce { @@ -494,7 +501,7 @@ impl MqttState { let pkid = publish.pkid; // Fulfill the pkid promise - if let Some(pkid_tx) = publish.pkid_tx.take() { + if let Some(pkid_tx) = pkid_tx { _ = pkid_tx.send(pkid); } @@ -604,7 +611,11 @@ impl MqttState { Ok(()) } - fn outgoing_subscribe(&mut self, mut subscription: Subscribe) -> Result<(), StateError> { + fn outgoing_subscribe( + &mut self, + mut subscription: Subscribe, + pkid_tx: Option>, + ) -> Result<(), StateError> { if subscription.filters.is_empty() { return Err(StateError::EmptySubscription); } @@ -612,7 +623,7 @@ impl MqttState { let pkid = self.next_pkid(); subscription.pkid = pkid; // Fulfill the pkid promise - if let Some(pkid_tx) = subscription.pkid_tx.take() { + if let Some(pkid_tx) = pkid_tx { _ = pkid_tx.send(pkid); } @@ -628,11 +639,15 @@ impl MqttState { Ok(()) } - fn outgoing_unsubscribe(&mut self, mut unsub: Unsubscribe) -> Result<(), StateError> { + fn outgoing_unsubscribe( + &mut self, + mut unsub: Unsubscribe, + pkid_tx: Option>, + ) -> Result<(), StateError> { let pkid = self.next_pkid(); unsub.pkid = pkid; // Fulfill the pkid promise - if let Some(pkid_tx) = unsub.pkid_tx.take() { + if let Some(pkid_tx) = pkid_tx { _ = pkid_tx.send(pkid); } @@ -769,7 +784,7 @@ mod test { let publish = build_outgoing_publish(QoS::AtMostOnce); // QoS 0 publish shouldn't be saved in queue - mqtt.outgoing_publish(publish).unwrap(); + mqtt.outgoing_publish(publish, None).unwrap(); assert_eq!(mqtt.last_pkid, 0); assert_eq!(mqtt.inflight, 0); @@ -777,12 +792,12 @@ mod test { let publish = build_outgoing_publish(QoS::AtLeastOnce); // Packet id should be set and publish should be saved in queue - mqtt.outgoing_publish(publish.clone()).unwrap(); + mqtt.outgoing_publish(publish.clone(), None).unwrap(); assert_eq!(mqtt.last_pkid, 1); assert_eq!(mqtt.inflight, 1); // Packet id should be incremented and publish should be saved in queue - mqtt.outgoing_publish(publish).unwrap(); + mqtt.outgoing_publish(publish, None).unwrap(); assert_eq!(mqtt.last_pkid, 2); assert_eq!(mqtt.inflight, 2); @@ -790,12 +805,12 @@ mod test { let publish = build_outgoing_publish(QoS::ExactlyOnce); // Packet id should be set and publish should be saved in queue - mqtt.outgoing_publish(publish.clone()).unwrap(); + mqtt.outgoing_publish(publish.clone(), None).unwrap(); assert_eq!(mqtt.last_pkid, 3); assert_eq!(mqtt.inflight, 3); // Packet id should be incremented and publish should be saved in queue - mqtt.outgoing_publish(publish).unwrap(); + mqtt.outgoing_publish(publish, None).unwrap(); assert_eq!(mqtt.last_pkid, 4); assert_eq!(mqtt.inflight, 4); } @@ -807,17 +822,17 @@ mod test { // QoS2 publish let publish = build_outgoing_publish(QoS::ExactlyOnce); - mqtt.outgoing_publish(publish.clone()).unwrap(); + mqtt.outgoing_publish(publish.clone(), None).unwrap(); assert_eq!(mqtt.last_pkid, 1); assert_eq!(mqtt.inflight, 1); // Packet id should be set back down to 0, since we hit the limit - mqtt.outgoing_publish(publish.clone()).unwrap(); + mqtt.outgoing_publish(publish.clone(), None).unwrap(); assert_eq!(mqtt.last_pkid, 0); assert_eq!(mqtt.inflight, 2); // This should cause a collition - mqtt.outgoing_publish(publish.clone()).unwrap(); + mqtt.outgoing_publish(publish.clone(), None).unwrap(); assert_eq!(mqtt.last_pkid, 1); assert_eq!(mqtt.inflight, 2); assert!(mqtt.collision.is_some()); @@ -827,7 +842,7 @@ mod test { assert_eq!(mqtt.inflight, 1); // Now there should be space in the outgoing queue - mqtt.outgoing_publish(publish.clone()).unwrap(); + mqtt.outgoing_publish(publish.clone(), None).unwrap(); assert_eq!(mqtt.last_pkid, 0); assert_eq!(mqtt.inflight, 2); } @@ -917,8 +932,8 @@ mod test { let publish1 = build_outgoing_publish(QoS::AtLeastOnce); let publish2 = build_outgoing_publish(QoS::ExactlyOnce); - mqtt.outgoing_publish(publish1).unwrap(); - mqtt.outgoing_publish(publish2).unwrap(); + mqtt.outgoing_publish(publish1, None).unwrap(); + mqtt.outgoing_publish(publish2, None).unwrap(); assert_eq!(mqtt.inflight, 2); mqtt.handle_incoming_puback(&PubAck::new(1, None)).unwrap(); @@ -952,8 +967,8 @@ mod test { let publish1 = build_outgoing_publish(QoS::AtLeastOnce); let publish2 = build_outgoing_publish(QoS::ExactlyOnce); - let _publish_out = mqtt.outgoing_publish(publish1); - let _publish_out = mqtt.outgoing_publish(publish2); + let _publish_out = mqtt.outgoing_publish(publish1, None); + let _publish_out = mqtt.outgoing_publish(publish2, None); mqtt.handle_incoming_pubrec(&PubRec::new(2, None)).unwrap(); assert_eq!(mqtt.inflight, 2); @@ -971,7 +986,7 @@ mod test { let mut mqtt = build_mqttstate(); let publish = build_outgoing_publish(QoS::ExactlyOnce); - mqtt.outgoing_publish(publish).unwrap(); + mqtt.outgoing_publish(publish, None).unwrap(); let packet = Packet::read(&mut mqtt.write, Some(10 * 1024)).unwrap(); match packet { Packet::Publish(publish) => assert_eq!(publish.pkid, 1), @@ -1011,7 +1026,7 @@ mod test { let mut mqtt = build_mqttstate(); let publish = build_outgoing_publish(QoS::ExactlyOnce); - mqtt.outgoing_publish(publish).unwrap(); + mqtt.outgoing_publish(publish, None).unwrap(); mqtt.handle_incoming_pubrec(&PubRec::new(1, None)).unwrap(); mqtt.handle_incoming_pubcomp(&PubComp::new(1, None)) @@ -1026,7 +1041,7 @@ mod test { // network activity other than pingresp let publish = build_outgoing_publish(QoS::AtLeastOnce); - mqtt.handle_outgoing_packet(Request::Publish(publish)) + mqtt.handle_outgoing_packet(Request::Publish(None, publish)) .unwrap(); mqtt.handle_incoming_packet(Incoming::PubAck(PubAck::new(1, None))) .unwrap(); From 1949c2a600ff008d30e94120ed641ea3c0f9b40d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 24 Feb 2024 19:36:12 +0530 Subject: [PATCH 14/16] `PkidError` when no pkid --- rumqttc/examples/pkid_promise.rs | 2 +- rumqttc/examples/pkid_promise_v5.rs | 2 +- rumqttc/src/lib.rs | 21 +++++++++++++++++---- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/rumqttc/examples/pkid_promise.rs b/rumqttc/examples/pkid_promise.rs index caac8d122..0fefd093e 100644 --- a/rumqttc/examples/pkid_promise.rs +++ b/rumqttc/examples/pkid_promise.rs @@ -61,7 +61,7 @@ async fn requests(client: AsyncClient) { .unwrap().wait_async(), ); } - Some(Ok(Some(pkid))) = joins.join_next() => { + Some(Ok(Ok(pkid))) = joins.join_next() => { println!("Pkid: {:?}", pkid); } else => break, diff --git a/rumqttc/examples/pkid_promise_v5.rs b/rumqttc/examples/pkid_promise_v5.rs index c8cbd6422..e4bd3d31c 100644 --- a/rumqttc/examples/pkid_promise_v5.rs +++ b/rumqttc/examples/pkid_promise_v5.rs @@ -61,7 +61,7 @@ async fn requests(client: AsyncClient) { .unwrap().wait_async(), ); } - Some(Ok(Some(pkid))) = joins.join_next() => { + Some(Ok(Ok(pkid))) = joins.join_next() => { println!("Pkid: {:?}", pkid); } else => break, diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index 5d27b994a..8095fcabd 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -164,19 +164,32 @@ pub struct PkidPromise { inner: oneshot::Receiver, } +#[derive(Debug, thiserror::Error)] +pub enum PkidError { + #[error("Eventloop dropped Sender: {0}")] + Recv(#[from] oneshot::error::RecvError), +} + impl PkidPromise { pub fn new(inner: oneshot::Receiver) -> Self { Self { inner } } /// Wait for the pkid to resolve by blocking the current thread - pub fn wait(self) -> Option { - self.inner.blocking_recv().ok() + /// + /// # Panics + /// Panics if called in an async context + pub fn wait(self) -> Result { + let pkid = self.inner.blocking_recv()?; + + Ok(pkid) } /// Await pkid resolution without blocking the current thread - pub async fn wait_async(self) -> Option { - self.inner.await.ok() + pub async fn wait_async(self) -> Result { + let pkid = self.inner.await?; + + Ok(pkid) } } From 33346a19f0a2e45754a9046199a361aaaa6c9c5c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 25 Feb 2024 09:46:21 +0000 Subject: [PATCH 15/16] don't expose `oneshot` --- rumqttc/src/lib.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index 8095fcabd..de7e49a8b 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -166,8 +166,14 @@ pub struct PkidPromise { #[derive(Debug, thiserror::Error)] pub enum PkidError { - #[error("Eventloop dropped Sender: {0}")] - Recv(#[from] oneshot::error::RecvError), + #[error("Eventloop dropped Sender")] + Recv, +} + +impl From for PkidError { + fn from(_: oneshot::error::RecvError) -> Self { + Self::Recv + } } impl PkidPromise { From de57cf17c4e8347fdd1a6a5cec6655150187ee5a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 29 Mar 2024 06:31:15 +0000 Subject: [PATCH 16/16] test: remove test --- rumqttc/src/state.rs | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index 77a5e8476..b86257b37 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -580,25 +580,6 @@ mod test { } } - #[test] - fn outgoing_max_packet_size_check() { - let mut mqtt = MqttState::new(100, false); - - let small_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 100]); - assert_eq!( - mqtt.handle_outgoing_packet(Request::Publish(None, small_publish)) - .is_ok(), - true - ); - - let large_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 265]); - assert_eq!( - mqtt.handle_outgoing_packet(Request::Publish(None, large_publish)) - .is_ok(), - false - ); - } - #[test] fn outgoing_publish_should_set_pkid_and_add_publish_to_queue() { let mut mqtt = build_mqttstate();