Skip to content

Commit

Permalink
Offer PushContainer::should_flush
Browse files Browse the repository at this point in the history
The `should_flush` function returns `true` if the holder of a container
should consider it full and flush it.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Apr 23, 2024
1 parent 819135e commit 4792520
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
8 changes: 8 additions & 0 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ pub trait PushContainer: Container {
fn preferred_capacity() -> usize;
/// Reserve space for `additional` elements, possibly increasing the capacity of the container.
fn reserve(&mut self, additional: usize);
/// Return `true` if the container should be flushed.
///
/// The default implementation returns `true` if the length is larger or equal
/// to the [`preferred_capacity`].
#[inline]
fn should_flush(&self) -> bool {
Self::preferred_capacity() <= self.len()
}
}

impl<T: Clone + 'static> Container for Vec<T> {
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<T, C: PushContainer, P: Push<Bundle<T, C>>> Buffer<T, C, P> where T: Eq+Clo
self.buffer.reserve(to_reserve);
}
self.buffer.push(data);
if self.buffer.len() >= C::preferred_capacity() {
if self.buffer.should_flush() {
self.flush();
}
}
Expand Down
6 changes: 5 additions & 1 deletion timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,12 @@ impl<T: Timestamp, C: PushContainer> Handle<T, C> {
/// });
/// ```
pub fn send<D: PushInto<C>>(&mut self, data: D) {
if self.buffer1.capacity() < C::preferred_capacity() {
let to_reserve = C::preferred_capacity() - self.buffer1.capacity();
self.buffer1.reserve(to_reserve);
}
self.buffer1.push(data);
if self.buffer1.len() == self.buffer1.capacity() {
if self.buffer1.should_flush() {
self.flush();
}
}
Expand Down

0 comments on commit 4792520

Please sign in to comment.