Skip to content

Commit

Permalink
Rename internal Core variants (#551)
Browse files Browse the repository at this point in the history
* Rename BundleCore to Bundle

* Rename BufferCore to Buffer

* Rename ParallelizationContractCore to ParallelizationContract

* Rename TeeCore to Tee

* Rename CounterCore to Counter
  • Loading branch information
frankmcsherry authored Mar 21, 2024
1 parent ded1a39 commit 786326d
Show file tree
Hide file tree
Showing 19 changed files with 171 additions and 188 deletions.
9 changes: 3 additions & 6 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ pub mod pullers;
pub mod pact;

/// The input to and output from timely dataflow communication channels.
pub type BundleCore<T, D> = crate::communication::Message<Message<T, D>>;

/// The input to and output from timely dataflow communication channels specialized to vectors.
pub type Bundle<T, D> = BundleCore<T, Vec<D>>;
pub type Bundle<T, D> = crate::communication::Message<Message<T, D>>;

/// A serializable representation of timestamped data.
#[derive(Clone, Abomonation, Serialize, Deserialize)]
Expand Down Expand Up @@ -46,11 +43,11 @@ impl<T, D: Container> Message<T, D> {
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element.
#[inline]
pub fn push_at<P: Push<BundleCore<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {
pub fn push_at<P: Push<Bundle<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {

let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
let mut bundle = Some(BundleCore::from_typed(message));
let mut bundle = Some(Bundle::from_typed(message));

pusher.push(&mut bundle);

Expand Down
45 changes: 20 additions & 25 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,32 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::communication::{Push, Pull, Data};
use crate::container::PushPartitioned;
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
use crate::dataflow::channels::{BundleCore, Message};
use crate::dataflow::channels::{Bundle, Message};
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
use crate::progress::Timestamp;
use crate::worker::AsWorker;

/// A `ParallelizationContractCore` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContractCore<T, D> {
/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T, D> {
/// Type implementing `Push` produced by this pact.
type Pusher: Push<BundleCore<T, D>>+'static;
type Pusher: Push<Bundle<T, D>>+'static;
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<BundleCore<T, D>>+'static;
type Puller: Pull<Bundle<T, D>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}

/// A `ParallelizationContractCore` specialized for `Vec` containers
/// TODO: Use trait aliases once stable.
pub trait ParallelizationContract<T, D: Clone>: ParallelizationContractCore<T, Vec<D>> { }
impl<T, D: Clone, P: ParallelizationContractCore<T, Vec<D>>> ParallelizationContract<T, D> for P { }

/// A direct connection
#[derive(Debug)]
pub struct Pipeline;

impl<T: 'static, D: Container> ParallelizationContractCore<T, D> for Pipeline {
type Pusher = LogPusher<T, D, ThreadPusher<BundleCore<T, D>>>;
type Puller = LogPuller<T, D, ThreadPuller<BundleCore<T, D>>>;
impl<T: 'static, D: Container> ParallelizationContract<T, D> for Pipeline {
type Pusher = LogPusher<T, D, ThreadPusher<Bundle<T, D>>>;
type Puller = LogPuller<T, D, ThreadPuller<Bundle<T, D>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Message<T, D>>(identifier, address);
// // ignore `&mut A` and use thread allocator
// let (pusher, puller) = Thread::new::<Bundle<T, D>>();
// let (pusher, puller) = Thread::new::<Bundle<T, Vec<D>>>();
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
}
Expand All @@ -71,13 +66,13 @@ where
}

// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Timestamp, C, H: 'static> ParallelizationContractCore<T, C> for ExchangeCore<C, H>
impl<T: Timestamp, C, H: 'static> ParallelizationContract<T, C> for ExchangeCore<C, H>
where
C: Data + PushPartitioned,
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<BundleCore<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<BundleCore<T, C>>>>;
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<Bundle<T, C>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
Expand All @@ -94,7 +89,7 @@ impl<C, F> Debug for ExchangeCore<C, F> {

/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPusher<T, D, P: Push<BundleCore<T, D>>> {
pub struct LogPusher<T, D, P: Push<Bundle<T, D>>> {
pusher: P,
channel: usize,
counter: usize,
Expand All @@ -104,7 +99,7 @@ pub struct LogPusher<T, D, P: Push<BundleCore<T, D>>> {
logging: Option<Logger>,
}

impl<T, D, P: Push<BundleCore<T, D>>> LogPusher<T, D, P> {
impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
/// Allocates a new pusher.
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPusher {
Expand All @@ -119,9 +114,9 @@ impl<T, D, P: Push<BundleCore<T, D>>> LogPusher<T, D, P> {
}
}

impl<T, D: Container, P: Push<BundleCore<T, D>>> Push<BundleCore<T, D>> for LogPusher<T, D, P> {
impl<T, D: Container, P: Push<Bundle<T, D>>> Push<Bundle<T, D>> for LogPusher<T, D, P> {
#[inline]
fn push(&mut self, pair: &mut Option<BundleCore<T, D>>) {
fn push(&mut self, pair: &mut Option<Bundle<T, D>>) {
if let Some(bundle) = pair {
self.counter += 1;

Expand Down Expand Up @@ -150,15 +145,15 @@ impl<T, D: Container, P: Push<BundleCore<T, D>>> Push<BundleCore<T, D>> for LogP

/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPuller<T, D, P: Pull<BundleCore<T, D>>> {
pub struct LogPuller<T, D, P: Pull<Bundle<T, D>>> {
puller: P,
channel: usize,
index: usize,
phantom: PhantomData<(T, D)>,
logging: Option<Logger>,
}

impl<T, D, P: Pull<BundleCore<T, D>>> LogPuller<T, D, P> {
impl<T, D, P: Pull<Bundle<T, D>>> LogPuller<T, D, P> {
/// Allocates a new `Puller`.
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPuller {
Expand All @@ -171,9 +166,9 @@ impl<T, D, P: Pull<BundleCore<T, D>>> LogPuller<T, D, P> {
}
}

impl<T, D: Container, P: Pull<BundleCore<T, D>>> Pull<BundleCore<T, D>> for LogPuller<T, D, P> {
impl<T, D: Container, P: Pull<Bundle<T, D>>> Pull<Bundle<T, D>> for LogPuller<T, D, P> {
#[inline]
fn pull(&mut self) -> &mut Option<BundleCore<T,D>> {
fn pull(&mut self) -> &mut Option<Bundle<T,D>> {
let result = self.puller.pull();
if let Some(bundle) = result {
let channel = self.channel;
Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
use std::rc::Rc;
use std::cell::RefCell;

use crate::dataflow::channels::BundleCore;
use crate::dataflow::channels::Bundle;
use crate::progress::ChangeBatch;
use crate::communication::Pull;
use crate::Container;

/// A wrapper which accounts records pulled past in a shared count map.
pub struct Counter<T: Ord+Clone+'static, D, P: Pull<BundleCore<T, D>>> {
pub struct Counter<T: Ord+Clone+'static, D, P: Pull<Bundle<T, D>>> {
pullable: P,
consumed: Rc<RefCell<ChangeBatch<T>>>,
phantom: ::std::marker::PhantomData<D>,
Expand All @@ -36,15 +36,15 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
}
}

impl<T:Ord+Clone+'static, D: Container, P: Pull<BundleCore<T, D>>> Counter<T, D, P> {
impl<T:Ord+Clone+'static, D: Container, P: Pull<Bundle<T, D>>> Counter<T, D, P> {
/// Retrieves the next timestamp and batch of data.
#[inline]
pub fn next(&mut self) -> Option<&mut BundleCore<T, D>> {
pub fn next(&mut self) -> Option<&mut Bundle<T, D>> {
self.next_guarded().map(|(_guard, bundle)| bundle)
}

#[inline]
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut BundleCore<T, D>)> {
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Bundle<T, D>)> {
if let Some(message) = self.pullable.pull() {
let guard = ConsumedGuard {
consumed: Rc::clone(&self.consumed),
Expand All @@ -57,7 +57,7 @@ impl<T:Ord+Clone+'static, D: Container, P: Pull<BundleCore<T, D>>> Counter<T, D,
}
}

impl<T:Ord+Clone+'static, D, P: Pull<BundleCore<T, D>>> Counter<T, D, P> {
impl<T:Ord+Clone+'static, D, P: Pull<Bundle<T, D>>> Counter<T, D, P> {
/// Allocates a new `Counter` from a boxed puller.
pub fn new(pullable: P) -> Self {
Counter {
Expand Down
31 changes: 14 additions & 17 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::communication::Push;
use crate::container::{PushContainer, PushInto};
use crate::dataflow::channels::{BundleCore, Message};
use crate::dataflow::channels::{Bundle, Message};
use crate::dataflow::operators::Capability;
use crate::progress::Timestamp;
use crate::{Container, Data};
Expand All @@ -13,18 +13,15 @@ use crate::{Container, Data};
/// The `Buffer` type should be used by calling `session` with a time, which checks whether
/// data must be flushed and creates a `Session` object which allows sending at the given time.
#[derive(Debug)]
pub struct BufferCore<T, D: Container, P: Push<BundleCore<T, D>>> {
pub struct Buffer<T, D: Container, P: Push<Bundle<T, D>>> {
/// the currently open time, if it is open
time: Option<T>,
/// a buffer for records, to send at self.time
buffer: D,
pusher: P,
}

/// A buffer specialized to vector-based containers.
pub type Buffer<T, D, P> = BufferCore<T, Vec<D>, P>;

impl<T, C: Container, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq+Clone {
impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, C, P> where T: Eq+Clone {

/// Creates a new `Buffer`.
pub fn new(pusher: P) -> Self {
Expand Down Expand Up @@ -82,7 +79,7 @@ impl<T, C: Container, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq
}
}

impl<T, C: PushContainer, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T: Eq+Clone {
impl<T, C: PushContainer, P: Push<Bundle<T, C>>> Buffer<T, C, P> where T: Eq+Clone {
// internal method for use by `Session`.
#[inline]
fn give<D: PushInto<C>>(&mut self, data: D) {
Expand All @@ -97,7 +94,7 @@ impl<T, C: PushContainer, P: Push<BundleCore<T, C>>> BufferCore<T, C, P> where T
}
}

impl<T, D: Data, P: Push<BundleCore<T, Vec<D>>>> Buffer<T, D, P> where T: Eq+Clone {
impl<T, D: Data, P: Push<Bundle<T, Vec<D>>>> Buffer<T, Vec<D>, P> where T: Eq+Clone {
// Gives an entire message at a specific time.
fn give_vec(&mut self, vector: &mut Vec<D>) {
// flush to ensure fifo-ness
Expand All @@ -114,18 +111,18 @@ impl<T, D: Data, P: Push<BundleCore<T, Vec<D>>>> Buffer<T, D, P> where T: Eq+Clo
/// The `Session` struct provides the user-facing interface to an operator output, namely
/// the `Buffer` type. A `Session` wraps a session of output at a specified time, and
/// avoids what would otherwise be a constant cost of checking timestamp equality.
pub struct Session<'a, T, C: Container, P: Push<BundleCore<T, C>>+'a> where T: Eq+Clone+'a, C: 'a {
buffer: &'a mut BufferCore<T, C, P>,
pub struct Session<'a, T, C: Container, P: Push<Bundle<T, C>>+'a> where T: Eq+Clone+'a, C: 'a {
buffer: &'a mut Buffer<T, C, P>,
}

impl<'a, T, C: Container, P: Push<BundleCore<T, C>>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
impl<'a, T, C: Container, P: Push<Bundle<T, C>>+'a> Session<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
/// Provide a container at the time specified by the [Session].
pub fn give_container(&mut self, container: &mut C) {
self.buffer.give_container(container)
}
}

impl<'a, T, C, P: Push<BundleCore<T, C>>+'a> Session<'a, T, C, P>
impl<'a, T, C, P: Push<Bundle<T, C>>+'a> Session<'a, T, C, P>
where
T: Eq+Clone+'a,
C: 'a + PushContainer,
Expand All @@ -144,7 +141,7 @@ where
}
}

impl<'a, T, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> Session<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
impl<'a, T, D: Data, P: Push<Bundle<T, Vec<D>>>+'a> Session<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
/// Provides a fully formed `Content<D>` message for senders which can use this type.
///
/// The `Content` type is the backing memory for communication in timely, and it can
Expand All @@ -159,18 +156,18 @@ impl<'a, T, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> Session<'a, T, Vec<D>, P
}

/// A session which will flush itself when dropped.
pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>+'a> where
pub struct AutoflushSessionCore<'a, T: Timestamp, C: Container, P: Push<Bundle<T, C>>+'a> where
T: Eq+Clone+'a, C: 'a {
/// A reference to the underlying buffer.
buffer: &'a mut BufferCore<T, C, P>,
buffer: &'a mut Buffer<T, C, P>,
/// The capability being used to send the data.
_capability: Capability<T>,
}

/// Auto-flush session specialized to vector-based containers.
pub type AutoflushSession<'a, T, D, P> = AutoflushSessionCore<'a, T, Vec<D>, P>;

impl<'a, T: Timestamp, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> AutoflushSessionCore<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
impl<'a, T: Timestamp, D: Data, P: Push<Bundle<T, Vec<D>>>+'a> AutoflushSessionCore<'a, T, Vec<D>, P> where T: Eq+Clone+'a, D: 'a {
/// Transmits a single record.
#[inline]
pub fn give(&mut self, data: D) {
Expand All @@ -192,7 +189,7 @@ impl<'a, T: Timestamp, D: Data, P: Push<BundleCore<T, Vec<D>>>+'a> AutoflushSess
}
}

impl<'a, T: Timestamp, C: Container, P: Push<BundleCore<T, C>>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
impl<'a, T: Timestamp, C: Container, P: Push<Bundle<T, C>>+'a> Drop for AutoflushSessionCore<'a, T, C, P> where T: Eq+Clone+'a, C: 'a {
fn drop(&mut self) {
self.buffer.cease();
}
Expand Down
17 changes: 7 additions & 10 deletions timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@ use std::rc::Rc;
use std::cell::RefCell;

use crate::progress::{ChangeBatch, Timestamp};
use crate::dataflow::channels::BundleCore;
use crate::dataflow::channels::Bundle;
use crate::communication::Push;
use crate::Container;

/// A wrapper which updates shared `produced` based on the number of records pushed.
#[derive(Debug)]
pub struct CounterCore<T, D, P: Push<BundleCore<T, D>>> {
pub struct Counter<T, D, P: Push<Bundle<T, D>>> {
pushee: P,
produced: Rc<RefCell<ChangeBatch<T>>>,
phantom: PhantomData<D>,
}

/// A counter specialized to vector.
pub type Counter<T, D, P> = CounterCore<T, Vec<D>, P>;

impl<T: Timestamp, D: Container, P> Push<BundleCore<T, D>> for CounterCore<T, D, P> where P: Push<BundleCore<T, D>> {
impl<T: Timestamp, D: Container, P> Push<Bundle<T, D>> for Counter<T, D, P> where P: Push<Bundle<T, D>> {
#[inline]
fn push(&mut self, message: &mut Option<BundleCore<T, D>>) {
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
if let Some(message) = message {
self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64);
}
Expand All @@ -34,10 +31,10 @@ impl<T: Timestamp, D: Container, P> Push<BundleCore<T, D>> for CounterCore<T, D,
}
}

impl<T, D, P: Push<BundleCore<T, D>>> CounterCore<T, D, P> where T : Ord+Clone+'static {
impl<T, D, P: Push<Bundle<T, D>>> Counter<T, D, P> where T : Ord+Clone+'static {
/// Allocates a new `Counter` from a pushee and shared counts.
pub fn new(pushee: P) -> CounterCore<T, D, P> {
CounterCore {
pub fn new(pushee: P) -> Counter<T, D, P> {
Counter {
pushee,
produced: Rc::new(RefCell::new(ChangeBatch::new())),
phantom: PhantomData,
Expand Down
10 changes: 5 additions & 5 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

use crate::communication::Push;
use crate::container::PushPartitioned;
use crate::dataflow::channels::{BundleCore, Message};
use crate::dataflow::channels::{Bundle, Message};
use crate::{Container, Data};

// TODO : Software write combining
/// Distributes records among target pushees according to a distribution function.
pub struct Exchange<T, C: PushPartitioned, P: Push<BundleCore<T, C>>, H>
pub struct Exchange<T, C: PushPartitioned, P: Push<Bundle<T, C>>, H>
where
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
Expand All @@ -17,7 +17,7 @@ where
hash_func: H,
}

impl<T: Clone, C: PushPartitioned, P: Push<BundleCore<T, C>>, H> Exchange<T, C, P, H>
impl<T: Clone, C: PushPartitioned, P: Push<Bundle<T, C>>, H> Exchange<T, C, P, H>
where
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
Expand All @@ -44,13 +44,13 @@ where
}
}

impl<T: Eq+Data, C: Container, P: Push<BundleCore<T, C>>, H, > Push<BundleCore<T, C>> for Exchange<T, C, P, H>
impl<T: Eq+Data, C: Container, P: Push<Bundle<T, C>>, H, > Push<Bundle<T, C>> for Exchange<T, C, P, H>
where
C: PushPartitioned,
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
#[inline(never)]
fn push(&mut self, message: &mut Option<BundleCore<T, C>>) {
fn push(&mut self, message: &mut Option<Bundle<T, C>>) {
// if only one pusher, no exchange
if self.pushers.len() == 1 {
self.pushers[0].push(message);
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/channels/pushers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub use self::tee::{Tee, TeeCore, TeeHelper};
pub use self::tee::{Tee, TeeHelper};
pub use self::exchange::Exchange;
pub use self::counter::{Counter, CounterCore};
pub use self::counter::Counter;

pub mod tee;
pub mod exchange;
Expand Down
Loading

0 comments on commit 786326d

Please sign in to comment.