Skip to content

Commit

Permalink
Operator movement (into core) (TimelyDataflow#553)
Browse files Browse the repository at this point in the history
* Update Feedback

* Update Concat

* Update Rc

* Update Probe
  • Loading branch information
frankmcsherry authored Mar 21, 2024
1 parent 6d3f13e commit e9ad539
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 60 deletions.
2 changes: 1 addition & 1 deletion timely/examples/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() {
timely::execute_from_args(std::env::args().skip(2), move |worker| {

worker.dataflow(move |scope| {
let (handle, stream) = scope.feedback::<usize>(1);
let (handle, stream) = scope.feedback::<Vec<usize>>(1);
stream.unary_notify(
Pipeline,
"Barrier",
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
//! Create cycles in a timely dataflow graph.

use crate::{Container, Data};
use crate::Container;

use crate::progress::{Timestamp, PathSummary};
use crate::progress::frontier::Antichain;
use crate::order::Product;

use crate::dataflow::channels::pushers::Tee;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{StreamCore, Scope, Stream};
use crate::dataflow::{StreamCore, Scope};
use crate::dataflow::scopes::child::Iterative;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::operators::generic::OutputWrapper;

/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
pub trait Feedback<G: Scope> {
/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.

/// Creates a [StreamCore] and a [Handle] to later bind the source of that `StreamCore`.
///
/// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
/// its `Handle` passed as an argument. Data passed through the stream will have their
/// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
/// its `Handle` passed as an argument. Containers passed through the stream will have their
/// timestamps advanced by `summary`.
///
/// # Examples
Expand All @@ -36,38 +37,15 @@ pub trait Feedback<G: Scope> {
/// .connect_loop(handle);
/// });
/// ```
fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>);

/// Creates a [StreamCore] and a [HandleCore] to later bind the source of that `Stream`.
///
/// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
/// its `Handle` passed as an argument. Data passed through the stream will have their
/// timestamps advanced by `summary`, and will be dropped if the result exceeds `limit`.
///
/// # Examples
/// ```
/// use timely::dataflow::Scope;
/// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen};
///
/// timely::example(|scope| {
/// // circulate 0..10 for 100 iterations.
/// let (handle, cycle) = scope.feedback_core::<Vec<_>>(1);
/// (0..10).to_stream(scope)
/// .concat(&cycle)
/// .inspect(|x| println!("seen: {:?}", x))
/// .branch_when(|t| t < &100).1
/// .connect_loop(handle);
/// });
/// ```
fn feedback_core<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (HandleCore<G, C>, StreamCore<G, C>);
fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>);
}

/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`.
/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`.
///
/// The resulting `Stream` will have its data defined by a future call to `connect_loop` with
/// its `Handle` passed as an argument. Data passed through the stream will have their
/// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with
/// its `Handle` passed as an argument. Containers passed through the stream will have their
/// timestamps advanced by `summary`.
///
/// # Examples
Expand All @@ -87,26 +65,23 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> {
/// });
/// });
/// ```
fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (HandleCore<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>);
fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>);
}

impl<G: Scope> Feedback<G> for G {
fn feedback<D: Data>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, D>, Stream<G, D>) {
self.feedback_core(summary)
}

fn feedback_core<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (HandleCore<G, C>, StreamCore<G, C>) {
fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, StreamCore<G, C>) {

let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
let (output, stream) = builder.new_output();

(HandleCore { builder, summary, output }, stream)
(Handle { builder, summary, output }, stream)
}
}

impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> {
fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (HandleCore<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>) {
self.feedback_core(Product::new(Default::default(), summary))
fn loop_variable<C: Container>(&mut self, summary: T::Summary) -> (Handle<Iterative<'a, G, T>, C>, StreamCore<Iterative<'a, G, T>, C>) {
self.feedback(Product::new(Default::default(), summary))
}
}

Expand All @@ -129,15 +104,15 @@ pub trait ConnectLoop<G: Scope, C: Container> {
/// .connect_loop(handle);
/// });
/// ```
fn connect_loop(&self, _: HandleCore<G, C>);
fn connect_loop(&self, handle: Handle<G, C>);
}

impl<G: Scope, C: Container> ConnectLoop<G, C> for StreamCore<G, C> {
fn connect_loop(&self, helper: HandleCore<G, C>) {
fn connect_loop(&self, handle: Handle<G, C>) {

let mut builder = helper.builder;
let summary = helper.summary;
let mut output = helper.output;
let mut builder = handle.builder;
let summary = handle.summary;
let mut output = handle.output;

let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]);

Expand All @@ -159,11 +134,8 @@ impl<G: Scope, C: Container> ConnectLoop<G, C> for StreamCore<G, C> {

/// A handle used to bind the source of a loop variable.
#[derive(Debug)]
pub struct HandleCore<G: Scope, C: Container> {
pub struct Handle<G: Scope, C: Container> {
builder: OperatorBuilder<G>,
summary: <G::Timestamp as Timestamp>::Summary,
output: OutputWrapper<G::Timestamp, C, Tee<G::Timestamp, C>>,
}

/// A `HandleCore` specialized for using `Vec` as container
pub type Handle<G, D> = HandleCore<G, Vec<D>>;
7 changes: 7 additions & 0 deletions timely/src/dataflow/operators/core/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
//! Extension traits for `Stream` implementing various operators that
//! are independent of specific container types.

pub mod concat;
pub mod exchange;
pub mod feedback;
pub mod inspect;
pub mod probe;
pub mod rc;
pub mod reclock;

pub use concat::{Concat, Concatenate};
pub use exchange::Exchange;
pub use feedback::{Feedback, LoopVariable, ConnectLoop};
pub use inspect::{Inspect, InspectCore};
pub use probe::Probe;
pub use reclock::Reclock;
File renamed without changes.
File renamed without changes.
11 changes: 4 additions & 7 deletions timely/src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
pub use self::enterleave::{Enter, EnterAt, Leave};
pub use self::input::Input;
pub use self::unordered_input::{UnorderedInput, UnorderedInputCore};
pub use self::feedback::{Feedback, LoopVariable, ConnectLoop};
pub use self::concat::{Concat, Concatenate};
pub use self::partition::Partition;
pub use self::map::Map;
pub use self::inspect::{Inspect, InspectCore};
pub use self::filter::Filter;
pub use self::delay::Delay;
pub use self::exchange::Exchange;
pub use self::broadcast::Broadcast;
pub use self::probe::Probe;
pub use self::to_stream::{ToStream, ToStreamCore};
pub use self::capture::Capture;
pub use self::branch::{Branch, BranchWhen};
Expand All @@ -39,21 +36,21 @@ pub mod enterleave;
pub mod input;
pub mod flow_controlled;
pub mod unordered_input;
pub mod feedback;
pub mod concat;
pub use self::core::feedback::{self, Feedback, LoopVariable, ConnectLoop};
pub use self::core::concat::{self, Concat, Concatenate};
pub mod partition;
pub mod map;
pub use self::core::inspect;
pub mod filter;
pub mod delay;
pub use self::core::exchange;
pub mod broadcast;
pub mod probe;
pub use self::core::probe::{self, Probe};
pub mod to_stream;
pub mod capture;
pub mod branch;
pub mod ok_err;
pub mod rc;
pub use self::core::rc;
pub mod result;

pub mod aggregation;
Expand Down
2 changes: 1 addition & 1 deletion timely/tests/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn barrier_sync_helper(comm_config: ::timely::CommunicationConfig) {
};
timely::execute(config, move |worker| {
worker.dataflow(move |scope| {
let (handle, stream) = scope.feedback::<u64>(1);
let (handle, stream) = scope.feedback::<Vec<usize>>(1);
stream.unary_notify(
Pipeline,
"Barrier",
Expand Down

0 comments on commit e9ad539

Please sign in to comment.