Skip to content

Commit

Permalink
refactor: break out if publish is unsolicited (#832)
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi authored Mar 26, 2024
1 parent 929a840 commit a7a0c54
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 91 deletions.
63 changes: 31 additions & 32 deletions rumqttc/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,11 @@ impl MqttState {
.ok_or(StateError::Unsolicited(puback.pkid))?;

self.last_puback = puback.pkid;
publish.take().ok_or({

if publish.take().is_none() {
error!("Unsolicited puback packet: {:?}", puback.pkid);
StateError::Unsolicited(puback.pkid)
})?;
return Err(StateError::Unsolicited(puback.pkid));
}

self.inflight -= 1;
let packet = self.check_collision(puback.pkid).map(|publish| {
Expand All @@ -250,52 +251,50 @@ impl MqttState {
.outgoing_pub
.get_mut(pubrec.pkid as usize)
.ok_or(StateError::Unsolicited(pubrec.pkid))?;
match publish.take() {
Some(_) => {
// NOTE: Inflight - 1 for qos2 in comp
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
let pubrel = PubRel { pkid: pubrec.pkid };
let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
self.events.push_back(event);

Ok(Some(Packet::PubRel(pubrel)))
}
None => {
error!("Unsolicited pubrec packet: {:?}", pubrec.pkid);
Err(StateError::Unsolicited(pubrec.pkid))
}
if publish.take().is_none() {
error!("Unsolicited pubrec packet: {:?}", pubrec.pkid);
return Err(StateError::Unsolicited(pubrec.pkid));
}

// NOTE: Inflight - 1 for qos2 in comp
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
let pubrel = PubRel { pkid: pubrec.pkid };
let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
self.events.push_back(event);

Ok(Some(Packet::PubRel(pubrel)))
}

fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<Option<Packet>, StateError> {
let publish = self
.incoming_pub
.get_mut(pubrel.pkid as usize)
.ok_or(StateError::Unsolicited(pubrel.pkid))?;
match publish.take() {
Some(_) => {
let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
let pubcomp = PubComp { pkid: pubrel.pkid };
self.events.push_back(event);

Ok(Some(Packet::PubComp(pubcomp)))
}
None => {
error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
Err(StateError::Unsolicited(pubrel.pkid))
}
if publish.take().is_none() {
error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
return Err(StateError::Unsolicited(pubrel.pkid));
}

let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
let pubcomp = PubComp { pkid: pubrel.pkid };
self.events.push_back(event);

Ok(Some(Packet::PubComp(pubcomp)))
}

fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result<Option<Packet>, StateError> {
self.outgoing_rel
if self
.outgoing_rel
.get_mut(pubcomp.pkid as usize)
.ok_or(StateError::Unsolicited(pubcomp.pkid))?
.take()
.ok_or({
error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
StateError::Unsolicited(pubcomp.pkid)
})?;
.is_none()
{
error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
return Err(StateError::Unsolicited(pubcomp.pkid));
}

self.inflight -= 1;
let packet = self.check_collision(pubcomp.pkid).map(|publish| {
Expand Down
107 changes: 48 additions & 59 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,17 +362,13 @@ impl MqttState {
.outgoing_pub
.get_mut(puback.pkid as usize)
.ok_or(StateError::Unsolicited(puback.pkid))?;
let v = match publish.take() {
Some(_) => {
self.inflight -= 1;

Ok(None)
}
None => {
error!("Unsolicited puback packet: {:?}", puback.pkid);
Err(StateError::Unsolicited(puback.pkid))
}
};
if publish.take().is_none() {
error!("Unsolicited puback packet: {:?}", puback.pkid);
return Err(StateError::Unsolicited(puback.pkid));
}

self.inflight -= 1;

if puback.reason != PubAckReason::Success
&& puback.reason != PubAckReason::NoMatchingSubscribers
Expand All @@ -394,61 +390,57 @@ impl MqttState {
return Ok(Some(Packet::Publish(publish)));
}

v
Ok(None)
}

fn handle_incoming_pubrec(&mut self, pubrec: &PubRec) -> Result<Option<Packet>, StateError> {
let publish = self
.outgoing_pub
.get_mut(pubrec.pkid as usize)
.ok_or(StateError::Unsolicited(pubrec.pkid))?;
match publish.take() {
Some(_) => {
if pubrec.reason != PubRecReason::Success
&& pubrec.reason != PubRecReason::NoMatchingSubscribers
{
return Err(StateError::PubRecFail {
reason: pubrec.reason,
});
}

// NOTE: Inflight - 1 for qos2 in comp
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
self.events.push_back(event);
if publish.take().is_none() {
error!("Unsolicited pubrec packet: {:?}", pubrec.pkid);
return Err(StateError::Unsolicited(pubrec.pkid));
}

Ok(Some(Packet::PubRel(PubRel::new(pubrec.pkid, None))))
}
None => {
error!("Unsolicited pubrec packet: {:?}", pubrec.pkid);
Err(StateError::Unsolicited(pubrec.pkid))
}
if pubrec.reason != PubRecReason::Success
&& pubrec.reason != PubRecReason::NoMatchingSubscribers
{
return Err(StateError::PubRecFail {
reason: pubrec.reason,
});
}

// NOTE: Inflight - 1 for qos2 in comp
self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
self.events.push_back(event);

Ok(Some(Packet::PubRel(PubRel::new(pubrec.pkid, None))))
}

fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<Option<Packet>, StateError> {
let publish = self
.incoming_pub
.get_mut(pubrel.pkid as usize)
.ok_or(StateError::Unsolicited(pubrel.pkid))?;
match publish.take() {
Some(_) => {
if pubrel.reason != PubRelReason::Success {
return Err(StateError::PubRelFail {
reason: pubrel.reason,
});
}

let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
self.events.push_back(event);
if publish.take().is_none() {
error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
return Err(StateError::Unsolicited(pubrel.pkid));
}

Ok(Some(Packet::PubComp(PubComp::new(pubrel.pkid, None))))
}
None => {
error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
Err(StateError::Unsolicited(pubrel.pkid))
}
if pubrel.reason != PubRelReason::Success {
return Err(StateError::PubRelFail {
reason: pubrel.reason,
});
}

let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
self.events.push_back(event);

Ok(Some(Packet::PubComp(PubComp::new(pubrel.pkid, None))))
}

fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result<Option<Packet>, StateError> {
Expand All @@ -465,22 +457,19 @@ impl MqttState {
.outgoing_rel
.get_mut(pubcomp.pkid as usize)
.ok_or(StateError::Unsolicited(pubcomp.pkid))?;
match pubrel.take() {
Some(_) => {
if pubcomp.reason != PubCompReason::Success {
return Err(StateError::PubCompFail {
reason: pubcomp.reason,
});
}
if pubrel.take().is_none() {
error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
return Err(StateError::Unsolicited(pubcomp.pkid));
}

self.inflight -= 1;
Ok(outgoing)
}
None => {
error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
Err(StateError::Unsolicited(pubcomp.pkid))
}
if pubcomp.reason != PubCompReason::Success {
return Err(StateError::PubCompFail {
reason: pubcomp.reason,
});
}

self.inflight -= 1;
Ok(outgoing)
}

fn handle_incoming_pingresp(&mut self) -> Result<Option<Packet>, StateError> {
Expand Down

0 comments on commit a7a0c54

Please sign in to comment.