diff --git a/timely/examples/barrier.rs b/timely/examples/barrier.rs index ff7f38283..f1d4c2a48 100644 --- a/timely/examples/barrier.rs +++ b/timely/examples/barrier.rs @@ -11,7 +11,7 @@ fn main() { timely::execute_from_args(std::env::args().skip(2), move |worker| { worker.dataflow(move |scope| { - let (handle, stream) = scope.feedback::(1); + let (handle, stream) = scope.feedback::>(1); stream.unary_notify( Pipeline, "Barrier", diff --git a/timely/src/dataflow/operators/concat.rs b/timely/src/dataflow/operators/core/concat.rs similarity index 100% rename from timely/src/dataflow/operators/concat.rs rename to timely/src/dataflow/operators/core/concat.rs diff --git a/timely/src/dataflow/operators/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs similarity index 57% rename from timely/src/dataflow/operators/feedback.rs rename to timely/src/dataflow/operators/core/feedback.rs index 80e623df1..0481bbb3c 100644 --- a/timely/src/dataflow/operators/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -1,6 +1,6 @@ //! Create cycles in a timely dataflow graph. -use crate::{Container, Data}; +use crate::Container; use crate::progress::{Timestamp, PathSummary}; use crate::progress::frontier::Antichain; @@ -8,17 +8,18 @@ use crate::order::Product; use crate::dataflow::channels::pushers::Tee; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{StreamCore, Scope, Stream}; +use crate::dataflow::{StreamCore, Scope}; use crate::dataflow::scopes::child::Iterative; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OutputWrapper; -/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. +/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`. pub trait Feedback { - /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. + + /// Creates a [StreamCore] and a [Handle] to later bind the source of that `StreamCore`. /// - /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with - /// its `Handle` passed as an argument. Data passed through the stream will have their + /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with + /// its `Handle` passed as an argument. Containers passed through the stream will have their /// timestamps advanced by `summary`. /// /// # Examples @@ -36,38 +37,15 @@ pub trait Feedback { /// .connect_loop(handle); /// }); /// ``` - fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream); - - /// Creates a [StreamCore] and a [HandleCore] to later bind the source of that `Stream`. - /// - /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with - /// its `Handle` passed as an argument. Data passed through the stream will have their - /// timestamps advanced by `summary`, and will be dropped if the result exceeds `limit`. - /// - /// # Examples - /// ``` - /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; - /// - /// timely::example(|scope| { - /// // circulate 0..10 for 100 iterations. - /// let (handle, cycle) = scope.feedback_core::>(1); - /// (0..10).to_stream(scope) - /// .concat(&cycle) - /// .inspect(|x| println!("seen: {:?}", x)) - /// .branch_when(|t| t < &100).1 - /// .connect_loop(handle); - /// }); - /// ``` - fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore); + fn feedback(&mut self, summary: ::Summary) -> (Handle, StreamCore); } -/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. +/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`. pub trait LoopVariable<'a, G: Scope, T: Timestamp> { - /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. + /// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`. /// - /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with - /// its `Handle` passed as an argument. Data passed through the stream will have their + /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with + /// its `Handle` passed as an argument. Containers passed through the stream will have their /// timestamps advanced by `summary`. /// /// # Examples @@ -87,26 +65,23 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> { /// }); /// }); /// ``` - fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, C>, StreamCore, C>); + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>); } impl Feedback for G { - fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream) { - self.feedback_core(summary) - } - fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore) { + fn feedback(&mut self, summary: ::Summary) -> (Handle, StreamCore) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); let (output, stream) = builder.new_output(); - (HandleCore { builder, summary, output }, stream) + (Handle { builder, summary, output }, stream) } } impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> { - fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, C>, StreamCore, C>) { - self.feedback_core(Product::new(Default::default(), summary)) + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>) { + self.feedback(Product::new(Default::default(), summary)) } } @@ -129,15 +104,15 @@ pub trait ConnectLoop { /// .connect_loop(handle); /// }); /// ``` - fn connect_loop(&self, _: HandleCore); + fn connect_loop(&self, handle: Handle); } impl ConnectLoop for StreamCore { - fn connect_loop(&self, helper: HandleCore) { + fn connect_loop(&self, handle: Handle) { - let mut builder = helper.builder; - let summary = helper.summary; - let mut output = helper.output; + let mut builder = handle.builder; + let summary = handle.summary; + let mut output = handle.output; let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]); @@ -159,11 +134,8 @@ impl ConnectLoop for StreamCore { /// A handle used to bind the source of a loop variable. #[derive(Debug)] -pub struct HandleCore { +pub struct Handle { builder: OperatorBuilder, summary: ::Summary, output: OutputWrapper>, } - -/// A `HandleCore` specialized for using `Vec` as container -pub type Handle = HandleCore>; diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs index c70c5d945..758eb5231 100644 --- a/timely/src/dataflow/operators/core/mod.rs +++ b/timely/src/dataflow/operators/core/mod.rs @@ -1,10 +1,17 @@ //! Extension traits for `Stream` implementing various operators that //! are independent of specific container types. +pub mod concat; pub mod exchange; +pub mod feedback; pub mod inspect; +pub mod probe; +pub mod rc; pub mod reclock; +pub use concat::{Concat, Concatenate}; pub use exchange::Exchange; +pub use feedback::{Feedback, LoopVariable, ConnectLoop}; pub use inspect::{Inspect, InspectCore}; +pub use probe::Probe; pub use reclock::Reclock; diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/core/probe.rs similarity index 100% rename from timely/src/dataflow/operators/probe.rs rename to timely/src/dataflow/operators/core/probe.rs diff --git a/timely/src/dataflow/operators/rc.rs b/timely/src/dataflow/operators/core/rc.rs similarity index 100% rename from timely/src/dataflow/operators/rc.rs rename to timely/src/dataflow/operators/core/rc.rs diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 5650f6e97..29f62299b 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -11,8 +11,6 @@ pub use self::enterleave::{Enter, EnterAt, Leave}; pub use self::input::Input; pub use self::unordered_input::{UnorderedInput, UnorderedInputCore}; -pub use self::feedback::{Feedback, LoopVariable, ConnectLoop}; -pub use self::concat::{Concat, Concatenate}; pub use self::partition::Partition; pub use self::map::Map; pub use self::inspect::{Inspect, InspectCore}; @@ -20,7 +18,6 @@ pub use self::filter::Filter; pub use self::delay::Delay; pub use self::exchange::Exchange; pub use self::broadcast::Broadcast; -pub use self::probe::Probe; pub use self::to_stream::{ToStream, ToStreamCore}; pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; @@ -39,8 +36,8 @@ pub mod enterleave; pub mod input; pub mod flow_controlled; pub mod unordered_input; -pub mod feedback; -pub mod concat; +pub use self::core::feedback::{self, Feedback, LoopVariable, ConnectLoop}; +pub use self::core::concat::{self, Concat, Concatenate}; pub mod partition; pub mod map; pub use self::core::inspect; @@ -48,12 +45,12 @@ pub mod filter; pub mod delay; pub use self::core::exchange; pub mod broadcast; -pub mod probe; +pub use self::core::probe::{self, Probe}; pub mod to_stream; pub mod capture; pub mod branch; pub mod ok_err; -pub mod rc; +pub use self::core::rc; pub mod result; pub mod aggregation; diff --git a/timely/tests/barrier.rs b/timely/tests/barrier.rs index 9e627762b..200a37a78 100644 --- a/timely/tests/barrier.rs +++ b/timely/tests/barrier.rs @@ -17,7 +17,7 @@ fn barrier_sync_helper(comm_config: ::timely::CommunicationConfig) { }; timely::execute(config, move |worker| { worker.dataflow(move |scope| { - let (handle, stream) = scope.feedback::(1); + let (handle, stream) = scope.feedback::>(1); stream.unary_notify( Pipeline, "Barrier",