diff --git a/container/Cargo.toml b/container/Cargo.toml index 6b3e2d65d..06abc9123 100644 --- a/container/Cargo.toml +++ b/container/Cargo.toml @@ -7,4 +7,5 @@ license = "MIT" [dependencies] columnation = { git = "https://github.com/frankmcsherry/columnation" } +flatcontainer = "0.1" serde = { version = "1.0"} diff --git a/container/src/flatcontainer.rs b/container/src/flatcontainer.rs new file mode 100644 index 000000000..026eef5b9 --- /dev/null +++ b/container/src/flatcontainer.rs @@ -0,0 +1,50 @@ +//! Present a [`FlatStack`] as a timely container. + +pub use flatcontainer::*; +use crate::{buffer, Container, PushContainer, PushInto}; + +impl Container for FlatStack { + type ItemRef<'a> = R::ReadItem<'a> where Self: 'a; + type Item<'a> = R::ReadItem<'a> where Self: 'a; + + fn len(&self) -> usize { + self.len() + } + + fn clear(&mut self) { + self.clear() + } + + type Iter<'a> = <&'a Self as IntoIterator>::IntoIter; + + fn iter<'a>(&'a self) -> Self::Iter<'a> { + IntoIterator::into_iter(self) + } + + type DrainIter<'a> = Self::Iter<'a>; + + fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { + IntoIterator::into_iter(&*self) + } +} + +impl PushContainer for FlatStack { + fn capacity(&self) -> usize { + self.capacity() + } + + fn preferred_capacity() -> usize { + buffer::default_capacity::() + } + + fn reserve(&mut self, additional: usize) { + self.reserve(additional); + } +} + +impl> PushInto> for T { + #[inline] + fn push_into(self, target: &mut FlatStack) { + target.copy(self); + } +} diff --git a/container/src/lib.rs b/container/src/lib.rs index 6a56295f9..61a4b32b9 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -3,6 +3,7 @@ #![forbid(missing_docs)] pub mod columnation; +pub mod flatcontainer; /// A container transferring data through dataflow edges /// diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index 00ab0042d..970aa7d2e 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -17,7 +17,7 @@ fn main() { timely::example(|scope| { (0u64..10) .to_stream(scope) - .unary(Pipeline, "increment", |capability, info| { + .unary::, _, _, _>(Pipeline, "increment", |capability, info| { let mut vector = Vec::new(); move |input, output| { @@ -75,7 +75,7 @@ use timely::dataflow::operators::generic::operator::source; fn main() { timely::example(|scope| { - source(scope, "Source", |capability, info| { + source::<_, Vec<_>, _, _>(scope, "Source", |capability, info| { // Acquire a re-activator for this operator. use timely::scheduling::Scheduler; @@ -131,7 +131,7 @@ fn main() { timely::example(|scope| { (0u64..10) .to_stream(scope) - .unary(Pipeline, "increment", |capability, info| { + .unary::, _, _, _>(Pipeline, "increment", |capability, info| { let mut maximum = 0; // define this here; use in the closure let mut vector = Vec::new(); diff --git a/mdbook/src/chapter_2/chapter_2_5.md b/mdbook/src/chapter_2/chapter_2_5.md index 5b4c75b27..09d73cb9b 100644 --- a/mdbook/src/chapter_2/chapter_2_5.md +++ b/mdbook/src/chapter_2/chapter_2_5.md @@ -196,7 +196,7 @@ As before, I'm just going to show you the new code, which now lives just after ` # .map(move |word| (word.to_owned(), diff)) # .collect::>() # ) - .unary_frontier( + .unary_frontier::, _, _, _>( Exchange::new(|x: &(String, i64)| (x.0).len() as u64), "WordCount", |_capability, operator_info| { diff --git a/timely/examples/distinct.rs b/timely/examples/distinct.rs index 28971384a..58d9020dd 100644 --- a/timely/examples/distinct.rs +++ b/timely/examples/distinct.rs @@ -18,7 +18,7 @@ fn main() { worker.dataflow::(|scope| { let mut counts_by_time = HashMap::new(); scope.input_from(&mut input) - .unary(Exchange::new(|x| *x), "Distinct", move |_, _| + .unary::, _, _, _>(Exchange::new(|x| *x), "Distinct", move |_, _| move |input, output| { input.for_each(|time, data| { let counts = diff --git a/timely/examples/hashjoin.rs b/timely/examples/hashjoin.rs index ad8ef8809..3b75144ac 100644 --- a/timely/examples/hashjoin.rs +++ b/timely/examples/hashjoin.rs @@ -35,7 +35,7 @@ fn main() { let exchange2 = Exchange::new(|x: &(u64, u64)| x.0); stream1 - .binary(&stream2, exchange1, exchange2, "HashJoin", |_capability, _info| { + .binary::<_, Vec<_>, _, _, _, _>(&stream2, exchange1, exchange2, "HashJoin", |_capability, _info| { let mut map1 = HashMap::>::new(); let mut map2 = HashMap::>::new(); diff --git a/timely/examples/wordcount.rs b/timely/examples/wordcount.rs index a92ee3b6d..fe98a2e0c 100644 --- a/timely/examples/wordcount.rs +++ b/timely/examples/wordcount.rs @@ -24,7 +24,7 @@ fn main() { .map(move |word| (word.to_owned(), diff)) .collect::>() ) - .unary_frontier(exchange, "WordCount", |_capability, _info| { + .unary_frontier::, _, _, _>(exchange, "WordCount", |_capability, _info| { let mut queues = HashMap::new(); let mut counts = HashMap::new(); diff --git a/timely/examples/wordcount_flatcontainer.rs b/timely/examples/wordcount_flatcontainer.rs new file mode 100644 index 000000000..5403a980e --- /dev/null +++ b/timely/examples/wordcount_flatcontainer.rs @@ -0,0 +1,93 @@ +//! Wordcount based on flatcontainer. + +#[cfg(feature = "bincode")] +use { + std::collections::HashMap, + timely::container::flatcontainer::{Containerized, FlatStack}, + timely::dataflow::channels::pact::{ExchangeCore, Pipeline}, + timely::dataflow::operators::core::InputHandle, + timely::dataflow::operators::{Inspect, Operator, Probe}, + timely::dataflow::ProbeHandle, +}; + +#[cfg(feature = "bincode")] +fn main() { + // initializes and runs a timely dataflow. + timely::execute_from_args(std::env::args(), |worker| { + let mut input = + ::Region>>>::new(); + let mut probe = ProbeHandle::new(); + + // create a new input, exchange data, and inspect its output + worker.dataflow::(|scope| { + input + .to_stream(scope) + .unary::::Region>, _, _, _>( + Pipeline, + "Split", + |_cap, _info| { + move |input, output| { + while let Some((time, data)) = input.next() { + let mut session = output.session(&time); + for (text, diff) in data.iter().flat_map(|(text, diff)| { + text.split_whitespace().map(move |s| (s, diff)) + }) { + session.give((text, diff)); + } + } + } + }, + ) + .unary_frontier::::Region>, _, _, _>( + ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64), + "WordCount", + |_capability, _info| { + let mut queues = HashMap::new(); + let mut counts = HashMap::new(); + + move |input, output| { + while let Some((time, data)) = input.next() { + queues + .entry(time.retain()) + .or_insert(Vec::new()) + .push(data.take()); + } + + for (key, val) in queues.iter_mut() { + if !input.frontier().less_equal(key.time()) { + let mut session = output.session(key); + for batch in val.drain(..) { + for (word, diff) in batch.iter() { + let entry = + counts.entry(word.to_string()).or_insert(0i64); + *entry += diff; + session.give((word, *entry)); + } + } + } + } + + queues.retain(|_key, val| !val.is_empty()); + } + }, + ) + .inspect(|x| println!("seen: {:?}", x)) + .probe_with(&mut probe); + }); + + // introduce data and watch! + for round in 0..10 { + input.send(("flat container", 1)); + input.advance_to(round + 1); + while probe.less_than(input.time()) { + worker.step(); + } + } + }) + .unwrap(); +} + +#[cfg(not(feature = "bincode"))] +fn main() { + eprintln!("Example requires feature bincode."); +} diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 84303a039..5bced98cd 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -11,10 +11,7 @@ pub mod pullers; pub mod pact; /// The input to and output from timely dataflow communication channels. -pub type BundleCore = crate::communication::Message>; - -/// The input to and output from timely dataflow communication channels specialized to vectors. -pub type Bundle = BundleCore>; +pub type Bundle = crate::communication::Message>; /// A serializable representation of timestamped data. #[derive(Clone, Abomonation, Serialize, Deserialize)] @@ -46,11 +43,11 @@ impl Message { /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher /// leaves in place, or the container's default element. #[inline] - pub fn push_at>>(buffer: &mut D, time: T, pusher: &mut P) { + pub fn push_at>>(buffer: &mut D, time: T, pusher: &mut P) { let data = ::std::mem::take(buffer); let message = Message::new(time, data, 0, 0); - let mut bundle = Some(BundleCore::from_typed(message)); + let mut bundle = Some(Bundle::from_typed(message)); pusher.push(&mut bundle); diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 19648b285..572002934 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -14,7 +14,7 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::communication::{Push, Pull, Data}; use crate::container::PushPartitioned; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::{Bundle, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; use crate::progress::Timestamp; use crate::worker::AsWorker; @@ -22,9 +22,9 @@ use crate::worker::AsWorker; /// A `ParallelizationContractCore` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContractCore { /// Type implementing `Push` produced by this pact. - type Pusher: Push>+'static; + type Pusher: Push>+'static; /// Type implementing `Pull` produced by this pact. - type Puller: Pull>+'static; + type Puller: Pull>+'static; /// Allocates a matched pair of push and pull endpoints implementing the pact. fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller); } @@ -39,8 +39,8 @@ impl>> ParallelizationCont pub struct Pipeline; impl ParallelizationContractCore for Pipeline { - type Pusher = LogPusher>>; - type Puller = LogPuller>>; + type Pusher = LogPusher>>; + type Puller = LogPuller>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (pusher, puller) = allocator.pipeline::>(identifier, address); // // ignore `&mut A` and use thread allocator @@ -76,8 +76,8 @@ where C: Data + PushPartitioned, for<'a> H: FnMut(&C::Item<'a>) -> u64 { - type Pusher = ExchangePusher>>>, H>; - type Puller = LogPuller>>>; + type Pusher = ExchangePusher>>>, H>; + type Puller = LogPuller>>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); @@ -94,7 +94,7 @@ impl Debug for ExchangeCore { /// Wraps a `Message` pusher to provide a `Push<(T, Content)>`. #[derive(Debug)] -pub struct LogPusher>> { +pub struct LogPusher>> { pusher: P, channel: usize, counter: usize, @@ -104,7 +104,7 @@ pub struct LogPusher>> { logging: Option, } -impl>> LogPusher { +impl>> LogPusher { /// Allocates a new pusher. pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option) -> Self { LogPusher { @@ -119,9 +119,9 @@ impl>> LogPusher { } } -impl>> Push> for LogPusher { +impl>> Push> for LogPusher { #[inline] - fn push(&mut self, pair: &mut Option>) { + fn push(&mut self, pair: &mut Option>) { if let Some(bundle) = pair { self.counter += 1; @@ -150,7 +150,7 @@ impl>> Push> for LogP /// Wraps a `Message` puller to provide a `Pull<(T, Content)>`. #[derive(Debug)] -pub struct LogPuller>> { +pub struct LogPuller>> { puller: P, channel: usize, index: usize, @@ -158,7 +158,7 @@ pub struct LogPuller>> { logging: Option, } -impl>> LogPuller { +impl>> LogPuller { /// Allocates a new `Puller`. pub fn new(puller: P, index: usize, channel: usize, logging: Option) -> Self { LogPuller { @@ -171,9 +171,9 @@ impl>> LogPuller { } } -impl>> Pull> for LogPuller { +impl>> Pull> for LogPuller { #[inline] - fn pull(&mut self) -> &mut Option> { + fn pull(&mut self) -> &mut Option> { let result = self.puller.pull(); if let Some(bundle) = result { let channel = self.channel; diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 8f9bbbf08..848b9a8f5 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -3,13 +3,13 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::Bundle; use crate::progress::ChangeBatch; use crate::communication::Pull; use crate::Container; /// A wrapper which accounts records pulled past in a shared count map. -pub struct Counter>> { +pub struct Counter>> { pullable: P, consumed: Rc>>, phantom: ::std::marker::PhantomData, @@ -36,15 +36,15 @@ impl Drop for ConsumedGuard { } } -impl>> Counter { +impl>> Counter { /// Retrieves the next timestamp and batch of data. #[inline] - pub fn next(&mut self) -> Option<&mut BundleCore> { + pub fn next(&mut self) -> Option<&mut Bundle> { self.next_guarded().map(|(_guard, bundle)| bundle) } #[inline] - pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut BundleCore)> { + pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut Bundle)> { if let Some(message) = self.pullable.pull() { let guard = ConsumedGuard { consumed: Rc::clone(&self.consumed), @@ -57,7 +57,7 @@ impl>> Counter>> Counter { +impl>> Counter { /// Allocates a new `Counter` from a boxed puller. pub fn new(pullable: P) -> Self { Counter { diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index e241587df..637c731c2 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -3,7 +3,7 @@ use crate::communication::Push; use crate::container::{PushContainer, PushInto}; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::{Bundle, Message}; use crate::dataflow::operators::Capability; use crate::progress::Timestamp; use crate::{Container, Data}; @@ -13,18 +13,14 @@ use crate::{Container, Data}; /// The `Buffer` type should be used by calling `session` with a time, which checks whether /// data must be flushed and creates a `Session` object which allows sending at the given time. #[derive(Debug)] -pub struct BufferCore>> { +pub struct Buffer>> { /// the currently open time, if it is open time: Option, /// a buffer for records, to send at self.time - buffer: D, + buffer: C, pusher: P, } - -/// A buffer specialized to vector-based containers. -pub type Buffer = BufferCore, P>; - -impl>> BufferCore where T: Eq+Clone { +impl>> Buffer where T: Eq+Clone { /// Creates a new `Buffer`. pub fn new(pusher: P) -> Self { @@ -82,7 +78,7 @@ impl>> BufferCore where T: Eq } } -impl>> BufferCore where T: Eq+Clone { +impl>> Buffer where T: Eq+Clone { // internal method for use by `Session`. #[inline] fn give>(&mut self, data: D) { @@ -97,7 +93,7 @@ impl>> BufferCore where T } } -impl>>> Buffer where T: Eq+Clone { +impl>>> Buffer, P> where T: Eq+Clone { // Gives an entire message at a specific time. fn give_vec(&mut self, vector: &mut Vec) { // flush to ensure fifo-ness @@ -114,18 +110,18 @@ impl>>> Buffer where T: Eq+Clo /// The `Session` struct provides the user-facing interface to an operator output, namely /// the `Buffer` type. A `Session` wraps a session of output at a specified time, and /// avoids what would otherwise be a constant cost of checking timestamp equality. -pub struct Session<'a, T, C: Container, P: Push>+'a> where T: Eq+Clone+'a, C: 'a { - buffer: &'a mut BufferCore, +pub struct Session<'a, T, C: Container, P: Push>+'a> where T: Eq+Clone+'a, C: 'a { + buffer: &'a mut Buffer, } -impl<'a, T, C: Container, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { +impl<'a, T, C: Container, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { /// Provide a container at the time specified by the [Session]. pub fn give_container(&mut self, container: &mut C) { self.buffer.give_container(container) } } -impl<'a, T, C, P: Push>+'a> Session<'a, T, C, P> +impl<'a, T, C, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a + PushContainer, @@ -144,7 +140,7 @@ where } } -impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { +impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { /// Provides a fully formed `Content` message for senders which can use this type. /// /// The `Content` type is the backing memory for communication in timely, and it can @@ -159,10 +155,10 @@ impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P } /// A session which will flush itself when dropped. -pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push>+'a> where +pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push>+'a> where T: Eq+Clone+'a, C: 'a { /// A reference to the underlying buffer. - buffer: &'a mut BufferCore, + buffer: &'a mut Buffer, /// The capability being used to send the data. _capability: Capability, } @@ -170,7 +166,7 @@ pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push = AutoflushSessionCore<'a, T, Vec, P>; -impl<'a, T: Timestamp, D: Data, P: Push>>+'a> AutoflushSessionCore<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { +impl<'a, T: Timestamp, D: Data, P: Push>>+'a> AutoflushSessionCore<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { /// Transmits a single record. #[inline] pub fn give(&mut self, data: D) { @@ -192,7 +188,7 @@ impl<'a, T: Timestamp, D: Data, P: Push>>+'a> AutoflushSess } } -impl<'a, T: Timestamp, C: Container, P: Push>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { +impl<'a, T: Timestamp, C: Container, P: Push>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { fn drop(&mut self) { self.buffer.cease(); } diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index 59ccacf32..936ee445b 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -5,13 +5,13 @@ use std::rc::Rc; use std::cell::RefCell; use crate::progress::{ChangeBatch, Timestamp}; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::Bundle; use crate::communication::Push; use crate::Container; /// A wrapper which updates shared `produced` based on the number of records pushed. #[derive(Debug)] -pub struct CounterCore>> { +pub struct CounterCore>> { pushee: P, produced: Rc>>, phantom: PhantomData, @@ -20,9 +20,9 @@ pub struct CounterCore>> { /// A counter specialized to vector. pub type Counter = CounterCore, P>; -impl Push> for CounterCore where P: Push> { +impl Push> for CounterCore where P: Push> { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64); } @@ -34,7 +34,7 @@ impl Push> for CounterCore>> CounterCore where T : Ord+Clone+'static { +impl>> CounterCore where T : Ord+Clone+'static { /// Allocates a new `Counter` from a pushee and shared counts. pub fn new(pushee: P) -> CounterCore { CounterCore { diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index e1b49cbed..403da32cd 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -2,12 +2,12 @@ use crate::communication::Push; use crate::container::PushPartitioned; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::{Bundle, Message}; use crate::{Container, Data}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H> +pub struct Exchange>, H> where for<'a> H: FnMut(&C::Item<'a>) -> u64 { @@ -17,7 +17,7 @@ where hash_func: H, } -impl>, H> Exchange +impl>, H> Exchange where for<'a> H: FnMut(&C::Item<'a>) -> u64 { @@ -44,13 +44,13 @@ where } } -impl>, H, > Push> for Exchange +impl>, H, > Push> for Exchange where C: PushPartitioned, for<'a> H: FnMut(&C::Item<'a>) -> u64 { #[inline(never)] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange if self.pushers.len() == 1 { self.pushers[0].push(message); diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index 1ec05e4e7..673e4b228 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -4,12 +4,12 @@ use std::cell::RefCell; use std::fmt::{self, Debug}; use std::rc::Rc; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::{Bundle, Message}; use crate::communication::Push; use crate::{Container, Data}; -type PushList = Rc>>>>>; +type PushList = Rc>>>>>; /// Wraps a shared list of `Box` to forward pushes to. Owned by `Stream`. pub struct TeeCore { @@ -20,9 +20,9 @@ pub struct TeeCore { /// [TeeCore] specialized to `Vec`-based container. pub type Tee = TeeCore>; -impl Push> for TeeCore { +impl Push> for TeeCore { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { let mut pushers = self.shared.borrow_mut(); if let Some(message) = message { for index in 1..pushers.len() { @@ -89,7 +89,7 @@ pub struct TeeHelper { impl TeeHelper { /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`. - pub fn add_pusher>+'static>(&self, pusher: P) { + pub fn add_pusher>+'static>(&self, pusher: P) { self.shared.borrow_mut().push(Box::new(pusher)); } } diff --git a/timely/src/dataflow/mod.rs b/timely/src/dataflow/mod.rs index a6a3c33a9..043317fd8 100644 --- a/timely/src/dataflow/mod.rs +++ b/timely/src/dataflow/mod.rs @@ -16,7 +16,7 @@ pub use self::stream::{StreamCore, Stream}; pub use self::scopes::{Scope, ScopeParent}; -pub use self::operators::input::HandleCore as InputHandleCore; +pub use self::operators::core::input::Handle as InputHandleCore; pub use self::operators::input::Handle as InputHandle; pub use self::operators::probe::Handle as ProbeHandle; diff --git a/timely/src/dataflow/operators/broadcast.rs b/timely/src/dataflow/operators/broadcast.rs index 21d80d6da..bb63f5dd2 100644 --- a/timely/src/dataflow/operators/broadcast.rs +++ b/timely/src/dataflow/operators/broadcast.rs @@ -1,11 +1,11 @@ //! Broadcast records to all workers. use crate::ExchangeData; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::operators::{Map, Exchange}; /// Broadcast records to all workers. -pub trait Broadcast { +pub trait Broadcast { /// Broadcast records to all workers. /// /// # Examples @@ -21,15 +21,59 @@ pub trait Broadcast { fn broadcast(&self) -> Self; } -impl Broadcast for Stream { +impl Broadcast> for Stream { fn broadcast(&self) -> Stream { // NOTE: Simplified implementation due to underlying motion // in timely dataflow internals. Optimize once they have // settled down. let peers = self.scope().peers() as u64; - self.flat_map(move |x| (0 .. peers).map(move |i| (i,x.clone()))) + self.flat_map(move |x| (0 .. peers).map(move |i| (i, x.clone()))) .exchange(|ix| ix.0) .map(|(_i,x)| x) } } + +#[cfg(feature = "bincode")] +mod flatcontainer { + use crate::container::flatcontainer::MirrorRegion; + use crate::container::flatcontainer::FlatStack; + use crate::container::flatcontainer::impls::tuple::TupleABRegion; + use crate::container::flatcontainer::Region; + use crate::dataflow::operators::{Broadcast, Operator}; + use crate::dataflow::{Scope, StreamCore}; + use crate::dataflow::channels::pact::{ExchangeCore, Pipeline}; + use crate::ExchangeData; + + impl Broadcast> for StreamCore> + where + FlatStack: ExchangeData, + R::Index: ExchangeData, + { + fn broadcast(&self) -> Self { + let peers = self.scope().peers() as u64; + self.unary::, R>>, _, _, _>(Pipeline, "Broadcast send", |_cap, _info| { + move |input, output| { + while let Some((time, data)) = input.next() { + let mut session = output.session(&time); + for i in 0..peers { + for item in &*data { + session.give((i, item)) + } + } + } + } + }) + .unary(ExchangeCore::new(|(i, _)| *i), "Broadcast recv", |_cap, _info| { + |input, output| { + while let Some((time, data)) = input.next() { + let mut session = output.session(&time); + for (_, item) in &*data { + session.give(item) + } + } + } + }) + } + } +} diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 5caa9bce8..50a88e48d 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -413,7 +413,7 @@ impl CapabilitySet { /// /// timely::example(|scope| { /// vec![()].into_iter().to_stream(scope) - /// .unary_frontier(Pipeline, "example", |default_cap, _info| { + /// .unary_frontier::, _, _, _>(Pipeline, "example", |default_cap, _info| { /// let mut cap = CapabilitySet::from_elem(default_cap); /// let mut vector = Vec::new(); /// move |input, output| { diff --git a/timely/src/dataflow/operators/capture/replay.rs b/timely/src/dataflow/operators/capture/replay.rs index 2cc325419..03ea4e581 100644 --- a/timely/src/dataflow/operators/capture/replay.rs +++ b/timely/src/dataflow/operators/capture/replay.rs @@ -40,7 +40,7 @@ use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; +use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::progress::Timestamp; @@ -50,7 +50,7 @@ use crate::Container; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay : Sized { - /// Replays `self` into the provided scope, as a `Stream`. + /// Replays `self` into the provided scope, as a `StreamCore`. fn replay_into>(self, scope: &mut S) -> StreamCore { self.replay_core(scope, Some(std::time::Duration::new(0, 0))) } diff --git a/timely/src/dataflow/operators/concat.rs b/timely/src/dataflow/operators/core/concat.rs similarity index 100% rename from timely/src/dataflow/operators/concat.rs rename to timely/src/dataflow/operators/core/concat.rs diff --git a/timely/src/dataflow/operators/core/delay.rs b/timely/src/dataflow/operators/core/delay.rs new file mode 100644 index 000000000..e1e7f227d --- /dev/null +++ b/timely/src/dataflow/operators/core/delay.rs @@ -0,0 +1,163 @@ +//! Operators acting on timestamps to logically delay records + +use std::collections::HashMap; +use timely_container::{Container, PushContainer, PushInto}; + +use crate::order::{PartialOrder, TotalOrder}; +use crate::dataflow::channels::pact::Pipeline; +use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::operators::generic::operator::Operator; + +/// Methods to advance the timestamps of records or batches of records. +pub trait Delay { + + /// Advances the timestamp of records using a supplied function. + /// + /// The function *must* advance the timestamp; the operator will test that the + /// new timestamp is greater or equal to the old timestamp, and will assert if + /// it is not. + /// + /// # Examples + /// + /// The following example takes the sequence `0..10` at time `0` + /// and delays each element `i` to time `i`. + /// + /// ``` + /// use timely::dataflow::operators::{ToStream, Delay, Operator}; + /// use timely::dataflow::channels::pact::Pipeline; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .delay(|data, time| *data) + /// .sink(Pipeline, "example", |input| { + /// input.for_each(|time, data| { + /// println!("data at time: {:?}", time); + /// }); + /// }); + /// }); + /// ``` + fn delay(&self, func: L) -> Self + where + for<'a> L: FnMut(&C::Item<'a>, &G::Timestamp)->G::Timestamp+'static; + + /// Advances the timestamp of records using a supplied function. + /// + /// This method is a specialization of `delay` for when the timestamp is totally + /// ordered. In this case, we can use a priority queue rather than an unsorted + /// list to manage the potentially available timestamps. + /// + /// # Examples + /// + /// The following example takes the sequence `0..10` at time `0` + /// and delays each element `i` to time `i`. + /// + /// ``` + /// use timely::dataflow::operators::{ToStream, Delay, Operator}; + /// use timely::dataflow::channels::pact::Pipeline; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .delay(|data, time| *data) + /// .sink(Pipeline, "example", |input| { + /// input.for_each(|time, data| { + /// println!("data at time: {:?}", time); + /// }); + /// }); + /// }); + /// ``` + fn delay_total(&self, func: L) -> Self + where + G::Timestamp: TotalOrder, + for<'a> L: FnMut(&C::Item<'a>, &G::Timestamp)->G::Timestamp+'static; + + /// Advances the timestamp of batches of records using a supplied function. + /// + /// The operator will test that the new timestamp is greater or equal to the + /// old timestamp, and will assert if it is not. The batch version does not + /// consult the data, and may only view the timestamp itself. + /// + /// # Examples + /// + /// The following example takes the sequence `0..10` at time `0` + /// and delays each batch (there is just one) to time `1`. + /// + /// ``` + /// use timely::dataflow::operators::{ToStream, Delay, Operator}; + /// use timely::dataflow::channels::pact::Pipeline; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .delay_batch(|time| time + 1) + /// .sink(Pipeline, "example", |input| { + /// input.for_each(|time, data| { + /// println!("data at time: {:?}", time); + /// }); + /// }); + /// }); + /// ``` + fn delay_batch(&self, func: L) -> Self + where + L: FnMut(&G::Timestamp)->G::Timestamp+'static; +} + +impl Delay for StreamCore +where + for<'a> C::Item<'a>: PushInto, +{ + fn delay(&self, mut func: L) -> Self + where + for<'a> L: FnMut(&C::Item<'a>, &G::Timestamp)->G::Timestamp+'static + { + let mut elements = HashMap::new(); + let mut vector = C::default(); + self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { + input.for_each(|time, data| { + data.swap(&mut vector); + for datum in vector.drain() { + let new_time = func(&datum, &time); + assert!(time.time().less_equal(&new_time)); + elements.entry(new_time.clone()) + .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); C::default() }) + .push(datum); + } + }); + + // for each available notification, send corresponding set + notificator.for_each(|time,_,_| { + if let Some(mut data) = elements.remove(&time) { + output.session(&time).give_iterator(data.drain()); + } + }); + }) + } + + fn delay_total(&self, func: L) -> Self + where + G::Timestamp: TotalOrder, + for<'a> L: FnMut(&C::Item<'a>, &G::Timestamp)->G::Timestamp+'static + { + self.delay(func) + } + + fn delay_batchG::Timestamp+'static>(&self, mut func: L) -> Self { + let mut elements = HashMap::new(); + self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { + input.for_each(|time, data| { + let new_time = func(&time); + assert!(time.time().less_equal(&new_time)); + elements.entry(new_time.clone()) + .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() }) + .push(data.replace(C::default())); + }); + + // for each available notification, send corresponding set + notificator.for_each(|time,_,_| { + if let Some(mut datas) = elements.remove(&time) { + for mut data in datas.drain(..) { + output.session(&time).give_container(&mut data); + } + } + }); + }) + } +} diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs new file mode 100644 index 000000000..0827aa34c --- /dev/null +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -0,0 +1,140 @@ +//! Create cycles in a timely dataflow graph. + +use crate::Container; + +use crate::progress::{Timestamp, PathSummary}; +use crate::progress::frontier::Antichain; +use crate::order::Product; + +use crate::dataflow::channels::pushers::TeeCore; +use crate::dataflow::channels::pact::Pipeline; +use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::scopes::child::Iterative; +use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; +use crate::dataflow::operators::generic::OutputWrapper; + +/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. +pub trait Feedback { + /// Creates a [StreamCore] and a [Handle] to later bind the source of that `Stream`. + /// + /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with + /// its `Handle` passed as an argument. Data passed through the stream will have their + /// timestamps advanced by `summary`, and will be dropped if the result exceeds `limit`. + /// + /// # Examples + /// ``` + /// use timely::dataflow::Scope; + /// use timely::dataflow::operators::core::{Feedback, ConnectLoop}; + /// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen}; + /// + /// timely::example(|scope| { + /// // circulate 0..10 for 100 iterations. + /// let (handle, cycle) = scope.feedback::>(1); + /// (0..10).to_stream(scope) + /// .concat(&cycle) + /// .inspect(|x| println!("seen: {:?}", x)) + /// .branch_when(|t| t < &100).1 + /// .connect_loop(handle); + /// }); + /// ``` + fn feedback(&mut self, summary: ::Summary) -> (Handle, StreamCore); +} + +/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. +pub trait LoopVariable<'a, G: Scope, T: Timestamp> { + /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. + /// + /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with + /// its `Handle` passed as an argument. Data passed through the stream will have their + /// timestamps advanced by `summary`. + /// + /// # Examples + /// ``` + /// use timely::dataflow::Scope; + /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; + /// + /// timely::example(|scope| { + /// // circulate 0..10 for 100 iterations. + /// scope.iterative::(|inner| { + /// let (handle, cycle) = inner.loop_variable(1); + /// (0..10).to_stream(inner) + /// .concat(&cycle) + /// .inspect(|x| println!("seen: {:?}", x)) + /// .branch_when(|t| t.inner < 100).1 + /// .connect_loop(handle); + /// }); + /// }); + /// ``` + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, D>, StreamCore, D>); +} + +impl Feedback for G { + fn feedback(&mut self, summary: ::Summary) -> (Handle, StreamCore) { + + let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); + let (output, stream) = builder.new_output(); + + (Handle { builder, summary, output }, stream) + } +} + +impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> { + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, D>, StreamCore, D>) { + self.feedback(Product::new(Default::default(), summary)) + } +} + +/// Connect a `Stream` to the input of a loop variable. +pub trait ConnectLoop { + /// Connect a `Stream` to be the input of a loop variable. + /// + /// # Examples + /// ``` + /// use timely::dataflow::Scope; + /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; + /// + /// timely::example(|scope| { + /// // circulate 0..10 for 100 iterations. + /// let (handle, cycle) = scope.feedback(1); + /// (0..10).to_stream(scope) + /// .concat(&cycle) + /// .inspect(|x| println!("seen: {:?}", x)) + /// .branch_when(|t| t < &100).1 + /// .connect_loop(handle); + /// }); + /// ``` + fn connect_loop(&self, _: Handle); +} + +impl ConnectLoop for StreamCore { + fn connect_loop(&self, helper: Handle) { + + let mut builder = helper.builder; + let summary = helper.summary; + let mut output = helper.output; + + let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]); + + let mut vector = Default::default(); + builder.build(move |_capability| move |_frontier| { + let mut output = output.activate(); + input.for_each(|cap, data| { + data.swap(&mut vector); + if let Some(new_time) = summary.results_in(cap.time()) { + let new_cap = cap.delayed(&new_time); + output + .session(&new_cap) + .give_container(&mut vector); + } + }); + }); + } +} + +/// A handle used to bind the source of a loop variable. +#[derive(Debug)] +pub struct Handle { + builder: OperatorBuilder, + summary: ::Summary, + output: OutputWrapper>, +} diff --git a/timely/src/dataflow/operators/filter.rs b/timely/src/dataflow/operators/core/filter.rs similarity index 57% rename from timely/src/dataflow/operators/filter.rs rename to timely/src/dataflow/operators/core/filter.rs index 7034a9df0..727c13cbf 100644 --- a/timely/src/dataflow/operators/filter.rs +++ b/timely/src/dataflow/operators/core/filter.rs @@ -1,12 +1,12 @@ //! Filters a stream by a predicate. -use crate::Data; +use timely_container::{Container, PushContainer, PushInto}; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for filtering. -pub trait Filter { +pub trait Filter { /// Returns a new instance of `self` containing only records satisfying `predicate`. /// /// # Examples @@ -19,18 +19,25 @@ pub trait Filter { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn filterbool+'static>(&self, predicate: P) -> Self; + fn filter(&self, predicate: P) -> Self + where + for<'a> P: FnMut(&C::Item<'a>)->bool; } -impl Filter for Stream { - fn filterbool+'static>(&self, mut predicate: P) -> Stream { - let mut vector = Vec::new(); +impl Filter for StreamCore +where + for<'a> C::Item<'a>: PushInto, +{ + fn filter

