diff --git a/container/src/columnation.rs b/container/src/columnation.rs index 54a00ca4e..1ab942304 100644 --- a/container/src/columnation.rs +++ b/container/src/columnation.rs @@ -296,8 +296,6 @@ mod container { use crate::columnation::{Columnation, TimelyStack}; impl Container for TimelyStack { - type Item = T; - fn len(&self) -> usize { self.local.len() } @@ -316,6 +314,7 @@ mod container { } impl PushPartitioned for TimelyStack { + type Item = T; fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) where I: FnMut(&Self::Item) -> usize, diff --git a/container/src/lib.rs b/container/src/lib.rs index eb9b3ce77..9cd2a12bc 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -17,9 +17,6 @@ pub mod columnation; /// is efficient (which is not necessarily the case when deriving `Clone`.) /// TODO: Don't require `Container: Clone` pub trait Container: Default + Clone + 'static { - /// The type of elements this container holds. - type Item; - /// The number of elements in this container /// /// The length of a container must be consistent between sending and receiving it. @@ -41,8 +38,6 @@ pub trait Container: Default + Clone + 'static { } impl Container for Vec { - type Item = T; - fn len(&self) -> usize { Vec::len(self) } @@ -64,8 +59,6 @@ mod rc { use crate::Container; impl Container for Rc { - type Item = T::Item; - fn len(&self) -> usize { std::ops::Deref::deref(self).len() } @@ -95,8 +88,6 @@ mod arc { use crate::Container; impl Container for Arc { - type Item = T::Item; - fn len(&self) -> usize { std::ops::Deref::deref(self).len() } @@ -122,6 +113,9 @@ mod arc { /// A container that can partition itself into pieces. pub trait PushPartitioned: Container { + /// Type of item to distribute among containers. + type Item; + /// Partition and push this container. /// /// Drain all elements from `self`, and use the function `index` to determine which `buffer` to @@ -133,6 +127,7 @@ pub trait PushPartitioned: Container { } impl PushPartitioned for Vec { + type Item = T; fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) where I: FnMut(&Self::Item) -> usize, diff --git a/timely/src/dataflow/operators/inspect.rs b/timely/src/dataflow/operators/inspect.rs index 6e26856d1..83087ad37 100644 --- a/timely/src/dataflow/operators/inspect.rs +++ b/timely/src/dataflow/operators/inspect.rs @@ -9,7 +9,7 @@ use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::operators::generic::Operator; /// Methods to inspect records and batches of records on a stream. -pub trait Inspect: InspectCore + Sized { +pub trait Inspect: InspectCore + Sized { /// Runs a supplied closure on each observed data element. /// /// # Examples @@ -21,7 +21,7 @@ pub trait Inspect: InspectCore + Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn inspect(&self, mut func: impl FnMut(&C::Item)+'static) -> Self { + fn inspect(&self, mut func: impl FnMut(&I)+'static) -> Self { self.inspect_batch(move |_, data| { for datum in data.iter() { func(datum); } }) @@ -38,7 +38,7 @@ pub trait Inspect: InspectCore + Sized { /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x)); /// }); /// ``` - fn inspect_time(&self, mut func: impl FnMut(&G::Timestamp, &C::Item)+'static) -> Self { + fn inspect_time(&self, mut func: impl FnMut(&G::Timestamp, &I)+'static) -> Self { self.inspect_batch(move |time, data| { for datum in data.iter() { func(&time, &datum); @@ -57,7 +57,7 @@ pub trait Inspect: InspectCore + Sized { /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len())); /// }); /// ``` - fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &[C::Item])+'static) -> Self { + fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &[I])+'static) -> Self { self.inspect_core(move |event| { if let Ok((time, data)) = event { func(time, data); @@ -84,25 +84,25 @@ pub trait Inspect: InspectCore + Sized { /// }); /// }); /// ``` - fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>)+'static; + fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[I]), &[G::Timestamp]>)+'static; } -impl Inspect> for StreamCore> { +impl Inspect, D> for StreamCore> { fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static { self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..])))) } } -impl Inspect> for StreamCore> { +impl Inspect, D> for StreamCore> { fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static { self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..])))) } } -impl Inspect> for StreamCore> - where C: AsRef<[C::Item]> +impl Inspect, C> for StreamCore> + where C: AsRef<[C]> { - fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>) + 'static { + fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C]), &[G::Timestamp]>) + 'static { self.inspect_container(move |r| func(r.map(|(t, c)| (t, c.as_ref().as_ref())))) } } diff --git a/timely/src/dataflow/operators/reclock.rs b/timely/src/dataflow/operators/reclock.rs index b656e0aaf..9510e8722 100644 --- a/timely/src/dataflow/operators/reclock.rs +++ b/timely/src/dataflow/operators/reclock.rs @@ -45,11 +45,11 @@ pub trait Reclock { /// assert_eq!(extracted[1], (5, vec![4,5])); /// assert_eq!(extracted[2], (8, vec![6,7,8])); /// ``` - fn reclock>(&self, clock: &StreamCore) -> Self; + fn reclock(&self, clock: &StreamCore) -> Self; } impl Reclock for StreamCore { - fn reclock>(&self, clock: &StreamCore) -> StreamCore { + fn reclock(&self, clock: &StreamCore) -> StreamCore { let mut stash = vec![];