diff --git a/src/lib.rs b/src/lib.rs index d620b44..8524fc6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,6 +64,7 @@ pub struct ChunksTimeout { cap: usize, // https://github.com/rust-lang-nursery/futures-rs/issues/1475 clock: Option, + clock_used: bool, duration: Duration, } @@ -74,6 +75,7 @@ where St: Stream, { unsafe_unpinned!(items: Vec); + unsafe_unpinned!(clock_used: bool); unsafe_pinned!(clock: Option); unsafe_pinned!(stream: Fuse); @@ -85,6 +87,7 @@ where items: Vec::with_capacity(capacity), cap: capacity, clock: None, + clock_used: false, duration, } } @@ -131,6 +134,9 @@ impl Stream for ChunksTimeout { type Item = Vec; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let duration = self.duration; + let make_clock = move || Delay::new(duration); + loop { match self.as_mut().stream().poll_next(cx) { Poll::Ready(item) => match item { @@ -139,11 +145,15 @@ impl Stream for ChunksTimeout { // the full one. Some(item) => { if self.items.is_empty() { - *self.as_mut().clock() = Some(Delay::new(self.duration)); + *self.as_mut().clock_used() = true; + self.as_mut() + .clock() + .get_or_insert_with(make_clock) + .reset(duration); } self.as_mut().items().push(item); if self.items.len() >= self.cap { - *self.as_mut().clock() = None; + *self.as_mut().clock_used() = false; return Poll::Ready(Some(self.as_mut().take())); } else { // Continue the loop @@ -168,26 +178,22 @@ impl Stream for ChunksTimeout { Poll::Pending => {} } - match self - .as_mut() - .clock() - .as_pin_mut() - .map(|clock| clock.poll(cx)) - { - Some(Poll::Ready(())) => { - *self.as_mut().clock() = None; + if !self.clock_used { + debug_assert!( + self.items().is_empty(), + "Inner buffer is empty, but clock is available." + ); + return Poll::Pending; + } + + let clock = self.as_mut().clock().get_mut().get_or_insert_with(make_clock); + match Pin::new(clock).poll(cx) { + Poll::Ready(()) => { + *self.as_mut().clock_used() = false; return Poll::Ready(Some(self.as_mut().take())); } - Some(Poll::Pending) => {} - None => { - debug_assert!( - self.items().is_empty(), - "Inner buffer is empty, but clock is available." - ); - } + Poll::Pending => return Poll::Pending, } - - return Poll::Pending; } }