Skip to content

Commit

Permalink
Update ToStream
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Mar 22, 2024
1 parent 386cb45 commit ca85b95
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 111 deletions.
2 changes: 2 additions & 0 deletions timely/src/dataflow/operators/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod ok_err;
pub mod probe;
pub mod rc;
pub mod reclock;
pub mod to_stream;
pub mod unordered_input;

pub use concat::{Concat, Concatenate};
Expand All @@ -25,5 +26,6 @@ pub use inspect::{Inspect, InspectCore};
pub use map::Map;
pub use ok_err::OkErr;
pub use probe::Probe;
pub use to_stream::ToStream;
pub use reclock::Reclock;
pub use unordered_input::{UnorderedInput, UnorderedHandle};
58 changes: 58 additions & 0 deletions timely/src/dataflow/operators/core/to_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//! Conversion to the `StreamCore` type from iterators.

use crate::container::{PushContainer, PushInto};
use crate::Container;
use crate::dataflow::operators::generic::operator::source;
use crate::dataflow::{StreamCore, Scope};

/// Converts to a timely [StreamCore].
pub trait ToStream<C: Container> {
/// Converts to a timely [StreamCore].
///
/// # Examples
///
/// ```
/// use timely::dataflow::operators::core::ToStream;
/// use timely::dataflow::operators::Capture;
/// use timely::dataflow::operators::capture::Extract;
///
/// let (data1, data2) = timely::example(|scope| {
/// let data1 = (0..3).to_stream(scope).capture();
/// let data2 = vec![0,1,2].to_stream(scope).capture();
/// (data1, data2)
/// });
///
/// assert_eq!(data1.extract(), data2.extract());
/// ```
fn to_stream<S: Scope>(self, scope: &mut S) -> StreamCore<S, C>;
}

impl<C: PushContainer, I: IntoIterator+'static> ToStream<C> for I where I::Item: PushInto<C> {
fn to_stream<S: Scope>(self, scope: &mut S) -> StreamCore<S, C> {

source(scope, "ToStream", |capability, info| {

// Acquire an activator, so that the operator can rescheduled itself.
let activator = scope.activator_for(&info.address[..]);

let mut iterator = self.into_iter().fuse();
let mut capability = Some(capability);

move |output| {

if let Some(element) = iterator.next() {
let mut session = output.session(capability.as_ref().unwrap());
session.give(element);
let n = 256 * crate::container::buffer::default_capacity::<I::Item>();
for element in iterator.by_ref().take(n - 1) {
session.give(element);
}
activator.activate();
}
else {
capability = None;
}
}
})
}
}
3 changes: 1 addition & 2 deletions timely/src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ pub use self::filter::Filter;
pub use self::delay::Delay;
pub use self::exchange::Exchange;
pub use self::broadcast::Broadcast;
pub use self::to_stream::{ToStream, ToStreamCore};
pub use self::capture::Capture;
pub use self::branch::{Branch, BranchWhen};
pub use self::result::ResultStream;
Expand All @@ -44,7 +43,7 @@ pub mod delay;
pub use self::core::exchange;
pub mod broadcast;
pub use self::core::probe::{self, Probe};
pub mod to_stream;
pub use self::core::to_stream::ToStream;
pub mod capture;
pub mod branch;
pub use self::core::ok_err::{self, OkErr};
Expand Down
109 changes: 0 additions & 109 deletions timely/src/dataflow/operators/to_stream.rs

This file was deleted.

0 comments on commit ca85b95

Please sign in to comment.