(&self, mut predicate: P) -> StreamCore + where + for<'a> P: FnMut(&C::Item<'a>)->bool+'static + { + let mut vector = Default::default(); self.unary(Pipeline, "Filter", move |_,_| move |input, output| { input.for_each(|time, data| { data.swap(&mut vector); - vector.retain(|x| predicate(x)); if !vector.is_empty() { - output.session(&time).give_vec(&mut vector); + output.session(&time).give_iterator(vector.drain().filter(|x| predicate(&x))); } }); }) diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs new file mode 100644 index 000000000..473942af8 --- /dev/null +++ b/timely/src/dataflow/operators/core/input.rs @@ -0,0 +1,442 @@ +//! Create new `Streams` connected to external inputs. + +use std::rc::Rc; +use std::cell::RefCell; +use timely_container::{PushContainer, PushInto}; + +use crate::scheduling::{Schedule, Activator}; + +use crate::progress::frontier::Antichain; +use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch}; +use crate::progress::Source; + +use crate::{Container}; +use crate::communication::Push; +use crate::dataflow::{ScopeParent, Scope, StreamCore}; +use crate::dataflow::channels::pushers::{TeeCore, CounterCore}; +use crate::dataflow::channels::Message; + + +// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something +// TODO : more like a harness, with direct access to its inputs. + +// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird. +// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long. +// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a. + +/// Create a new `Stream` and `Handle` through which to supply input. +pub trait Input : Scope { + /// Create a new [StreamCore] and [Handle] through which to supply input. + /// + /// The `new_input_core` method returns a pair `(Handle, StreamCore)` where the [StreamCore] can be used + /// immediately for timely dataflow construction, and the `Handle` is later used to introduce + /// data into the timely dataflow computation. + /// + /// The `Handle` also provides a means to indicate + /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely + /// to issue progress notifications. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::Input; + /// use timely::dataflow::operators::Inspect; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = worker.dataflow(|scope| { + /// let (input, stream) = scope.new_input::>(); + /// stream.inspect(|x| println!("hello {:?}", x)); + /// input + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + fn new_input(&mut self) -> (Handle<::Timestamp, D>, StreamCore); + + /// Create a new stream from a supplied interactive handle. + /// + /// This method creates a new timely stream whose data are supplied interactively through the `handle` + /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate + /// if it as attached to more than one stream. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::Input; + /// use timely::dataflow::operators::core::input::Handle; + /// use timely::dataflow::operators::Inspect; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = >>::new(); + /// worker.dataflow(|scope| { + /// scope.input_from(&mut input) + /// .inspect(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> StreamCore; +} + +use crate::order::TotalOrder; +impl Input for G where ::Timestamp: TotalOrder { + fn new_input(&mut self) -> (Handle<::Timestamp, D>, StreamCore) { + let mut handle = Handle::new(); + let stream = self.input_from(&mut handle); + (handle, stream) + } + + fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> StreamCore { + let (output, registrar) = TeeCore::<::Timestamp, D>::new(); + let counter = CounterCore::new(output); + let produced = counter.produced().clone(); + + let index = self.allocate_operator_index(); + let mut address = self.addr(); + address.push(index); + + handle.activate.push(self.activator_for(&address[..])); + + let progress = Rc::new(RefCell::new(ChangeBatch::new())); + + handle.register(counter, progress.clone()); + + let copies = self.peers(); + + self.add_operator_with_index(Box::new(Operator { + name: "Input".to_owned(), + address, + shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))), + progress, + messages: produced, + copies, + }), index); + + StreamCore::new(Source::new(index, 0), registrar, self.clone()) + } +} + +#[derive(Debug)] +struct Operator { + name: String, + address: Vec, + shared_progress: Rc>>, + progress: Rc>>, // times closed since last asked + messages: Rc>>, // messages sent since last asked + copies: usize, +} + +impl Schedule for Operator { + + fn name(&self) -> &str { &self.name } + + fn path(&self) -> &[usize] { &self.address[..] } + + fn schedule(&mut self) -> bool { + let shared_progress = &mut *self.shared_progress.borrow_mut(); + self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]); + self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]); + false + } +} + +impl Operate for Operator { + + fn inputs(&self) -> usize { 0 } + fn outputs(&self) -> usize { 1 } + + fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { + self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64); + (Vec::new(), self.shared_progress.clone()) + } + + fn notify_me(&self) -> bool { false } +} + + +/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. +#[derive(Debug)] +pub struct Handle { + activate: Vec, + progress: Vec>>>, + pushers: Vec>>, + buffer1: C, + buffer2: C, + now_at: T, +} + +impl Handle { + /// Allocates a new input handle, from which one can create timely streams. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::Input; + /// use timely::dataflow::operators::core::input::Handle; + /// use timely::dataflow::operators::Inspect; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = >>::new(); + /// worker.dataflow(|scope| { + /// scope.input_from(&mut input) + /// .inspect(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + pub fn new() -> Self { + Self { + activate: Vec::new(), + progress: Vec::new(), + pushers: Vec::new(), + buffer1: Default::default(), + buffer2: Default::default(), + now_at: T::minimum(), + } + } + + /// Creates an input stream from the handle in the supplied scope. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::Input; + /// use timely::dataflow::operators::core::input::Handle; + /// use timely::dataflow::operators::Inspect; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = >>::new(); + /// worker.dataflow(|scope| { + /// input.to_stream(scope) + /// .inspect(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + pub fn to_stream(&mut self, scope: &mut G) -> StreamCore + where + T: TotalOrder, + G: ScopeParent, + { + scope.input_from(self) + } + + fn register( + &mut self, + pusher: CounterCore>, + progress: Rc>>, + ) { + // flush current contents, so new registrant does not see existing data. + if !self.buffer1.is_empty() { self.flush(); } + + // we need to produce an appropriate update to the capabilities for `progress`, in case a + // user has decided to drive the handle around a bit before registering it. + progress.borrow_mut().update(T::minimum(), -1); + progress.borrow_mut().update(self.now_at.clone(), 1); + + self.progress.push(progress); + self.pushers.push(pusher); + } + + // flushes our buffer at each of the destinations. there can be more than one; clone if needed. + #[inline(never)] + fn flush(&mut self) { + for index in 0 .. self.pushers.len() { + if index < self.pushers.len() - 1 { + self.buffer2.clone_from(&self.buffer1); + Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); + debug_assert!(self.buffer2.is_empty()); + } + else { + Message::push_at(&mut self.buffer1, self.now_at.clone(), &mut self.pushers[index]); + debug_assert!(self.buffer1.is_empty()); + } + } + self.buffer1.clear(); + } + + // closes the current epoch, flushing if needed, shutting if needed, and updating the frontier. + fn close_epoch(&mut self) { + if !self.buffer1.is_empty() { self.flush(); } + for pusher in self.pushers.iter_mut() { + pusher.done(); + } + for progress in self.progress.iter() { + progress.borrow_mut().update(self.now_at.clone(), -1); + } + // Alert worker of each active input operator. + for activate in self.activate.iter() { + activate.activate(); + } + } + + /// Sends a batch of records into the corresponding timely dataflow [StreamCore], at the current epoch. + /// + /// This method flushes single elements previously sent with `send`, to keep the insertion order. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::Input; + /// use timely::dataflow::operators::core::input::Handle; + /// use timely::dataflow::operators::InspectCore; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = >>::new(); + /// worker.dataflow(|scope| { + /// scope.input_from(&mut input) + /// .inspect_container(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send_batch(&mut vec![format!("{}", round)]); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + pub fn send_batch(&mut self, buffer: &mut D) { + + if !buffer.is_empty() { + // flush buffered elements to ensure local fifo. + if !self.buffer1.is_empty() { self.flush(); } + + // push buffer (or clone of buffer) at each destination. + for index in 0 .. self.pushers.len() { + if index < self.pushers.len() - 1 { + self.buffer2.clone_from(&buffer); + Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); + assert!(self.buffer2.is_empty()); + } + else { + Message::push_at(buffer, self.now_at.clone(), &mut self.pushers[index]); + assert!(buffer.is_empty()); + } + } + buffer.clear(); + } + } + + /// Advances the current epoch to `next`. + /// + /// This method allows timely dataflow to issue progress notifications as it can now determine + /// that this input can no longer produce data at earlier timestamps. + pub fn advance_to(&mut self, next: T) { + // Assert that we do not rewind time. + assert!(self.now_at.less_equal(&next)); + // Flush buffers if time has actually changed. + if !self.now_at.eq(&next) { + self.close_epoch(); + self.now_at = next; + for progress in self.progress.iter() { + progress.borrow_mut().update(self.now_at.clone(), 1); + } + } + } + + /// Closes the input. + /// + /// This method allows timely dataflow to issue all progress notifications blocked by this input + /// and to begin to shut down operators, as this input can no longer produce data. + pub fn close(self) { } + + /// Reports the current epoch. + pub fn epoch(&self) -> &T { + &self.now_at + } + + /// Reports the current timestamp. + pub fn time(&self) -> &T { + &self.now_at + } +} + +impl Handle { + #[inline] + /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. + /// + /// # Examples + /// ``` + /// use timely::*; + /// use timely::dataflow::operators::core::Input; + /// use timely::dataflow::operators::core::input::Handle; + /// use timely::dataflow::operators::Inspect; + /// + /// // construct and execute a timely dataflow + /// timely::execute(Config::thread(), |worker| { + /// + /// // add an input and base computation off of it + /// let mut input = >>::new(); + /// worker.dataflow(|scope| { + /// scope.input_from(&mut input) + /// .inspect(|x| println!("hello {:?}", x)); + /// }); + /// + /// // introduce input, advance computation + /// for round in 0..10 { + /// input.send(round); + /// input.advance_to(round + 1); + /// worker.step(); + /// } + /// }); + /// ``` + pub fn send>(&mut self, data: D) { + // assert!(self.buffer1.capacity() == Message::::default_length()); + self.buffer1.push(data); + if self.buffer1.len() == self.buffer1.capacity() { + self.flush(); + } + } +} + +impl Default for Handle { + fn default() -> Self { + Self::new() + } +} + +impl Drop for Handle { + fn drop(&mut self) { + self.close_epoch(); + } +} diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs new file mode 100644 index 000000000..b4cf6b798 --- /dev/null +++ b/timely/src/dataflow/operators/core/map.rs @@ -0,0 +1,74 @@ +//! Extension methods for [`StreamCore`] based on record-by-record transformation. + +use timely_container::{Container, PushContainer, PushInto}; +use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::channels::pact::Pipeline; +use crate::dataflow::operators::generic::operator::Operator; + +/// Extension trait for `Stream`. +pub trait Map { + /// Consumes each element of the stream and yields a new element. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::core::Map; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .map::, _, _>(|x| x + 1) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn map, L: 'static>(&self, logic: L) -> StreamCore + where + for<'a> L: FnMut(C::Item<'a>)->D2; + /// Consumes each element of the stream and yields some number of new elements. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::core::Map; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .flat_map::, _, _>(|x| (0..x)) + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + fn flat_map(&self, logic: L) -> StreamCore + where + I::Item: PushInto, + for<'a> L: FnMut(C::Item<'a>)->I; +} + +impl Map for StreamCore { + fn map, L: 'static>(&self, mut logic: L) -> StreamCore + where + for<'a> L: FnMut(C::Item<'a>)->D2, + { + let mut vector = Default::default(); + self.unary(Pipeline, "Map", move |_,_| move |input, output| { + input.for_each(|time, data| { + data.swap(&mut vector); + output.session(&time).give_iterator(vector.drain().map(&mut logic)); + }); + }) + } + // TODO : This would be more robust if it captured an iterator and then pulled an appropriate + // TODO : number of elements from the iterator. This would allow iterators that produce many + // TODO : records without taking arbitrarily long and arbitrarily much memory. + fn flat_map(&self, mut logic: L) -> StreamCore + where + I::Item: PushInto, + for<'a> L: FnMut(C::Item<'a>)->I, + { + let mut vector = Default::default(); + self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| { + input.for_each(|time, data| { + data.swap(&mut vector); + output.session(&time).give_iterator(vector.drain().flat_map(|x| logic(x).into_iter())); + }); + }) + } +} diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs index c70c5d945..71d009a02 100644 --- a/timely/src/dataflow/operators/core/mod.rs +++ b/timely/src/dataflow/operators/core/mod.rs @@ -1,10 +1,32 @@ //! Extension traits for `Stream` implementing various operators that //! are independent of specific container types. +pub mod concat; +pub mod delay; pub mod exchange; +pub mod feedback; +pub mod filter; +pub mod input; pub mod inspect; +pub mod map; +pub mod ok_err; +pub mod probe; +pub mod rc; pub mod reclock; +pub mod to_stream; +pub mod unordered_input; +pub use concat::{Concat, Concatenate}; +pub use delay::Delay; pub use exchange::Exchange; +pub use self::feedback::{Feedback, LoopVariable, ConnectLoop}; +pub use filter::Filter; +pub use input::{Input, Handle as InputHandle}; pub use inspect::{Inspect, InspectCore}; +pub use map::Map; +pub use ok_err::OkErr; +pub use probe::{Probe, Handle}; +pub use rc::SharedStream; pub use reclock::Reclock; +pub use to_stream::ToStream; +pub use unordered_input::{UnorderedHandle, UnorderedInput}; diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs new file mode 100644 index 000000000..86a2eb374 --- /dev/null +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -0,0 +1,89 @@ +//! Operators that separate one stream into two streams based on some condition + +use timely_container::{Container, PushContainer, PushInto}; +use crate::dataflow::channels::pact::Pipeline; +use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; +use crate::dataflow::{Scope, StreamCore}; + +/// Extension trait for `Stream`. +pub trait OkErr { + /// Takes one input stream and splits it into two output streams. + /// For each record, the supplied closure is called with the data. + /// If it returns `Ok(x)`, then `x` will be sent + /// to the first returned stream; otherwise, if it returns `Err(e)`, + /// then `e` will be sent to the second. + /// + /// If the result of the closure only depends on the time, not the data, + /// `branch_when` should be used instead. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::core::OkErr; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::Stream; + /// + /// timely::example(|scope| { + /// let (odd, even): (Stream<_, _>, Stream<_, _>) = (0..10) + /// .to_stream(scope) + /// .ok_err(|x| if x % 2 == 0 { Ok(x) } else { Err(x) }); + /// + /// even.inspect(|x| println!("even numbers: {:?}", x)); + /// odd.inspect(|x| println!("odd numbers: {:?}", x)); + /// }); + /// ``` + fn ok_err( + &self, + logic: L, + ) -> (StreamCore, StreamCore) + + where + D1: PushInto, + C1: PushContainer, + D2: PushInto, + C2: PushContainer, + for<'a> L: FnMut(C::Item<'a>) -> Result+'static + ; +} + +impl OkErr for StreamCore { + fn ok_err( + &self, + mut logic: L, + ) -> (StreamCore, StreamCore) + + where + D1: PushInto, + C1: PushContainer, + D2: PushInto, + C2: PushContainer, + for<'a> L: FnMut(C::Item<'a>) -> Result+'static + { + let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope()); + + let mut input = builder.new_input(self, Pipeline); + let (mut output1, stream1) = builder.new_output(); + let (mut output2, stream2) = builder.new_output(); + + builder.build(move |_| { + let mut vector = C::default(); + move |_frontiers| { + let mut output1_handle = output1.activate(); + let mut output2_handle = output2.activate(); + + input.for_each(|time, data| { + data.swap(&mut vector); + let mut out1 = output1_handle.session(&time); + let mut out2 = output2_handle.session(&time); + for datum in vector.drain() { + match logic(datum) { + Ok(datum) => out1.give(datum), + Err(datum) => out2.give(datum), + } + } + }); + } + }); + + (stream1, stream2) + } +} diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/core/probe.rs similarity index 99% rename from timely/src/dataflow/operators/probe.rs rename to timely/src/dataflow/operators/core/probe.rs index ad990cb79..ced729ea9 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -6,7 +6,7 @@ use std::cell::RefCell; use crate::progress::Timestamp; use crate::progress::frontier::{AntichainRef, MutableAntichain}; use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; +use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; diff --git a/timely/src/dataflow/operators/rc.rs b/timely/src/dataflow/operators/core/rc.rs similarity index 100% rename from timely/src/dataflow/operators/rc.rs rename to timely/src/dataflow/operators/core/rc.rs diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs new file mode 100644 index 000000000..bfd10a181 --- /dev/null +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -0,0 +1,58 @@ +//! Conversion to the `Stream` type from iterators. + +use crate::container::{PushContainer, PushInto}; +use crate::Container; +use crate::dataflow::operators::generic::operator::source; +use crate::dataflow::{StreamCore, Scope}; + +/// Converts to a timely [StreamCore]. +pub trait ToStream { + /// Converts to a timely [StreamCore]. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::operators::core::ToStream; + /// use timely::dataflow::operators::Capture; + /// use timely::dataflow::operators::capture::Extract; + /// + /// let (data1, data2) = timely::example(|scope| { + /// let data1 = Some((0..3).collect::>()).to_stream(scope).capture(); + /// let data2 = Some(vec![0,1,2]).to_stream(scope).capture(); + /// (data1, data2) + /// }); + /// + /// assert_eq!(data1.extract(), data2.extract()); + /// ``` + fn to_stream(self, scope: &mut S) -> StreamCore; +} + +impl ToStream for I where I::Item: PushInto { + fn to_stream(self, scope: &mut S) -> StreamCore { + + source(scope, "ToStream", |capability, info| { + + // Acquire an activator, so that the operator can rescheduled itself. + let activator = scope.activator_for(&info.address[..]); + + let mut iterator = self.into_iter().fuse(); + let mut capability = Some(capability); + + move |output| { + + if let Some(element) = iterator.next() { + let mut session = output.session(capability.as_ref().unwrap()); + session.give(element); + let n = 256 * crate::container::buffer::default_capacity::(); + for element in iterator.by_ref().take(n - 1) { + session.give(element); + } + activator.activate(); + } + else { + capability = None; + } + } + }) + } +} diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs new file mode 100644 index 000000000..30941b766 --- /dev/null +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -0,0 +1,165 @@ +//! Create new `Streams` connected to external inputs. + +use std::rc::Rc; +use std::cell::RefCell; +use crate::Container; + +use crate::scheduling::{Schedule, ActivateOnDrop}; + +use crate::progress::frontier::Antichain; +use crate::progress::{Operate, operate::SharedProgress, Timestamp}; +use crate::progress::Source; +use crate::progress::ChangeBatch; + +use crate::dataflow::channels::pushers::{CounterCore as PushCounter, TeeCore}; +use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSessionCore}; + +use crate::dataflow::operators::{ActivateCapability, Capability}; + +use crate::dataflow::{Scope, StreamCore}; + +/// Create a new `Stream` and `Handle` through which to supply input. +pub trait UnorderedInput { + /// Create a new capability-based [StreamCore] and [UnorderedHandle] through which to supply input. This + /// input supports multiple open epochs (timestamps) at the same time. + /// + /// The `new_unordered_input_core` method returns `((HandleCore, Capability), StreamCore)` where the `StreamCore` can be used + /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce + /// data into the timely dataflow computation. + /// + /// The `Capability` returned is for the default value of the timestamp type in use. The + /// capability can be dropped to inform the system that the input has advanced beyond the + /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp + /// should be obtained first, via the `delayed` function for `Capability`. + /// + /// To communicate the end-of-input drop all available capabilities. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Mutex}; + /// + /// use timely::*; + /// use timely::dataflow::operators::*; + /// use timely::dataflow::operators::core::*; + /// use timely::dataflow::operators::capture::Extract; + /// use timely::dataflow::Stream; + /// + /// // get send and recv endpoints, wrap send to share + /// let (send, recv) = ::std::sync::mpsc::channel(); + /// let send = Arc::new(Mutex::new(send)); + /// + /// timely::execute(Config::thread(), move |worker| { + /// + /// // this is only to validate the output. + /// let send = send.lock().unwrap().clone(); + /// + /// // create and capture the unordered input. + /// let (mut input, mut cap) = worker.dataflow::(|scope| { + /// let (input, stream) = scope.new_unordered_input(); + /// stream.capture_into(send); + /// input + /// }); + /// + /// // feed values 0..10 at times 0..10. + /// for round in 0..10 { + /// input.session(cap.clone()).give(round); + /// cap = cap.delayed(&(round + 1)); + /// worker.step(); + /// } + /// }).unwrap(); + /// + /// let extract = recv.extract(); + /// for i in 0..10 { + /// assert_eq!(extract[i], (i, vec![i])); + /// } + /// ``` + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore); +} + + +impl UnorderedInput for G { + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore) { + + let (output, registrar) = TeeCore::::new(); + let internal = Rc::new(RefCell::new(ChangeBatch::new())); + // let produced = Rc::new(RefCell::new(ChangeBatch::new())); + let cap = Capability::new(G::Timestamp::minimum(), internal.clone()); + let counter = PushCounter::new(output); + let produced = counter.produced().clone(); + let peers = self.peers(); + + let index = self.allocate_operator_index(); + let mut address = self.addr(); + address.push(index); + + let cap = ActivateCapability::new(cap, &address, self.activations()); + + let helper = UnorderedHandle::new(counter); + + self.add_operator_with_index(Box::new(UnorderedOperator { + name: "UnorderedInput".to_owned(), + address, + shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))), + internal, + produced, + peers, + }), index); + + ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone())) + } +} + +struct UnorderedOperator { + name: String, + address: Vec, + shared_progress: Rc>>, + internal: Rc>>, + produced: Rc>>, + peers: usize, +} + +impl Schedule for UnorderedOperator { + fn name(&self) -> &str { &self.name } + fn path(&self) -> &[usize] { &self.address[..] } + fn schedule(&mut self) -> bool { + let shared_progress = &mut *self.shared_progress.borrow_mut(); + self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]); + self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]); + false + } +} + +impl Operate for UnorderedOperator { + fn inputs(&self) -> usize { 0 } + fn outputs(&self) -> usize { 1 } + + fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { + let mut borrow = self.internal.borrow_mut(); + for (time, count) in borrow.drain() { + self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64)); + } + (Vec::new(), self.shared_progress.clone()) + } + + fn notify_me(&self) -> bool { false } +} + +/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. +#[derive(Debug)] +pub struct UnorderedHandle { + buffer: PushBuffer>>, +} + +impl UnorderedHandle { + fn new(pusher: PushCounter>) -> UnorderedHandle { + UnorderedHandle { + buffer: PushBuffer::new(pusher), + } + } + + /// Allocates a new automatically flushing session based on the supplied capability. + pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { + ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) + } +} diff --git a/timely/src/dataflow/operators/delay.rs b/timely/src/dataflow/operators/delay.rs index ddda2faaf..3a214e7ec 100644 --- a/timely/src/dataflow/operators/delay.rs +++ b/timely/src/dataflow/operators/delay.rs @@ -1,12 +1,9 @@ //! Operators acting on timestamps to logically delay records -use std::collections::HashMap; - use crate::Data; -use crate::order::{PartialOrder, TotalOrder}; -use crate::dataflow::channels::pact::Pipeline; +use crate::order::TotalOrder; use crate::dataflow::{Stream, Scope}; -use crate::dataflow::operators::generic::operator::Operator; +use crate::dataflow::operators::core::{Delay as DelayCore}; /// Methods to advance the timestamps of records or batches of records. pub trait Delay { @@ -95,55 +92,17 @@ pub trait Delay { } impl Delay for Stream { - fn delayG::Timestamp+'static>(&self, mut func: L) -> Self { - let mut elements = HashMap::new(); - let mut vector = Vec::new(); - self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { - input.for_each(|time, data| { - data.swap(&mut vector); - for datum in vector.drain(..) { - let new_time = func(&datum, &time); - assert!(time.time().less_equal(&new_time)); - elements.entry(new_time.clone()) - .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() }) - .push(datum); - } - }); - - // for each available notification, send corresponding set - notificator.for_each(|time,_,_| { - if let Some(mut data) = elements.remove(&time) { - output.session(&time).give_iterator(data.drain(..)); - } - }); - }) + fn delayG::Timestamp+'static>(&self, func: L) -> Self { + DelayCore::delay(self, func) } fn delay_totalG::Timestamp+'static>(&self, func: L) -> Self where G::Timestamp: TotalOrder { - self.delay(func) + Delay::delay(self, func) } - fn delay_batchG::Timestamp+'static>(&self, mut func: L) -> Self { - let mut elements = HashMap::new(); - self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { - input.for_each(|time, data| { - let new_time = func(&time); - assert!(time.time().less_equal(&new_time)); - elements.entry(new_time.clone()) - .or_insert_with(|| { notificator.notify_at(time.delayed(&new_time)); Vec::new() }) - .push(data.replace(Vec::new())); - }); - - // for each available notification, send corresponding set - notificator.for_each(|time,_,_| { - if let Some(mut datas) = elements.remove(&time) { - for mut data in datas.drain(..) { - output.session(&time).give_vec(&mut data); - } - } - }); - }) + fn delay_batchG::Timestamp+'static>(&self, func: L) -> Self { + DelayCore::delay_batch(self, func) } } diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index 24c8fa12e..92728d438 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -21,20 +21,20 @@ use std::marker::PhantomData; +use crate::communication::Push; +use crate::container::{PushContainer, PushInto}; +use crate::dataflow::channels::pushers::{CounterCore, TeeCore}; +use crate::dataflow::channels::{Bundle, Message}; +use crate::dataflow::operators::core::delay::Delay; +use crate::dataflow::scopes::{Child, ScopeParent}; +use crate::dataflow::{StreamCore, Scope}; use crate::logging::{TimelyLogger, MessagesEvent}; +use crate::order::Product; use crate::progress::Timestamp; use crate::progress::timestamp::Refines; use crate::progress::{Source, Target}; -use crate::order::Product; -use crate::{Container, Data}; -use crate::communication::Push; -use crate::dataflow::channels::pushers::{CounterCore, TeeCore}; -use crate::dataflow::channels::{BundleCore, Message}; - use crate::worker::AsWorker; -use crate::dataflow::{StreamCore, Scope, Stream}; -use crate::dataflow::scopes::{Child, ScopeParent}; -use crate::dataflow::operators::delay::Delay; +use crate::Container; /// Extension trait to move a `Stream` into a child of its current `Scope`. pub trait Enter, C: Container> { @@ -58,7 +58,7 @@ pub trait Enter, C: Container> { use crate::dataflow::scopes::child::Iterative; /// Extension trait to move a `Stream` into a child of its current `Scope` setting the timestamp for each element. -pub trait EnterAt { +pub trait EnterAt { /// Moves the `Stream` argument into a child of its current `Scope` setting the timestamp for each element by `initial`. /// /// # Examples @@ -73,17 +73,26 @@ pub trait EnterAt { /// }); /// }); /// ``` - fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, initial: F) -> Stream, D> ; + fn enter_at<'a, F>(&self, scope: &Iterative<'a, G, T>, initial: F) -> StreamCore, C> + where + for<'c> F: FnMut(&C::Item<'c>) -> T + 'static; } -impl::Timestamp, T>, Vec>> EnterAt for E { - fn enter_at<'a, F:FnMut(&D)->T+'static>(&self, scope: &Iterative<'a, G, T>, mut initial: F) -> - Stream, D> { - self.enter(scope).delay(move |datum, time| Product::new(time.clone().to_outer(), initial(datum))) +impl::Timestamp, T>, C>> EnterAt for E +where + for<'a> C::Item<'a>: PushInto, +{ + fn enter_at<'a, F>(&self, scope: &Iterative<'a, G, T>, mut initial: F) -> StreamCore, C> + where + for<'c> F: FnMut(&C::Item<'c>) -> T + 'static + { + self + .enter(scope) + .delay(move |datum, time| Product::new(time.clone().to_outer(), initial(datum))) } } -impl, C: Data+Container> Enter for StreamCore { +impl, C: Container> Enter for StreamCore { fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore, C> { use crate::scheduling::Scheduler; @@ -167,12 +176,12 @@ struct IngressNub, TData: C active: bool, } -impl, TData: Container> Push> for IngressNub { - fn push(&mut self, element: &mut Option>) { +impl, TData: Container> Push> for IngressNub { + fn push(&mut self, element: &mut Option>) { if let Some(message) = element { let outer_message = message.as_mut(); let data = ::std::mem::take(&mut outer_message.data); - let mut inner_message = Some(BundleCore::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0))); + let mut inner_message = Some(Bundle::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0))); self.targets.push(&mut inner_message); if let Some(inner_message) = inner_message { if let Some(inner_message) = inner_message.if_typed() { @@ -192,18 +201,18 @@ impl, TData: Container> Pus } -struct EgressNub, TData: Data> { +struct EgressNub, TData> { targets: TeeCore, phantom: PhantomData, } -impl Push> for EgressNub -where TOuter: Timestamp, TInner: Timestamp+Refines, TData: Data { - fn push(&mut self, message: &mut Option>) { +impl Push> for EgressNub +where TOuter: Timestamp, TInner: Timestamp+Refines, { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { let inner_message = message.as_mut(); let data = ::std::mem::take(&mut inner_message.data); - let mut outer_message = Some(BundleCore::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0))); + let mut outer_message = Some(Bundle::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0))); self.targets.push(&mut outer_message); if let Some(outer_message) = outer_message { if let Some(outer_message) = outer_message.if_typed() { @@ -241,12 +250,12 @@ impl

