From d63d775dab0bfb47babdfcb4e953509b7f580554 Mon Sep 17 00:00:00 2001 From: GunnarMorrigan <13799935+GunnarMorrigan@users.noreply.github.com> Date: Tue, 16 Jan 2024 20:20:29 +0100 Subject: [PATCH] Send almost all incoming packets to user handler Before this commit only a select couple of packet types were actually send to the users handler. Now, all packets are send to the users handler except those that are not allowed to go to the users handler per MQTT spec. These packets are Publish packets with QoS 2. --- src/state_handler.rs | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/state_handler.rs b/src/state_handler.rs index f0b9c0a..5f92ec5 100644 --- a/src/state_handler.rs +++ b/src/state_handler.rs @@ -50,8 +50,8 @@ impl StateHandler { /// true is returned if the users handler should be called /// false otherwise pub fn handle_incoming_packet(&self, packet: &Packet) -> Result<(Option, bool), HandlerError> { - Ok(match packet { - Packet::Publish(publish) => self.handle_incoming_publish(publish), + let reply_packet = match packet { + Packet::Publish(publish) => return self.handle_incoming_publish(publish), Packet::PubAck(puback) => self.handle_incoming_puback(puback), Packet::PubRec(pubrec) => self.handle_incoming_pubrec(pubrec), Packet::PubRel(pubrel) => self.handle_incoming_pubrel(pubrel), @@ -59,7 +59,9 @@ impl StateHandler { Packet::SubAck(suback) => self.handle_incoming_suback(suback), Packet::UnsubAck(unsuback) => self.handle_incoming_unsuback(unsuback), a => return Err(HandlerError::UnexpectedPacket(a.packet_type())), - }?) + }?; + + Ok((reply_packet, true)) } fn handle_incoming_publish(&self, publish: &Publish) -> Result<(Option, bool), HandlerError> { @@ -87,12 +89,12 @@ impl StateHandler { } } - fn handle_incoming_puback(&self, puback: &PubAck) -> Result<(Option, bool), HandlerError> { + fn handle_incoming_puback(&self, puback: &PubAck) -> Result, HandlerError> { if self.state.remove_outgoing_pub(puback.packet_identifier).is_some() { #[cfg(feature = "logs")] debug!("Publish {:?} has been acknowledged", puback.packet_identifier); self.state.make_pkid_available(puback.packet_identifier)?; - Ok((None, false)) + Ok(None) } else { #[cfg(feature = "logs")] error!("Publish {:?} was not found, while receiving a PubAck for it", puback.packet_identifier,); @@ -100,7 +102,7 @@ impl StateHandler { } } - fn handle_incoming_pubrec(&self, pubrec: &PubRec) -> Result<(Option, bool), HandlerError> { + fn handle_incoming_pubrec(&self, pubrec: &PubRec) -> Result, HandlerError> { match self.state.remove_outgoing_pub(pubrec.packet_identifier) { Some(_) => match pubrec.reason_code { PubRecReasonCode::Success | PubRecReasonCode::NoMatchingSubscribers => { @@ -111,9 +113,9 @@ impl StateHandler { #[cfg(feature = "logs")] debug!("Publish {:?} has been PubReced", pubrec.packet_identifier); - Ok((Some(Packet::PubRel(pubrel)), false)) + Ok(Some(Packet::PubRel(pubrel))) } - _ => Ok((None, false)), + _ => Ok(None), }, None => { #[cfg(feature = "logs")] @@ -123,20 +125,20 @@ impl StateHandler { } } - fn handle_incoming_pubrel(&self, pubrel: &PubRel) -> Result<(Option, bool), HandlerError> { + fn handle_incoming_pubrel(&self, pubrel: &PubRel) -> Result, HandlerError> { let pubcomp = PubComp::new(pubrel.packet_identifier); if !self.state.remove_incoming_pub(pubrel.packet_identifier) { #[cfg(feature = "logs")] warn!("Received an unexpected / unsolicited PubRel packet with id {}", pubrel.packet_identifier); let _pubcomp = PubComp::new(pubrel.packet_identifier); } - Ok((Some(Packet::PubComp(pubcomp)), false)) + Ok(Some(Packet::PubComp(pubcomp))) } - fn handle_incoming_pubcomp(&self, pubcomp: &PubComp) -> Result<(Option, bool), HandlerError> { + fn handle_incoming_pubcomp(&self, pubcomp: &PubComp) -> Result, HandlerError> { if self.state.remove_outgoing_rel(&pubcomp.packet_identifier) { self.state.make_pkid_available(pubcomp.packet_identifier)?; - Ok((None, false)) + Ok(None) } else { #[cfg(feature = "logs")] error!("PubRel {} was not found, while receiving a PubComp for it", pubcomp.packet_identifier,); @@ -144,10 +146,10 @@ impl StateHandler { } } - fn handle_incoming_suback(&self, suback: &SubAck) -> Result<(Option, bool), HandlerError> { + fn handle_incoming_suback(&self, suback: &SubAck) -> Result, HandlerError> { if self.state.remove_outgoing_sub(suback.packet_identifier) { self.state.make_pkid_available(suback.packet_identifier)?; - Ok((None, false)) + Ok(None) } else { #[cfg(feature = "logs")] error!("Sub {} was not found, while receiving a SubAck for it", suback.packet_identifier,); @@ -155,10 +157,10 @@ impl StateHandler { } } - fn handle_incoming_unsuback(&self, unsuback: &UnsubAck) -> Result<(Option, bool), HandlerError> { + fn handle_incoming_unsuback(&self, unsuback: &UnsubAck) -> Result, HandlerError> { if self.state.remove_outgoing_unsub(unsuback.packet_identifier) { self.state.make_pkid_available(unsuback.packet_identifier)?; - Ok((None, false)) + Ok(None) } else { #[cfg(feature = "logs")] error!("Unsub {} was not found, while receiving a unsuback for it", unsuback.packet_identifier,);