From f02c699d7c7007dba4fb001d53f092d7c12719b1 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Mon, 7 Oct 2024 22:36:07 -0700 Subject: [PATCH] chore: simplified generator relying purely on Polling (#2132) Signed-off-by: Vigith Maurice --- rust/numaflow-core/src/source/generator.rs | 84 ++++++++-------------- 1 file changed, 28 insertions(+), 56 deletions(-) diff --git a/rust/numaflow-core/src/source/generator.rs b/rust/numaflow-core/src/source/generator.rs index 588df6126..611969c74 100644 --- a/rust/numaflow-core/src/source/generator.rs +++ b/rust/numaflow-core/src/source/generator.rs @@ -19,7 +19,7 @@ use std::time::Duration; /// 2 batches only 1 batch (no reread) 5 5 5 /// /// ``` -/// NOTE: The minimum granularity of duration is 5ms. +/// NOTE: The minimum granularity of duration is 10ms. mod stream_generator { use bytes::Bytes; use futures::Stream; @@ -27,7 +27,7 @@ mod stream_generator { use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; - use tokio::time::{Instant, MissedTickBehavior}; + use tokio::time::MissedTickBehavior; #[pin_project] pub(super) struct StreamGenerator { @@ -37,13 +37,9 @@ mod stream_generator { rpu: usize, /// batch size per read batch: usize, - /// unit of time-period over which the [rpu] is defined. - unit: Duration, /// the amount of credits used for the current time-period. /// remaining = (rpu - used) for that time-period used: usize, - /// the last time we generated data. now() - prev_time is compared against the duration. - prev_time: Instant, #[pin] tick: tokio::time::Interval, } @@ -58,12 +54,7 @@ mod stream_generator { rpu, // batch cannot > rpu batch: if batch > rpu { rpu } else { batch }, - unit, used: 0, - // rewind a bit to return on the first call - prev_time: Instant::now() - .checked_sub(unit) - .expect("subtraction cannot fail"), tick, } } @@ -76,52 +67,33 @@ mod stream_generator { mut self: Pin<&mut StreamGenerator>, cx: &mut Context<'_>, ) -> Poll> { - let this = self.as_mut().project(); - - // Calculate the elapsed time since the last poll - let elapsed = this.prev_time.elapsed(); - - // we can return the complete batch if enough time has passed, with a precision +- 5ms - if elapsed >= *this.unit { - // Reset the timer - *this.prev_time = Instant::now(); - - // Generate data that equals to batch data - let data = vec![this.content.clone(); *this.batch]; - // reset used quota - *this.used = *this.batch; - - Poll::Ready(Some(data)) - } else if this.used < this.rpu { - // even if enough time hasn't passed, we can still send data if we have - // quota (rpu - used) left - - // make sure we do not send more than desired - let to_send = if *this.rpu - *this.used < *this.batch { - *this.rpu - *this.used - } else { - *this.batch - }; - - // update the counters - *this.used += to_send; - - Poll::Ready(Some(vec![this.content.clone(); to_send])) - } else { - // we have to wait for the next tick because we are out of quota - let mut tick = this.tick; - match tick.poll_tick(cx) { - Poll::Ready(_) => { - // reset the prev_time as we are quite certain that we should be returning - // data, else we would have been in Pending - *this.prev_time = (*this.prev_time) - .checked_sub(elapsed) - .expect("subtraction cannot fail"); - - // we can recurse ourselves to return data since enough time has passed - self.poll_next(cx) + let mut this = self.as_mut().project(); + + match this.tick.poll_tick(cx) { + // Poll::Ready means we are ready to send data the whole batch since enough time + // has passed. + Poll::Ready(_) => { + // generate data that equals to batch data + let data = vec![this.content.clone(); *this.batch]; + // reset used quota + *this.used = *this.batch; + + Poll::Ready(Some(data)) + } + Poll::Pending => { + // even if enough time hasn't passed, we can still send data if we have + // quota (rpu - used) left + if this.used < this.rpu { + // make sure we do not send more than desired + let to_send = std::cmp::min(*this.rpu - *this.used, *this.batch); + + // update the counters + *this.used += to_send; + + Poll::Ready(Some(vec![this.content.clone(); to_send])) + } else { + Poll::Pending } - Poll::Pending => Poll::Pending, } } }