From 8ad22231d17ca7e4019ec076b490084aaf8a94db Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 30 Sep 2024 16:58:54 +0530 Subject: [PATCH] feat: return pkid of ack --- rumqttc/src/lib.rs | 9 +++++---- rumqttc/src/state.rs | 10 +++++----- rumqttc/src/v5/state.rs | 10 +++++----- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/rumqttc/src/lib.rs b/rumqttc/src/lib.rs index 30081214d..041ea68f3 100644 --- a/rumqttc/src/lib.rs +++ b/rumqttc/src/lib.rs @@ -223,11 +223,12 @@ impl From for Request { } } -pub type AckPromise = oneshot::Receiver<()>; +pub type Pkid = u16; +pub type AckPromise = oneshot::Receiver; #[derive(Debug)] pub struct PromiseTx { - inner: oneshot::Sender<()>, + inner: oneshot::Sender, } impl PromiseTx { @@ -237,8 +238,8 @@ impl PromiseTx { (PromiseTx { inner }, promise) } - fn resolve(self) { - if self.inner.send(()).is_err() { + fn resolve(self, pkid: Pkid) { + if self.inner.send(pkid).is_err() { trace!("Promise was drpped") } } diff --git a/rumqttc/src/state.rs b/rumqttc/src/state.rs index 81a60c904..5b8c44095 100644 --- a/rumqttc/src/state.rs +++ b/rumqttc/src/state.rs @@ -206,7 +206,7 @@ impl MqttState { .any(|x| matches!(x, SubscribeReasonCode::Success(_))) { if let Some(tx) = self.ack_waiter[suback.pkid as usize].take() { - tx.resolve(); + tx.resolve(suback.pkid); } } @@ -223,7 +223,7 @@ impl MqttState { } if let Some(tx) = self.ack_waiter[unsuback.pkid as usize].take() { - tx.resolve(); + tx.resolve(unsuback.pkid); } Ok(None) @@ -271,7 +271,7 @@ impl MqttState { if let Some(tx) = self.ack_waiter[puback.pkid as usize].take() { // Resolve promise for QoS 1 - tx.resolve(); + tx.resolve(puback.pkid); } self.inflight -= 1; @@ -333,7 +333,7 @@ impl MqttState { if let Some(tx) = self.ack_waiter[pubcomp.pkid as usize].take() { // Resolve promise for QoS 2 - tx.resolve(); + tx.resolve(pubcomp.pkid); } self.outgoing_rel.set(pubcomp.pkid as usize, false); @@ -398,7 +398,7 @@ impl MqttState { let event = Event::Outgoing(Outgoing::Publish(publish.pkid)); self.events.push_back(event); match (publish.qos, tx) { - (QoS::AtMostOnce, Some(tx)) => tx.resolve(), + (QoS::AtMostOnce, Some(tx)) => tx.resolve(publish.pkid), (_, tx) => self.ack_waiter[publish.pkid as usize] = tx, } diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 85feb959a..ca6d0d00e 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -269,7 +269,7 @@ impl MqttState { .any(|x| matches!(x, SubscribeReasonCode::Success(_))) { if let Some(tx) = self.ack_waiter[suback.pkid as usize].take() { - tx.resolve(); + tx.resolve(suback.pkid); } } @@ -293,7 +293,7 @@ impl MqttState { if unsuback.reasons.contains(&UnsubAckReason::Success) { if let Some(tx) = self.ack_waiter[unsuback.pkid as usize].take() { - tx.resolve(); + tx.resolve(unsuback.pkid); } } @@ -401,7 +401,7 @@ impl MqttState { if let Some(tx) = self.ack_waiter[puback.pkid as usize].take() { // Resolve promise for QoS 1 - tx.resolve(); + tx.resolve(puback.pkid); } self.inflight -= 1; @@ -490,7 +490,7 @@ impl MqttState { if let Some(tx) = self.ack_waiter[pubcomp.pkid as usize].take() { // Resolve promise for QoS 2 - tx.resolve(); + tx.resolve(pubcomp.pkid); } self.outgoing_rel.set(pubcomp.pkid as usize, false); @@ -577,7 +577,7 @@ impl MqttState { let event = Event::Outgoing(Outgoing::Publish(pkid)); self.events.push_back(event); match (publish.qos, tx) { - (QoS::AtMostOnce, Some(tx)) => tx.resolve(), + (QoS::AtMostOnce, Some(tx)) => tx.resolve(0), (_, tx) => self.ack_waiter[publish.pkid as usize] = tx, }