LogPusher

{ } } -impl Push> for LogPusher

+impl Push> for LogPusher

where D: Container, - P: Push>, + P: Push>, { - fn push(&mut self, element: &mut Option>) { + fn push(&mut self, element: &mut Option>) { if let Some(bundle) = element { let send_event = MessagesEvent { is_send: true, diff --git a/timely/src/dataflow/operators/feedback.rs b/timely/src/dataflow/operators/feedback.rs index a7eb90c65..5b71f60a1 100644 --- a/timely/src/dataflow/operators/feedback.rs +++ b/timely/src/dataflow/operators/feedback.rs @@ -1,17 +1,10 @@ //! Create cycles in a timely dataflow graph. -use crate::{Container, Data}; +use crate::Data; -use crate::progress::{Timestamp, PathSummary}; -use crate::progress::frontier::Antichain; -use crate::order::Product; - -use crate::dataflow::channels::pushers::TeeCore; -use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{StreamCore, Scope, Stream}; -use crate::dataflow::scopes::child::Iterative; -use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; -use crate::dataflow::operators::generic::OutputWrapper; +use crate::progress::Timestamp; +use crate::dataflow::{Scope, Stream}; +use crate::dataflow::operators::core::{Feedback as FeedbackCore}; /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. pub trait Feedback { @@ -37,133 +30,13 @@ pub trait Feedback { /// }); /// ``` fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream); - - /// Creates a [StreamCore] and a [HandleCore] to later bind the source of that `Stream`. - /// - /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with - /// its `Handle` passed as an argument. Data passed through the stream will have their - /// timestamps advanced by `summary`, and will be dropped if the result exceeds `limit`. - /// - /// # Examples - /// ``` - /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; - /// - /// timely::example(|scope| { - /// // circulate 0..10 for 100 iterations. - /// let (handle, cycle) = scope.feedback_core::>(1); - /// (0..10).to_stream(scope) - /// .concat(&cycle) - /// .inspect(|x| println!("seen: {:?}", x)) - /// .branch_when(|t| t < &100).1 - /// .connect_loop(handle); - /// }); - /// ``` - fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore); -} - -/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. -pub trait LoopVariable<'a, G: Scope, T: Timestamp> { - /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. - /// - /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with - /// its `Handle` passed as an argument. Data passed through the stream will have their - /// timestamps advanced by `summary`. - /// - /// # Examples - /// ``` - /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; - /// - /// timely::example(|scope| { - /// // circulate 0..10 for 100 iterations. - /// scope.iterative::(|inner| { - /// let (handle, cycle) = inner.loop_variable(1); - /// (0..10).to_stream(inner) - /// .concat(&cycle) - /// .inspect(|x| println!("seen: {:?}", x)) - /// .branch_when(|t| t.inner < 100).1 - /// .connect_loop(handle); - /// }); - /// }); - /// ``` - fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>); } impl Feedback for G { fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream) { - self.feedback_core(summary) - } - - fn feedback_core(&mut self, summary: ::Summary) -> (HandleCore, StreamCore) { - - let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); - let (output, stream) = builder.new_output(); - - (HandleCore { builder, summary, output }, stream) - } -} - -impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> { - fn loop_variable(&mut self, summary: T::Summary) -> (HandleCore, D>, StreamCore, D>) { - self.feedback_core(Product::new(Default::default(), summary)) + FeedbackCore::feedback(self, summary) } } -/// Connect a `Stream` to the input of a loop variable. -pub trait ConnectLoop { - /// Connect a `Stream` to be the input of a loop variable. - /// - /// # Examples - /// ``` - /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; - /// - /// timely::example(|scope| { - /// // circulate 0..10 for 100 iterations. - /// let (handle, cycle) = scope.feedback(1); - /// (0..10).to_stream(scope) - /// .concat(&cycle) - /// .inspect(|x| println!("seen: {:?}", x)) - /// .branch_when(|t| t < &100).1 - /// .connect_loop(handle); - /// }); - /// ``` - fn connect_loop(&self, _: HandleCore); -} - -impl ConnectLoop for StreamCore { - fn connect_loop(&self, helper: HandleCore) { - - let mut builder = helper.builder; - let summary = helper.summary; - let mut output = helper.output; - - let mut input = builder.new_input_connection(self, Pipeline, vec![Antichain::from_elem(summary.clone())]); - - let mut vector = Default::default(); - builder.build(move |_capability| move |_frontier| { - let mut output = output.activate(); - input.for_each(|cap, data| { - data.swap(&mut vector); - if let Some(new_time) = summary.results_in(cap.time()) { - let new_cap = cap.delayed(&new_time); - output - .session(&new_cap) - .give_container(&mut vector); - } - }); - }); - } -} - -/// A handle used to bind the source of a loop variable. -#[derive(Debug)] -pub struct HandleCore { - builder: OperatorBuilder, - summary: ::Summary, - output: OutputWrapper>, -} - -/// A `HandleCore` specialized for using `Vec` as container -pub type Handle = HandleCore>; +/// A `Handle` specialized for using `Vec` as container +pub type Handle = crate::dataflow::operators::core::feedback::Handle>; diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index ca80e3182..c4d064109 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -12,7 +12,7 @@ use crate::Container; use crate::dataflow::{Scope, StreamCore}; use crate::dataflow::channels::pushers::TeeCore; use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; +use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; use crate::dataflow::channels::pact::ParallelizationContractCore; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::capability::Capability; diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index eeddf5295..935ad1e2e 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -12,8 +12,8 @@ use crate::progress::ChangeBatch; use crate::progress::frontier::MutableAntichain; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::{BufferCore, Session}; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::pushers::buffer::{Buffer, Session}; +use crate::dataflow::channels::Bundle; use crate::communication::{Push, Pull, message::RefOrMut}; use crate::Container; use crate::logging::TimelyLogger as Logger; @@ -22,7 +22,7 @@ use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; /// Handle to an operator's input stream. -pub struct InputHandleCore>> { +pub struct InputHandleCore>> { pull_counter: PullCounter, internal: Rc>>>>>, /// Timestamp summaries from this input to each output. @@ -37,7 +37,7 @@ pub struct InputHandleCore> pub type InputHandle = InputHandleCore, P>; /// Handle to an operator's input stream and frontier. -pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull>+'a> { +pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull>+'a> { /// The underlying input handle. pub handle: &'a mut InputHandleCore, /// The frontier as reported by timely progress tracking. @@ -47,7 +47,7 @@ pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull< /// Handle to an operator's input stream and frontier, specialized to vectors. pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec, P>; -impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore { +impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore { /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`. /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. @@ -99,7 +99,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore< } -impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInputHandleCore<'a, T, D, P> { +impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInputHandleCore<'a, T, D, P> { /// Allocate a new frontiered input handle. pub fn new(handle: &'a mut InputHandleCore, frontier: &'a MutableAntichain) -> Self { FrontieredInputHandleCore { @@ -146,13 +146,13 @@ impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInp } } -pub fn _access_pull_counter>>(input: &mut InputHandleCore) -> &mut PullCounter { +pub fn _access_pull_counter>>(input: &mut InputHandleCore) -> &mut PullCounter { &mut input.pull_counter } /// Constructs an input handle. /// Declared separately so that it can be kept private when `InputHandle` is re-exported. -pub fn new_input_handle>>( +pub fn new_input_handle>>( pull_counter: PullCounter, internal: Rc>>>>>, summaries: Rc>>>, @@ -172,14 +172,14 @@ pub fn new_input_handle>>( /// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the /// pusher is flushed (via the `cease` method) once it is no longer used. #[derive(Debug)] -pub struct OutputWrapper>> { - push_buffer: BufferCore>, +pub struct OutputWrapper>> { + push_buffer: Buffer>, internal_buffer: Rc>>, } -impl>> OutputWrapper { +impl>> OutputWrapper { /// Creates a new output wrapper from a push buffer. - pub fn new(push_buffer: BufferCore>, internal_buffer: Rc>>) -> Self { + pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>) -> Self { OutputWrapper { push_buffer, internal_buffer, @@ -199,15 +199,15 @@ impl>> OutputWrapper>+'a> { - push_buffer: &'a mut BufferCore>, +pub struct OutputHandleCore<'a, T: Timestamp, C: Container+'a, P: Push>+'a> { + push_buffer: &'a mut Buffer>, internal_buffer: &'a Rc>>, } /// Handle specialized to `Vec`-based container. pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, Vec, P>; -impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, C, P> { +impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, C, P> { /// Obtains a session that can send data at the timestamp associated with capability `cap`. /// /// In order to send data at a future timestamp, obtain a capability for the new timestamp @@ -241,7 +241,7 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore } } -impl<'a, T: Timestamp, C: Container, P: Push>> Drop for OutputHandleCore<'a, T, C, P> { +impl<'a, T: Timestamp, C: Container, P: Push>> Drop for OutputHandleCore<'a, T, C, P> { fn drop(&mut self) { self.push_buffer.cease(); } diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 6c8dd8ea9..336b6aebf 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -217,7 +217,7 @@ fn notificator_delivers_notifications_in_topo_order() { /// } /// }); /// } -/// }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); +/// }).inspect_batch(|t, x: &Vec<_>| println!("{:?} -> {:?}", t, x)); /// /// (in1_handle, in2_handle) /// }); diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index afb7a25d1..5bb24421d 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -30,7 +30,7 @@ pub trait Operator { /// fn main() { /// timely::example(|scope| { /// (0u64..10).to_stream(scope) - /// .unary_frontier(Pipeline, "example", |default_cap, _info| { + /// .unary_frontier::, _, _, _>(Pipeline, "example", |default_cap, _info| { /// let mut cap = Some(default_cap.delayed(&12)); /// let mut notificator = FrontierNotificator::new(); /// let mut stash = HashMap::new(); @@ -172,7 +172,7 @@ pub trait Operator { /// } /// }); /// } - /// }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); + /// }).inspect_batch(|t, x: &Vec<_>| println!("{:?} -> {:?}", t, x)); /// /// (in1_handle, in2_handle) /// }); @@ -529,7 +529,7 @@ impl Operator for StreamCore { /// /// timely::example(|scope| { /// -/// source(scope, "Source", |capability, info| { +/// source::<_, Vec<_>, _, _>(scope, "Source", |capability, info| { /// /// let activator = scope.activator_for(&info.address[..]); /// diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 72719c5f8..b5f5fbe45 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -1,19 +1,8 @@ //! Create new `Streams` connected to external inputs. -use std::rc::Rc; -use std::cell::RefCell; - -use crate::scheduling::{Schedule, Activator}; - -use crate::progress::frontier::Antichain; -use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch}; -use crate::progress::Source; - -use crate::{Container, Data}; -use crate::communication::Push; -use crate::dataflow::{Stream, ScopeParent, Scope, StreamCore}; -use crate::dataflow::channels::pushers::{TeeCore, CounterCore}; -use crate::dataflow::channels::Message; +use crate::Data; +use crate::dataflow::{Stream, ScopeParent, Scope}; +use crate::dataflow::operators::core::{Input as InputCore}; // TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something @@ -60,41 +49,6 @@ pub trait Input : Scope { /// ``` fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream); - /// Create a new [StreamCore] and [HandleCore] through which to supply input. - /// - /// The `new_input_core` method returns a pair `(HandleCore, StreamCore)` where the [StreamCore] can be used - /// immediately for timely dataflow construction, and the `HandleCore` is later used to introduce - /// data into the timely dataflow computation. - /// - /// The `HandleCore` also provides a means to indicate - /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely - /// to issue progress notifications. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = worker.dataflow(|scope| { - /// let (input, stream) = scope.new_input_core::>(); - /// stream.inspect(|x| println!("hello {:?}", x)); - /// input - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore); - /// Create a new stream from a supplied interactive handle. /// /// This method creates a new timely stream whose data are supplied interactively through the `handle` @@ -126,388 +80,18 @@ pub trait Input : Scope { /// }); /// ``` fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream; - - /// Create a new stream from a supplied interactive handle. - /// - /// This method creates a new timely stream whose data are supplied interactively through the `handle` - /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate - /// if it as attached to more than one stream. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// use timely::dataflow::operators::input::Handle; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = Handle::new(); - /// worker.dataflow(|scope| { - /// scope.input_from_core(&mut input) - /// .inspect(|x| println!("hello {:?}", x)); - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore; } use crate::order::TotalOrder; impl Input for G where ::Timestamp: TotalOrder { fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream) { - self.new_input_core() + InputCore::new_input(self) } fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream { - self.input_from_core(handle) - } - - fn new_input_core(&mut self) -> (HandleCore<::Timestamp, D>, StreamCore) { - let mut handle = HandleCore::new(); - let stream = self.input_from_core(&mut handle); - (handle, stream) - } - - fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore { - let (output, registrar) = TeeCore::<::Timestamp, D>::new(); - let counter = CounterCore::new(output); - let produced = counter.produced().clone(); - - let index = self.allocate_operator_index(); - let mut address = self.addr(); - address.push(index); - - handle.activate.push(self.activator_for(&address[..])); - - let progress = Rc::new(RefCell::new(ChangeBatch::new())); - - handle.register(counter, progress.clone()); - - let copies = self.peers(); - - self.add_operator_with_index(Box::new(Operator { - name: "Input".to_owned(), - address, - shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))), - progress, - messages: produced, - copies, - }), index); - - StreamCore::new(Source::new(index, 0), registrar, self.clone()) - } -} - -#[derive(Debug)] -struct Operator { - name: String, - address: Vec, - shared_progress: Rc>>, - progress: Rc>>, // times closed since last asked - messages: Rc>>, // messages sent since last asked - copies: usize, -} - -impl Schedule for Operator { - - fn name(&self) -> &str { &self.name } - - fn path(&self) -> &[usize] { &self.address[..] } - - fn schedule(&mut self) -> bool { - let shared_progress = &mut *self.shared_progress.borrow_mut(); - self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]); - self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]); - false + InputCore::input_from(self, handle) } } -impl Operate for Operator { - - fn inputs(&self) -> usize { 0 } - fn outputs(&self) -> usize { 1 } - - fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { - self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64); - (Vec::new(), self.shared_progress.clone()) - } - - fn notify_me(&self) -> bool { false } -} - - -/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. -#[derive(Debug)] -pub struct HandleCore { - activate: Vec, - progress: Vec>>>, - pushers: Vec>>, - buffer1: C, - buffer2: C, - now_at: T, -} - /// A handle specialized to vector-based containers. -pub type Handle = HandleCore>; - -impl HandleCore { - /// Allocates a new input handle, from which one can create timely streams. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// use timely::dataflow::operators::input::Handle; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = Handle::new(); - /// worker.dataflow(|scope| { - /// scope.input_from(&mut input) - /// .inspect(|x| println!("hello {:?}", x)); - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - pub fn new() -> Self { - Self { - activate: Vec::new(), - progress: Vec::new(), - pushers: Vec::new(), - buffer1: Default::default(), - buffer2: Default::default(), - now_at: T::minimum(), - } - } - - /// Creates an input stream from the handle in the supplied scope. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// use timely::dataflow::operators::input::Handle; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = Handle::new(); - /// worker.dataflow(|scope| { - /// input.to_stream(scope) - /// .inspect(|x| println!("hello {:?}", x)); - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - pub fn to_stream(&mut self, scope: &mut G) -> StreamCore - where - T: TotalOrder, - G: ScopeParent, - { - scope.input_from_core(self) - } - - fn register( - &mut self, - pusher: CounterCore>, - progress: Rc>>, - ) { - // flush current contents, so new registrant does not see existing data. - if !self.buffer1.is_empty() { self.flush(); } - - // we need to produce an appropriate update to the capabilities for `progress`, in case a - // user has decided to drive the handle around a bit before registering it. - progress.borrow_mut().update(T::minimum(), -1); - progress.borrow_mut().update(self.now_at.clone(), 1); - - self.progress.push(progress); - self.pushers.push(pusher); - } - - // flushes our buffer at each of the destinations. there can be more than one; clone if needed. - #[inline(never)] - fn flush(&mut self) { - for index in 0 .. self.pushers.len() { - if index < self.pushers.len() - 1 { - self.buffer2.clone_from(&self.buffer1); - Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); - debug_assert!(self.buffer2.is_empty()); - } - else { - Message::push_at(&mut self.buffer1, self.now_at.clone(), &mut self.pushers[index]); - debug_assert!(self.buffer1.is_empty()); - } - } - self.buffer1.clear(); - } - - // closes the current epoch, flushing if needed, shutting if needed, and updating the frontier. - fn close_epoch(&mut self) { - if !self.buffer1.is_empty() { self.flush(); } - for pusher in self.pushers.iter_mut() { - pusher.done(); - } - for progress in self.progress.iter() { - progress.borrow_mut().update(self.now_at.clone(), -1); - } - // Alert worker of each active input operator. - for activate in self.activate.iter() { - activate.activate(); - } - } - - /// Sends a batch of records into the corresponding timely dataflow [StreamCore], at the current epoch. - /// - /// This method flushes single elements previously sent with `send`, to keep the insertion order. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, InspectCore}; - /// use timely::dataflow::operators::input::HandleCore; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = HandleCore::new(); - /// worker.dataflow(|scope| { - /// scope.input_from_core(&mut input) - /// .inspect_container(|x| println!("hello {:?}", x)); - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send_batch(&mut vec![format!("{}", round)]); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - pub fn send_batch(&mut self, buffer: &mut D) { - - if !buffer.is_empty() { - // flush buffered elements to ensure local fifo. - if !self.buffer1.is_empty() { self.flush(); } - - // push buffer (or clone of buffer) at each destination. - for index in 0 .. self.pushers.len() { - if index < self.pushers.len() - 1 { - self.buffer2.clone_from(&buffer); - Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]); - assert!(self.buffer2.is_empty()); - } - else { - Message::push_at(buffer, self.now_at.clone(), &mut self.pushers[index]); - assert!(buffer.is_empty()); - } - } - buffer.clear(); - } - } - - /// Advances the current epoch to `next`. - /// - /// This method allows timely dataflow to issue progress notifications as it can now determine - /// that this input can no longer produce data at earlier timestamps. - pub fn advance_to(&mut self, next: T) { - // Assert that we do not rewind time. - assert!(self.now_at.less_equal(&next)); - // Flush buffers if time has actually changed. - if !self.now_at.eq(&next) { - self.close_epoch(); - self.now_at = next; - for progress in self.progress.iter() { - progress.borrow_mut().update(self.now_at.clone(), 1); - } - } - } - - /// Closes the input. - /// - /// This method allows timely dataflow to issue all progress notifications blocked by this input - /// and to begin to shut down operators, as this input can no longer produce data. - pub fn close(self) { } - - /// Reports the current epoch. - pub fn epoch(&self) -> &T { - &self.now_at - } - - /// Reports the current timestamp. - pub fn time(&self) -> &T { - &self.now_at - } -} - -impl Handle { - #[inline] - /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch. - /// - /// # Examples - /// ``` - /// use timely::*; - /// use timely::dataflow::operators::{Input, Inspect}; - /// use timely::dataflow::operators::input::Handle; - /// - /// // construct and execute a timely dataflow - /// timely::execute(Config::thread(), |worker| { - /// - /// // add an input and base computation off of it - /// let mut input = Handle::new(); - /// worker.dataflow(|scope| { - /// scope.input_from(&mut input) - /// .inspect(|x| println!("hello {:?}", x)); - /// }); - /// - /// // introduce input, advance computation - /// for round in 0..10 { - /// input.send(round); - /// input.advance_to(round + 1); - /// worker.step(); - /// } - /// }); - /// ``` - pub fn send(&mut self, data: D) { - // assert!(self.buffer1.capacity() == Message::::default_length()); - self.buffer1.push(data); - if self.buffer1.len() == self.buffer1.capacity() { - self.flush(); - } - } -} - -impl Default for Handle { - fn default() -> Self { - Self::new() - } -} - -impl Drop for HandleCore { - fn drop(&mut self) { - self.close_epoch(); - } -} +pub type Handle = crate::dataflow::operators::core::input::Handle>; diff --git a/timely/src/dataflow/operators/map.rs b/timely/src/dataflow/operators/map.rs index 77eced3f6..4e4cb787e 100644 --- a/timely/src/dataflow/operators/map.rs +++ b/timely/src/dataflow/operators/map.rs @@ -4,6 +4,7 @@ use crate::Data; use crate::dataflow::{Stream, Scope}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; +use crate::dataflow::operators::core::{Map as MapCore}; /// Extension trait for `Stream`. pub trait Map { @@ -49,14 +50,8 @@ pub trait Map { } impl Map for Stream { - fn mapD2+'static>(&self, mut logic: L) -> Stream { - let mut vector = Vec::new(); - self.unary(Pipeline, "Map", move |_,_| move |input, output| { - input.for_each(|time, data| { - data.swap(&mut vector); - output.session(&time).give_iterator(vector.drain(..).map(|x| logic(x))); - }); - }) + fn mapD2+'static>(&self, logic: L) -> Stream { + MapCore::map(self, logic) } fn map_in_place(&self, mut logic: L) -> Stream { let mut vector = Vec::new(); @@ -71,13 +66,7 @@ impl Map for Stream { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. - fn flat_mapI+'static>(&self, mut logic: L) -> Stream where I::Item: Data { - let mut vector = Vec::new(); - self.unary(Pipeline, "FlatMap", move |_,_| move |input, output| { - input.for_each(|time, data| { - data.swap(&mut vector); - output.session(&time).give_iterator(vector.drain(..).flat_map(|x| logic(x).into_iter())); - }); - }) + fn flat_mapI+'static>(&self, logic: L) -> Stream where I::Item: Data { + MapCore::flat_map(self, logic) } } diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 5650f6e97..ce6a34bb0 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -10,8 +10,9 @@ pub use self::enterleave::{Enter, EnterAt, Leave}; pub use self::input::Input; -pub use self::unordered_input::{UnorderedInput, UnorderedInputCore}; -pub use self::feedback::{Feedback, LoopVariable, ConnectLoop}; +pub use self::unordered_input::UnorderedInput; +pub use self::feedback::{Feedback}; +pub use self::core::{LoopVariable, ConnectLoop}; pub use self::concat::{Concat, Concatenate}; pub use self::partition::Partition; pub use self::map::Map; @@ -21,7 +22,7 @@ pub use self::delay::Delay; pub use self::exchange::Exchange; pub use self::broadcast::Broadcast; pub use self::probe::Probe; -pub use self::to_stream::{ToStream, ToStreamCore}; +pub use self::to_stream::ToStream; pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; pub use self::ok_err::OkErr; @@ -30,7 +31,7 @@ pub use self::result::ResultStream; pub use self::generic::Operator; pub use self::generic::{Notificator, FrontierNotificator}; -pub use self::reclock::Reclock; +pub use self::core::reclock::Reclock; pub use self::count::Accumulate; pub mod core; @@ -40,20 +41,20 @@ pub mod input; pub mod flow_controlled; pub mod unordered_input; pub mod feedback; -pub mod concat; +pub use self::core::concat; pub mod partition; pub mod map; pub use self::core::inspect; -pub mod filter; +pub use self::core::filter; pub mod delay; pub use self::core::exchange; pub mod broadcast; -pub mod probe; +pub use self::core::probe; pub mod to_stream; pub mod capture; pub mod branch; pub mod ok_err; -pub mod rc; +pub use self::core::rc; pub mod result; pub mod aggregation; @@ -64,4 +65,5 @@ pub mod count; // keep "mint" module-private mod capability; + pub use self::capability::{ActivateCapability, Capability, InputCapability, CapabilitySet, DowngradeError}; diff --git a/timely/src/dataflow/operators/ok_err.rs b/timely/src/dataflow/operators/ok_err.rs index 36d794681..23c314195 100644 --- a/timely/src/dataflow/operators/ok_err.rs +++ b/timely/src/dataflow/operators/ok_err.rs @@ -1,7 +1,6 @@ //! Operators that separate one stream into two streams based on some condition -use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; +use crate::dataflow::operators::core::{OkErr as OkErrCore}; use crate::dataflow::{Scope, Stream}; use crate::Data; @@ -44,7 +43,7 @@ pub trait OkErr { impl OkErr for Stream { fn ok_err( &self, - mut logic: L, + logic: L, ) -> (Stream, Stream) where @@ -52,32 +51,6 @@ impl OkErr for Stream { D2: Data, L: FnMut(D) -> Result+'static { - let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope()); - - let mut input = builder.new_input(self, Pipeline); - let (mut output1, stream1) = builder.new_output(); - let (mut output2, stream2) = builder.new_output(); - - builder.build(move |_| { - let mut vector = Vec::new(); - move |_frontiers| { - let mut output1_handle = output1.activate(); - let mut output2_handle = output2.activate(); - - input.for_each(|time, data| { - data.swap(&mut vector); - let mut out1 = output1_handle.session(&time); - let mut out2 = output2_handle.session(&time); - for datum in vector.drain(..) { - match logic(datum) { - Ok(datum) => out1.give(datum), - Err(datum) => out2.give(datum), - } - } - }); - } - }); - - (stream1, stream2) + OkErrCore::ok_err(self, logic) } } diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs index 555a540b0..a85677dc0 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/to_stream.rs @@ -1,13 +1,11 @@ //! Conversion to the `Stream` type from iterators. -use crate::Container; -use crate::progress::Timestamp; use crate::Data; -use crate::dataflow::operators::generic::operator::source; -use crate::dataflow::{StreamCore, Stream, Scope}; +use crate::dataflow::{Stream, Scope}; +use crate::dataflow::operators::core::{ToStream as ToStreamCore}; /// Converts to a timely `Stream`. -pub trait ToStream { +pub trait ToStream { /// Converts to a timely `Stream`. /// /// # Examples @@ -24,86 +22,11 @@ pub trait ToStream { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream>(self, scope: &mut S) -> Stream; + fn to_stream(self, scope: &mut S) -> Stream; } -impl ToStream for I where I::Item: Data { - fn to_stream>(self, scope: &mut S) -> Stream { - - source(scope, "ToStream", |capability, info| { - - // Acquire an activator, so that the operator can rescheduled itself. - let activator = scope.activator_for(&info.address[..]); - - let mut iterator = self.into_iter().fuse(); - let mut capability = Some(capability); - - move |output| { - - if let Some(element) = iterator.next() { - let mut session = output.session(capability.as_ref().unwrap()); - session.give(element); - let n = 256 * crate::container::buffer::default_capacity::(); - for element in iterator.by_ref().take(n - 1) { - session.give(element); - } - activator.activate(); - } - else { - capability = None; - } - } - }) - } -} - -/// Converts to a timely [StreamCore]. -pub trait ToStreamCore { - /// Converts to a timely [StreamCore]. - /// - /// # Examples - /// - /// ``` - /// use timely::dataflow::operators::{ToStreamCore, Capture}; - /// use timely::dataflow::operators::capture::Extract; - /// - /// let (data1, data2) = timely::example(|scope| { - /// let data1 = Some((0..3).collect::>()).to_stream_core(scope).capture(); - /// let data2 = Some(vec![0,1,2]).to_stream_core(scope).capture(); - /// (data1, data2) - /// }); - /// - /// assert_eq!(data1.extract(), data2.extract()); - /// ``` - fn to_stream_core>(self, scope: &mut S) -> StreamCore; -} - -impl ToStreamCore for I where I::Item: Container { - fn to_stream_core>(self, scope: &mut S) -> StreamCore { - - source(scope, "ToStreamCore", |capability, info| { - - // Acquire an activator, so that the operator can rescheduled itself. - let activator = scope.activator_for(&info.address[..]); - - let mut iterator = self.into_iter().fuse(); - let mut capability = Some(capability); - - move |output| { - - if let Some(mut element) = iterator.next() { - let mut session = output.session(capability.as_ref().unwrap()); - session.give_container(&mut element); - let n = 256; - for mut element in iterator.by_ref().take(n - 1) { - session.give_container(&mut element); - } - activator.activate(); - } - else { - capability = None; - } - } - }) +impl ToStream for I where I::Item: Data { + fn to_stream(self, scope: &mut S) -> Stream { + ToStreamCore::to_stream(self, scope) } } diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index c7e600234..33ba0deb0 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -1,23 +1,9 @@ //! Create new `Streams` connected to external inputs. -use std::rc::Rc; -use std::cell::RefCell; -use crate::Container; - -use crate::scheduling::{Schedule, ActivateOnDrop}; - -use crate::progress::frontier::Antichain; -use crate::progress::{Operate, operate::SharedProgress, Timestamp}; -use crate::progress::Source; -use crate::progress::ChangeBatch; - use crate::Data; -use crate::dataflow::channels::pushers::{CounterCore as PushCounter, TeeCore}; -use crate::dataflow::channels::pushers::buffer::{BufferCore as PushBuffer, AutoflushSessionCore}; - -use crate::dataflow::operators::{ActivateCapability, Capability}; - -use crate::dataflow::{Stream, Scope, StreamCore}; +use crate::dataflow::operators::{ActivateCapability}; +use crate::dataflow::operators::core::{UnorderedInput as UnorderedInputCore, UnorderedHandle as UnorderedHandleCore}; +use crate::dataflow::{Stream, Scope}; /// Create a new `Stream` and `Handle` through which to supply input. pub trait UnorderedInput { @@ -80,154 +66,9 @@ pub trait UnorderedInput { impl UnorderedInput for G { fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { - self.new_unordered_input_core() + UnorderedInputCore::new_unordered_input(self) } } /// An unordered handle specialized to vectors. pub type UnorderedHandle = UnorderedHandleCore>; - -/// Create a new `Stream` and `Handle` through which to supply input. -pub trait UnorderedInputCore { - /// Create a new capability-based [StreamCore] and [UnorderedHandleCore] through which to supply input. This - /// input supports multiple open epochs (timestamps) at the same time. - /// - /// The `new_unordered_input_core` method returns `((HandleCore, Capability), StreamCore)` where the `StreamCore` can be used - /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce - /// data into the timely dataflow computation. - /// - /// The `Capability` returned is for the default value of the timestamp type in use. The - /// capability can be dropped to inform the system that the input has advanced beyond the - /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp - /// should be obtained first, via the `delayed` function for `Capability`. - /// - /// To communicate the end-of-input drop all available capabilities. - /// - /// # Examples - /// - /// ``` - /// use std::sync::{Arc, Mutex}; - /// - /// use timely::*; - /// use timely::dataflow::operators::*; - /// use timely::dataflow::operators::capture::Extract; - /// use timely::dataflow::Stream; - /// - /// // get send and recv endpoints, wrap send to share - /// let (send, recv) = ::std::sync::mpsc::channel(); - /// let send = Arc::new(Mutex::new(send)); - /// - /// timely::execute(Config::thread(), move |worker| { - /// - /// // this is only to validate the output. - /// let send = send.lock().unwrap().clone(); - /// - /// // create and capture the unordered input. - /// let (mut input, mut cap) = worker.dataflow::(|scope| { - /// let (input, stream) = scope.new_unordered_input_core(); - /// stream.capture_into(send); - /// input - /// }); - /// - /// // feed values 0..10 at times 0..10. - /// for round in 0..10 { - /// input.session(cap.clone()).give(round); - /// cap = cap.delayed(&(round + 1)); - /// worker.step(); - /// } - /// }).unwrap(); - /// - /// let extract = recv.extract(); - /// for i in 0..10 { - /// assert_eq!(extract[i], (i, vec![i])); - /// } - /// ``` - fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore); -} - - -impl UnorderedInputCore for G { - fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore) { - - let (output, registrar) = TeeCore::::new(); - let internal = Rc::new(RefCell::new(ChangeBatch::new())); - // let produced = Rc::new(RefCell::new(ChangeBatch::new())); - let cap = Capability::new(G::Timestamp::minimum(), internal.clone()); - let counter = PushCounter::new(output); - let produced = counter.produced().clone(); - let peers = self.peers(); - - let index = self.allocate_operator_index(); - let mut address = self.addr(); - address.push(index); - - let cap = ActivateCapability::new(cap, &address, self.activations()); - - let helper = UnorderedHandleCore::new(counter); - - self.add_operator_with_index(Box::new(UnorderedOperator { - name: "UnorderedInput".to_owned(), - address, - shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))), - internal, - produced, - peers, - }), index); - - ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone())) - } -} - -struct UnorderedOperator { - name: String, - address: Vec, - shared_progress: Rc>>, - internal: Rc>>, - produced: Rc>>, - peers: usize, -} - -impl Schedule for UnorderedOperator { - fn name(&self) -> &str { &self.name } - fn path(&self) -> &[usize] { &self.address[..] } - fn schedule(&mut self) -> bool { - let shared_progress = &mut *self.shared_progress.borrow_mut(); - self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]); - self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]); - false - } -} - -impl Operate for UnorderedOperator { - fn inputs(&self) -> usize { 0 } - fn outputs(&self) -> usize { 1 } - - fn get_internal_summary(&mut self) -> (Vec::Summary>>>, Rc>>) { - let mut borrow = self.internal.borrow_mut(); - for (time, count) in borrow.drain() { - self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64)); - } - (Vec::new(), self.shared_progress.clone()) - } - - fn notify_me(&self) -> bool { false } -} - -/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. -#[derive(Debug)] -pub struct UnorderedHandleCore { - buffer: PushBuffer>>, -} - -impl UnorderedHandleCore { - fn new(pusher: PushCounter>) -> UnorderedHandleCore { - UnorderedHandleCore { - buffer: PushBuffer::new(pusher), - } - } - - /// Allocates a new automatically flushing session based on the supplied capability. - pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { - ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) - } -} diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index aa5e48601..1f8791214 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -9,7 +9,7 @@ use crate::progress::{Source, Target}; use crate::communication::Push; use crate::dataflow::Scope; use crate::dataflow::channels::pushers::tee::TeeHelper; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::Bundle; use std::fmt::{self, Debug}; use crate::Container; @@ -37,7 +37,7 @@ impl StreamCore { /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { + pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent { diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index 320a3dcb8..d1758460f 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -233,4 +233,4 @@ impl Drop for Sequencer { .expect("Sequencer.activator unavailable") .activate() } -} \ No newline at end of file +}