Skip to content

Commit

Permalink
refactor: read and write methods on v4::Packet (#821)
Browse files Browse the repository at this point in the history
* refactor: `Packet::read`

* refactor: `Packet::write`

* test: fix changes in refactor
  • Loading branch information
Devdutt Shenoi committed Mar 24, 2024
1 parent a6d1116 commit 82bea4e
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 42 deletions.
3 changes: 2 additions & 1 deletion benchmarks/parsers/v4.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytes::{Buf, BytesMut};
use rumqttc::mqttbytes::v4;
use rumqttc::mqttbytes::QoS;
use rumqttc::Packet;
use std::time::Instant;

mod common;
Expand Down Expand Up @@ -31,7 +32,7 @@ fn main() {
let start = Instant::now();
let mut packets = Vec::with_capacity(count);
while output.has_remaining() {
let packet = v4::read(&mut output, 10 * 1024).unwrap();
let packet = Packet::read(&mut output, 10 * 1024).unwrap();
packets.push(packet);
}

Expand Down
4 changes: 2 additions & 2 deletions rumqttc/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Network {

pub async fn read(&mut self) -> io::Result<Incoming> {
loop {
let required = match read(&mut self.read, self.max_incoming_size) {
let required = match Packet::read(&mut self.read, self.max_incoming_size) {
Ok(packet) => return Ok(packet),
Err(mqttbytes::Error::InsufficientBytes(required)) => required,
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidData, e.to_string())),
Expand All @@ -75,7 +75,7 @@ impl Network {
pub async fn readb(&mut self, state: &mut MqttState) -> Result<(), StateError> {
let mut count = 0;
loop {
match read(&mut self.read, self.max_incoming_size) {
match Packet::read(&mut self.read, self.max_incoming_size) {
Ok(packet) => {
state.handle_incoming_packet(packet)?;

Expand Down
88 changes: 55 additions & 33 deletions rumqttc/src/mqttbytes/v4/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,45 +66,67 @@ impl Packet {
Self::Disconnect => Disconnect.size(),
}
}
}

/// Reads a stream of bytes and extracts next MQTT packet out of it
pub fn read(stream: &mut BytesMut, max_size: usize) -> Result<Packet, Error> {
let fixed_header = check(stream.iter(), max_size)?;
/// Reads a stream of bytes and extracts next MQTT packet out of it
pub fn read(stream: &mut BytesMut, max_size: usize) -> Result<Self, Error> {
let fixed_header = check(stream.iter(), max_size)?;

// Test with a stream with exactly the size to check border panics
let packet = stream.split_to(fixed_header.frame_length());
let packet_type = fixed_header.packet_type()?;

// Test with a stream with exactly the size to check border panics
let packet = stream.split_to(fixed_header.frame_length());
let packet_type = fixed_header.packet_type()?;
if fixed_header.remaining_len == 0 {
// no payload packets
return match packet_type {
PacketType::PingReq => Ok(Packet::PingReq),
PacketType::PingResp => Ok(Packet::PingResp),
PacketType::Disconnect => Ok(Packet::Disconnect),
_ => Err(Error::PayloadRequired),
};
}

if fixed_header.remaining_len == 0 {
// no payload packets
return match packet_type {
PacketType::PingReq => Ok(Packet::PingReq),
PacketType::PingResp => Ok(Packet::PingResp),
PacketType::Disconnect => Ok(Packet::Disconnect),
_ => Err(Error::PayloadRequired),
let packet = packet.freeze();
let packet = match packet_type {
PacketType::Connect => Packet::Connect(Connect::read(fixed_header, packet)?),
PacketType::ConnAck => Packet::ConnAck(ConnAck::read(fixed_header, packet)?),
PacketType::Publish => Packet::Publish(Publish::read(fixed_header, packet)?),
PacketType::PubAck => Packet::PubAck(PubAck::read(fixed_header, packet)?),
PacketType::PubRec => Packet::PubRec(PubRec::read(fixed_header, packet)?),
PacketType::PubRel => Packet::PubRel(PubRel::read(fixed_header, packet)?),
PacketType::PubComp => Packet::PubComp(PubComp::read(fixed_header, packet)?),
PacketType::Subscribe => Packet::Subscribe(Subscribe::read(fixed_header, packet)?),
PacketType::SubAck => Packet::SubAck(SubAck::read(fixed_header, packet)?),
PacketType::Unsubscribe => {
Packet::Unsubscribe(Unsubscribe::read(fixed_header, packet)?)
}
PacketType::UnsubAck => Packet::UnsubAck(UnsubAck::read(fixed_header, packet)?),
PacketType::PingReq => Packet::PingReq,
PacketType::PingResp => Packet::PingResp,
PacketType::Disconnect => Packet::Disconnect,
};
}

let packet = packet.freeze();
let packet = match packet_type {
PacketType::Connect => Packet::Connect(Connect::read(fixed_header, packet)?),
PacketType::ConnAck => Packet::ConnAck(ConnAck::read(fixed_header, packet)?),
PacketType::Publish => Packet::Publish(Publish::read(fixed_header, packet)?),
PacketType::PubAck => Packet::PubAck(PubAck::read(fixed_header, packet)?),
PacketType::PubRec => Packet::PubRec(PubRec::read(fixed_header, packet)?),
PacketType::PubRel => Packet::PubRel(PubRel::read(fixed_header, packet)?),
PacketType::PubComp => Packet::PubComp(PubComp::read(fixed_header, packet)?),
PacketType::Subscribe => Packet::Subscribe(Subscribe::read(fixed_header, packet)?),
PacketType::SubAck => Packet::SubAck(SubAck::read(fixed_header, packet)?),
PacketType::Unsubscribe => Packet::Unsubscribe(Unsubscribe::read(fixed_header, packet)?),
PacketType::UnsubAck => Packet::UnsubAck(UnsubAck::read(fixed_header, packet)?),
PacketType::PingReq => Packet::PingReq,
PacketType::PingResp => Packet::PingResp,
PacketType::Disconnect => Packet::Disconnect,
};
Ok(packet)
}

Ok(packet)
/// Serializes the MQTT packet into a stream of bytes
pub fn write(&self, stream: &mut BytesMut) -> Result<usize, Error> {
match self {
Packet::Connect(c) => c.write(stream),
Packet::ConnAck(c) => c.write(stream),
Packet::Publish(p) => p.write(stream),
Packet::PubAck(p) => p.write(stream),
Packet::PubRec(p) => p.write(stream),
Packet::PubRel(p) => p.write(stream),
Packet::PubComp(p) => p.write(stream),
Packet::Subscribe(s) => s.write(stream),
Packet::SubAck(s) => s.write(stream),
Packet::Unsubscribe(u) => u.write(stream),
Packet::UnsubAck(u) => u.write(stream),
Packet::PingReq => PingReq.write(stream),
Packet::PingResp => PingResp.write(stream),
Packet::Disconnect => Disconnect.write(stream),
}
}
}

/// Return number of remaining length bytes required for encoding length
Expand Down
10 changes: 5 additions & 5 deletions rumqttc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ mod test {
let publish = build_incoming_publish(QoS::ExactlyOnce, 1);

mqtt.handle_incoming_publish(&publish).unwrap();
let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
let packet = Packet::read(&mut mqtt.write, 10 * 1024).unwrap();
match packet {
Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
_ => panic!("Invalid network request: {:?}", packet),
Expand Down Expand Up @@ -770,14 +770,14 @@ mod test {

let publish = build_outgoing_publish(QoS::ExactlyOnce);
mqtt.outgoing_publish(publish).unwrap();
let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
let packet = Packet::read(&mut mqtt.write, 10 * 1024).unwrap();
match packet {
Packet::Publish(publish) => assert_eq!(publish.pkid, 1),
packet => panic!("Invalid network request: {:?}", packet),
}

mqtt.handle_incoming_pubrec(&PubRec::new(1)).unwrap();
let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
let packet = Packet::read(&mut mqtt.write, 10 * 1024).unwrap();
match packet {
Packet::PubRel(pubrel) => assert_eq!(pubrel.pkid, 1),
packet => panic!("Invalid network request: {:?}", packet),
Expand All @@ -790,14 +790,14 @@ mod test {
let publish = build_incoming_publish(QoS::ExactlyOnce, 1);

mqtt.handle_incoming_publish(&publish).unwrap();
let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
let packet = Packet::read(&mut mqtt.write, 10 * 1024).unwrap();
match packet {
Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
packet => panic!("Invalid network request: {:?}", packet),
}

mqtt.handle_incoming_pubrel(&PubRel::new(1)).unwrap();
let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
let packet = Packet::read(&mut mqtt.write, 10 * 1024).unwrap();
match packet {
Packet::PubComp(pubcomp) => assert_eq!(pubcomp.pkid, 1),
packet => panic!("Invalid network request: {:?}", packet),
Expand Down
2 changes: 1 addition & 1 deletion rumqttc/tests/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl Network {
pub async fn readb(&mut self, incoming: &mut VecDeque<Incoming>) -> io::Result<()> {
let mut count = 0;
loop {
match read(&mut self.read, self.max_incoming_size) {
match Packet::read(&mut self.read, self.max_incoming_size) {
Ok(packet) => {
incoming.push_back(packet);
count += 1;
Expand Down

0 comments on commit 82bea4e

Please sign in to comment.