From 786326de36c989272ec34cf229532ba1081df719 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 21 Mar 2024 12:42:14 -0400 Subject: [PATCH] Rename internal `Core` variants (#551) * Rename BundleCore to Bundle * Rename BufferCore to Buffer * Rename ParallelizationContractCore to ParallelizationContract * Rename TeeCore to Tee * Rename CounterCore to Counter --- timely/src/dataflow/channels/mod.rs | 9 +-- timely/src/dataflow/channels/pact.rs | 45 ++++++------ .../src/dataflow/channels/pullers/counter.rs | 12 ++-- .../src/dataflow/channels/pushers/buffer.rs | 31 ++++---- .../src/dataflow/channels/pushers/counter.rs | 17 ++--- .../src/dataflow/channels/pushers/exchange.rs | 10 +-- timely/src/dataflow/channels/pushers/mod.rs | 4 +- timely/src/dataflow/channels/pushers/tee.rs | 25 +++---- .../src/dataflow/operators/capture/replay.rs | 4 +- timely/src/dataflow/operators/enterleave.rs | 32 ++++----- timely/src/dataflow/operators/feedback.rs | 4 +- .../dataflow/operators/generic/builder_raw.rs | 14 ++-- .../dataflow/operators/generic/builder_rc.rs | 16 ++--- .../src/dataflow/operators/generic/handles.rs | 34 ++++----- .../dataflow/operators/generic/operator.rs | 70 +++++++++---------- timely/src/dataflow/operators/input.rs | 10 +-- timely/src/dataflow/operators/probe.rs | 4 +- .../src/dataflow/operators/unordered_input.rs | 12 ++-- timely/src/dataflow/stream.rs | 6 +- 19 files changed, 171 insertions(+), 188 deletions(-) diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 84303a0391..5bced98cd1 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -11,10 +11,7 @@ pub mod pullers; pub mod pact; /// The input to and output from timely dataflow communication channels. -pub type BundleCore = crate::communication::Message>; - -/// The input to and output from timely dataflow communication channels specialized to vectors. -pub type Bundle = BundleCore>; +pub type Bundle = crate::communication::Message>; /// A serializable representation of timestamped data. #[derive(Clone, Abomonation, Serialize, Deserialize)] @@ -46,11 +43,11 @@ impl Message { /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher /// leaves in place, or the container's default element. #[inline] - pub fn push_at>>(buffer: &mut D, time: T, pusher: &mut P) { + pub fn push_at>>(buffer: &mut D, time: T, pusher: &mut P) { let data = ::std::mem::take(buffer); let message = Message::new(time, data, 0, 0); - let mut bundle = Some(BundleCore::from_typed(message)); + let mut bundle = Some(Bundle::from_typed(message)); pusher.push(&mut bundle); diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 19648b2859..3e0ba21f75 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -14,37 +14,32 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::communication::{Push, Pull, Data}; use crate::container::PushPartitioned; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::{Bundle, Message}; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; use crate::progress::Timestamp; use crate::worker::AsWorker; -/// A `ParallelizationContractCore` allocates paired `Push` and `Pull` implementors. -pub trait ParallelizationContractCore { +/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. +pub trait ParallelizationContract { /// Type implementing `Push` produced by this pact. - type Pusher: Push>+'static; + type Pusher: Push>+'static; /// Type implementing `Pull` produced by this pact. - type Puller: Pull>+'static; + type Puller: Pull>+'static; /// Allocates a matched pair of push and pull endpoints implementing the pact. fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller); } -/// A `ParallelizationContractCore` specialized for `Vec` containers -/// TODO: Use trait aliases once stable. -pub trait ParallelizationContract: ParallelizationContractCore> { } -impl>> ParallelizationContract for P { } - /// A direct connection #[derive(Debug)] pub struct Pipeline; -impl ParallelizationContractCore for Pipeline { - type Pusher = LogPusher>>; - type Puller = LogPuller>>; +impl ParallelizationContract for Pipeline { + type Pusher = LogPusher>>; + type Puller = LogPuller>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (pusher, puller) = allocator.pipeline::>(identifier, address); // // ignore `&mut A` and use thread allocator - // let (pusher, puller) = Thread::new::>(); + // let (pusher, puller) = Thread::new::>>(); (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()), LogPuller::new(puller, allocator.index(), identifier, logging)) } @@ -71,13 +66,13 @@ where } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -impl ParallelizationContractCore for ExchangeCore +impl ParallelizationContract for ExchangeCore where C: Data + PushPartitioned, for<'a> H: FnMut(&C::Item<'a>) -> u64 { - type Pusher = ExchangePusher>>>, H>; - type Puller = LogPuller>>>; + type Pusher = ExchangePusher>>>, H>; + type Puller = LogPuller>>>; fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); @@ -94,7 +89,7 @@ impl Debug for ExchangeCore { /// Wraps a `Message` pusher to provide a `Push<(T, Content)>`. #[derive(Debug)] -pub struct LogPusher>> { +pub struct LogPusher>> { pusher: P, channel: usize, counter: usize, @@ -104,7 +99,7 @@ pub struct LogPusher>> { logging: Option, } -impl>> LogPusher { +impl>> LogPusher { /// Allocates a new pusher. pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option) -> Self { LogPusher { @@ -119,9 +114,9 @@ impl>> LogPusher { } } -impl>> Push> for LogPusher { +impl>> Push> for LogPusher { #[inline] - fn push(&mut self, pair: &mut Option>) { + fn push(&mut self, pair: &mut Option>) { if let Some(bundle) = pair { self.counter += 1; @@ -150,7 +145,7 @@ impl>> Push> for LogP /// Wraps a `Message` puller to provide a `Pull<(T, Content)>`. #[derive(Debug)] -pub struct LogPuller>> { +pub struct LogPuller>> { puller: P, channel: usize, index: usize, @@ -158,7 +153,7 @@ pub struct LogPuller>> { logging: Option, } -impl>> LogPuller { +impl>> LogPuller { /// Allocates a new `Puller`. pub fn new(puller: P, index: usize, channel: usize, logging: Option) -> Self { LogPuller { @@ -171,9 +166,9 @@ impl>> LogPuller { } } -impl>> Pull> for LogPuller { +impl>> Pull> for LogPuller { #[inline] - fn pull(&mut self) -> &mut Option> { + fn pull(&mut self) -> &mut Option> { let result = self.puller.pull(); if let Some(bundle) = result { let channel = self.channel; diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 8f9bbbf088..848b9a8f50 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -3,13 +3,13 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::Bundle; use crate::progress::ChangeBatch; use crate::communication::Pull; use crate::Container; /// A wrapper which accounts records pulled past in a shared count map. -pub struct Counter>> { +pub struct Counter>> { pullable: P, consumed: Rc>>, phantom: ::std::marker::PhantomData, @@ -36,15 +36,15 @@ impl Drop for ConsumedGuard { } } -impl>> Counter { +impl>> Counter { /// Retrieves the next timestamp and batch of data. #[inline] - pub fn next(&mut self) -> Option<&mut BundleCore> { + pub fn next(&mut self) -> Option<&mut Bundle> { self.next_guarded().map(|(_guard, bundle)| bundle) } #[inline] - pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut BundleCore)> { + pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut Bundle)> { if let Some(message) = self.pullable.pull() { let guard = ConsumedGuard { consumed: Rc::clone(&self.consumed), @@ -57,7 +57,7 @@ impl>> Counter>> Counter { +impl>> Counter { /// Allocates a new `Counter` from a boxed puller. pub fn new(pullable: P) -> Self { Counter { diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index e241587dfd..27e95377f2 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -3,7 +3,7 @@ use crate::communication::Push; use crate::container::{PushContainer, PushInto}; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::{Bundle, Message}; use crate::dataflow::operators::Capability; use crate::progress::Timestamp; use crate::{Container, Data}; @@ -13,7 +13,7 @@ use crate::{Container, Data}; /// The `Buffer` type should be used by calling `session` with a time, which checks whether /// data must be flushed and creates a `Session` object which allows sending at the given time. #[derive(Debug)] -pub struct BufferCore>> { +pub struct Buffer>> { /// the currently open time, if it is open time: Option, /// a buffer for records, to send at self.time @@ -21,10 +21,7 @@ pub struct BufferCore>> { pusher: P, } -/// A buffer specialized to vector-based containers. -pub type Buffer = BufferCore, P>; - -impl>> BufferCore where T: Eq+Clone { +impl>> Buffer where T: Eq+Clone { /// Creates a new `Buffer`. pub fn new(pusher: P) -> Self { @@ -82,7 +79,7 @@ impl>> BufferCore where T: Eq } } -impl>> BufferCore where T: Eq+Clone { +impl>> Buffer where T: Eq+Clone { // internal method for use by `Session`. #[inline] fn give>(&mut self, data: D) { @@ -97,7 +94,7 @@ impl>> BufferCore where T } } -impl>>> Buffer where T: Eq+Clone { +impl>>> Buffer, P> where T: Eq+Clone { // Gives an entire message at a specific time. fn give_vec(&mut self, vector: &mut Vec) { // flush to ensure fifo-ness @@ -114,18 +111,18 @@ impl>>> Buffer where T: Eq+Clo /// The `Session` struct provides the user-facing interface to an operator output, namely /// the `Buffer` type. A `Session` wraps a session of output at a specified time, and /// avoids what would otherwise be a constant cost of checking timestamp equality. -pub struct Session<'a, T, C: Container, P: Push>+'a> where T: Eq+Clone+'a, C: 'a { - buffer: &'a mut BufferCore, +pub struct Session<'a, T, C: Container, P: Push>+'a> where T: Eq+Clone+'a, C: 'a { + buffer: &'a mut Buffer, } -impl<'a, T, C: Container, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { +impl<'a, T, C: Container, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { /// Provide a container at the time specified by the [Session]. pub fn give_container(&mut self, container: &mut C) { self.buffer.give_container(container) } } -impl<'a, T, C, P: Push>+'a> Session<'a, T, C, P> +impl<'a, T, C, P: Push>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a + PushContainer, @@ -144,7 +141,7 @@ where } } -impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { +impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { /// Provides a fully formed `Content` message for senders which can use this type. /// /// The `Content` type is the backing memory for communication in timely, and it can @@ -159,10 +156,10 @@ impl<'a, T, D: Data, P: Push>>+'a> Session<'a, T, Vec, P } /// A session which will flush itself when dropped. -pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push>+'a> where +pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push>+'a> where T: Eq+Clone+'a, C: 'a { /// A reference to the underlying buffer. - buffer: &'a mut BufferCore, + buffer: &'a mut Buffer, /// The capability being used to send the data. _capability: Capability, } @@ -170,7 +167,7 @@ pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push = AutoflushSessionCore<'a, T, Vec, P>; -impl<'a, T: Timestamp, D: Data, P: Push>>+'a> AutoflushSessionCore<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { +impl<'a, T: Timestamp, D: Data, P: Push>>+'a> AutoflushSessionCore<'a, T, Vec, P> where T: Eq+Clone+'a, D: 'a { /// Transmits a single record. #[inline] pub fn give(&mut self, data: D) { @@ -192,7 +189,7 @@ impl<'a, T: Timestamp, D: Data, P: Push>>+'a> AutoflushSess } } -impl<'a, T: Timestamp, C: Container, P: Push>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { +impl<'a, T: Timestamp, C: Container, P: Push>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a { fn drop(&mut self) { self.buffer.cease(); } diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index 59ccacf321..c40e3ddec3 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -5,24 +5,21 @@ use std::rc::Rc; use std::cell::RefCell; use crate::progress::{ChangeBatch, Timestamp}; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::Bundle; use crate::communication::Push; use crate::Container; /// A wrapper which updates shared `produced` based on the number of records pushed. #[derive(Debug)] -pub struct CounterCore>> { +pub struct Counter>> { pushee: P, produced: Rc>>, phantom: PhantomData, } -/// A counter specialized to vector. -pub type Counter = CounterCore, P>; - -impl Push> for CounterCore where P: Push> { +impl Push> for Counter where P: Push> { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64); } @@ -34,10 +31,10 @@ impl Push> for CounterCore>> CounterCore where T : Ord+Clone+'static { +impl>> Counter where T : Ord+Clone+'static { /// Allocates a new `Counter` from a pushee and shared counts. - pub fn new(pushee: P) -> CounterCore { - CounterCore { + pub fn new(pushee: P) -> Counter { + Counter { pushee, produced: Rc::new(RefCell::new(ChangeBatch::new())), phantom: PhantomData, diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index e1b49cbedf..403da32cd6 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -2,12 +2,12 @@ use crate::communication::Push; use crate::container::PushPartitioned; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::{Bundle, Message}; use crate::{Container, Data}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H> +pub struct Exchange>, H> where for<'a> H: FnMut(&C::Item<'a>) -> u64 { @@ -17,7 +17,7 @@ where hash_func: H, } -impl>, H> Exchange +impl>, H> Exchange where for<'a> H: FnMut(&C::Item<'a>) -> u64 { @@ -44,13 +44,13 @@ where } } -impl>, H, > Push> for Exchange +impl>, H, > Push> for Exchange where C: PushPartitioned, for<'a> H: FnMut(&C::Item<'a>) -> u64 { #[inline(never)] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange if self.pushers.len() == 1 { self.pushers[0].push(message); diff --git a/timely/src/dataflow/channels/pushers/mod.rs b/timely/src/dataflow/channels/pushers/mod.rs index 2ab6d4f9f6..295d033cab 100644 --- a/timely/src/dataflow/channels/pushers/mod.rs +++ b/timely/src/dataflow/channels/pushers/mod.rs @@ -1,6 +1,6 @@ -pub use self::tee::{Tee, TeeCore, TeeHelper}; +pub use self::tee::{Tee, TeeHelper}; pub use self::exchange::Exchange; -pub use self::counter::{Counter, CounterCore}; +pub use self::counter::Counter; pub mod tee; pub mod exchange; diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index 1ec05e4e75..210c9d1490 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -4,25 +4,22 @@ use std::cell::RefCell; use std::fmt::{self, Debug}; use std::rc::Rc; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::{Bundle, Message}; use crate::communication::Push; use crate::{Container, Data}; -type PushList = Rc>>>>>; +type PushList = Rc>>>>>; /// Wraps a shared list of `Box` to forward pushes to. Owned by `Stream`. -pub struct TeeCore { +pub struct Tee { buffer: D, shared: PushList, } -/// [TeeCore] specialized to `Vec`-based container. -pub type Tee = TeeCore>; - -impl Push> for TeeCore { +impl Push> for Tee { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { let mut pushers = self.shared.borrow_mut(); if let Some(message) = message { for index in 1..pushers.len() { @@ -42,11 +39,11 @@ impl Push> for TeeCore { } } -impl TeeCore { +impl Tee { /// Allocates a new pair of `Tee` and `TeeHelper`. - pub fn new() -> (TeeCore, TeeHelper) { + pub fn new() -> (Tee, TeeHelper) { let shared = Rc::new(RefCell::new(Vec::new())); - let port = TeeCore { + let port = Tee { buffer: Default::default(), shared: shared.clone(), }; @@ -55,7 +52,7 @@ impl TeeCore { } } -impl Clone for TeeCore { +impl Clone for Tee { fn clone(&self) -> Self { Self { buffer: Default::default(), @@ -64,7 +61,7 @@ impl Clone for TeeCore { } } -impl Debug for TeeCore +impl Debug for Tee where D: Debug, { @@ -89,7 +86,7 @@ pub struct TeeHelper { impl TeeHelper { /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`. - pub fn add_pusher>+'static>(&self, pusher: P) { + pub fn add_pusher>+'static>(&self, pusher: P) { self.shared.borrow_mut().push(Box::new(pusher)); } } diff --git a/timely/src/dataflow/operators/capture/replay.rs b/timely/src/dataflow/operators/capture/replay.rs index 2cc3254196..5fdb237040 100644 --- a/timely/src/dataflow/operators/capture/replay.rs +++ b/timely/src/dataflow/operators/capture/replay.rs @@ -39,8 +39,8 @@ //! than that in which the stream was captured. use crate::dataflow::{Scope, StreamCore}; -use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; +use crate::dataflow::channels::pushers::Counter as PushCounter; +use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::progress::Timestamp; diff --git a/timely/src/dataflow/operators/enterleave.rs b/timely/src/dataflow/operators/enterleave.rs index 24c8fa12eb..af2eaf00f1 100644 --- a/timely/src/dataflow/operators/enterleave.rs +++ b/timely/src/dataflow/operators/enterleave.rs @@ -28,8 +28,8 @@ use crate::progress::{Source, Target}; use crate::order::Product; use crate::{Container, Data}; use crate::communication::Push; -use crate::dataflow::channels::pushers::{CounterCore, TeeCore}; -use crate::dataflow::channels::{BundleCore, Message}; +use crate::dataflow::channels::pushers::{Counter, Tee}; +use crate::dataflow::channels::{Bundle, Message}; use crate::worker::AsWorker; use crate::dataflow::{StreamCore, Scope, Stream}; @@ -88,9 +88,9 @@ impl, C: Data+Container> Enter::new(); + let (targets, registrar) = Tee::::new(); let ingress = IngressNub { - targets: CounterCore::new(targets), + targets: Counter::new(targets), phantom: PhantomData, activator: scope.activator_for(&scope.addr()), active: false, @@ -140,7 +140,7 @@ impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave let output = scope.subgraph.borrow_mut().new_output(); let target = Target::new(0, output.port); - let (targets, registrar) = TeeCore::::new(); + let (targets, registrar) = Tee::::new(); let egress = EgressNub { targets, phantom: PhantomData }; let channel_id = scope.clone().new_identifier(); @@ -161,18 +161,18 @@ impl<'a, G: Scope, D: Clone+Container, T: Timestamp+Refines> Leave struct IngressNub, TData: Container> { - targets: CounterCore>, + targets: Counter>, phantom: ::std::marker::PhantomData, activator: crate::scheduling::Activator, active: bool, } -impl, TData: Container> Push> for IngressNub { - fn push(&mut self, element: &mut Option>) { +impl, TData: Container> Push> for IngressNub { + fn push(&mut self, element: &mut Option>) { if let Some(message) = element { let outer_message = message.as_mut(); let data = ::std::mem::take(&mut outer_message.data); - let mut inner_message = Some(BundleCore::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0))); + let mut inner_message = Some(Bundle::from_typed(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0))); self.targets.push(&mut inner_message); if let Some(inner_message) = inner_message { if let Some(inner_message) = inner_message.if_typed() { @@ -193,17 +193,17 @@ impl, TData: Container> Pus struct EgressNub, TData: Data> { - targets: TeeCore, + targets: Tee, phantom: PhantomData, } -impl Push> for EgressNub +impl Push> for EgressNub where TOuter: Timestamp, TInner: Timestamp+Refines, TData: Data { - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { let inner_message = message.as_mut(); let data = ::std::mem::take(&mut inner_message.data); - let mut outer_message = Some(BundleCore::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0))); + let mut outer_message = Some(Bundle::from_typed(Message::new(inner_message.time.clone().to_outer(), data, 0, 0))); self.targets.push(&mut outer_message); if let Some(outer_message) = outer_message { if let Some(outer_message) = outer_message.if_typed() { @@ -241,12 +241,12 @@ impl

LogPusher

{ } } -impl Push> for LogPusher

