Skip to content

Commit

Permalink
chore: simplified generator relying purely on Polling (#2132)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith authored Oct 8, 2024
1 parent ceb8f5b commit f02c699
Showing 1 changed file with 28 additions and 56 deletions.
84 changes: 28 additions & 56 deletions rust/numaflow-core/src/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ use std::time::Duration;
/// 2 batches only 1 batch (no reread) 5 5 5
///
/// ```
/// NOTE: The minimum granularity of duration is 5ms.
/// NOTE: The minimum granularity of duration is 10ms.
mod stream_generator {
use bytes::Bytes;
use futures::Stream;
use pin_project::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::{Instant, MissedTickBehavior};
use tokio::time::MissedTickBehavior;

#[pin_project]
pub(super) struct StreamGenerator {
Expand All @@ -37,13 +37,9 @@ mod stream_generator {
rpu: usize,
/// batch size per read
batch: usize,
/// unit of time-period over which the [rpu] is defined.
unit: Duration,
/// the amount of credits used for the current time-period.
/// remaining = (rpu - used) for that time-period
used: usize,
/// the last time we generated data. now() - prev_time is compared against the duration.
prev_time: Instant,
#[pin]
tick: tokio::time::Interval,
}
Expand All @@ -58,12 +54,7 @@ mod stream_generator {
rpu,
// batch cannot > rpu
batch: if batch > rpu { rpu } else { batch },
unit,
used: 0,
// rewind a bit to return on the first call
prev_time: Instant::now()
.checked_sub(unit)
.expect("subtraction cannot fail"),
tick,
}
}
Expand All @@ -76,52 +67,33 @@ mod stream_generator {
mut self: Pin<&mut StreamGenerator>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.as_mut().project();

// Calculate the elapsed time since the last poll
let elapsed = this.prev_time.elapsed();

// we can return the complete batch if enough time has passed, with a precision +- 5ms
if elapsed >= *this.unit {
// Reset the timer
*this.prev_time = Instant::now();

// Generate data that equals to batch data
let data = vec![this.content.clone(); *this.batch];
// reset used quota
*this.used = *this.batch;

Poll::Ready(Some(data))
} else if this.used < this.rpu {
// even if enough time hasn't passed, we can still send data if we have
// quota (rpu - used) left

// make sure we do not send more than desired
let to_send = if *this.rpu - *this.used < *this.batch {
*this.rpu - *this.used
} else {
*this.batch
};

// update the counters
*this.used += to_send;

Poll::Ready(Some(vec![this.content.clone(); to_send]))
} else {
// we have to wait for the next tick because we are out of quota
let mut tick = this.tick;
match tick.poll_tick(cx) {
Poll::Ready(_) => {
// reset the prev_time as we are quite certain that we should be returning
// data, else we would have been in Pending
*this.prev_time = (*this.prev_time)
.checked_sub(elapsed)
.expect("subtraction cannot fail");

// we can recurse ourselves to return data since enough time has passed
self.poll_next(cx)
let mut this = self.as_mut().project();

match this.tick.poll_tick(cx) {
// Poll::Ready means we are ready to send data the whole batch since enough time
// has passed.
Poll::Ready(_) => {
// generate data that equals to batch data
let data = vec![this.content.clone(); *this.batch];
// reset used quota
*this.used = *this.batch;

Poll::Ready(Some(data))
}
Poll::Pending => {
// even if enough time hasn't passed, we can still send data if we have
// quota (rpu - used) left
if this.used < this.rpu {
// make sure we do not send more than desired
let to_send = std::cmp::min(*this.rpu - *this.used, *this.batch);

// update the counters
*this.used += to_send;

Poll::Ready(Some(vec![this.content.clone(); to_send]))
} else {
Poll::Pending
}
Poll::Pending => Poll::Pending,
}
}
}
Expand Down

0 comments on commit f02c699

Please sign in to comment.