diff --git a/container/src/columnation.rs b/container/src/columnation.rs index 83d986d62..0e7883660 100644 --- a/container/src/columnation.rs +++ b/container/src/columnation.rs @@ -314,7 +314,7 @@ mod container { use crate::columnation::{Columnation, TimelyStack}; - impl Container for TimelyStack { + impl Container for TimelyStack { type ItemRef<'a> = &'a T where Self: 'a; type Item<'a> = &'a T where Self: 'a; @@ -330,13 +330,13 @@ mod container { TimelyStack::clear(self) } - type Iter<'a> = std::slice::Iter<'a, T>; + type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a; fn iter(&self) -> Self::Iter<'_> { self.deref().iter() } - type DrainIter<'a> = std::slice::Iter<'a, T>; + type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a; fn drain(&mut self) -> Self::DrainIter<'_> { (*self).iter() diff --git a/container/src/lib.rs b/container/src/lib.rs index 61a4b32b9..fcb7ee85f 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -16,8 +16,7 @@ pub mod flatcontainer; /// We require the container to be cloneable to enable efficient copies when providing references /// of containers to operators. Care must be taken that the type's `clone_from` implementation /// is efficient (which is not necessarily the case when deriving `Clone`.) -/// TODO: Don't require `Container: Clone` -pub trait Container: Default + Clone + 'static { +pub trait Container: Default { /// The type of elements when reading non-destructively from the container. type ItemRef<'a> where Self: 'a; @@ -42,13 +41,13 @@ pub trait Container: Default + Clone + 'static { fn clear(&mut self); /// Iterator type when reading from the container. - type Iter<'a>: Iterator>; + type Iter<'a>: Iterator> where Self: 'a; /// Returns an iterator that reads the contents of this container. fn iter(&self) -> Self::Iter<'_>; /// Iterator type when draining the container. - type DrainIter<'a>: Iterator>; + type DrainIter<'a>: Iterator> where Self: 'a; /// Returns an iterator that drains the contents of this container. /// Drain leaves the container in an undefined state. @@ -83,7 +82,7 @@ pub trait PushContainer: Container { fn reserve(&mut self, additional: usize); } -impl Container for Vec { +impl Container for Vec { type ItemRef<'a> = &'a T where T: 'a; type Item<'a> = T where T: 'a; @@ -97,13 +96,13 @@ impl Container for Vec { fn clear(&mut self) { Vec::clear(self) } - type Iter<'a> = std::slice::Iter<'a, T>; + type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a; fn iter(&self) -> Self::Iter<'_> { self.as_slice().iter() } - type DrainIter<'a> = std::vec::Drain<'a, T>; + type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a; fn drain(&mut self) -> Self::DrainIter<'_> { self.drain(..) @@ -165,13 +164,13 @@ mod rc { } } - type Iter<'a> = T::Iter<'a>; + type Iter<'a> = T::Iter<'a> where Self: 'a; fn iter(&self) -> Self::Iter<'_> { self.deref().iter() } - type DrainIter<'a> = T::Iter<'a>; + type DrainIter<'a> = T::Iter<'a> where Self: 'a; fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() @@ -206,13 +205,13 @@ mod arc { } } - type Iter<'a> = T::Iter<'a>; + type Iter<'a> = T::Iter<'a> where Self: 'a; fn iter(&self) -> Self::Iter<'_> { self.deref().iter() } - type DrainIter<'a> = T::Iter<'a>; + type DrainIter<'a> = T::Iter<'a> where Self: 'a; fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() @@ -232,7 +231,7 @@ pub trait PushPartitioned: PushContainer { F: FnMut(usize, &mut Self); } -impl PushPartitioned for T where for<'a> T::Item<'a>: PushInto { +impl PushPartitioned for T where for<'a> T::Item<'a>: PushInto { fn push_partitioned(&mut self, buffers: &mut [Self], mut index: I, mut flush: F) where for<'a> I: FnMut(&Self::Item<'a>) -> usize, diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index abf6fef3a..69b3e1d1d 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -9,9 +9,9 @@ use std::{fmt::{self, Debug}, marker::PhantomData}; -use crate::Container; +use crate::{Container, ExchangeData}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; -use crate::communication::{Push, Pull, Data}; +use crate::communication::{Push, Pull}; use crate::container::PushPartitioned; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; use crate::dataflow::channels::{Bundle, Message}; @@ -33,7 +33,7 @@ pub trait ParallelizationContract { #[derive(Debug)] pub struct Pipeline; -impl ParallelizationContract for Pipeline { +impl ParallelizationContract for Pipeline { type Pusher = LogPusher>>; type Puller = LogPuller>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { @@ -66,7 +66,7 @@ where // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. impl ParallelizationContract for ExchangeCore where - C: Data + PushPartitioned, + C: PushPartitioned + ExchangeData, for<'a> H: FnMut(&C::Item<'a>) -> u64 { type Pusher = ExchangePusher>>>, H>; diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 403da32cd..a03ddf0bb 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -3,7 +3,7 @@ use crate::communication::Push; use crate::container::PushPartitioned; use crate::dataflow::channels::{Bundle, Message}; -use crate::{Container, Data}; +use crate::Data; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. @@ -44,9 +44,9 @@ where } } -impl>, H, > Push> for Exchange +impl>, H, > Push> for Exchange where - C: PushPartitioned, + C: PushPartitioned+Data, for<'a> H: FnMut(&C::Item<'a>) -> u64 { #[inline(never)] diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index f5cbfb226..087af505e 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -17,7 +17,7 @@ pub struct Tee { shared: PushList, } -impl Push> for Tee { +impl Push> for Tee { #[inline] fn push(&mut self, message: &mut Option>) { let mut pushers = self.shared.borrow_mut(); diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/branch.rs index 70e087abd..b3dacac8b 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/branch.rs @@ -94,7 +94,7 @@ pub trait BranchWhen: Sized { fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self); } -impl BranchWhen for StreamCore { +impl BranchWhen for StreamCore { fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); diff --git a/timely/src/dataflow/operators/core/capture/capture.rs b/timely/src/dataflow/operators/core/capture/capture.rs index b14d3a6b5..3d7caeb6f 100644 --- a/timely/src/dataflow/operators/core/capture/capture.rs +++ b/timely/src/dataflow/operators/core/capture/capture.rs @@ -10,14 +10,14 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; -use crate::Container; +use crate::{Container, Data}; use crate::progress::ChangeBatch; use crate::progress::Timestamp; use super::{Event, EventPusher}; /// Capture a stream of timestamped data for later replay. -pub trait Capture { +pub trait Capture { /// Captures a stream of timestamped data for later replay. /// /// # Examples @@ -113,7 +113,7 @@ pub trait Capture { } } -impl Capture for StreamCore { +impl Capture for StreamCore { fn capture_into+'static>(&self, mut event_pusher: P) { let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope()); diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index 2ab843f86..9a9adca50 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -46,7 +46,7 @@ use crate::progress::Timestamp; use super::Event; use super::event::EventIterator; -use crate::Container; +use crate::{Container, Data}; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay : Sized { @@ -62,7 +62,7 @@ pub trait Replay : Sized { fn replay_core>(self, scope: &mut S, period: Option) -> StreamCore; } -impl Replay for I +impl Replay for I where I : IntoIterator, ::Item: EventIterator+'static, diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index f69fd98a7..104a7fc25 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -1,7 +1,7 @@ //! Merges the contents of multiple streams. -use crate::Container; +use crate::{Container, Data}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{StreamCore, Scope}; @@ -23,7 +23,7 @@ pub trait Concat { fn concat(&self, _: &StreamCore) -> StreamCore; } -impl Concat for StreamCore { +impl Concat for StreamCore { fn concat(&self, other: &StreamCore) -> StreamCore { self.scope().concatenate([self.clone(), other.clone()]) } @@ -52,7 +52,7 @@ pub trait Concatenate { I: IntoIterator>; } -impl Concatenate for StreamCore { +impl Concatenate for StreamCore { fn concatenate(&self, sources: I) -> StreamCore where I: IntoIterator> @@ -62,7 +62,7 @@ impl Concatenate for StreamCore { } } -impl Concatenate for G { +impl Concatenate for G { fn concatenate(&self, sources: I) -> StreamCore where I: IntoIterator> diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index 23da0e201..e40d21c3b 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -103,7 +103,7 @@ pub trait Leave { fn leave(&self) -> StreamCore; } -impl<'a, G: Scope, C: Clone+Container, T: Timestamp+Refines> Leave for StreamCore, C> { +impl<'a, G: Scope, C: Container+Data, T: Timestamp+Refines> Leave for StreamCore, C> { fn leave(&self) -> StreamCore { let scope = self.scope(); @@ -130,14 +130,14 @@ impl<'a, G: Scope, C: Clone+Container, T: Timestamp+Refines> Leave } -struct IngressNub, TContainer: Container> { +struct IngressNub, TContainer: Container+Data> { targets: Counter>, phantom: ::std::marker::PhantomData, activator: crate::scheduling::Activator, active: bool, } -impl, TContainer: Container> Push> for IngressNub { +impl, TContainer: Container+Data> Push> for IngressNub { fn push(&mut self, element: &mut Option>) { if let Some(message) = element { let outer_message = message.as_mut(); diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index 0481bbb3c..0690bf6a1 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -1,6 +1,6 @@ //! Create cycles in a timely dataflow graph. -use crate::Container; +use crate::{Container, Data}; use crate::progress::{Timestamp, PathSummary}; use crate::progress::frontier::Antichain; @@ -37,7 +37,7 @@ pub trait Feedback { /// .connect_loop(handle); /// }); /// ``` - fn feedback(&mut self, summary: ::Summary) -> (Handle, StreamCore); + fn feedback(&mut self, summary: ::Summary) -> (Handle, StreamCore); } /// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`. @@ -65,12 +65,12 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> { /// }); /// }); /// ``` - fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>); + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>); } impl Feedback for G { - fn feedback(&mut self, summary: ::Summary) -> (Handle, StreamCore) { + fn feedback(&mut self, summary: ::Summary) -> (Handle, StreamCore) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); let (output, stream) = builder.new_output(); @@ -80,13 +80,13 @@ impl Feedback for G { } impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> { - fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>) { + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>) { self.feedback(Product::new(Default::default(), summary)) } } /// Connect a `Stream` to the input of a loop variable. -pub trait ConnectLoop { +pub trait ConnectLoop { /// Connect a `Stream` to be the input of a loop variable. /// /// # Examples @@ -107,7 +107,7 @@ pub trait ConnectLoop { fn connect_loop(&self, handle: Handle); } -impl ConnectLoop for StreamCore { +impl ConnectLoop for StreamCore { fn connect_loop(&self, handle: Handle) { let mut builder = handle.builder; @@ -134,7 +134,7 @@ impl ConnectLoop for StreamCore { /// A handle used to bind the source of a loop variable. #[derive(Debug)] -pub struct Handle { +pub struct Handle { builder: OperatorBuilder, summary: ::Summary, output: OutputWrapper>, diff --git a/timely/src/dataflow/operators/core/filter.rs b/timely/src/dataflow/operators/core/filter.rs index c6585162a..2c301e0ae 100644 --- a/timely/src/dataflow/operators/core/filter.rs +++ b/timely/src/dataflow/operators/core/filter.rs @@ -1,5 +1,6 @@ //! Filters a stream by a predicate. use timely_container::{Container, PushContainer, PushInto}; +use crate::Data; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Scope, StreamCore}; @@ -23,7 +24,7 @@ pub trait Filter { fn filter)->bool+'static>(&self, predicate: P) -> Self; } -impl Filter for StreamCore +impl Filter for StreamCore where for<'a> C::Item<'a>: PushInto { diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 2b8ed58dc..843fb6ede 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -11,7 +11,7 @@ use crate::progress::frontier::Antichain; use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch}; use crate::progress::Source; -use crate::Container; +use crate::{Container, Data}; use crate::communication::Push; use crate::dataflow::{Scope, ScopeParent, StreamCore}; use crate::dataflow::channels::pushers::{Tee, Counter}; @@ -60,7 +60,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input(&mut self) -> (Handle<::Timestamp, C>, StreamCore); + fn new_input(&mut self) -> (Handle<::Timestamp, C>, StreamCore); /// Create a new stream from a supplied interactive handle. /// @@ -93,18 +93,18 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn input_from(&mut self, handle: &mut Handle<::Timestamp, C>) -> StreamCore; + fn input_from(&mut self, handle: &mut Handle<::Timestamp, C>) -> StreamCore; } use crate::order::TotalOrder; impl Input for G where ::Timestamp: TotalOrder { - fn new_input(&mut self) -> (Handle<::Timestamp, C>, StreamCore) { + fn new_input(&mut self) -> (Handle<::Timestamp, C>, StreamCore) { let mut handle = Handle::new(); let stream = self.input_from(&mut handle); (handle, stream) } - fn input_from(&mut self, handle: &mut Handle<::Timestamp, C>) -> StreamCore { + fn input_from(&mut self, handle: &mut Handle<::Timestamp, C>) -> StreamCore { let (output, registrar) = Tee::<::Timestamp, C>::new(); let counter = Counter::new(output); let produced = counter.produced().clone(); @@ -174,7 +174,7 @@ impl Operate for Operator { /// A handle to an input `StreamCore`, used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct Handle { +pub struct Handle { activate: Vec, progress: Vec>>>, pushers: Vec>>, @@ -183,7 +183,7 @@ pub struct Handle { now_at: T, } -impl Handle { +impl Handle { /// Allocates a new input handle, from which one can create timely streams. /// /// # Examples @@ -390,7 +390,7 @@ impl Handle { } } -impl Handle { +impl Handle { #[inline] /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. /// @@ -427,13 +427,13 @@ impl Handle { } } -impl Default for Handle { +impl Default for Handle { fn default() -> Self { Self::new() } } -impl Drop for Handle { +impl Drop for Handle { fn drop(&mut self) { self.close_epoch(); } diff --git a/timely/src/dataflow/operators/core/inspect.rs b/timely/src/dataflow/operators/core/inspect.rs index d26ff1446..b175c2364 100644 --- a/timely/src/dataflow/operators/core/inspect.rs +++ b/timely/src/dataflow/operators/core/inspect.rs @@ -1,6 +1,6 @@ //! Extension trait and implementation for observing and action on streamed data. -use crate::Container; +use crate::{Container, Data}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::operators::generic::Operator; @@ -90,7 +90,7 @@ pub trait Inspect: InspectCore + Sized { fn inspect_core(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } -impl Inspect for StreamCore { +impl Inspect for StreamCore { fn inspect_core(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static { self.inspect_container(move |r| func(r)) } @@ -120,7 +120,7 @@ pub trait InspectCore { fn inspect_container(&self, func: F) -> StreamCore where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } -impl InspectCore for StreamCore { +impl InspectCore for StreamCore { fn inspect_container(&self, mut func: F) -> StreamCore where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 2b52826dc..1422516f2 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -1,6 +1,7 @@ //! Extension methods for `StreamCore` based on record-by-record transformation. use timely_container::{Container, PushContainer, PushInto}; +use crate::Data; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pact::Pipeline; @@ -24,7 +25,7 @@ pub trait Map { /// ``` fn map(&self, mut logic: L) -> StreamCore where - C2: PushContainer, + C2: PushContainer+Data, D2: PushInto, L: FnMut(C::Item<'_>)->D2 + 'static, { @@ -46,20 +47,20 @@ pub trait Map { /// ``` fn flat_map(&self, logic: L) -> StreamCore where - C2: PushContainer, + C2: PushContainer+Data, I: IntoIterator, I::Item: PushInto, L: FnMut(C::Item<'_>)->I + 'static, ; } -impl Map for StreamCore { +impl Map for StreamCore { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. fn flat_map(&self, mut logic: L) -> StreamCore where - C2: PushContainer, + C2: PushContainer+Data, I: IntoIterator, I::Item: PushInto, L: FnMut(C::Item<'_>)->I + 'static, diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index c49b0513f..1894be397 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -1,6 +1,7 @@ //! Operators that separate one stream into two streams based on some condition use timely_container::{Container, PushContainer, PushInto}; +use crate::Data; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; @@ -33,23 +34,23 @@ pub trait OkErr { logic: L, ) -> (StreamCore, StreamCore) where - C1: PushContainer, + C1: PushContainer+Data, D1: PushInto, - C2: PushContainer, + C2: PushContainer+Data, D2: PushInto, L: FnMut(C::Item<'_>) -> Result+'static ; } -impl OkErr for StreamCore { +impl OkErr for StreamCore { fn ok_err( &self, mut logic: L, ) -> (StreamCore, StreamCore) where - C1: PushContainer, + C1: PushContainer+Data, D1: PushInto, - C2: PushContainer, + C2: PushContainer+Data, D2: PushInto, L: FnMut(C::Item<'_>) -> Result+'static { diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index 592899a22..431ce0d68 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -13,7 +13,7 @@ use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::dataflow::{StreamCore, Scope}; -use crate::Container; +use crate::{Container, Data}; /// Monitors progress at a `Stream`. pub trait Probe { @@ -79,7 +79,7 @@ pub trait Probe { fn probe_with(&self, handle: &Handle) -> StreamCore; } -impl Probe for StreamCore { +impl Probe for StreamCore { fn probe(&self) -> Handle { // the frontier is shared state; scope updates, handle reads. diff --git a/timely/src/dataflow/operators/core/rc.rs b/timely/src/dataflow/operators/core/rc.rs index c00108a7c..c5ae802bd 100644 --- a/timely/src/dataflow/operators/core/rc.rs +++ b/timely/src/dataflow/operators/core/rc.rs @@ -3,7 +3,7 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::Operator; use crate::dataflow::{Scope, StreamCore}; -use crate::Container; +use crate::{Container, Data}; use std::rc::Rc; /// Convert a stream into a stream of shared containers @@ -24,7 +24,7 @@ pub trait SharedStream { fn shared(&self) -> StreamCore>; } -impl SharedStream for StreamCore { +impl SharedStream for StreamCore { fn shared(&self) -> StreamCore> { let mut container = Default::default(); self.unary(Pipeline, "Shared", move |_, _| { diff --git a/timely/src/dataflow/operators/core/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs index 9510e8722..54dda1afe 100644 --- a/timely/src/dataflow/operators/core/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -1,6 +1,6 @@ //! Extension methods for `Stream` based on record-by-record transformation. -use crate::Container; +use crate::{Container, Data}; use crate::order::PartialOrder; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pact::Pipeline; @@ -45,11 +45,11 @@ pub trait Reclock { /// assert_eq!(extracted[1], (5, vec![4,5])); /// assert_eq!(extracted[2], (8, vec![6,7,8])); /// ``` - fn reclock(&self, clock: &StreamCore) -> Self; + fn reclock(&self, clock: &StreamCore) -> Self; } -impl Reclock for StreamCore { - fn reclock(&self, clock: &StreamCore) -> StreamCore { +impl Reclock for StreamCore { + fn reclock(&self, clock: &StreamCore) -> StreamCore { let mut stash = vec![]; diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index a7d874be4..f0f0538ed 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -1,7 +1,7 @@ //! Conversion to the `StreamCore` type from iterators. use crate::container::{PushContainer, PushInto}; -use crate::Container; +use crate::{Container, Data}; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::{StreamCore, Scope}; @@ -26,7 +26,7 @@ pub trait ToStream { fn to_stream(self, scope: &mut S) -> StreamCore; } -impl ToStream for I where I::Item: PushInto { +impl ToStream for I where I::Item: PushInto { fn to_stream(self, scope: &mut S) -> StreamCore { source(scope, "ToStream", |capability, info| { diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 61c42daa5..278f639e6 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -2,7 +2,7 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::Container; +use crate::{Container, Data}; use crate::scheduling::{Schedule, ActivateOnDrop}; @@ -73,11 +73,11 @@ pub trait UnorderedInput { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore); + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore); } impl UnorderedInput for G { - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore) { + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore) { let (output, registrar) = Tee::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); @@ -145,11 +145,11 @@ impl Operate for UnorderedOperator { /// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. #[derive(Debug)] -pub struct UnorderedHandle { +pub struct UnorderedHandle { buffer: PushBuffer>>, } -impl UnorderedHandle { +impl UnorderedHandle { fn new(pusher: Counter>) -> UnorderedHandle { UnorderedHandle { buffer: PushBuffer::new(pusher), diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 3d2ebbe29..a9987a5dd 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -8,7 +8,7 @@ use crate::progress::{ChangeBatch, Timestamp}; use crate::progress::operate::SharedProgress; use crate::progress::frontier::{Antichain, MutableAntichain}; -use crate::Container; +use crate::{Container, Data}; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pushers::Tee; use crate::dataflow::channels::pushers::Counter as PushCounter; @@ -92,7 +92,7 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { + pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()]; self.new_output_connection(connection) } @@ -105,7 +105,7 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { + pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { let (tee, stream) = self.builder.new_output_connection(connection.clone()); diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 24ce5b376..e4e59e5dc 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -12,7 +12,7 @@ use crate::dataflow::{Scope, StreamCore}; use super::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OperatorInfo; use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator}; -use crate::Container; +use crate::{Container, Data}; /// Methods to construct generic streaming and blocking operators. pub trait Operator { @@ -58,7 +58,7 @@ pub trait Operator { /// ``` fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore where - C2: Container, + C2: Container+Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut OutputHandleCore>)+'static, @@ -93,7 +93,7 @@ pub trait Operator { /// }); /// } /// ``` - fn unary_notify, &mut OutputHandleCore>, &mut Notificator)+'static, @@ -130,7 +130,7 @@ pub trait Operator { /// ``` fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore where - C2: Container, + C2: Container+Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut OutputHandleCore>)+'static, @@ -190,8 +190,8 @@ pub trait Operator { /// ``` fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - C2: Container, - C3: Container, + C2: Container+Data, + C3: Container+Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut FrontieredInputHandleCore, @@ -244,8 +244,8 @@ pub trait Operator { /// } /// }).unwrap(); /// ``` - fn binary_notify, &mut InputHandleCore, &mut OutputHandleCore>, @@ -290,8 +290,8 @@ pub trait Operator { /// ``` fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - C2: Container, - C3: Container, + C2: Container+Data, + C3: Container+Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut InputHandleCore, @@ -328,11 +328,11 @@ pub trait Operator { P: ParallelizationContract; } -impl Operator for StreamCore { +impl Operator for StreamCore { fn unary_frontier(&self, pact: P, name: &str, constructor: B) -> StreamCore where - C2: Container, + C2: Container+Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut OutputHandleCore>)+'static, @@ -358,7 +358,7 @@ impl Operator for StreamCore { stream } - fn unary_notify, &mut OutputHandleCore>, &mut Notificator)+'static, @@ -382,7 +382,7 @@ impl Operator for StreamCore { fn unary(&self, pact: P, name: &str, constructor: B) -> StreamCore where - C2: Container, + C2: Container+Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut OutputHandleCore>)+'static, @@ -410,8 +410,8 @@ impl Operator for StreamCore { fn binary_frontier(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - C2: Container, - C3: Container, + C2: Container+Data, + C3: Container+Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut FrontieredInputHandleCore, @@ -441,8 +441,8 @@ impl Operator for StreamCore { stream } - fn binary_notify, &mut InputHandleCore, &mut OutputHandleCore>, @@ -470,8 +470,8 @@ impl Operator for StreamCore { fn binary(&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore where - C2: Container, - C3: Container, + C2: Container+Data, + C3: Container+Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut InputHandleCore, @@ -561,7 +561,7 @@ impl Operator for StreamCore { /// ``` pub fn source(scope: &G, name: &str, constructor: B) -> StreamCore where - C: Container, + C: Container+Data, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut OutputHandleCore>)+'static { @@ -603,7 +603,7 @@ where /// /// }); /// ``` -pub fn empty(scope: &G) -> StreamCore { +pub fn empty(scope: &G) -> StreamCore { source(scope, "Empty", |_capability, _info| |_output| { // drop capability, do nothing }) diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs index 4a9b41dfa..a85677dc0 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/to_stream.rs @@ -29,4 +29,4 @@ impl ToStream for I where I::Item: Data { fn to_stream(self, scope: &mut S) -> Stream { ToStreamCore::to_stream(self, scope) } -} \ No newline at end of file +} diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 173e777af..7c7186148 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -19,7 +19,6 @@ use crate::Container; /// /// Internally `Stream` maintains a list of data recipients who should be presented with data /// produced by the source of the stream. -#[derive(Clone)] pub struct StreamCore { /// The progress identifier of the stream's data source. name: Source, @@ -29,6 +28,22 @@ pub struct StreamCore { ports: TeeHelper, } +impl Clone for StreamCore { + fn clone(&self) -> Self { + Self { + name: self.name, + scope: self.scope.clone(), + ports: self.ports.clone(), + } + } + + fn clone_from(&mut self, source: &Self) { + self.name.clone_from(&source.name); + self.scope.clone_from(&source.scope); + self.ports.clone_from(&source.ports); + } +} + /// A stream batching data in vectors. pub type Stream = StreamCore>;