Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix pipeline congestion shortcut synchronization #1740

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ struct StageIn {
impl StageIn {
fn push_network_message(
&mut self,
msg: &mut NetworkMessage,
msg: &NetworkMessage,
priority: Priority,
deadline: &mut Deadline,
) -> Result<bool, TransportClosed> {
Expand Down Expand Up @@ -315,7 +315,7 @@ impl StageIn {
// Get the current serialization batch.
let mut batch = zgetbatch_rets!();
// Attempt the serialization on the current batch
let e = match batch.encode(&*msg) {
let e = match batch.encode(msg) {
Ok(_) => zretok!(batch, msg),
Err(e) => e,
};
Expand All @@ -335,7 +335,7 @@ impl StageIn {

if let BatchError::NewFrame = e {
// Attempt a serialization with a new frame
if batch.encode((&*msg, &frame)).is_ok() {
if batch.encode((msg, &frame)).is_ok() {
zretok!(batch, msg);
}
}
Expand All @@ -347,7 +347,7 @@ impl StageIn {
}

// Attempt a second serialization on fully empty batch
if batch.encode((&*msg, &frame)).is_ok() {
if batch.encode((msg, &frame)).is_ok() {
zretok!(batch, msg);
}

Expand All @@ -361,7 +361,7 @@ impl StageIn {

let mut writer = self.fragbuf.writer();
let codec = Zenoh080::new();
codec.write(&mut writer, &*msg).unwrap();
codec.write(&mut writer, msg).unwrap();

// Fragment the whole message
let mut fragment = FragmentHeader {
Expand Down Expand Up @@ -783,7 +783,7 @@ impl TransmissionPipelineProducer {
#[inline]
pub(crate) fn push_network_message(
&self,
mut msg: NetworkMessage,
msg: NetworkMessage,
) -> Result<bool, TransportClosed> {
// If the queue is not QoS, it means that we only have one priority with index 0.
let (idx, priority) = if self.stage_in.len() > 1 {
Expand All @@ -806,9 +806,31 @@ impl TransmissionPipelineProducer {
let mut deadline = Deadline::new(wait_time, max_wait_time);
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
let sent = queue.push_network_message(&mut msg, priority, &mut deadline)?;
// Check again for congestion in case it happens when blocking on the mutex.
if self.status.is_congested(priority) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mallets are you ok with this addition?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

return Ok(false);
}
let mut sent = queue.push_network_message(&msg, priority, &mut deadline)?;
// If the message cannot be sent, mark the pipeline as congested.
if !sent {
self.status.set_congested(priority, true);
// During the time between deadline wakeup and setting the congested flag,
// all batches could have been refilled (especially if there is a single one),
// so try again with the same already expired deadline.
sent = queue.push_network_message(&msg, priority, &mut deadline)?;
// If the message is sent in the end, reset the status.
// Setting the status to `true` is only done with the stage_in mutex acquired,
// so it is not possible that further messages see the congestion flag set
// after this point.
if sent {
self.status.set_congested(priority, false);
}
// There is one edge case that is fortunately supported: if the message that
// has been pushed again is fragmented, we might have some batches actually
// refilled, but still end with dropping the message, not resetting the
// congested flag in that case. However, if some batches were available,
// that means that they would have still been pushed, so we can expect them to
// be refilled, and they will eventually unset the congested flag.
}
Ok(sent)
}
Expand Down
Loading