diff --git a/container/src/lib.rs b/container/src/lib.rs index c11340026..8323b72ae 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -50,7 +50,7 @@ pub trait Container: Default + Clone + 'static { type DrainIter<'a>: Iterator>; /// Returns an iterator that drains the contents of this container. - // TODO: What invariants should hold after `drain` returns? + /// Drain leaves the container in an undefined state. fn drain<'a>(&'a mut self) -> Self::DrainIter<'a>; } @@ -63,6 +63,10 @@ pub trait PushInto { /// A type that has the necessary infrastructure to push elements, without specifying how pushing /// itself works. For this, pushable types should implement [`PushInto`]. pub trait PushContainer: Container { + /// Push `item` into self + fn push>(&mut self, item: T) { + item.push_into(self) + } /// Return the capacity of the container. fn capacity(&self) -> usize; /// Return the preferred capacity of the container. @@ -91,10 +95,10 @@ impl Container for Vec { self.as_slice().iter() } - type DrainIter<'a> = as IntoIterator>::IntoIter; + type DrainIter<'a> = std::vec::Drain<'a, T>; - fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { - IntoIterator::into_iter(std::mem::take(self)) + fn drain(&mut self) -> Self::DrainIter<'_> { + self.drain(..) } } @@ -235,7 +239,7 @@ impl PushPartitioned for T where for<'a> T::Item<'a> for datum in self.drain() { let index = index(&datum); ensure_capacity(&mut buffers[index]); - datum.push_into(&mut buffers[index]); + buffers[index].push(datum); if buffers[index].len() == buffers[index].capacity() { flush(index, &mut buffers[index]); } diff --git a/timely/examples/wordcount_flatcontainer.rs b/timely/examples/wordcount_flatcontainer.rs index ef4f34557..1b81d9305 100644 --- a/timely/examples/wordcount_flatcontainer.rs +++ b/timely/examples/wordcount_flatcontainer.rs @@ -3,7 +3,7 @@ extern crate timely; use std::collections::HashMap; use timely::dataflow::{InputHandle, ProbeHandle}; -use timely::dataflow::operators::containers::Map; +use timely::dataflow::operators::core::Map; use timely::dataflow::operators::{Operator, Inspect, Probe}; use timely::dataflow::channels::pact::ExchangeCore; use timely_container::flatcontainer::{Containerized, FlatStack}; diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index cfce8bbc2..b73076c7c 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -75,7 +75,7 @@ where // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. impl ParallelizationContractCore for ExchangeCore where - C: Data + Container + PushPartitioned, + C: Data + PushPartitioned, for<'a> H: FnMut(&C::Item<'a>) -> u64 { type Pusher = ExchangePusher>>>, H>; diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 21da8e610..4ec5c9bb2 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -82,7 +82,7 @@ impl>> BufferCore where T: Eq } } -impl>> BufferCore where T: Eq+Clone { +impl>> BufferCore where T: Eq+Clone { // internal method for use by `Session`. #[inline] fn give>(&mut self, data: D) { @@ -124,7 +124,7 @@ impl<'a, T, C: Container, P: Push>+'a> Session<'a, T, C, P> wh } } -impl<'a, T, C: Container + PushContainer, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { +impl<'a, T, C: PushContainer, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { /// Provides one record at the time specified by the `Session`. #[inline] pub fn give>(&mut self, data: D) { diff --git a/timely/src/dataflow/operators/broadcast.rs b/timely/src/dataflow/operators/broadcast.rs index a1d6f2378..4013f332f 100644 --- a/timely/src/dataflow/operators/broadcast.rs +++ b/timely/src/dataflow/operators/broadcast.rs @@ -2,8 +2,7 @@ use crate::ExchangeData; use crate::dataflow::{Scope, Stream}; -use crate::dataflow::operators::containers::Map; -use crate::dataflow::operators::{Exchange}; +use crate::dataflow::operators::{Map, Exchange}; /// Broadcast records to all workers. pub trait Broadcast { diff --git a/timely/src/dataflow/operators/containers/map.rs b/timely/src/dataflow/operators/core/map.rs similarity index 96% rename from timely/src/dataflow/operators/containers/map.rs rename to timely/src/dataflow/operators/core/map.rs index 2f50b91f5..d7d467a33 100644 --- a/timely/src/dataflow/operators/containers/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -11,7 +11,7 @@ pub trait Map { /// /// # Examples /// ``` - /// use timely::dataflow::operators::containers::Map; + /// use timely::dataflow::operators::core::Map; /// use timely::dataflow::operators::{ToStream, Inspect}; /// /// timely::example(|scope| { @@ -27,7 +27,7 @@ pub trait Map { /// /// # Examples /// ``` - /// use timely::dataflow::operators::containers::Map; + /// use timely::dataflow::operators::core::Map; /// use timely::dataflow::operators::{ToStream, Inspect}; /// /// timely::example(|scope| { diff --git a/timely/src/dataflow/operators/containers/mod.rs b/timely/src/dataflow/operators/core/mod.rs similarity index 100% rename from timely/src/dataflow/operators/containers/mod.rs rename to timely/src/dataflow/operators/core/mod.rs diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 99ac633a2..adf4e4a30 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -34,7 +34,7 @@ pub use self::generic::{Notificator, FrontierNotificator}; pub use self::reclock::Reclock; pub use self::count::Accumulate; -pub mod containers; +pub mod core; pub mod enterleave; pub mod input;