diff --git a/container/Cargo.toml b/container/Cargo.toml index 6b3e2d65d..06abc9123 100644 --- a/container/Cargo.toml +++ b/container/Cargo.toml @@ -7,4 +7,5 @@ license = "MIT" [dependencies] columnation = { git = "https://github.com/frankmcsherry/columnation" } +flatcontainer = "0.1" serde = { version = "1.0"} diff --git a/container/src/columnation.rs b/container/src/columnation.rs index 54a00ca4e..3da6b30e6 100644 --- a/container/src/columnation.rs +++ b/container/src/columnation.rs @@ -124,6 +124,24 @@ impl TimelyStack { }); (length, capacity) } + + /// The length in items. + #[inline] + pub fn len(&self) -> usize { + self.local.len() + } + + /// The capacity of the local vector. + #[inline] + pub fn capacity(&self) -> usize { + self.local.capacity() + } + + /// Reserve space for `additional` elements. + #[inline] + pub fn reserve(&mut self, additional: usize) { + self.local.reserve(additional) + } } impl TimelyStack<(A, B)> { @@ -291,12 +309,14 @@ mod serde { } mod container { - use crate::{Container, PushPartitioned}; + use std::ops::Deref; + use crate::{Container, PushContainer}; use crate::columnation::{Columnation, TimelyStack}; impl Container for TimelyStack { - type Item = T; + type ItemRef<'a> = &'a T where Self: 'a; + type Item<'a> = &'a T where Self: 'a; fn len(&self) -> usize { self.local.len() @@ -306,38 +326,34 @@ mod container { self.local.is_empty() } - fn capacity(&self) -> usize { - self.local.capacity() - } - fn clear(&mut self) { TimelyStack::clear(self) } + + type Iter<'a> = std::slice::Iter<'a, T>; + + fn iter<'a>(&'a self) -> Self::Iter<'a> { + self.deref().iter() + } + + type DrainIter<'a> = std::slice::Iter<'a, T>; + + fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { + (&*self).iter() + } } - impl PushPartitioned for TimelyStack { - fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) - where - I: FnMut(&Self::Item) -> usize, - F: FnMut(usize, &mut Self), - { - fn ensure_capacity(this: &mut TimelyStack) { - let capacity = this.local.capacity(); - let desired_capacity = crate::buffer::default_capacity::(); - if capacity < desired_capacity { - this.local.reserve(desired_capacity - capacity); - } - } + impl PushContainer for TimelyStack { + fn capacity(&self) -> usize { + self.capacity() + } - for datum in &self[..] { - let index = index(&datum); - ensure_capacity(&mut buffers[index]); - buffers[index].copy(datum); - if buffers[index].len() == buffers[index].local.capacity() { - flush(index, &mut buffers[index]); - } - } - self.clear(); + fn preferred_capacity() -> usize { + crate::buffer::default_capacity::() + } + + fn reserve(&mut self, additional: usize) { + self.reserve(additional) } } } diff --git a/container/src/flatcontainer.rs b/container/src/flatcontainer.rs new file mode 100644 index 000000000..026eef5b9 --- /dev/null +++ b/container/src/flatcontainer.rs @@ -0,0 +1,50 @@ +//! Present a [`FlatStack`] as a timely container. + +pub use flatcontainer::*; +use crate::{buffer, Container, PushContainer, PushInto}; + +impl Container for FlatStack { + type ItemRef<'a> = R::ReadItem<'a> where Self: 'a; + type Item<'a> = R::ReadItem<'a> where Self: 'a; + + fn len(&self) -> usize { + self.len() + } + + fn clear(&mut self) { + self.clear() + } + + type Iter<'a> = <&'a Self as IntoIterator>::IntoIter; + + fn iter<'a>(&'a self) -> Self::Iter<'a> { + IntoIterator::into_iter(self) + } + + type DrainIter<'a> = Self::Iter<'a>; + + fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { + IntoIterator::into_iter(&*self) + } +} + +impl PushContainer for FlatStack { + fn capacity(&self) -> usize { + self.capacity() + } + + fn preferred_capacity() -> usize { + buffer::default_capacity::() + } + + fn reserve(&mut self, additional: usize) { + self.reserve(additional); + } +} + +impl> PushInto> for T { + #[inline] + fn push_into(self, target: &mut FlatStack) { + target.copy(self); + } +} diff --git a/container/src/lib.rs b/container/src/lib.rs index eb9b3ce77..75a71ffc7 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -3,6 +3,7 @@ #![forbid(missing_docs)] pub mod columnation; +pub mod flatcontainer; /// A container transferring data through dataflow edges /// @@ -17,14 +18,18 @@ pub mod columnation; /// is efficient (which is not necessarily the case when deriving `Clone`.) /// TODO: Don't require `Container: Clone` pub trait Container: Default + Clone + 'static { - /// The type of elements this container holds. - type Item; + /// The type of elements when reading non-destructively from the container. + type ItemRef<'a> where Self: 'a; + + /// The type of elements when draining the continer. + type Item<'a> where Self: 'a; /// The number of elements in this container /// /// The length of a container must be consistent between sending and receiving it. /// When exchanging a container and partitioning it into pieces, the sum of the length - /// of all pieces must be equal to the length of the original container. + /// of all pieces must be equal to the length of the original container. When combining + /// containers, the length of the result must be the sum of the individual parts. fn len(&self) -> usize; /// Determine if the container contains any elements, corresponding to `len() == 0`. @@ -32,16 +37,55 @@ pub trait Container: Default + Clone + 'static { self.len() == 0 } - /// The capacity of the underlying container - fn capacity(&self) -> usize; - /// Remove all contents from `self` while retaining allocated memory. /// After calling `clear`, `is_empty` must return `true` and `len` 0. fn clear(&mut self); + + /// Iterator type when reading from the container. + type Iter<'a>: Iterator>; + + /// Returns an iterator that reads the contents of this container. + fn iter<'a>(&'a self) -> Self::Iter<'a>; + + /// Iterator type when draining the container. + type DrainIter<'a>: Iterator>; + + /// Returns an iterator that drains the contents of this container. + /// Drain leaves the container in an undefined state. + fn drain<'a>(&'a mut self) -> Self::DrainIter<'a>; +} + +/// A type that can push itself into a container. +pub trait PushInto { + /// Push self into the target container. + fn push_into(self, target: &mut C); +} + +/// A type that has the necessary infrastructure to push elements, without specifying how pushing +/// itself works. For this, pushable types should implement [`PushInto`]. +// TODO: Reconsider this interface because it assumes +// * Containers have a capacity +// * Push presents single elements. +// * Instead of testing `len == cap`, we could have a `is_full` to test that we might +// not be able to absorb more data. +// * Example: A FlatStack with optimized offsets and deduplication can absorb many elements without reallocation. What does capacity mean in this context? +pub trait PushContainer: Container { + /// Push `item` into self + #[inline] + fn push>(&mut self, item: T) { + item.push_into(self) + } + /// Return the capacity of the container. + fn capacity(&self) -> usize; + /// Return the preferred capacity of the container. + fn preferred_capacity() -> usize; + /// Reserve space for `additional` elements, possibly increasing the capacity of the container. + fn reserve(&mut self, additional: usize); } impl Container for Vec { - type Item = T; + type ItemRef<'a> = &'a T where T: 'a; + type Item<'a> = T where T: 'a; fn len(&self) -> usize { Vec::len(self) @@ -51,20 +95,58 @@ impl Container for Vec { Vec::is_empty(self) } + fn clear(&mut self) { Vec::clear(self) } + + type Iter<'a> = std::slice::Iter<'a, T>; + + fn iter<'a>(&'a self) -> Self::Iter<'a> { + self.as_slice().iter() + } + + type DrainIter<'a> = std::vec::Drain<'a, T>; + + fn drain(&mut self) -> Self::DrainIter<'_> { + self.drain(..) + } +} + +impl PushContainer for Vec { fn capacity(&self) -> usize { - Vec::capacity(self) + self.capacity() } - fn clear(&mut self) { Vec::clear(self) } + fn preferred_capacity() -> usize { + buffer::default_capacity::() + } + + fn reserve(&mut self, additional: usize) { + self.reserve(additional); + } +} + +impl PushInto> for T { + #[inline] + fn push_into(self, target: &mut Vec) { + target.push(self) + } +} + +impl<'a, T: Clone> PushInto> for &'a T { + #[inline] + fn push_into(self, target: &mut Vec) { + target.push(self.clone()) + } } mod rc { + use std::ops::Deref; use std::rc::Rc; use crate::Container; impl Container for Rc { - type Item = T::Item; + type ItemRef<'a> = T::ItemRef<'a> where Self: 'a; + type Item<'a> = T::ItemRef<'a> where Self: 'a; fn len(&self) -> usize { std::ops::Deref::deref(self).len() @@ -74,10 +156,6 @@ mod rc { std::ops::Deref::deref(self).is_empty() } - fn capacity(&self) -> usize { - std::ops::Deref::deref(self).capacity() - } - fn clear(&mut self) { // Try to reuse the allocation if possible if let Some(inner) = Rc::get_mut(self) { @@ -86,16 +164,30 @@ mod rc { *self = Self::default(); } } + + type Iter<'a> = T::Iter<'a>; + + fn iter(&self) -> Self::Iter<'_> { + self.deref().iter() + } + + type DrainIter<'a> = T::Iter<'a>; + + fn drain(&mut self) -> Self::DrainIter<'_> { + self.iter() + } } } mod arc { + use std::ops::Deref; use std::sync::Arc; use crate::Container; impl Container for Arc { - type Item = T::Item; + type ItemRef<'a> = T::ItemRef<'a> where Self: 'a; + type Item<'a> = T::ItemRef<'a> where Self: 'a; fn len(&self) -> usize { std::ops::Deref::deref(self).len() @@ -105,10 +197,6 @@ mod arc { std::ops::Deref::deref(self).is_empty() } - fn capacity(&self) -> usize { - std::ops::Deref::deref(self).capacity() - } - fn clear(&mut self) { // Try to reuse the allocation if possible if let Some(inner) = Arc::get_mut(self) { @@ -117,43 +205,56 @@ mod arc { *self = Self::default(); } } + + type Iter<'a> = T::Iter<'a>; + + fn iter(&self) -> Self::Iter<'_> { + self.deref().iter() + } + + type DrainIter<'a> = T::Iter<'a>; + + fn drain(&mut self) -> Self::DrainIter<'_> { + self.iter() + } } } /// A container that can partition itself into pieces. -pub trait PushPartitioned: Container { +pub trait PushPartitioned: PushContainer { /// Partition and push this container. /// /// Drain all elements from `self`, and use the function `index` to determine which `buffer` to /// append an element to. Call `flush` with an index and a buffer to send the data downstream. fn push_partitioned(&mut self, buffers: &mut [Self], index: I, flush: F) where - I: FnMut(&Self::Item) -> usize, + for<'a> I: FnMut(&Self::Item<'a>) -> usize, F: FnMut(usize, &mut Self); } -impl PushPartitioned for Vec { +impl PushPartitioned for T where for<'a> T::Item<'a>: PushInto { fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) where - I: FnMut(&Self::Item) -> usize, + for<'a> I: FnMut(&Self::Item<'a>) -> usize, F: FnMut(usize, &mut Self), { - fn ensure_capacity(this: &mut Vec) { + let ensure_capacity = |this: &mut Self| { let capacity = this.capacity(); - let desired_capacity = buffer::default_capacity::(); + let desired_capacity = Self::preferred_capacity(); if capacity < desired_capacity { this.reserve(desired_capacity - capacity); } - } + }; - for datum in self.drain(..) { + for datum in self.drain() { let index = index(&datum); ensure_capacity(&mut buffers[index]); buffers[index].push(datum); - if buffers[index].len() == buffers[index].capacity() { + if buffers[index].len() >= buffers[index].capacity() { flush(index, &mut buffers[index]); } } + self.clear(); } } diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 976023a19..b73076c7c 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -53,14 +53,18 @@ impl ParallelizationContractCore for Pipeline { } /// An exchange between multiple observers by data -pub struct ExchangeCore { hash_func: F, phantom: PhantomData<(C, D)> } +pub struct ExchangeCore { hash_func: F, phantom: PhantomData } /// [ExchangeCore] specialized to vector-based containers. -pub type Exchange = ExchangeCore, D, F>; +pub type Exchange = ExchangeCore, F>; -implu64+'static> ExchangeCore { +impl ExchangeCore +where + C: PushPartitioned, + for<'a> F: FnMut(&C::Item<'a>)->u64 +{ /// Allocates a new `Exchange` pact from a distribution function. - pub fn new(func: F) -> ExchangeCore { + pub fn new(func: F) -> ExchangeCore { ExchangeCore { hash_func: func, phantom: PhantomData, @@ -69,11 +73,12 @@ implu64+'static> ExchangeCore { } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -implu64+'static> ParallelizationContractCore for ExchangeCore +impl ParallelizationContractCore for ExchangeCore where - C: Data + Container + PushPartitioned, + C: Data + PushPartitioned, + for<'a> H: FnMut(&C::Item<'a>) -> u64 { - type Pusher = ExchangePusher>>>, F>; + type Pusher = ExchangePusher>>>, H>; type Puller = LogPuller>>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { @@ -83,7 +88,7 @@ where } } -impl Debug for ExchangeCore { +impl Debug for ExchangeCore { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Exchange").finish() } diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index d18a0f84d..f3b737f7d 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -1,7 +1,8 @@ //! Buffering and session mechanisms to provide the appearance of record-at-a-time sending, //! with the performance of batched sends. -use crate::dataflow::channels::{Bundle, BundleCore, Message}; +use timely_container::{PushContainer, PushInto}; +use crate::dataflow::channels::{BundleCore, Message}; use crate::progress::Timestamp; use crate::dataflow::operators::Capability; use crate::communication::Push; @@ -81,21 +82,22 @@ impl>> BufferCore where T: Eq } } -impl>> Buffer where T: Eq+Clone { +impl>> BufferCore where T: Eq+Clone { // internal method for use by `Session`. #[inline] - fn give(&mut self, data: D) { - if self.buffer.capacity() < crate::container::buffer::default_capacity::() { + fn give>(&mut self, data: D) { + if self.buffer.capacity() < C::preferred_capacity() { let to_reserve = crate::container::buffer::default_capacity::() - self.buffer.capacity(); self.buffer.reserve(to_reserve); } self.buffer.push(data); - // assert!(self.buffer.capacity() == Message::::default_length()); - if self.buffer.len() == self.buffer.capacity() { + if self.buffer.len() >= C::preferred_capacity() { self.flush(); } } +} +impl>>> Buffer where T: Eq+Clone { // Gives an entire message at a specific time. fn give_vec(&mut self, vector: &mut Vec) { // flush to ensure fifo-ness diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 9ea271d31..93ca437e5 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -7,17 +7,22 @@ use crate::dataflow::channels::{BundleCore, Message}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H: FnMut(&D) -> u64> { +pub struct Exchange>, H> +where + for<'a> H: FnMut(&C::Item<'a>) -> u64 +{ pushers: Vec

