Skip to content

Commit

Permalink
refactor(rumqttc): v5/state.rs Replace Vec with FixedBitSet for QoS…
Browse files Browse the repository at this point in the history
… 2 publishes (#868)
  • Loading branch information
hippalus committed May 22, 2024
1 parent 36f3372 commit 138d0b1
Showing 1 changed file with 20 additions and 33 deletions.
53 changes: 20 additions & 33 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::mqttbytes::{self, Error as MqttError, QoS};
use super::{Event, Incoming, Outgoing, Request};

use bytes::Bytes;
use fixedbitset::FixedBitSet;
use std::collections::{HashMap, VecDeque};
use std::{io, time::Instant};

Expand Down Expand Up @@ -104,9 +105,9 @@ pub struct MqttState {
/// Outgoing QoS 1, 2 publishes which aren't acked yet
pub(crate) outgoing_pub: Vec<Option<Publish>>,
/// Packet ids of released QoS 2 publishes
pub(crate) outgoing_rel: Vec<Option<u16>>,
pub(crate) outgoing_rel: FixedBitSet,
/// Packet ids on incoming QoS 2 publishes
pub(crate) incoming_pub: Vec<Option<u16>>,
pub(crate) incoming_pub: FixedBitSet,
/// Last collision due to broker not acking in order
pub collision: Option<Publish>,
/// Buffered incoming packets
Expand Down Expand Up @@ -137,8 +138,8 @@ impl MqttState {
inflight: 0,
// index 0 is wasted as 0 is not a valid packet id
outgoing_pub: vec![None; max_inflight as usize + 1],
outgoing_rel: vec![None; max_inflight as usize + 1],
incoming_pub: vec![None; u16::MAX as usize + 1],
outgoing_rel: FixedBitSet::with_capacity(max_inflight as usize + 1),
incoming_pub: FixedBitSet::with_capacity(u16::MAX as usize + 1),
collision: None,
// TODO: Optimize these sizes later
events: VecDeque::with_capacity(100),
Expand All @@ -163,17 +164,14 @@ impl MqttState {
}

// remove and collect pending releases
for rel in self.outgoing_rel.iter_mut() {
if let Some(pkid) = rel.take() {
let request = Request::PubRel(PubRel::new(pkid, None));
pending.push(request);
}
for pkid in self.outgoing_rel.ones() {
let request = Request::PubRel(PubRel::new(pkid as u16, None));
pending.push(request);
}
self.outgoing_rel.clear();

// remove packed ids of incoming qos2 publishes
for id in self.incoming_pub.iter_mut() {
id.take();
}
self.incoming_pub.clear();

self.await_pingresp = false;
self.collision_ping_count = 0;
Expand Down Expand Up @@ -349,7 +347,7 @@ impl MqttState {
}
QoS::ExactlyOnce => {
let pkid = publish.pkid;
self.incoming_pub[pkid as usize] = Some(pkid);
self.incoming_pub.insert(pkid as usize);

if !self.manual_acks {
let pubrec = PubRec::new(pkid, None);
Expand Down Expand Up @@ -416,23 +414,19 @@ impl MqttState {
}

// NOTE: Inflight - 1 for qos2 in comp
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
self.outgoing_rel.insert(pubrec.pkid as usize);
let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
self.events.push_back(event);

Ok(Some(Packet::PubRel(PubRel::new(pubrec.pkid, None))))
}

fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<Option<Packet>, StateError> {
let publish = self
.incoming_pub
.get_mut(pubrel.pkid as usize)
.ok_or(StateError::Unsolicited(pubrel.pkid))?;

if publish.take().is_none() {
if !self.incoming_pub.contains(pubrel.pkid as usize) {
error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
return Err(StateError::Unsolicited(pubrel.pkid));
}
self.incoming_pub.set(pubrel.pkid as usize, false);

if pubrel.reason != PubRelReason::Success {
return Err(StateError::PubRelFail {
Expand All @@ -456,14 +450,11 @@ impl MqttState {
Packet::Publish(publish)
});

let pubrel = self
.outgoing_rel
.get_mut(pubcomp.pkid as usize)
.ok_or(StateError::Unsolicited(pubcomp.pkid))?;
if pubrel.take().is_none() {
if !self.outgoing_rel.contains(pubcomp.pkid as usize) {
error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
return Err(StateError::Unsolicited(pubcomp.pkid));
}
self.outgoing_rel.set(pubcomp.pkid as usize, false);

if pubcomp.reason != PubCompReason::Success {
return Err(StateError::PubCompFail {
Expand Down Expand Up @@ -668,7 +659,7 @@ impl MqttState {
_ => pubrel,
};

self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid);
self.outgoing_rel.insert(pubrel.pkid as usize);
self.inflight += 1;
Ok(pubrel)
}
Expand Down Expand Up @@ -824,10 +815,8 @@ mod test {
mqtt.handle_incoming_publish(&mut publish2).unwrap();
mqtt.handle_incoming_publish(&mut publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();

// only qos2 publish should be add to queue
assert_eq!(pkid, 3);
assert!(mqtt.incoming_pub.contains(3));
}

#[test]
Expand Down Expand Up @@ -870,9 +859,7 @@ mod test {
mqtt.handle_incoming_publish(&mut publish2).unwrap();
mqtt.handle_incoming_publish(&mut publish3).unwrap();

let pkid = mqtt.incoming_pub[3].unwrap();
assert_eq!(pkid, 3);

assert!(mqtt.incoming_pub.contains(3));
assert!(mqtt.events.is_empty());
}

Expand Down Expand Up @@ -940,7 +927,7 @@ mod test {
assert_eq!(backup.unwrap().pkid, 1);

// check if the qos2 element's release pkid is 2
assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2);
assert!(mqtt.outgoing_rel.contains(2));
}

#[test]
Expand Down

0 comments on commit 138d0b1

Please sign in to comment.