Skip to content

Commit

Permalink
Make collections generic over containers
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Sep 15, 2023
1 parent 2b9ac68 commit 5e81e2b
Show file tree
Hide file tree
Showing 14 changed files with 823 additions and 251 deletions.
10 changes: 5 additions & 5 deletions dogsdogsdogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use timely::progress::Timestamp;
use timely::dataflow::operators::Partition;
use timely::dataflow::operators::Concatenate;

use differential_dataflow::{ExchangeData, Collection, AsCollection};
use differential_dataflow::{Data, ExchangeData, Collection, AsCollection};
use differential_dataflow::operators::Threshold;
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
Expand All @@ -33,9 +33,9 @@ pub mod operators;
**/
pub trait PrefixExtender<G: Scope, R: Monoid+Multiply<Output = R>> {
/// The required type of prefix to extend.
type Prefix;
type Prefix: Data;
/// The type to be produced as extension.
type Extension;
type Extension: Data;
/// Annotates prefixes with the number of extensions the relation would propose.
fn count(&mut self, prefixes: &Collection<G, (Self::Prefix, usize, usize), R>, index: usize) -> Collection<G, (Self::Prefix, usize, usize), R>;
/// Extends each prefix with corresponding extensions.
Expand Down Expand Up @@ -92,11 +92,11 @@ where
}
}

pub trait ValidateExtensionMethod<G: Scope, R: Monoid+Multiply<Output = R>, P, E> {
pub trait ValidateExtensionMethod<G: Scope, R: Monoid+Multiply<Output = R>, P: Data, E: Data> {
fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E), R>;
}

impl<G: Scope, R: Monoid+Multiply<Output = R>, P, E> ValidateExtensionMethod<G, R, P, E> for Collection<G, (P, E), R> {
impl<G: Scope, R: Monoid+Multiply<Output = R>, P: Data, E: Data> ValidateExtensionMethod<G, R, P, E> for Collection<G, (P, E), R> {
fn validate_using<PE: PrefixExtender<G, R, Prefix=P, Extension=E>>(&self, extender: &mut PE) -> Collection<G, (P, E), R> {
extender.validate(self)
}
Expand Down
6 changes: 3 additions & 3 deletions src/algorithms/prefix_sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

use timely::dataflow::Scope;

use ::{Collection, ExchangeData};
use ::{Collection, Data, ExchangeData};
use ::lattice::Lattice;
use ::operators::*;

/// Extension trait for the prefix_sum method.
pub trait PrefixSum<G: Scope, K, D> {
pub trait PrefixSum<G: Scope, K: Data, D> {
/// Computes the prefix sum for each element in the collection.
///
/// The prefix sum is data-parallel, in the sense that the sums are computed independently for
Expand Down Expand Up @@ -150,4 +150,4 @@ where
.distinct()
})
.semijoin(&queries)
}
}
240 changes: 134 additions & 106 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ use timely::Data;
use timely::progress::Timestamp;
use timely::order::Product;
use timely::dataflow::scopes::{Child, child::Iterative};
use timely::dataflow::{Scope, Stream};
use timely::dataflow::{Scope, StreamCore};
use timely::dataflow::operators::*;

use ::difference::{Semigroup, Abelian, Multiply};
use lattice::Lattice;
use hashable::Hashable;
use TimelyContainer;