, buffers: Vec, current: Option, hash_func: H, - phantom: std::marker::PhantomData, } -impl>, H: FnMut(&D) -> u64> Exchange { +impl>, H> Exchange +where + for<'a> H: FnMut(&C::Item<'a>) -> u64 +{ /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. - pub fn new(pushers: Vec

, key: H) -> Exchange { + pub fn new(pushers: Vec

, key: H) -> Exchange { let mut buffers = vec![]; for _ in 0..pushers.len() { buffers.push(Default::default()); @@ -27,7 +32,6 @@ impl>, H: FnMut(&D) -> hash_func: key, buffers, current: None, - phantom: std::marker::PhantomData, } } #[inline] @@ -40,9 +44,10 @@ impl>, H: FnMut(&D) -> } } -impl>, H: FnMut(&D) -> u64> Push> for Exchange +impl>, H, > Push> for Exchange where - C: PushPartitioned + C: PushPartitioned, + for<'a> H: FnMut(&C::Item<'a>) -> u64 { #[inline(never)] fn push(&mut self, message: &mut Option>) { diff --git a/timely/src/dataflow/operators/exchange.rs b/timely/src/dataflow/operators/core/exchange.rs similarity index 76% rename from timely/src/dataflow/operators/exchange.rs rename to timely/src/dataflow/operators/core/exchange.rs index 1f603df25..e88a9bc82 100644 --- a/timely/src/dataflow/operators/exchange.rs +++ b/timely/src/dataflow/operators/core/exchange.rs @@ -1,5 +1,6 @@ //! Exchange records between workers. +use timely_container::Container; use crate::ExchangeData; use crate::container::PushPartitioned; use crate::dataflow::channels::pact::ExchangeCore; @@ -7,7 +8,7 @@ use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::{Scope, StreamCore}; /// Exchange records between workers. -pub trait Exchange { +pub trait Exchange { /// Exchange records between workers. /// /// The closure supplied should map a reference to a record to a `u64`, @@ -23,15 +24,19 @@ pub trait Exchange { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn exchange(&self, route: impl FnMut(&D) -> u64 + 'static) -> Self; + fn exchange(&self, route: F) -> Self + where + for<'a> F: FnMut(&C::Item<'a>) -> u64; } -impl Exchange for StreamCore +impl Exchange for StreamCore where C: PushPartitioned + ExchangeData, - C::Item: ExchangeData, { - fn exchange(&self, route: impl FnMut(&C::Item) -> u64 + 'static) -> StreamCore { + fn exchange(&self, route: F) -> StreamCore + where + for<'a> F: FnMut(&C::Item<'a>) -> u64, + { let mut container = Default::default(); self.unary(ExchangeCore::new(route), "Exchange", |_, _| { move |input, output| { diff --git a/timely/src/dataflow/operators/inspect.rs b/timely/src/dataflow/operators/core/inspect.rs similarity index 79% rename from timely/src/dataflow/operators/inspect.rs rename to timely/src/dataflow/operators/core/inspect.rs index 6e26856d1..d26ff1446 100644 --- a/timely/src/dataflow/operators/inspect.rs +++ b/timely/src/dataflow/operators/core/inspect.rs @@ -1,9 +1,6 @@ //! Extension trait and implementation for observing and action on streamed data. -use std::rc::Rc; -use timely_container::columnation::{Columnation, TimelyStack}; use crate::Container; -use crate::Data; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::operators::generic::Operator; @@ -21,7 +18,10 @@ pub trait Inspect: InspectCore + Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn inspect(&self, mut func: impl FnMut(&C::Item)+'static) -> Self { + fn inspect(&self, mut func: F) -> Self + where + for<'a> F: FnMut(C::ItemRef<'a>) + { self.inspect_batch(move |_, data| { for datum in data.iter() { func(datum); } }) @@ -38,10 +38,13 @@ pub trait Inspect: InspectCore + Sized { /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x)); /// }); /// ``` - fn inspect_time(&self, mut func: impl FnMut(&G::Timestamp, &C::Item)+'static) -> Self { + fn inspect_time(&self, mut func: F) -> Self + where + for <'a> F: FnMut(&G::Timestamp, C::ItemRef<'a>), + { self.inspect_batch(move |time, data| { for datum in data.iter() { - func(&time, &datum); + func(&time, datum); } }) } @@ -57,7 +60,7 @@ pub trait Inspect: InspectCore + Sized { /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len())); /// }); /// ``` - fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &[C::Item])+'static) -> Self { + fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self { self.inspect_core(move |event| { if let Ok((time, data)) = event { func(time, data); @@ -84,26 +87,12 @@ pub trait Inspect: InspectCore + Sized { /// }); /// }); /// ``` - fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>)+'static; -} - -impl Inspect> for StreamCore> { - fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static { - self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..])))) - } -} - -impl Inspect> for StreamCore> { - fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static { - self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..])))) - } + fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } -impl Inspect> for StreamCore> - where C: AsRef<[C::Item]> -{ - fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>) + 'static { - self.inspect_container(move |r| func(r.map(|(t, c)| (t, c.as_ref().as_ref())))) +impl Inspect for StreamCore { + fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static { + self.inspect_container(move |r| func(r)) } } diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs new file mode 100644 index 000000000..c70c5d945 --- /dev/null +++ b/timely/src/dataflow/operators/core/mod.rs @@ -0,0 +1,10 @@ +//! Extension traits for `Stream` implementing various operators that +//! are independent of specific container types. + +pub mod exchange; +pub mod inspect; +pub mod reclock; + +pub use exchange::Exchange; +pub use inspect::{Inspect, InspectCore}; +pub use reclock::Reclock; diff --git a/timely/src/dataflow/operators/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs similarity index 94% rename from timely/src/dataflow/operators/reclock.rs rename to timely/src/dataflow/operators/core/reclock.rs index b656e0aaf..9510e8722 100644 --- a/timely/src/dataflow/operators/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -45,11 +45,11 @@ pub trait Reclock { /// assert_eq!(extracted[1], (5, vec![4,5])); /// assert_eq!(extracted[2], (8, vec![6,7,8])); /// ``` - fn reclock>(&self, clock: &StreamCore) -> Self; + fn reclock(&self, clock: &StreamCore) -> Self; } impl Reclock for StreamCore { - fn reclock>(&self, clock: &StreamCore) -> StreamCore { + fn reclock(&self, clock: &StreamCore) -> StreamCore { let mut stash = vec![]; diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 1eec15cba..5650f6e97 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -33,6 +33,8 @@ pub use self::generic::{Notificator, FrontierNotificator}; pub use self::reclock::Reclock; pub use self::count::Accumulate; +pub mod core; + pub mod enterleave; pub mod input; pub mod flow_controlled; @@ -41,10 +43,10 @@ pub mod feedback; pub mod concat; pub mod partition; pub mod map; -pub mod inspect; +pub use self::core::inspect; pub mod filter; pub mod delay; -pub mod exchange; +pub use self::core::exchange; pub mod broadcast; pub mod probe; pub mod to_stream; @@ -57,7 +59,7 @@ pub mod result; pub mod aggregation; pub mod generic; -pub mod reclock; +pub use self::core::reclock; pub mod count; // keep "mint" module-private