Skip to content

Commit

Permalink
fix: state
Browse files Browse the repository at this point in the history
* ack incoming publishes if required
* remove unused write buffer
* update tests
  • Loading branch information
Devdutt Shenoi committed Mar 25, 2024
1 parent ae5ab16 commit 0eff631
Showing 1 changed file with 17 additions and 22 deletions.
39 changes: 17 additions & 22 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::mqttbytes::{self, Error as MqttError, QoS};

use super::{Event, Incoming, Outgoing, Request};

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use std::collections::{HashMap, VecDeque};
use std::{io, time::Instant};

Expand Down Expand Up @@ -109,8 +109,6 @@ pub struct MqttState {
pub collision: Option<Publish>,
/// Buffered incoming packets
pub events: VecDeque<Event>,
/// Write buffer
pub write: BytesMut,
/// Indicates if acknowledgements should be send immediately
pub manual_acks: bool,
/// Map of alias_id->topic
Expand Down Expand Up @@ -142,7 +140,6 @@ impl MqttState {
collision: None,
// TODO: Optimize these sizes later
events: VecDeque::with_capacity(100),
write: BytesMut::with_capacity(10 * 1024),
manual_acks,
topic_alises: HashMap::new(),
// Set via CONNACK
Expand Down Expand Up @@ -343,7 +340,7 @@ impl MqttState {
QoS::AtLeastOnce => {
if !self.manual_acks {
let puback = PubAck::new(publish.pkid, None);
self.outgoing_puback(puback)?;
return self.outgoing_puback(puback);
}
Ok(None)
}
Expand All @@ -353,7 +350,7 @@ impl MqttState {

if !self.manual_acks {
let pubrec = PubRec::new(pkid, None);
self.outgoing_pubrec(pubrec)?;
return self.outgoing_pubrec(pubrec);
}
Ok(None)
}
Expand Down Expand Up @@ -892,11 +889,9 @@ mod test {
let mut mqtt = build_mqttstate();
let mut publish = build_incoming_publish(QoS::ExactlyOnce, 1);

mqtt.handle_incoming_publish(&mut publish).unwrap();
let packet = Packet::read(&mut mqtt.write, Some(10 * 1024)).unwrap();
match packet {
match mqtt.handle_incoming_publish(&mut publish).unwrap().unwrap() {
Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
_ => panic!("Invalid network request: {:?}", packet),
packet => panic!("Invalid network request: {:?}", packet),
}
}

Expand Down Expand Up @@ -961,16 +956,16 @@ mod test {
let mut mqtt = build_mqttstate();

let publish = build_outgoing_publish(QoS::ExactlyOnce);
mqtt.outgoing_publish(publish).unwrap();
let packet = Packet::read(&mut mqtt.write, Some(10 * 1024)).unwrap();
match packet {
match mqtt.outgoing_publish(publish).unwrap().unwrap() {
Packet::Publish(publish) => assert_eq!(publish.pkid, 1),
packet => panic!("Invalid network request: {:?}", packet),
}

mqtt.handle_incoming_pubrec(&PubRec::new(1, None)).unwrap();
let packet = Packet::read(&mut mqtt.write, Some(10 * 1024)).unwrap();
match packet {
match mqtt
.handle_incoming_pubrec(&PubRec::new(1, None))
.unwrap()
.unwrap()
{
Packet::PubRel(pubrel) => assert_eq!(pubrel.pkid, 1),
packet => panic!("Invalid network request: {:?}", packet),
}
Expand All @@ -981,16 +976,16 @@ mod test {
let mut mqtt = build_mqttstate();
let mut publish = build_incoming_publish(QoS::ExactlyOnce, 1);

mqtt.handle_incoming_publish(&mut publish).unwrap();
let packet = Packet::read(&mut mqtt.write, Some(10 * 1024)).unwrap();
match packet {
match mqtt.handle_incoming_publish(&mut publish).unwrap().unwrap() {
Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
packet => panic!("Invalid network request: {:?}", packet),
}

mqtt.handle_incoming_pubrel(&PubRel::new(1, None)).unwrap();
let packet = Packet::read(&mut mqtt.write, Some(10 * 1024)).unwrap();
match packet {
match mqtt
.handle_incoming_pubrel(&PubRel::new(1, None))
.unwrap()
.unwrap()
{
Packet::PubComp(pubcomp) => assert_eq!(pubcomp.pkid, 1),
packet => panic!("Invalid network request: {:?}", packet),
}
Expand Down

0 comments on commit 0eff631

Please sign in to comment.