Skip to content

Commit

Permalink
Buffer and session supports push into
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 12, 2024
1 parent 83780ec commit e639eb1
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,15 @@ impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P
}
}

impl<T, CB, P> Buffer<T, CB, P>
impl<T, CB, P, D> PushInto<D> for Buffer<T, CB, P>
where
T: Eq+Clone,
CB: ContainerBuilder,
CB: ContainerBuilder + PushInto<D>,
P: Push<Bundle<T, CB::Container>>
{
// Push a single item into the builder. Internal method for use by `Session`.
#[inline]
fn give<D>(&mut self, data: D) where CB: PushInto<D> {
self.builder.push_into(data);
fn push_into(&mut self, item: D) {
self.builder.push_into(item);
self.extract();
}
}
Expand Down Expand Up @@ -164,7 +163,7 @@ where
/// Provides one record at the time specified by the `Session`.
#[inline]
pub fn give<D>(&mut self, data: D) where CB: PushInto<D> {
self.buffer.give(data);
self.push_into(data);
}

/// Provides an iterator of records at the time specified by the `Session`.
Expand All @@ -175,11 +174,23 @@ where
CB: PushInto<I::Item>,
{
for item in iter {
self.give(item);
self.push_into(item);
}
}
}

impl<'a, T, CB, P, D> PushInto<D> for Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + PushInto<D> + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
{
#[inline]
fn push_into(&mut self, item: D) {
self.buffer.push_into(item);
}
}

/// A session which will flush itself when dropped.
pub struct AutoflushSession<'a, T, CB, P>
where
Expand All @@ -205,7 +216,7 @@ where
where
CB: PushInto<D>,
{
self.buffer.give(data);
self.push_into(data);
}

/// Transmits records produced by an iterator.
Expand All @@ -216,10 +227,21 @@ where
CB: PushInto<D>,
{
for item in iter {
self.give(item);
self.push_into(item);
}
}
}
impl<'a, T, CB, P, D> PushInto<D> for AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + PushInto<D> + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
{
#[inline]
fn push_into(&mut self, item: D) {
self.buffer.push_into(item);
}
}

impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P>
where
Expand Down

0 comments on commit e639eb1

Please sign in to comment.