/// A mutable collection of values of type `D`
/// A mutable collection of values of type `D` within a container `C`
///
/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
/// differential dataflow computation, you write as if the collection is a static dataset to which you
Expand All @@ -30,31 +31,140 @@ use hashable::Hashable;
/// propagate changes through your functional computation and report the corresponding changes to the
/// output collections.
///
/// Each collection has three generic parameters. The parameter `G` is for the scope in which the
/// Each collection has four generic parameters. The parameter `G` is for the scope in which the
/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
/// defaults to) `isize`, representing changes to the occurrence count of each record.
/// defaults to) `isize`, representing changes to the occurrence count of each record. The `C`
/// parameter specifies the container type of the collection.
///
/// Note that the default container type is `Vec<_>`.
#[derive(Clone)]
pub struct Collection<G: Scope, D, R: Semigroup = isize> {
pub struct Collection<G, D, R = isize, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
where
G: Scope,
R: Semigroup,
C: TimelyContainer<Item=(D, G::Timestamp, R)>,
{
/// The underlying timely dataflow stream.
///
/// This field is exposed to support direct timely dataflow manipulation when required, but it is
/// not intended to be the idiomatic way to work with the collection.
pub inner: Stream<G, (D, G::Timestamp, R)>
pub inner: StreamCore<G, C>
}

impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Data {
impl<G: Scope, C, D, R: Semigroup> Collection<G, D, R, C>
where
C: TimelyContainer<Item=(D, G::Timestamp, R)>,
{
/// Creates a new Collection from a timely dataflow stream.
///
/// This method seems to be rarely used, with the `as_collection` method on streams being a more
/// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
/// provides a `new_collection` method which will create a new collection for you without exposing
/// the underlying timely stream at all.
pub fn new(stream: Stream<G, (D, G::Timestamp, R)>) -> Collection<G, D, R> {
Collection { inner: stream }
pub fn new(stream: StreamCore<G, C>) -> Collection<G, D, R, C> {
Self { inner: stream }
}

/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concat(&evens)
/// .assert_eq(&data);
/// });
/// }
/// ```
pub fn concat(&self, other: &Collection<G, D, R, C>) -> Collection<G, D, R, C> {
self.inner
.concat(&other.inner)
.as_collection()
}

/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concatenate(Some(evens))
/// .assert_eq(&data);
/// });
/// }
/// ```
pub fn concatenate<I>(&self, sources: I) -> Collection<G, D, R, C>
where
I: IntoIterator<Item=Collection<G, D, R, C>>
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}

/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and can
/// be read out.
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
}

/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
/// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
/// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
/// avoid swamping the system.
pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Collection<G, D, R, C> {
self.inner
.probe_with(handle)
.as_collection()
}

/// The scope containing the underlying timely dataflow stream.
pub fn scope(&self) -> G {
self.inner.scope()
}
}

impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Data {
/// Creates a new collection by applying the supplied function to each input element.
///
/// # Examples
Expand Down Expand Up @@ -166,73 +276,6 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
.filter(move |&(ref data, _, _)| logic(data))
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concat(&evens)
/// .assert_eq(&data);
/// });
/// }
/// ```
pub fn concat(&self, other: &Collection<G, D, R>) -> Collection<G, D, R> {
self.inner
.concat(&other.inner)
.as_collection()
}
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
/// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
/// two collections.
///
/// # Examples
///
/// ```
/// extern crate timely;
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
///
/// fn main() {
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let odds = data.filter(|x| x % 2 == 1);
/// let evens = data.filter(|x| x % 2 == 0);
///
/// odds.concatenate(Some(evens))
/// .assert_eq(&data);
/// });
/// }
/// ```
pub fn concatenate<I>(&self, sources: I) -> Collection<G, D, R>
where
I: IntoIterator<Item=Collection<G, D, R>>
{
self.inner
.concatenate(sources.into_iter().map(|x| x.inner))
.as_collection()
}
/// Replaces each record with another, with a new difference type.
///
/// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
Expand Down Expand Up @@ -482,25 +525,6 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
.inspect_batch(func)
.as_collection()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and can
/// be read out.
pub fn probe(&self) -> probe::Handle<G::Timestamp> {
self.inner
.probe()
}
/// Attaches a timely dataflow probe to the output of a Collection.
///
/// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
/// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
/// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
/// avoid swamping the system.
pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Collection<G, D, R> {
self.inner
.probe_with(handle)
.as_collection()
}

/// Assert if the collection is ever non-empty.
///
Expand Down Expand Up @@ -534,11 +558,6 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
self.consolidate()
.inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
}

/// The scope containing the underlying timely dataflow stream.
pub fn scope(&self) -> G {
self.inner.scope()
}
}

use timely::dataflow::scopes::ScopeParent;
Expand Down Expand Up @@ -673,13 +692,21 @@ impl<G: Scope, D: Data, R: Abelian> Collection<G, D, R> where G::Timestamp: Data
}

/// Conversion to a differential dataflow Collection.
pub trait AsCollection<G: Scope, D: Data, R: Semigroup> {
pub trait AsCollection<G, D, R, C = Vec<(D, <G as ScopeParent>::Timestamp, R)>>
where
G: Scope,
R: Semigroup,
C: TimelyContainer<Item=(D, G::Timestamp, R)>,
{
/// Converts the type to a differential dataflow collection.
fn as_collection(&self) -> Collection<G, D, R>;
fn as_collection(&self) -> Collection<G, D, R, C>;
}

impl<G: Scope, D: Data, R: Semigroup> AsCollection<G, D, R> for Stream<G, (D, G::Timestamp, R)> {
fn as_collection(&self) -> Collection<G, D, R> {
impl<G: Scope, D, R: Semigroup, C> AsCollection<G, D, R, C> for StreamCore<G, C>
where
C: TimelyContainer<Item=(D, G::Timestamp, R)>
{
fn as_collection(&self) -> Collection<G, D, R, C> {
Collection::new(self.clone())
}
}
Expand Down Expand Up @@ -710,12 +737,13 @@ impl<G: Scope, D: Data, R: Semigroup> AsCollection<G, D, R> for Stream<G, (D, G:
/// });
/// }
/// ```
pub fn concatenate<G, D, R, I>(scope: &mut G, iterator: I) -> Collection<G, D, R>
pub fn concatenate<G, D, R, I, C>(scope: &mut G, iterator: I) -> Collection<G, D, R, C>
where
G: Scope,
D: Data,
R: Semigroup,
I: IntoIterator<Item=Collection<G, D, R>>,
I: IntoIterator<Item=Collection<G, D, R, C>>,
C: TimelyContainer<Item=(D, G::Timestamp, R)>,
{
scope
.concatenate(iterator.into_iter().map(|x| x.inner))
Expand Down
Loading

0 comments on commit 5e81e2b

Please sign in to comment.