Skip to content

Commit

Permalink
fix: fix pipeline congestion shortcut synchronization (#1740)
Browse files Browse the repository at this point in the history
* fix: fix pipeline congestion shortcut synchronization

#1627 introduced a congestion shortcut mechanism, but this one was not
correctly synchronized. There was indeed situations experienced by
users in which congested flag was set and never reset, which implies a
drop of all successive messages (the publisher becomes kind of dead).

The congestion flag is in fact set after the deadline of a message is
reached, while it is unset when batches were refilled, only with
relaxed atomic operations. Also, after the deadline is reached, there
is no further check of the queue.

The most obvious synchronization flow here is that between the instant
where the thread is waken up because the deadline has been reached, and
the one where the congested flag is set, it is possible that the tx
task is unblocked and all the batches are sent and refilled. The
congested flags would been set after, so there would be no batch to
refill to unset it back. This flow seems hard to achieve when there are
many batches in the queue, but it is still theoretically possible. And
when fragmentation chain is dropped in the middle, pushing the
ephemeral batch takes more time before setting the congested flag; this
is precisely the case where the bug was observed by the way, so it may
indicate the described flow is the reason behind, but it's not sure.

The proposed fix adds a additional synchronization step after setting
the congested flag: we retry to push the message, this time with an
already expired deadline. If the batches were refilled, the message
should be able to be sent and the congestion flag will be reset.

If batches were in fact refilled, but the message was fragmented and
still ends by being dropped, it still means batches have been pushed
and will be refilled later, still unsetting the congested flag.

* Fix typo

* fix: typo

---------

Co-authored-by: Oussama Teffahi <[email protected]>
  • Loading branch information
wyfo and oteffahi authored Jan 24, 2025
1 parent d37fc80 commit eb3a7a4
Showing 1 changed file with 29 additions and 7 deletions.
36 changes: 29 additions & 7 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ struct StageIn {
impl StageIn {
fn push_network_message(
&mut self,
msg: &mut NetworkMessage,
msg: &NetworkMessage,
priority: Priority,
deadline: &mut Deadline,
) -> Result<bool, TransportClosed> {
Expand Down Expand Up @@ -315,7 +315,7 @@ impl StageIn {
// Get the current serialization batch.
let mut batch = zgetbatch_rets!();
// Attempt the serialization on the current batch
let e = match batch.encode(&*msg) {
let e = match batch.encode(msg) {
Ok(_) => zretok!(batch, msg),
Err(e) => e,
};
Expand All @@ -335,7 +335,7 @@ impl StageIn {

if let BatchError::NewFrame = e {
// Attempt a serialization with a new frame
if batch.encode((&*msg, &frame)).is_ok() {
if batch.encode((msg, &frame)).is_ok() {
zretok!(batch, msg);
}
}
Expand All @@ -347,7 +347,7 @@ impl StageIn {
}

// Attempt a second serialization on fully empty batch
if batch.encode((&*msg, &frame)).is_ok() {
if batch.encode((msg, &frame)).is_ok() {
zretok!(batch, msg);
}

Expand All @@ -361,7 +361,7 @@ impl StageIn {

let mut writer = self.fragbuf.writer();
let codec = Zenoh080::new();
codec.write(&mut writer, &*msg).unwrap();
codec.write(&mut writer, msg).unwrap();

// Fragment the whole message
let mut fragment = FragmentHeader {
Expand Down Expand Up @@ -783,7 +783,7 @@ impl TransmissionPipelineProducer {
#[inline]
pub(crate) fn push_network_message(
&self,
mut msg: NetworkMessage,
msg: NetworkMessage,
) -> Result<bool, TransportClosed> {
// If the queue is not QoS, it means that we only have one priority with index 0.
let (idx, priority) = if self.stage_in.len() > 1 {
Expand All @@ -806,9 +806,31 @@ impl TransmissionPipelineProducer {
let mut deadline = Deadline::new(wait_time, max_wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
let sent = queue.push_network_message(&mut msg, priority, &mut deadline)?;
// Check again for congestion in case it happens when blocking on the mutex.
if self.status.is_congested(priority) {
return Ok(false);
}
let mut sent = queue.push_network_message(&msg, priority, &mut deadline)?;
// If the message cannot be sent, mark the pipeline as congested.
if !sent {
self.status.set_congested(priority, true);
// During the time between deadline wakeup and setting the congested flag,
// all batches could have been refilled (especially if there is a single one),
// so try again with the same already expired deadline.
sent = queue.push_network_message(&msg, priority, &mut deadline)?;
// If the message is sent in the end, reset the status.
// Setting the status to `true` is only done with the stage_in mutex acquired,
// so it is not possible that further messages see the congestion flag set
// after this point.
if sent {
self.status.set_congested(priority, false);
}
// There is one edge case that is fortunately supported: if the message that
// has been pushed again is fragmented, we might have some batches actually
// refilled, but still end with dropping the message, not resetting the
// congested flag in that case. However, if some batches were available,
// that means that they would have still been pushed, so we can expect them to
// be refilled, and they will eventually unset the congested flag.
}
Ok(sent)
}
Expand Down

0 comments on commit eb3a7a4

Please sign in to comment.