diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index bc9319a7d1..d982a905a8 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -254,7 +254,7 @@ struct StageIn { impl StageIn { fn push_network_message( &mut self, - msg: &mut NetworkMessage, + msg: &NetworkMessage, priority: Priority, deadline: &mut Deadline, ) -> Result { @@ -315,7 +315,7 @@ impl StageIn { // Get the current serialization batch. let mut batch = zgetbatch_rets!(); // Attempt the serialization on the current batch - let e = match batch.encode(&*msg) { + let e = match batch.encode(msg) { Ok(_) => zretok!(batch, msg), Err(e) => e, }; @@ -335,7 +335,7 @@ impl StageIn { if let BatchError::NewFrame = e { // Attempt a serialization with a new frame - if batch.encode((&*msg, &frame)).is_ok() { + if batch.encode((msg, &frame)).is_ok() { zretok!(batch, msg); } } @@ -347,7 +347,7 @@ impl StageIn { } // Attempt a second serialization on fully empty batch - if batch.encode((&*msg, &frame)).is_ok() { + if batch.encode((msg, &frame)).is_ok() { zretok!(batch, msg); } @@ -361,7 +361,7 @@ impl StageIn { let mut writer = self.fragbuf.writer(); let codec = Zenoh080::new(); - codec.write(&mut writer, &*msg).unwrap(); + codec.write(&mut writer, msg).unwrap(); // Fragment the whole message let mut fragment = FragmentHeader { @@ -783,7 +783,7 @@ impl TransmissionPipelineProducer { #[inline] pub(crate) fn push_network_message( &self, - mut msg: NetworkMessage, + msg: NetworkMessage, ) -> Result { // If the queue is not QoS, it means that we only have one priority with index 0. let (idx, priority) = if self.stage_in.len() > 1 { @@ -806,9 +806,31 @@ impl TransmissionPipelineProducer { let mut deadline = Deadline::new(wait_time, max_wait_time); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); - let sent = queue.push_network_message(&mut msg, priority, &mut deadline)?; + // Check again for congestion in case it happens when blocking on the mutex. + if self.status.is_congested(priority) { + return Ok(false); + } + let mut sent = queue.push_network_message(&msg, priority, &mut deadline)?; + // If the message cannot be sent, mark the pipeline as congested. if !sent { self.status.set_congested(priority, true); + // During the time between deadline wakeup and setting the congested flag, + // all batches could have been refilled (especially if there is a single one), + // so try again with the same already expired deadline. + sent = queue.push_network_message(&msg, priority, &mut deadline)?; + // If the message is sent in the end, reset the status. + // Setting the status to `true` is only done with the stage_in mutex acquired, + // so it is not possible that further messages see the congestion flag set + // after this point. + if sent { + self.status.set_congested(priority, false); + } + // There is one edge case that is fortunately supported: if the message that + // has been pushed again is fragmented, we might have some batches actually + // refilled, but still end with dropping the message, not resetting the + // congested flag in that case. However, if some batches were available, + // that means that they would have still been pushed, so we can expect them to + // be refilled, and they will eventually unset the congested flag. } Ok(sent) }