Skip to content

Commit

Permalink
drop the next message in queue if it's stale
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Jan 31, 2024
1 parent cfa3275 commit fdd51e7
Showing 1 changed file with 30 additions and 13 deletions.
43 changes: 30 additions & 13 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,6 @@ impl EnabledHandler {
});
}

// We may need to inform the behviour if we have a dropped a message. This gets set if that
// is the case.
let mut dropped_message = None;

// process outbound stream
loop {
match std::mem::replace(
Expand All @@ -271,10 +267,11 @@ impl EnabledHandler {
} => {
if Pin::new(timeout).poll(cx).is_ready() {
// Inform the behaviour and end the poll.
dropped_message = Some(HandlerEvent::MessageDropped(message));
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));
break;
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::MessageDropped(message),
));
}
}
_ => {} // All other messages are not time-bound.
Expand Down Expand Up @@ -348,13 +345,7 @@ impl EnabledHandler {
}
}

// If there was a timeout in sending a message, inform the behaviour before restarting the
// poll
if let Some(handler_event) = dropped_message {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(handler_event));
}

// Handle inbound messages
// Handle inbound messages.
loop {
match std::mem::replace(
&mut self.inbound_substream,
Expand Down Expand Up @@ -419,6 +410,32 @@ impl EnabledHandler {
}
}

// Drop the next message in queue if it's stale.
let mut peakable = self.send_queue.clone().peekable();
if let Poll::Ready(Some(mut message)) = peakable.poll_next_unpin(cx) {
match message {
RpcOut::Publish {
message: _,
ref mut timeout,
}
| RpcOut::Forward {
message: _,
ref mut timeout,
} => {
if Pin::new(timeout).poll(cx).is_ready() {
// Drop the message.
let dropped = futures::ready!(self.send_queue.poll_next_unpin(cx))
.expect("There should be a message");
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::MessageDropped(dropped),
));
}
}
// the next message in queue is not time bound.
_ => {}
}
}

Poll::Pending
}
}
Expand Down

0 comments on commit fdd51e7

Please sign in to comment.