From 138d0b10e313a265bc70420a1e436dfd40e89d7e Mon Sep 17 00:00:00 2001 From: hippalus Date: Wed, 22 May 2024 15:58:23 +0200 Subject: [PATCH] refactor(rumqttc): `v5/state.rs` Replace Vec with FixedBitSet for QoS 2 publishes (#868) --- rumqttc/src/v5/state.rs | 53 ++++++++++++++++------------------------- 1 file changed, 20 insertions(+), 33 deletions(-) diff --git a/rumqttc/src/v5/state.rs b/rumqttc/src/v5/state.rs index 0d8f11f65..6f37a4719 100644 --- a/rumqttc/src/v5/state.rs +++ b/rumqttc/src/v5/state.rs @@ -8,6 +8,7 @@ use super::mqttbytes::{self, Error as MqttError, QoS}; use super::{Event, Incoming, Outgoing, Request}; use bytes::Bytes; +use fixedbitset::FixedBitSet; use std::collections::{HashMap, VecDeque}; use std::{io, time::Instant}; @@ -104,9 +105,9 @@ pub struct MqttState { /// Outgoing QoS 1, 2 publishes which aren't acked yet pub(crate) outgoing_pub: Vec>, /// Packet ids of released QoS 2 publishes - pub(crate) outgoing_rel: Vec>, + pub(crate) outgoing_rel: FixedBitSet, /// Packet ids on incoming QoS 2 publishes - pub(crate) incoming_pub: Vec>, + pub(crate) incoming_pub: FixedBitSet, /// Last collision due to broker not acking in order pub collision: Option, /// Buffered incoming packets @@ -137,8 +138,8 @@ impl MqttState { inflight: 0, // index 0 is wasted as 0 is not a valid packet id outgoing_pub: vec![None; max_inflight as usize + 1], - outgoing_rel: vec![None; max_inflight as usize + 1], - incoming_pub: vec![None; u16::MAX as usize + 1], + outgoing_rel: FixedBitSet::with_capacity(max_inflight as usize + 1), + incoming_pub: FixedBitSet::with_capacity(u16::MAX as usize + 1), collision: None, // TODO: Optimize these sizes later events: VecDeque::with_capacity(100), @@ -163,17 +164,14 @@ impl MqttState { } // remove and collect pending releases - for rel in self.outgoing_rel.iter_mut() { - if let Some(pkid) = rel.take() { - let request = Request::PubRel(PubRel::new(pkid, None)); - pending.push(request); - } + for pkid in self.outgoing_rel.ones() { + let request = Request::PubRel(PubRel::new(pkid as u16, None)); + pending.push(request); } + self.outgoing_rel.clear(); // remove packed ids of incoming qos2 publishes - for id in self.incoming_pub.iter_mut() { - id.take(); - } + self.incoming_pub.clear(); self.await_pingresp = false; self.collision_ping_count = 0; @@ -349,7 +347,7 @@ impl MqttState { } QoS::ExactlyOnce => { let pkid = publish.pkid; - self.incoming_pub[pkid as usize] = Some(pkid); + self.incoming_pub.insert(pkid as usize); if !self.manual_acks { let pubrec = PubRec::new(pkid, None); @@ -416,7 +414,7 @@ impl MqttState { } // NOTE: Inflight - 1 for qos2 in comp - self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid); + self.outgoing_rel.insert(pubrec.pkid as usize); let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid)); self.events.push_back(event); @@ -424,15 +422,11 @@ impl MqttState { } fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result, StateError> { - let publish = self - .incoming_pub - .get_mut(pubrel.pkid as usize) - .ok_or(StateError::Unsolicited(pubrel.pkid))?; - - if publish.take().is_none() { + if !self.incoming_pub.contains(pubrel.pkid as usize) { error!("Unsolicited pubrel packet: {:?}", pubrel.pkid); return Err(StateError::Unsolicited(pubrel.pkid)); } + self.incoming_pub.set(pubrel.pkid as usize, false); if pubrel.reason != PubRelReason::Success { return Err(StateError::PubRelFail { @@ -456,14 +450,11 @@ impl MqttState { Packet::Publish(publish) }); - let pubrel = self - .outgoing_rel - .get_mut(pubcomp.pkid as usize) - .ok_or(StateError::Unsolicited(pubcomp.pkid))?; - if pubrel.take().is_none() { + if !self.outgoing_rel.contains(pubcomp.pkid as usize) { error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid); return Err(StateError::Unsolicited(pubcomp.pkid)); } + self.outgoing_rel.set(pubcomp.pkid as usize, false); if pubcomp.reason != PubCompReason::Success { return Err(StateError::PubCompFail { @@ -668,7 +659,7 @@ impl MqttState { _ => pubrel, }; - self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid); + self.outgoing_rel.insert(pubrel.pkid as usize); self.inflight += 1; Ok(pubrel) } @@ -824,10 +815,8 @@ mod test { mqtt.handle_incoming_publish(&mut publish2).unwrap(); mqtt.handle_incoming_publish(&mut publish3).unwrap(); - let pkid = mqtt.incoming_pub[3].unwrap(); - // only qos2 publish should be add to queue - assert_eq!(pkid, 3); + assert!(mqtt.incoming_pub.contains(3)); } #[test] @@ -870,9 +859,7 @@ mod test { mqtt.handle_incoming_publish(&mut publish2).unwrap(); mqtt.handle_incoming_publish(&mut publish3).unwrap(); - let pkid = mqtt.incoming_pub[3].unwrap(); - assert_eq!(pkid, 3); - + assert!(mqtt.incoming_pub.contains(3)); assert!(mqtt.events.is_empty()); } @@ -940,7 +927,7 @@ mod test { assert_eq!(backup.unwrap().pkid, 1); // check if the qos2 element's release pkid is 2 - assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2); + assert!(mqtt.outgoing_rel.contains(2)); } #[test]