Skip to content

Commit

Permalink
Change generic variable for containers from D to C (TimelyDataflow#552)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Mar 21, 2024
1 parent 786326d commit 6d3f13e
Show file tree
Hide file tree
Showing 19 changed files with 288 additions and 290 deletions.
16 changes: 8 additions & 8 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,39 @@ pub mod pullers;
pub mod pact;

/// The input to and output from timely dataflow communication channels.
pub type Bundle<T, D> = crate::communication::Message<Message<T, D>>;
pub type Bundle<T, C> = crate::communication::Message<Message<T, C>>;

/// A serializable representation of timestamped data.
#[derive(Clone, Abomonation, Serialize, Deserialize)]
pub struct Message<T, D> {
pub struct Message<T, C> {
/// The timestamp associated with the message.
pub time: T,
/// The data in the message.
pub data: D,
pub data: C,
/// The source worker.
pub from: usize,
/// A sequence number for this worker-to-worker stream.
pub seq: usize,
}

impl<T, D> Message<T, D> {
impl<T, C> Message<T, C> {
/// Default buffer size.
#[deprecated = "Use timely::buffer::default_capacity instead"]
pub fn default_length() -> usize {
crate::container::buffer::default_capacity::<D>()
crate::container::buffer::default_capacity::<C>()
}
}

impl<T, D: Container> Message<T, D> {
impl<T, C: Container> Message<T, C> {
/// Creates a new message instance from arguments.
pub fn new(time: T, data: D, from: usize, seq: usize) -> Self {
pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
Message { time, data, from, seq }
}

/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element.
#[inline]
pub fn push_at<P: Push<Bundle<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {
pub fn push_at<P: Push<Bundle<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {

let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
Expand Down
36 changes: 17 additions & 19 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use crate::progress::Timestamp;
use crate::worker::AsWorker;

/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T, D> {
pub trait ParallelizationContract<T, C> {
/// Type implementing `Push` produced by this pact.
type Pusher: Push<Bundle<T, D>>+'static;
type Pusher: Push<Bundle<T, C>>+'static;
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<Bundle<T, D>>+'static;
type Puller: Pull<Bundle<T, C>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}
Expand All @@ -33,13 +33,11 @@ pub trait ParallelizationContract<T, D> {
#[derive(Debug)]
pub struct Pipeline;

impl<T: 'static, D: Container> ParallelizationContract<T, D> for Pipeline {
type Pusher = LogPusher<T, D, ThreadPusher<Bundle<T, D>>>;
type Puller = LogPuller<T, D, ThreadPuller<Bundle<T, D>>>;
impl<T: 'static, C: Container> ParallelizationContract<T, C> for Pipeline {
type Pusher = LogPusher<T, C, ThreadPusher<Bundle<T, C>>>;
type Puller = LogPuller<T, C, ThreadPuller<Bundle<T, C>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Message<T, D>>(identifier, address);
// // ignore `&mut A` and use thread allocator
// let (pusher, puller) = Thread::new::<Bundle<T, Vec<D>>>();
let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
}
Expand Down Expand Up @@ -89,17 +87,17 @@ impl<C, F> Debug for ExchangeCore<C, F> {

/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPusher<T, D, P: Push<Bundle<T, D>>> {
pub struct LogPusher<T, C, P: Push<Bundle<T, C>>> {
pusher: P,
channel: usize,
counter: usize,
source: usize,
target: usize,
phantom: PhantomData<(T, D)>,
phantom: PhantomData<(T, C)>,
logging: Option<Logger>,
}

impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
impl<T, C, P: Push<Bundle<T, C>>> LogPusher<T, C, P> {
/// Allocates a new pusher.
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPusher {
Expand All @@ -114,9 +112,9 @@ impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
}
}

impl<T, D: Container, P: Push<Bundle<T, D>>> Push<Bundle<T, D>> for LogPusher<T, D, P> {
impl<T, C: Container, P: Push<Bundle<T, C>>> Push<Bundle<T, C>> for LogPusher<T, C, P> {
#[inline]
fn push(&mut self, pair: &mut Option<Bundle<T, D>>) {
fn push(&mut self, pair: &mut Option<Bundle<T, C>>) {
if let Some(bundle) = pair {
self.counter += 1;

Expand Down Expand Up @@ -145,15 +143,15 @@ impl<T, D: Container, P: Push<Bundle<T, D>>> Push<Bundle<T, D>> for LogPusher<T,

/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPuller<T, D, P: Pull<Bundle<T, D>>> {
pub struct LogPuller<T, C, P: Pull<Bundle<T, C>>> {
puller: P,
channel: usize,
index: usize,
phantom: PhantomData<(T, D)>,
phantom: PhantomData<(T, C)>,
logging: Option<Logger>,
}

impl<T, D, P: Pull<Bundle<T, D>>> LogPuller<T, D, P> {
impl<T, C, P: Pull<Bundle<T, C>>> LogPuller<T, C, P> {
/// Allocates a new `Puller`.
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPuller {
Expand All @@ -166,9 +164,9 @@ impl<T, D, P: Pull<Bundle<T, D>>> LogPuller<T, D, P> {
}
}

impl<T, D: Container, P: Pull<Bundle<T, D>>> Pull<Bundle<T, D>> for LogPuller<T, D, P> {
impl<T, C: Container, P: Pull<Bundle<T, C>>> Pull<Bundle<T, C>> for LogPuller<T, C, P> {
#[inline]
fn pull(&mut self) -> &mut Option<Bundle<T,D>> {
fn pull(&mut self) -> &mut Option<Bundle<T, C>> {
let result = self.puller.pull();
if let Some(bundle) = result {
let channel = self.channel;
Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use crate::communication::Pull;
use crate::Container;

/// A wrapper which accounts records pulled past in a shared count map.
pub struct Counter<T: Ord+Clone+'static, D, P: Pull<Bundle<T, D>>> {
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> {
pullable: P,
consumed: Rc<RefCell<ChangeBatch<T>>>,
phantom: ::std::marker::PhantomData<D>,
phantom: ::std::marker::PhantomData<C>,
}

/// A guard type that updates the change batch counts on drop
Expand All @@ -36,15 +36,15 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
}
}

impl<T:Ord+Clone+'static, D: Container, P: Pull<Bundle<T, D>>> Counter<T, D, P> {
impl<T:Ord+Clone+'static, C: Container, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
/// Retrieves the next timestamp and batch of data.
#[inline]
pub fn next(&mut self) -> Option<&mut Bundle<T, D>> {
pub fn next(&mut self) -> Option<&mut Bundle<T, C>> {
self.next_guarded().map(|(_guard, bundle)| bundle)
}

#[inline]
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Bundle<T, D>)> {
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Bundle<T, C>)> {
if let Some(message) = self.pullable.pull() {
let guard = ConsumedGuard {
consumed: Rc::clone(&self.consumed),
Expand All @@ -57,7 +57,7 @@ impl<T:Ord+Clone+'static, D: Container, P: Pull<Bundle<T, D>>> Counter<T, D, P>
}
}

impl<T:Ord+Clone+'static, D, P: Pull<Bundle<T, D>>> Counter<T, D, P> {
impl<T:Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
/// Allocates a new `Counter` from a boxed puller.
pub fn new(pullable: P) -> Self {
Counter {
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use crate::{Container, Data};
/// The `Buffer` type should be used by calling `session` with a time, which checks whether
/// data must be flushed and creates a `Session` object which allows sending at the given time.
#[derive(Debug)]
pub struct Buffer<T, D: Container, P: Push<Bundle<T, D>>> {
pub struct Buffer<T, C: Container, P: Push<Bundle<T, C>>> {
/// the currently open time, if it is open
time: Option<T>,
/// a buffer for records, to send at self.time
buffer: D,
buffer: C,
pusher: P,
}

Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use crate::Container;

/// A wrapper which updates shared `produced` based on the number of records pushed.
#[derive(Debug)]
pub struct Counter<T, D, P: Push<Bundle<T, D>>> {
pub struct Counter<T, C, P: Push<Bundle<T, C>>> {
pushee: P,
produced: Rc<RefCell<ChangeBatch<T>>>,
phantom: PhantomData<D>,
phantom: PhantomData<C>,
}

impl<T: Timestamp, D: Container, P> Push<Bundle<T, D>> for Counter<T, D, P> where P: Push<Bundle<T, D>> {
impl<T: Timestamp, C: Container, P> Push<Bundle<T, C>> for Counter<T, C, P> where P: Push<Bundle<T, C>> {
#[inline]
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
fn push(&mut self, message: &mut Option<Bundle<T, C>>) {
if let Some(message) = message {
self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64);
}
Expand All @@ -31,9 +31,9 @@ impl<T: Timestamp, D: Container, P> Push<Bundle<T, D>> for Counter<T, D, P> wher
}
}

impl<T, D, P: Push<Bundle<T, D>>> Counter<T, D, P> where T : Ord+Clone+'static {
impl<T, C, P: Push<Bundle<T, C>>> Counter<T, C, P> where T : Ord+Clone+'static {
/// Allocates a new `Counter` from a pushee and shared counts.
pub fn new(pushee: P) -> Counter<T, D, P> {
pub fn new(pushee: P) -> Counter<T, C, P> {
Counter {
pushee,
produced: Rc::new(RefCell::new(ChangeBatch::new())),
Expand Down
34 changes: 17 additions & 17 deletions timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ use crate::dataflow::channels::{Bundle, Message};
use crate::communication::Push;
use crate::{Container, Data};

type PushList<T, D> = Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>;
type PushList<T, C> = Rc<RefCell<Vec<Box<dyn Push<Bundle<T, C>>>>>>;

/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
pub struct Tee<T, D> {
buffer: D,
shared: PushList<T, D>,
pub struct Tee<T, C> {
buffer: C,
shared: PushList<T, C>,
}

impl<T: Data, D: Container> Push<Bundle<T, D>> for Tee<T, D> {
impl<T: Data, C: Container> Push<Bundle<T, C>> for Tee<T, C> {
#[inline]
fn push(&mut self, message: &mut Option<Bundle<T, D>>) {
fn push(&mut self, message: &mut Option<Bundle<T, C>>) {
let mut pushers = self.shared.borrow_mut();
if let Some(message) = message {
for index in 1..pushers.len() {
Expand All @@ -39,9 +39,9 @@ impl<T: Data, D: Container> Push<Bundle<T, D>> for Tee<T, D> {
}
}

impl<T, D: Container> Tee<T, D> {
impl<T, C: Container> Tee<T, C> {
/// Allocates a new pair of `Tee` and `TeeHelper`.
pub fn new() -> (Tee<T, D>, TeeHelper<T, D>) {
pub fn new() -> (Tee<T, C>, TeeHelper<T, C>) {
let shared = Rc::new(RefCell::new(Vec::new()));
let port = Tee {
buffer: Default::default(),
Expand All @@ -52,7 +52,7 @@ impl<T, D: Container> Tee<T, D> {
}
}

impl<T, D: Container> Clone for Tee<T, D> {
impl<T, C: Container> Clone for Tee<T, C> {
fn clone(&self) -> Self {
Self {
buffer: Default::default(),
Expand All @@ -61,9 +61,9 @@ impl<T, D: Container> Clone for Tee<T, D> {
}
}

impl<T, D> Debug for Tee<T, D>
impl<T, C> Debug for Tee<T, C>
where
D: Debug,
C: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug = f.debug_struct("Tee");
Expand All @@ -80,26 +80,26 @@ where
}

/// A shared list of `Box<Push>` used to add `Push` implementors.
pub struct TeeHelper<T, D> {
shared: PushList<T, D>,
pub struct TeeHelper<T, C> {
shared: PushList<T, C>,
}

impl<T, D> TeeHelper<T, D> {
impl<T, C> TeeHelper<T, C> {
/// Adds a new `Push` implementor to the list of recipients shared with a `Stream`.
pub fn add_pusher<P: Push<Bundle<T, D>>+'static>(&self, pusher: P) {
pub fn add_pusher<P: Push<Bundle<T, C>>+'static>(&self, pusher: P) {
self.shared.borrow_mut().push(Box::new(pusher));
}
}

impl<T, D> Clone for TeeHelper<T, D> {
impl<T, C> Clone for TeeHelper<T, C> {
fn clone(&self) -> Self {
TeeHelper {
shared: self.shared.clone(),
}
}
}

impl<T, D> Debug for TeeHelper<T, D> {
impl<T, C> Debug for TeeHelper<T, C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug = f.debug_struct("TeeHelper");

Expand Down
10 changes: 5 additions & 5 deletions timely/src/dataflow/operators/capture/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::progress::Timestamp;
use super::{EventCore, EventPusherCore};

/// Capture a stream of timestamped data for later replay.
pub trait Capture<T: Timestamp, D: Container> {
pub trait Capture<T: Timestamp, C: Container> {
/// Captures a stream of timestamped data for later replay.
///
/// # Examples
Expand Down Expand Up @@ -103,18 +103,18 @@ pub trait Capture<T: Timestamp, D: Container> {
///
/// assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());
/// ```
fn capture_into<P: EventPusherCore<T, D>+'static>(&self, pusher: P);
fn capture_into<P: EventPusherCore<T, C>+'static>(&self, pusher: P);

/// Captures a stream using Rust's MPSC channels.
fn capture(&self) -> ::std::sync::mpsc::Receiver<EventCore<T, D>> {
fn capture(&self) -> ::std::sync::mpsc::Receiver<EventCore<T, C>> {
let (send, recv) = ::std::sync::mpsc::channel();
self.capture_into(send);
recv
}
}

impl<S: Scope, D: Container> Capture<S::Timestamp, D> for StreamCore<S, D> {
fn capture_into<P: EventPusherCore<S::Timestamp, D>+'static>(&self, mut event_pusher: P) {
impl<S: Scope, C: Container> Capture<S::Timestamp, C> for StreamCore<S, C> {
fn capture_into<P: EventPusherCore<S::Timestamp, C>+'static>(&self, mut event_pusher: P) {

let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
let mut input = PullCounter::new(builder.new_input(self, Pipeline));
Expand Down
Loading

0 comments on commit 6d3f13e

Please sign in to comment.