+impl Push> for LogPusher

where D: Container, - P: Push>, + P: Push>, { - fn push(&mut self, element: &mut Option>) { + fn push(&mut self, element: &mut Option>) { if let Some(bundle) = element { let send_event = MessagesEvent { is_send: true, diff --git a/timely/src/dataflow/operators/feedback.rs b/timely/src/dataflow/operators/feedback.rs index a7eb90c658..038ca7e09d 100644 --- a/timely/src/dataflow/operators/feedback.rs +++ b/timely/src/dataflow/operators/feedback.rs @@ -6,7 +6,7 @@ use crate::progress::{Timestamp, PathSummary}; use crate::progress::frontier::Antichain; use crate::order::Product; -use crate::dataflow::channels::pushers::TeeCore; +use crate::dataflow::channels::pushers::Tee; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::{StreamCore, Scope, Stream}; use crate::dataflow::scopes::child::Iterative; @@ -162,7 +162,7 @@ impl ConnectLoop for StreamCore { pub struct HandleCore { builder: OperatorBuilder, summary: ::Summary, - output: OutputWrapper>, + output: OutputWrapper>, } /// A `HandleCore` specialized for using `Vec` as container diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 8e97492afd..10899b9be4 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -15,8 +15,8 @@ use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain}; use crate::Container; use crate::dataflow::{StreamCore, Scope}; -use crate::dataflow::channels::pushers::TeeCore; -use crate::dataflow::channels::pact::ParallelizationContractCore; +use crate::dataflow::channels::pushers::Tee; +use crate::dataflow::channels::pact::ParallelizationContract; use crate::dataflow::operators::generic::operator_info::OperatorInfo; /// Contains type-free information about the operator properties. @@ -107,7 +107,7 @@ impl OperatorBuilder { /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> P::Puller where - P: ParallelizationContractCore { + P: ParallelizationContract { let connection = vec![Antichain::from_elem(Default::default()); self.shape.outputs]; self.new_input_connection(stream, pact, connection) } @@ -115,7 +115,7 @@ impl OperatorBuilder { /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> P::Puller where - P: ParallelizationContractCore { + P: ParallelizationContract { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); @@ -131,16 +131,16 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (TeeCore, StreamCore) { + pub fn new_output(&mut self) -> (Tee, StreamCore) { let connection = vec![Antichain::from_elem(Default::default()); self.shape.inputs]; self.new_output_connection(connection) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (TeeCore, StreamCore) { + pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (Tee, StreamCore) { - let (targets, registrar) = TeeCore::::new(); + let (targets, registrar) = Tee::::new(); let source = Source::new(self.index, self.shape.outputs); let stream = StreamCore::new(source, registrar, self.scope.clone()); diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index ca80e31828..f753ef09ac 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -10,10 +10,10 @@ use crate::progress::frontier::{Antichain, MutableAntichain}; use crate::Container; use crate::dataflow::{Scope, StreamCore}; -use crate::dataflow::channels::pushers::TeeCore; -use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; -use crate::dataflow::channels::pact::ParallelizationContractCore; +use crate::dataflow::channels::pushers::Tee; +use crate::dataflow::channels::pushers::Counter as PushCounter; +use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; +use crate::dataflow::channels::pact::ParallelizationContract; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::capability::Capability; use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper}; @@ -61,7 +61,7 @@ impl OperatorBuilder { /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input(&mut self, stream: &StreamCore, pact: P) -> InputHandleCore where - P: ParallelizationContractCore { + P: ParallelizationContract { let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().outputs()]; self.new_input_connection(stream, pact, connection) @@ -77,7 +77,7 @@ impl OperatorBuilder { /// antichain indicating that there is no connection from the input to the output. pub fn new_input_connection(&mut self, stream: &StreamCore, pact: P, connection: Vec::Summary>>) -> InputHandleCore where - P: ParallelizationContractCore { + P: ParallelizationContract { let puller = self.builder.new_input_connection(stream, pact, connection.clone()); @@ -92,7 +92,7 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { + pub fn new_output(&mut self) -> (OutputWrapper>, StreamCore) { let connection = vec![Antichain::from_elem(Default::default()); self.builder.shape().inputs()]; self.new_output_connection(connection) } @@ -105,7 +105,7 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { + pub fn new_output_connection(&mut self, connection: Vec::Summary>>) -> (OutputWrapper>, StreamCore) { let (tee, stream) = self.builder.new_output_connection(connection.clone()); diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index eeddf52952..b5157f4ce2 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -11,9 +11,9 @@ use crate::progress::Timestamp; use crate::progress::ChangeBatch; use crate::progress::frontier::MutableAntichain; use crate::dataflow::channels::pullers::Counter as PullCounter; -use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::{BufferCore, Session}; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::pushers::Counter as PushCounter; +use crate::dataflow::channels::pushers::buffer::{Buffer, Session}; +use crate::dataflow::channels::Bundle; use crate::communication::{Push, Pull, message::RefOrMut}; use crate::Container; use crate::logging::TimelyLogger as Logger; @@ -22,7 +22,7 @@ use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; /// Handle to an operator's input stream. -pub struct InputHandleCore>> { +pub struct InputHandleCore>> { pull_counter: PullCounter, internal: Rc>>>>>, /// Timestamp summaries from this input to each output. @@ -37,7 +37,7 @@ pub struct InputHandleCore> pub type InputHandle = InputHandleCore, P>; /// Handle to an operator's input stream and frontier. -pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull>+'a> { +pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull>+'a> { /// The underlying input handle. pub handle: &'a mut InputHandleCore, /// The frontier as reported by timely progress tracking. @@ -47,7 +47,7 @@ pub struct FrontieredInputHandleCore<'a, T: Timestamp, D: Container+'a, P: Pull< /// Handle to an operator's input stream and frontier, specialized to vectors. pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec, P>; -impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore { +impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore { /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`. /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. @@ -99,7 +99,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore< } -impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInputHandleCore<'a, T, D, P> { +impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInputHandleCore<'a, T, D, P> { /// Allocate a new frontiered input handle. pub fn new(handle: &'a mut InputHandleCore, frontier: &'a MutableAntichain) -> Self { FrontieredInputHandleCore { @@ -146,13 +146,13 @@ impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInp } } -pub fn _access_pull_counter>>(input: &mut InputHandleCore) -> &mut PullCounter { +pub fn _access_pull_counter>>(input: &mut InputHandleCore) -> &mut PullCounter { &mut input.pull_counter } /// Constructs an input handle. /// Declared separately so that it can be kept private when `InputHandle` is re-exported. -pub fn new_input_handle>>( +pub fn new_input_handle>>( pull_counter: PullCounter, internal: Rc>>>>>, summaries: Rc>>>, @@ -172,14 +172,14 @@ pub fn new_input_handle>>( /// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the /// pusher is flushed (via the `cease` method) once it is no longer used. #[derive(Debug)] -pub struct OutputWrapper>> { - push_buffer: BufferCore>, +pub struct OutputWrapper>> { + push_buffer: Buffer>, internal_buffer: Rc>>, } -impl>> OutputWrapper { +impl>> OutputWrapper { /// Creates a new output wrapper from a push buffer. - pub fn new(push_buffer: BufferCore>, internal_buffer: Rc>>) -> Self { + pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>) -> Self { OutputWrapper { push_buffer, internal_buffer, @@ -199,15 +199,15 @@ impl>> OutputWrapper>+'a> { - push_buffer: &'a mut BufferCore>, +pub struct OutputHandleCore<'a, T: Timestamp, C: Container+'a, P: Push>+'a> { + push_buffer: &'a mut Buffer>, internal_buffer: &'a Rc>>, } /// Handle specialized to `Vec`-based container. pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, Vec, P>; -impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, C, P> { +impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, C, P> { /// Obtains a session that can send data at the timestamp associated with capability `cap`. /// /// In order to send data at a future timestamp, obtain a capability for the new timestamp @@ -241,7 +241,7 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore } } -impl<'a, T: Timestamp, C: Container, P: Push>> Drop for OutputHandleCore<'a, T, C, P> { +impl<'a, T: Timestamp, C: Container, P: Push>> Drop for OutputHandleCore<'a, T, C, P> { fn drop(&mut self) { self.push_buffer.cease(); } diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index afb7a25d11..5ba6a211f6 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -1,8 +1,8 @@ //! Methods to construct generic streaming and blocking unary operators. -use crate::dataflow::channels::pushers::TeeCore; -use crate::dataflow::channels::pact::ParallelizationContractCore; +use crate::dataflow::channels::pushers::Tee; +use crate::dataflow::channels::pact::ParallelizationContract; use crate::dataflow::operators::generic::handles::{InputHandleCore, FrontieredInputHandleCore, OutputHandleCore}; use crate::dataflow::operators::capability::Capability; @@ -60,8 +60,8 @@ pub trait Operator { D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContractCore; + &mut OutputHandleCore>)+'static, + P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -94,9 +94,9 @@ pub trait Operator { /// ``` fn unary_notify, - &mut OutputHandleCore>, + &mut OutputHandleCore>, &mut Notificator)+'static, - P: ParallelizationContractCore> + P: ParallelizationContract> (&self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; /// Creates a new dataflow operator that partitions its input stream by a parallelization @@ -132,8 +132,8 @@ pub trait Operator { D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContractCore; + &mut OutputHandleCore>)+'static, + P: ParallelizationContract; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -192,9 +192,9 @@ pub trait Operator { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore; + &mut OutputHandleCore>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -245,10 +245,10 @@ pub trait Operator { D3: Container, L: FnMut(&mut InputHandleCore, &mut InputHandleCore, - &mut OutputHandleCore>, + &mut OutputHandleCore>, &mut Notificator)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore> + P1: ParallelizationContract, + P2: ParallelizationContract> (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; /// Creates a new dataflow operator that partitions its input streams by a parallelization @@ -292,9 +292,9 @@ pub trait Operator { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore; + &mut OutputHandleCore>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream @@ -322,7 +322,7 @@ pub trait Operator { fn sink(&self, pact: P, name: &str, logic: L) where L: FnMut(&mut FrontieredInputHandleCore)+'static, - P: ParallelizationContractCore; + P: ParallelizationContract; } impl Operator for StreamCore { @@ -332,8 +332,8 @@ impl Operator for StreamCore { D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContractCore { + &mut OutputHandleCore>)+'static, + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -357,9 +357,9 @@ impl Operator for StreamCore { fn unary_notify, - &mut OutputHandleCore>, + &mut OutputHandleCore>, &mut Notificator)+'static, - P: ParallelizationContractCore> + P: ParallelizationContract> (&self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.unary_frontier(pact, name, move |capability, _info| { @@ -382,8 +382,8 @@ impl Operator for StreamCore { D2: Container, B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P: ParallelizationContractCore { + &mut OutputHandleCore>)+'static, + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -412,9 +412,9 @@ impl Operator for StreamCore { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore, &mut FrontieredInputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore { + &mut OutputHandleCore>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -442,10 +442,10 @@ impl Operator for StreamCore { D3: Container, L: FnMut(&mut InputHandleCore, &mut InputHandleCore, - &mut OutputHandleCore>, + &mut OutputHandleCore>, &mut Notificator)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore> + P1: ParallelizationContract, + P2: ParallelizationContract> (&self, other: &StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { self.binary_frontier(other, pact1, pact2, name, |capability, _info| { @@ -472,9 +472,9 @@ impl Operator for StreamCore { B: FnOnce(Capability, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore, &mut InputHandleCore, - &mut OutputHandleCore>)+'static, - P1: ParallelizationContractCore, - P2: ParallelizationContractCore { + &mut OutputHandleCore>)+'static, + P1: ParallelizationContract, + P2: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let operator_info = builder.operator_info(); @@ -500,7 +500,7 @@ impl Operator for StreamCore { fn sink(&self, pact: P, name: &str, mut logic: L) where L: FnMut(&mut FrontieredInputHandleCore)+'static, - P: ParallelizationContractCore { + P: ParallelizationContract { let mut builder = OperatorBuilder::new(name.to_owned(), self.scope()); let mut input = builder.new_input(self, pact); @@ -559,7 +559,7 @@ pub fn source(scope: &G, name: &str, constructor: B) -> Strea where D: Container, B: FnOnce(Capability, OperatorInfo) -> L, - L: FnMut(&mut OutputHandleCore>)+'static { + L: FnMut(&mut OutputHandleCore>)+'static { let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone()); let operator_info = builder.operator_info(); diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/input.rs index 72719c5f85..3be59db318 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/input.rs @@ -12,7 +12,7 @@ use crate::progress::Source; use crate::{Container, Data}; use crate::communication::Push; use crate::dataflow::{Stream, ScopeParent, Scope, StreamCore}; -use crate::dataflow::channels::pushers::{TeeCore, CounterCore}; +use crate::dataflow::channels::pushers::{Tee, Counter}; use crate::dataflow::channels::Message; @@ -177,8 +177,8 @@ impl Input for G where ::Timestamp: TotalOrder { } fn input_from_core(&mut self, handle: &mut HandleCore<::Timestamp, D>) -> StreamCore { - let (output, registrar) = TeeCore::<::Timestamp, D>::new(); - let counter = CounterCore::new(output); + let (output, registrar) = Tee::<::Timestamp, D>::new(); + let counter = Counter::new(output); let produced = counter.produced().clone(); let index = self.allocate_operator_index(); @@ -249,7 +249,7 @@ impl Operate for Operator { pub struct HandleCore { activate: Vec, progress: Vec>>>, - pushers: Vec>>, + pushers: Vec>>, buffer1: C, buffer2: C, now_at: T, @@ -332,7 +332,7 @@ impl HandleCore { fn register( &mut self, - pusher: CounterCore>, + pusher: Counter>, progress: Rc>>, ) { // flush current contents, so new registrant does not see existing data. diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index ad990cb79c..982303a595 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -5,8 +5,8 @@ use std::cell::RefCell; use crate::progress::Timestamp; use crate::progress::frontier::{AntichainRef, MutableAntichain}; -use crate::dataflow::channels::pushers::CounterCore as PushCounter; -use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer; +use crate::dataflow::channels::pushers::Counter as PushCounter; +use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/unordered_input.rs index c7e6002341..ea60847705 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/unordered_input.rs @@ -12,8 +12,8 @@ use crate::progress::Source; use crate::progress::ChangeBatch; use crate::Data; -use crate::dataflow::channels::pushers::{CounterCore as PushCounter, TeeCore}; -use crate::dataflow::channels::pushers::buffer::{BufferCore as PushBuffer, AutoflushSessionCore}; +use crate::dataflow::channels::pushers::{Counter as PushCounter, Tee}; +use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSessionCore}; use crate::dataflow::operators::{ActivateCapability, Capability}; @@ -149,7 +149,7 @@ pub trait UnorderedInputCore { impl UnorderedInputCore for G { fn new_unordered_input_core(&mut self) -> ((UnorderedHandleCore, ActivateCapability), StreamCore) { - let (output, registrar) = TeeCore::::new(); + let (output, registrar) = Tee::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); // let produced = Rc::new(RefCell::new(ChangeBatch::new())); let cap = Capability::new(G::Timestamp::minimum(), internal.clone()); @@ -216,18 +216,18 @@ impl Operate for UnorderedOperator { /// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. #[derive(Debug)] pub struct UnorderedHandleCore { - buffer: PushBuffer>>, + buffer: PushBuffer>>, } impl UnorderedHandleCore { - fn new(pusher: PushCounter>) -> UnorderedHandleCore { + fn new(pusher: PushCounter>) -> UnorderedHandleCore { UnorderedHandleCore { buffer: PushBuffer::new(pusher), } } /// Allocates a new automatically flushing session based on the supplied capability. - pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { + pub fn session<'b>(&'b mut self, cap: ActivateCapability) -> ActivateOnDrop>>> { ActivateOnDrop::new(self.buffer.autoflush_session(cap.capability.clone()), cap.address.clone(), cap.activations.clone()) } } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index aa5e48601f..e695207293 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -9,7 +9,7 @@ use crate::progress::{Source, Target}; use crate::communication::Push; use crate::dataflow::Scope; use crate::dataflow::channels::pushers::tee::TeeHelper; -use crate::dataflow::channels::BundleCore; +use crate::dataflow::channels::Bundle; use std::fmt::{self, Debug}; use crate::Container; @@ -25,7 +25,7 @@ pub struct StreamCore { name: Source, /// The `Scope` containing the stream. scope: S, - /// Maintains a list of Push> interested in the stream's output. + /// Maintains a list of Push>> interested in the stream's output. ports: TeeHelper, } @@ -37,7 +37,7 @@ impl StreamCore { /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { + pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {