Skip to content

Commit

Permalink
Send almost all incoming packets to user handler
Browse files Browse the repository at this point in the history
Before this commit only a select couple of packet types were actually send to the users handler.
Now, all packets are send to the users handler except those that are not allowed to go to the users handler per MQTT spec.
These packets are Publish packets with QoS 2.
  • Loading branch information
GunnarMorrigan committed Jan 16, 2024
1 parent 48f84bf commit d63d775
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions src/state_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,18 @@ impl StateHandler {
/// true is returned if the users handler should be called
/// false otherwise
pub fn handle_incoming_packet(&self, packet: &Packet) -> Result<(Option<Packet>, bool), HandlerError> {
Ok(match packet {
Packet::Publish(publish) => self.handle_incoming_publish(publish),
let reply_packet = match packet {
Packet::Publish(publish) => return self.handle_incoming_publish(publish),
Packet::PubAck(puback) => self.handle_incoming_puback(puback),
Packet::PubRec(pubrec) => self.handle_incoming_pubrec(pubrec),
Packet::PubRel(pubrel) => self.handle_incoming_pubrel(pubrel),
Packet::PubComp(pubcomp) => self.handle_incoming_pubcomp(pubcomp),
Packet::SubAck(suback) => self.handle_incoming_suback(suback),
Packet::UnsubAck(unsuback) => self.handle_incoming_unsuback(unsuback),
a => return Err(HandlerError::UnexpectedPacket(a.packet_type())),
}?)
}?;

Ok((reply_packet, true))
}

fn handle_incoming_publish(&self, publish: &Publish) -> Result<(Option<Packet>, bool), HandlerError> {
Expand Down Expand Up @@ -87,20 +89,20 @@ impl StateHandler {
}
}

fn handle_incoming_puback(&self, puback: &PubAck) -> Result<(Option<Packet>, bool), HandlerError> {
fn handle_incoming_puback(&self, puback: &PubAck) -> Result<Option<Packet>, HandlerError> {
if self.state.remove_outgoing_pub(puback.packet_identifier).is_some() {
#[cfg(feature = "logs")]
debug!("Publish {:?} has been acknowledged", puback.packet_identifier);
self.state.make_pkid_available(puback.packet_identifier)?;
Ok((None, false))
Ok(None)
} else {
#[cfg(feature = "logs")]
error!("Publish {:?} was not found, while receiving a PubAck for it", puback.packet_identifier,);
Err(HandlerError::Unsolicited(puback.packet_identifier, PacketType::PubAck))
}
}

fn handle_incoming_pubrec(&self, pubrec: &PubRec) -> Result<(Option<Packet>, bool), HandlerError> {
fn handle_incoming_pubrec(&self, pubrec: &PubRec) -> Result<Option<Packet>, HandlerError> {
match self.state.remove_outgoing_pub(pubrec.packet_identifier) {
Some(_) => match pubrec.reason_code {
PubRecReasonCode::Success | PubRecReasonCode::NoMatchingSubscribers => {
Expand All @@ -111,9 +113,9 @@ impl StateHandler {
#[cfg(feature = "logs")]
debug!("Publish {:?} has been PubReced", pubrec.packet_identifier);

Ok((Some(Packet::PubRel(pubrel)), false))
Ok(Some(Packet::PubRel(pubrel)))
}
_ => Ok((None, false)),
_ => Ok(None),
},
None => {
#[cfg(feature = "logs")]
Expand All @@ -123,42 +125,42 @@ impl StateHandler {
}
}

fn handle_incoming_pubrel(&self, pubrel: &PubRel) -> Result<(Option<Packet>, bool), HandlerError> {
fn handle_incoming_pubrel(&self, pubrel: &PubRel) -> Result<Option<Packet>, HandlerError> {
let pubcomp = PubComp::new(pubrel.packet_identifier);
if !self.state.remove_incoming_pub(pubrel.packet_identifier) {
#[cfg(feature = "logs")]
warn!("Received an unexpected / unsolicited PubRel packet with id {}", pubrel.packet_identifier);
let _pubcomp = PubComp::new(pubrel.packet_identifier);
}
Ok((Some(Packet::PubComp(pubcomp)), false))
Ok(Some(Packet::PubComp(pubcomp)))
}

fn handle_incoming_pubcomp(&self, pubcomp: &PubComp) -> Result<(Option<Packet>, bool), HandlerError> {
fn handle_incoming_pubcomp(&self, pubcomp: &PubComp) -> Result<Option<Packet>, HandlerError> {
if self.state.remove_outgoing_rel(&pubcomp.packet_identifier) {
self.state.make_pkid_available(pubcomp.packet_identifier)?;
Ok((None, false))
Ok(None)
} else {
#[cfg(feature = "logs")]
error!("PubRel {} was not found, while receiving a PubComp for it", pubcomp.packet_identifier,);
Err(HandlerError::Unsolicited(pubcomp.packet_identifier, PacketType::PubComp))
}
}

fn handle_incoming_suback(&self, suback: &SubAck) -> Result<(Option<Packet>, bool), HandlerError> {
fn handle_incoming_suback(&self, suback: &SubAck) -> Result<Option<Packet>, HandlerError> {
if self.state.remove_outgoing_sub(suback.packet_identifier) {
self.state.make_pkid_available(suback.packet_identifier)?;
Ok((None, false))
Ok(None)
} else {
#[cfg(feature = "logs")]
error!("Sub {} was not found, while receiving a SubAck for it", suback.packet_identifier,);
Err(HandlerError::Unsolicited(suback.packet_identifier, PacketType::SubAck))
}
}

fn handle_incoming_unsuback(&self, unsuback: &UnsubAck) -> Result<(Option<Packet>, bool), HandlerError> {
fn handle_incoming_unsuback(&self, unsuback: &UnsubAck) -> Result<Option<Packet>, HandlerError> {
if self.state.remove_outgoing_unsub(unsuback.packet_identifier) {
self.state.make_pkid_available(unsuback.packet_identifier)?;
Ok((None, false))
Ok(None)
} else {
#[cfg(feature = "logs")]
error!("Unsub {} was not found, while receiving a unsuback for it", unsuback.packet_identifier,);
Expand Down

0 comments on commit d63d775

Please sign in to comment.