Skip to content

Commit

Permalink
Address some feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Feb 17, 2024
1 parent 4c61247 commit 80ff1ea
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 14 deletions.
14 changes: 9 additions & 5 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub trait Container: Default + Clone + 'static {
type DrainIter<'a>: Iterator<Item=Self::Item<'a>>;

/// Returns an iterator that drains the contents of this container.
// TODO: What invariants should hold after `drain` returns?
/// Drain leaves the container in an undefined state.
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a>;
}

Expand All @@ -63,6 +63,10 @@ pub trait PushInto<C> {
/// A type that has the necessary infrastructure to push elements, without specifying how pushing
/// itself works. For this, pushable types should implement [`PushInto`].
pub trait PushContainer: Container {
/// Push `item` into self
fn push<T: PushInto<Self>>(&mut self, item: T) {
item.push_into(self)
}
/// Return the capacity of the container.
fn capacity(&self) -> usize;
/// Return the preferred capacity of the container.
Expand Down Expand Up @@ -91,10 +95,10 @@ impl<T: Clone + 'static> Container for Vec<T> {
self.as_slice().iter()
}

type DrainIter<'a> = <Vec<T> as IntoIterator>::IntoIter;
type DrainIter<'a> = std::vec::Drain<'a, T>;

fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
IntoIterator::into_iter(std::mem::take(self))
fn drain(&mut self) -> Self::DrainIter<'_> {
self.drain(..)
}
}

Expand Down Expand Up @@ -235,7 +239,7 @@ impl<T: PushContainer + 'static> PushPartitioned for T where for<'a> T::Item<'a>
for datum in self.drain() {
let index = index(&datum);
ensure_capacity(&mut buffers[index]);
datum.push_into(&mut buffers[index]);
buffers[index].push(datum);
if buffers[index].len() == buffers[index].capacity() {
flush(index, &mut buffers[index]);
}
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/wordcount_flatcontainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ extern crate timely;
use std::collections::HashMap;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::containers::Map;
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::{Operator, Inspect, Probe};
use timely::dataflow::channels::pact::ExchangeCore;
use timely_container::flatcontainer::{Containerized, FlatStack};
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Timestamp, C, H: 'static> ParallelizationContractCore<T, C> for ExchangeCore<C, H>
where
C: Data + Container + PushPartitioned,
C: Data + PushPartitioned,
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<BundleCore<T, C>>>>, H>;
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<T, C: Container, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq
}
}

impl<T, C: Container + PushContainer, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq+Clone {
impl<T, C: PushContainer, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq+Clone {
// internal method for use by `Session`.
#[inline]
fn give<D: PushInto<C>>(&mut self, data: D) {
Expand Down Expand Up @@ -124,7 +124,7 @@ impl<'a, T, C: Container, P: Push<BundleCore<T, C>>+'a> Session<'a, T, C, P> wh
}
}

impl<'a, T, C: Container + PushContainer, P: Push<BundleCore<T, C>>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
impl<'a, T, C: PushContainer, P: Push<BundleCore<T, C>>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
/// Provides one record at the time specified by the `Session`.
#[inline]
pub fn give<D: PushInto<C>>(&mut self, data: D) {
Expand Down
3 changes: 1 addition & 2 deletions timely/src/dataflow/operators/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

use crate::ExchangeData;
use crate::dataflow::{Scope, Stream};
use crate::dataflow::operators::containers::Map;
use crate::dataflow::operators::{Exchange};
use crate::dataflow::operators::{Map, Exchange};

/// Broadcast records to all workers.
pub trait Broadcast<C: ExchangeData> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub trait Map<S: Scope, C: Container> {
///
/// # Examples
/// ```
/// use timely::dataflow::operators::containers::Map;
/// use timely::dataflow::operators::core::Map;
/// use timely::dataflow::operators::{ToStream, Inspect};
///
/// timely::example(|scope| {
Expand All @@ -27,7 +27,7 @@ pub trait Map<S: Scope, C: Container> {
///
/// # Examples
/// ```
/// use timely::dataflow::operators::containers::Map;
/// use timely::dataflow::operators::core::Map;
/// use timely::dataflow::operators::{ToStream, Inspect};
///
/// timely::example(|scope| {
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use self::generic::{Notificator, FrontierNotificator};
pub use self::reclock::Reclock;
pub use self::count::Accumulate;

pub mod containers;
pub mod core;

pub mod enterleave;
pub mod input;
Expand Down

0 comments on commit 80ff1ea

Please sign in to comment.