diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 59a7e6e22..a89b89bd9 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use std::{ + ops::Add, sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, Arc, Mutex, MutexGuard, @@ -127,37 +128,70 @@ impl StageInMutex { } } -struct Deadline { - deadline: Option>, - wait_time: Duration, +enum DeadlineSetting { + Immediate, + Infinite, + Finite(Instant), } -impl Deadline { - fn new(wait_time: Duration) -> Self { - let deadline = Self::calc_deadline(wait_time); +struct LazyDeadline { + deadline: Option, + wait_time: Option, +} + +impl LazyDeadline { + fn new(wait_time: Option) -> Self { Self { - deadline, + deadline: None, wait_time, } } - #[inline] - fn wait(&self, s_ref: &StageInRefill) -> bool { - match self.deadline { - None => false, - Some(None) => s_ref.wait(), - Some(Some(deadline)) => s_ref.wait_deadline(deadline), + fn advance(&mut self) { + let wait_time = self.wait_time; + match &mut self.deadline() { + DeadlineSetting::Immediate => {} + DeadlineSetting::Infinite => {} + DeadlineSetting::Finite(instant) => { + *instant = instant.add(unsafe { wait_time.unwrap_unchecked() }); + } } } #[inline] - fn on_next_fragment(&mut self) { - self.deadline = Self::calc_deadline(self.wait_time); + fn deadline(&mut self) -> &mut DeadlineSetting { + self.deadline.get_or_insert_with(|| match self.wait_time { + Some(wait_time) => match wait_time.is_zero() { + true => DeadlineSetting::Immediate, + false => DeadlineSetting::Finite(Instant::now().add(wait_time)), + }, + None => DeadlineSetting::Infinite, + }) + } +} + +struct Deadline { + lazy_deadline: LazyDeadline, +} + +impl Deadline { + fn new(wait_time: Option) -> Self { + Self { + lazy_deadline: LazyDeadline::new(wait_time), + } } #[inline] - fn calc_deadline(wait_time: Duration) -> Option> { - (!wait_time.is_zero()).then_some(Instant::now().checked_add(wait_time)) + fn wait(&mut self, s_ref: &StageInRefill) -> bool { + match self.lazy_deadline.deadline() { + DeadlineSetting::Immediate => false, + DeadlineSetting::Infinite => s_ref.wait(), + DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant), + } + } + + fn on_next_fragment(&mut self) { + self.lazy_deadline.advance(); } } @@ -198,7 +232,6 @@ impl StageIn { drop(c_guard); // Wait for an available batch until deadline if !deadline.wait(&self.s_ref) { -// tracing::warn!("Zenoh message dropped because of deadline",); // Still no available batch. // Restore the sequence number and drop the message $restore_sn; @@ -661,7 +694,7 @@ impl TransmissionPipelineProducer { } else { self.wait_before_close }; - let mut deadline = Deadline::new(wait_time); + let mut deadline = Deadline::new(Some(wait_time)); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); queue.push_network_message(&mut msg, priority, &mut deadline)