Skip to content

Commit

Permalink
optimizations for batching deadline
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Oct 8, 2024
1 parent 74f0ab6 commit 4375599
Showing 1 changed file with 52 additions and 19 deletions.
71 changes: 52 additions & 19 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use std::{
ops::Add,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc, Mutex, MutexGuard,
Expand Down Expand Up @@ -127,37 +128,70 @@ impl StageInMutex {
}
}

struct Deadline {
deadline: Option<Option<Instant>>,
wait_time: Duration,
enum DeadlineSetting {
Immediate,
Infinite,
Finite(Instant),
}

impl Deadline {
fn new(wait_time: Duration) -> Self {
let deadline = Self::calc_deadline(wait_time);
struct LazyDeadline {
deadline: Option<DeadlineSetting>,
wait_time: Option<Duration>,
}

impl LazyDeadline {
fn new(wait_time: Option<Duration>) -> Self {
Self {
deadline,
deadline: None,
wait_time,
}
}

#[inline]
fn wait(&self, s_ref: &StageInRefill) -> bool {
match self.deadline {
None => false,
Some(None) => s_ref.wait(),
Some(Some(deadline)) => s_ref.wait_deadline(deadline),
fn advance(&mut self) {
let wait_time = self.wait_time;
match &mut self.deadline() {
DeadlineSetting::Immediate => {}
DeadlineSetting::Infinite => {}
DeadlineSetting::Finite(instant) => {
*instant = instant.add(unsafe { wait_time.unwrap_unchecked() });
}
}
}

#[inline]
fn on_next_fragment(&mut self) {
self.deadline = Self::calc_deadline(self.wait_time);
fn deadline(&mut self) -> &mut DeadlineSetting {
self.deadline.get_or_insert_with(|| match self.wait_time {
Some(wait_time) => match wait_time.is_zero() {
true => DeadlineSetting::Immediate,
false => DeadlineSetting::Finite(Instant::now().add(wait_time)),
},
None => DeadlineSetting::Infinite,
})
}
}

struct Deadline {
lazy_deadline: LazyDeadline,
}

impl Deadline {
fn new(wait_time: Option<Duration>) -> Self {
Self {
lazy_deadline: LazyDeadline::new(wait_time),
}
}

#[inline]
fn calc_deadline(wait_time: Duration) -> Option<Option<Instant>> {
(!wait_time.is_zero()).then_some(Instant::now().checked_add(wait_time))
fn wait(&mut self, s_ref: &StageInRefill) -> bool {
match self.lazy_deadline.deadline() {
DeadlineSetting::Immediate => false,
DeadlineSetting::Infinite => s_ref.wait(),
DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant),
}
}

fn on_next_fragment(&mut self) {
self.lazy_deadline.advance();
}
}

Expand Down Expand Up @@ -198,7 +232,6 @@ impl StageIn {
drop(c_guard);
// Wait for an available batch until deadline
if !deadline.wait(&self.s_ref) {
// tracing::warn!("Zenoh message dropped because of deadline",);
// Still no available batch.
// Restore the sequence number and drop the message
$restore_sn;
Expand Down Expand Up @@ -661,7 +694,7 @@ impl TransmissionPipelineProducer {
} else {
self.wait_before_close
};
let mut deadline = Deadline::new(wait_time);
let mut deadline = Deadline::new(Some(wait_time));
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
queue.push_network_message(&mut msg, priority, &mut deadline)
Expand Down

0 comments on commit 4375599

Please sign in to comment.