Skip to content

Commit

Permalink
feat: return pkid of ack
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Sep 30, 2024
1 parent 2632ab1 commit 8ad2223
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
9 changes: 5 additions & 4 deletions rumqttc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,12 @@ impl From<Unsubscribe> for Request {
}
}

pub type AckPromise = oneshot::Receiver<()>;
pub type Pkid = u16;
pub type AckPromise = oneshot::Receiver<Pkid>;

#[derive(Debug)]
pub struct PromiseTx {
inner: oneshot::Sender<()>,
inner: oneshot::Sender<Pkid>,
}

impl PromiseTx {
Expand All @@ -237,8 +238,8 @@ impl PromiseTx {
(PromiseTx { inner }, promise)
}

fn resolve(self) {
if self.inner.send(()).is_err() {
fn resolve(self, pkid: Pkid) {
if self.inner.send(pkid).is_err() {
trace!("Promise was drpped")
}
}
Expand Down
10 changes: 5 additions & 5 deletions rumqttc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl MqttState {
.any(|x| matches!(x, SubscribeReasonCode::Success(_)))
{
if let Some(tx) = self.ack_waiter[suback.pkid as usize].take() {
tx.resolve();
tx.resolve(suback.pkid);
}
}

Expand All @@ -223,7 +223,7 @@ impl MqttState {
}

if let Some(tx) = self.ack_waiter[unsuback.pkid as usize].take() {
tx.resolve();
tx.resolve(unsuback.pkid);
}

Ok(None)
Expand Down Expand Up @@ -271,7 +271,7 @@ impl MqttState {

if let Some(tx) = self.ack_waiter[puback.pkid as usize].take() {
// Resolve promise for QoS 1
tx.resolve();
tx.resolve(puback.pkid);
}

self.inflight -= 1;
Expand Down Expand Up @@ -333,7 +333,7 @@ impl MqttState {

if let Some(tx) = self.ack_waiter[pubcomp.pkid as usize].take() {
// Resolve promise for QoS 2
tx.resolve();
tx.resolve(pubcomp.pkid);
}

self.outgoing_rel.set(pubcomp.pkid as usize, false);
Expand Down Expand Up @@ -398,7 +398,7 @@ impl MqttState {
let event = Event::Outgoing(Outgoing::Publish(publish.pkid));
self.events.push_back(event);
match (publish.qos, tx) {
(QoS::AtMostOnce, Some(tx)) => tx.resolve(),
(QoS::AtMostOnce, Some(tx)) => tx.resolve(publish.pkid),
(_, tx) => self.ack_waiter[publish.pkid as usize] = tx,
}

Expand Down
10 changes: 5 additions & 5 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl MqttState {
.any(|x| matches!(x, SubscribeReasonCode::Success(_)))
{
if let Some(tx) = self.ack_waiter[suback.pkid as usize].take() {
tx.resolve();
tx.resolve(suback.pkid);
}
}

Expand All @@ -293,7 +293,7 @@ impl MqttState {

if unsuback.reasons.contains(&UnsubAckReason::Success) {
if let Some(tx) = self.ack_waiter[unsuback.pkid as usize].take() {
tx.resolve();
tx.resolve(unsuback.pkid);
}
}

Expand Down Expand Up @@ -401,7 +401,7 @@ impl MqttState {

if let Some(tx) = self.ack_waiter[puback.pkid as usize].take() {
// Resolve promise for QoS 1
tx.resolve();
tx.resolve(puback.pkid);
}

self.inflight -= 1;
Expand Down Expand Up @@ -490,7 +490,7 @@ impl MqttState {

if let Some(tx) = self.ack_waiter[pubcomp.pkid as usize].take() {
// Resolve promise for QoS 2
tx.resolve();
tx.resolve(pubcomp.pkid);
}

self.outgoing_rel.set(pubcomp.pkid as usize, false);
Expand Down Expand Up @@ -577,7 +577,7 @@ impl MqttState {
let event = Event::Outgoing(Outgoing::Publish(pkid));
self.events.push_back(event);
match (publish.qos, tx) {
(QoS::AtMostOnce, Some(tx)) => tx.resolve(),
(QoS::AtMostOnce, Some(tx)) => tx.resolve(0),
(_, tx) => self.ack_waiter[publish.pkid as usize] = tx,
}

Expand Down

0 comments on commit 8ad2223

Please sign in to comment.