Skip to content

Commit

Permalink
Remove EnterAt, migrate enterleave.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Mar 21, 2024
1 parent e9ad539 commit 7b4a1ba
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,14 @@ use crate::logging::{TimelyLogger, MessagesEvent};
use crate::progress::Timestamp;
use crate::progress::timestamp::Refines;
use crate::progress::{Source, Target};
use crate::order::Product;
use crate::{Container, Data};
use crate::communication::Push;
use crate::dataflow::channels::pushers::{Counter, Tee};
use crate::dataflow::channels::{Bundle, Message};

use crate::worker::AsWorker;
use crate::dataflow::{StreamCore, Scope, Stream};
use crate::dataflow::scopes::{Child, ScopeParent};
use crate::dataflow::operators::delay::Delay;
use crate::dataflow::{StreamCore, Scope};
use crate::dataflow::scopes::Child;

/// Extension trait to move a `Stream` into a child of its current `Scope`.
pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> {
Expand All @@ -55,34 +53,6 @@ pub trait Enter<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Container> {
fn enter<'a>(&self, _: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C>;
}

use crate::dataflow::scopes::child::Iterative;

/// Extension trait to move a `Stream` into a child of its current `Scope` setting the timestamp for each element.
pub trait EnterAt<G: Scope, T: Timestamp, D: Data> {
/// Moves the `Stream` argument into a child of its current `Scope` setting the timestamp for each element by `initial`.
///
/// # Examples
/// ```
/// use timely::dataflow::scopes::Scope;
/// use timely::dataflow::operators::{EnterAt, Leave, ToStream};
///
/// timely::example(|outer| {
/// let stream = (0..9u64).to_stream(outer);
/// let output = outer.iterative(|inner| {
/// stream.enter_at(inner, |x| *x).leave()
/// });
/// });
/// ```
fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, initial: F) -> Stream<Iterative<'a, G, T>, D> ;
}

impl<G: Scope, T: Timestamp, D: Data, E: Enter<G, Product<<G as ScopeParent>::Timestamp, T>, Vec<D>>> EnterAt<G, T, D> for E {
fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, mut initial: F) ->
Stream<Iterative<'a, G, T>, D> {
self.enter(scope).delay(move |datum, time| Product::new(time.clone().to_outer(), initial(datum)))
}
}

impl<G: Scope, T: Timestamp+Refines<G::Timestamp>, C: Data+Container> Enter<G, T, C> for StreamCore<G, C> {
fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C> {

Expand Down
2 changes: 2 additions & 0 deletions timely/src/dataflow/operators/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! are independent of specific container types.

pub mod concat;
pub mod enterleave;
pub mod exchange;
pub mod feedback;
pub mod inspect;
Expand All @@ -10,6 +11,7 @@ pub mod rc;
pub mod reclock;

pub use concat::{Concat, Concatenate};
pub use enterleave::{Enter, Leave};
pub use exchange::Exchange;
pub use feedback::{Feedback, LoopVariable, ConnectLoop};
pub use inspect::{Inspect, InspectCore};
Expand Down
3 changes: 1 addition & 2 deletions timely/src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
//! operators whose behavior can be supplied using closures accepting input and output handles.
//! Most of the operators in this module are defined using these two general operators.

pub use self::enterleave::{Enter, EnterAt, Leave};
pub use self::input::Input;
pub use self::unordered_input::{UnorderedInput, UnorderedInputCore};
pub use self::partition::Partition;
Expand All @@ -32,7 +31,7 @@ pub use self::count::Accumulate;

pub mod core;

pub mod enterleave;
pub use self::core::enterleave::{self, Enter, Leave};
pub mod input;
pub mod flow_controlled;
pub mod unordered_input;
Expand Down

0 comments on commit 7b4a1ba

Please sign in to comment.