Skip to content

Commit

Permalink
Remove Container::Item
Browse files Browse the repository at this point in the history
It's not used, and unclear as to what it should communicate.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Nov 26, 2023
1 parent 64be92b commit efb29a0
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 23 deletions.
3 changes: 1 addition & 2 deletions container/src/columnation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,6 @@ mod container {
use crate::columnation::{Columnation, TimelyStack};

impl<T: Columnation + 'static> Container for TimelyStack<T> {
type Item = T;

fn len(&self) -> usize {
self.local.len()
}
Expand All @@ -316,6 +314,7 @@ mod container {
}

impl<T: Columnation + 'static> PushPartitioned for TimelyStack<T> {
type Item = T;
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
where
I: FnMut(&Self::Item) -> usize,
Expand Down
13 changes: 4 additions & 9 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ pub mod columnation;
/// is efficient (which is not necessarily the case when deriving `Clone`.)
/// TODO: Don't require `Container: Clone`
pub trait Container: Default + Clone + 'static {
/// The type of elements this container holds.
type Item;

/// The number of elements in this container
///
/// The length of a container must be consistent between sending and receiving it.
Expand All @@ -41,8 +38,6 @@ pub trait Container: Default + Clone + 'static {
}

impl<T: Clone + 'static> Container for Vec<T> {
type Item = T;

fn len(&self) -> usize {
Vec::len(self)
}
Expand All @@ -64,8 +59,6 @@ mod rc {
use crate::Container;

impl<T: Container> Container for Rc<T> {
type Item = T::Item;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}
Expand Down Expand Up @@ -95,8 +88,6 @@ mod arc {
use crate::Container;

impl<T: Container> Container for Arc<T> {
type Item = T::Item;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}
Expand All @@ -122,6 +113,9 @@ mod arc {

/// A container that can partition itself into pieces.
pub trait PushPartitioned: Container {
/// Type of item to distribute among containers.
type Item;

/// Partition and push this container.
///
/// Drain all elements from `self`, and use the function `index` to determine which `buffer` to
Expand All @@ -133,6 +127,7 @@ pub trait PushPartitioned: Container {
}

impl<T: Clone + 'static> PushPartitioned for Vec<T> {
type Item = T;
fn push_partitioned<I, F>(&mut self, buffers: &mut [Self], mut index: I, mut flush: F)
where
I: FnMut(&Self::Item) -> usize,
Expand Down
20 changes: 10 additions & 10 deletions timely/src/dataflow/operators/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::operators::generic::Operator;

/// Methods to inspect records and batches of records on a stream.
pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
pub trait Inspect<G: Scope, C: Container, I>: InspectCore<G, C> + Sized {
/// Runs a supplied closure on each observed data element.
///
/// # Examples
Expand All @@ -21,7 +21,7 @@ pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn inspect(&self, mut func: impl FnMut(&C::Item)+'static) -> Self {
fn inspect(&self, mut func: impl FnMut(&I)+'static) -> Self {
self.inspect_batch(move |_, data| {
for datum in data.iter() { func(datum); }
})
Expand All @@ -38,7 +38,7 @@ pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
/// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
/// });
/// ```
fn inspect_time(&self, mut func: impl FnMut(&G::Timestamp, &C::Item)+'static) -> Self {
fn inspect_time(&self, mut func: impl FnMut(&G::Timestamp, &I)+'static) -> Self {
self.inspect_batch(move |time, data| {
for datum in data.iter() {
func(&time, &datum);
Expand All @@ -57,7 +57,7 @@ pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
/// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
/// });
/// ```
fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &[C::Item])+'static) -> Self {
fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &[I])+'static) -> Self {
self.inspect_core(move |event| {
if let Ok((time, data)) = event {
func(time, data);
Expand All @@ -84,25 +84,25 @@ pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
/// });
/// });
/// ```
fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>)+'static;
fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[I]), &[G::Timestamp]>)+'static;
}

impl<G: Scope, D: Data> Inspect<G, Vec<D>> for StreamCore<G, Vec<D>> {
impl<G: Scope, D: Data> Inspect<G, Vec<D>, D> for StreamCore<G, Vec<D>> {
fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static {
self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..]))))
}
}

impl<G: Scope, D: Data+Columnation> Inspect<G, TimelyStack<D>> for StreamCore<G, TimelyStack<D>> {
impl<G: Scope, D: Data+Columnation> Inspect<G, TimelyStack<D>, D> for StreamCore<G, TimelyStack<D>> {
fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[D]), &[G::Timestamp]>) + 'static {
self.inspect_container(move |r| func(r.map(|(t, c)| (t, &c[..]))))
}
}

impl<G: Scope, C: Container> Inspect<G, Rc<C>> for StreamCore<G, Rc<C>>
where C: AsRef<[C::Item]>
impl<G: Scope, C: Container> Inspect<G, Rc<C>, C> for StreamCore<G, Rc<C>>
where C: AsRef<[C]>
{
fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C::Item]), &[G::Timestamp]>) + 'static {
fn inspect_core<F>(&self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &[C]), &[G::Timestamp]>) + 'static {
self.inspect_container(move |r| func(r.map(|(t, c)| (t, c.as_ref().as_ref()))))
}
}
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/reclock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ pub trait Reclock<S: Scope> {
/// assert_eq!(extracted[1], (5, vec![4,5]));
/// assert_eq!(extracted[2], (8, vec![6,7,8]));
/// ```
fn reclock<TC: Container<Item=()>>(&self, clock: &StreamCore<S, TC>) -> Self;
fn reclock<TC: Container>(&self, clock: &StreamCore<S, TC>) -> Self;
}

impl<S: Scope, C: Container> Reclock<S> for StreamCore<S, C> {
fn reclock<TC: Container<Item=()>>(&self, clock: &StreamCore<S, TC>) -> StreamCore<S, C> {
fn reclock<TC: Container>(&self, clock: &StreamCore<S, TC>) -> StreamCore<S, C> {

let mut stash = vec![];

Expand Down

0 comments on commit efb29a0

Please sign in to comment.