Skip to content

Commit

Permalink
Move consolidate methods to inherent implementations (TimelyDataflow#376
Browse files Browse the repository at this point in the history
)
  • Loading branch information
frankmcsherry authored Feb 1, 2023
1 parent 8a04699 commit e9e1157
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 41 deletions.
1 change: 0 additions & 1 deletion examples/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ extern crate differential_dataflow;
use rand::{Rng, SeedableRng, StdRng};

use differential_dataflow::input::Input;
use differential_dataflow::operators::Consolidate;

fn main() {

Expand Down
1 change: 0 additions & 1 deletion examples/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::reduce::Reduce;
use differential_dataflow::operators::join::JoinCore;
use differential_dataflow::operators::Iterate;
use differential_dataflow::operators::Consolidate;

fn main() {

Expand Down
2 changes: 1 addition & 1 deletion examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use differential_dataflow::lattice::Lattice;
use differential_dataflow::input::{Input, InputSession};
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
use differential_dataflow::operators::iterate::Variable;
use differential_dataflow::operators::{Threshold, JoinCore, Consolidate};
use differential_dataflow::operators::{Threshold, JoinCore};

type Node = usize;
type Edge = (Node, Node);
Expand Down
4 changes: 3 additions & 1 deletion src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
R: ::ExchangeData+Hashable,
G::Timestamp: Lattice+Ord,
{
use operators::consolidate::Consolidate;
self.consolidate()
.inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
}
Expand All @@ -545,6 +544,7 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
use timely::dataflow::scopes::ScopeParent;
use timely::progress::timestamp::Refines;

/// Methods requiring a nested scope.
impl<'a, G: Scope, T: Timestamp, D: Data, R: Semigroup> Collection<Child<'a, G, T>, D, R>
where
T: Refines<<G as ScopeParent>::Timestamp>,
Expand Down Expand Up @@ -582,6 +582,7 @@ where
}
}

/// Methods requiring a region as the scope.
impl<'a, G: Scope, D: Data, R: Semigroup> Collection<Child<'a, G, G::Timestamp>, D, R>
{
/// Returns the value of a Collection from a nested region to its containing scope.
Expand All @@ -595,6 +596,7 @@ impl<'a, G: Scope, D: Data, R: Semigroup> Collection<Child<'a, G, G::Timestamp>,
}
}

/// Methods requiring an Abelian difference, to support negation.
impl<G: Scope, D: Data, R: Abelian> Collection<G, D, R> where G::Timestamp: Data {
/// Creates a new collection whose counts are the negation of those in the input.
///
Expand Down
56 changes: 23 additions & 33 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,18 @@ use timely::dataflow::Scope;

use ::{Collection, ExchangeData, Hashable};
use ::difference::Semigroup;
use operators::arrange::arrangement::Arrange;

/// An extension method for consolidating weighted streams.
pub trait Consolidate<D: ExchangeData+Hashable> : Sized {
use Data;
use lattice::Lattice;

/// Methods which require data be arrangeable.
impl<G, D, R> Collection<G, D, R>
where
G: Scope,
G::Timestamp: Data+Lattice,
D: ExchangeData+Hashable,
R: Semigroup+ExchangeData,
{
/// Aggregates the weights of equal records into at most one record.
///
/// This method uses the type `D`'s `hashed()` method to partition the data. The data are
Expand All @@ -26,7 +34,6 @@ pub trait Consolidate<D: ExchangeData+Hashable> : Sized {
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::Consolidate;
///
/// fn main() {
/// ::timely::example(|scope| {
Expand All @@ -40,30 +47,23 @@ pub trait Consolidate<D: ExchangeData+Hashable> : Sized {
/// });
/// }
/// ```
fn consolidate(&self) -> Self {
self.consolidate_named("Consolidate")
pub fn consolidate(&self) -> Self {
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
self.consolidate_named::<DefaultKeyTrace<_,_,_>>("Consolidate")
}

/// As `consolidate` but with the ability to name the operator.
fn consolidate_named(&self, name: &str) -> Self;
}

impl<G: Scope, D, R> Consolidate<D> for Collection<G, D, R>
where
D: ExchangeData+Hashable,
R: ExchangeData+Semigroup,
G::Timestamp: ::lattice::Lattice+Ord,
{
fn consolidate_named(&self, name: &str) -> Self {
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
/// As `consolidate` but with the ability to name the operator and specify the trace type.
pub fn consolidate_named<Tr>(&self, name: &str) -> Self
where
Tr: crate::trace::Trace+crate::trace::TraceReader<Key=D,Val=(),Time=G::Timestamp,R=R>+'static,
Tr::Batch: crate::trace::Batch,
{
use operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
.arrange_named::<DefaultKeyTrace<_,_,_>>(name)
.arrange_named::<Tr>(name)
.as_collection(|d: &D, _| d.clone())
}
}

/// An extension method for consolidating weighted streams.
pub trait ConsolidateStream<D: ExchangeData+Hashable> {
/// Aggregates the weights of equal records.
///
/// Unlike `consolidate`, this method does not exchange data and does not
Expand All @@ -79,7 +79,6 @@ pub trait ConsolidateStream<D: ExchangeData+Hashable> {
/// extern crate differential_dataflow;
///
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::consolidate::ConsolidateStream;
///
/// fn main() {
/// ::timely::example(|scope| {
Expand All @@ -93,16 +92,7 @@ pub trait ConsolidateStream<D: ExchangeData+Hashable> {
/// });
/// }
/// ```
fn consolidate_stream(&self) -> Self;
}

impl<G: Scope, D, R> ConsolidateStream<D> for Collection<G, D, R>
where
D: ExchangeData+Hashable,
R: ExchangeData+Semigroup,
G::Timestamp: ::lattice::Lattice+Ord,
{
fn consolidate_stream(&self) -> Self {
pub fn consolidate_stream(&self) -> Self {

use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Operator;
Expand Down
2 changes: 0 additions & 2 deletions src/operators/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ pub trait Iterate<G: Scope, D: Data, R: Semigroup> {
///
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::Iterate;
/// use differential_dataflow::operators::Consolidate;
///
/// fn main() {
/// ::timely::example(|scope| {
Expand Down Expand Up @@ -145,7 +144,6 @@ impl<G: Scope, D: Ord+Data+Debug, R: Semigroup> Iterate<G, D, R> for G {
///
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::iterate::Variable;
/// use differential_dataflow::operators::Consolidate;
///
/// fn main() {
/// ::timely::example(|scope| {
Expand Down
1 change: 0 additions & 1 deletion src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
//! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`).

pub use self::reduce::{Reduce, Threshold, Count};
pub use self::consolidate::Consolidate;
pub use self::iterate::Iterate;
pub use self::join::{Join, JoinCore};
pub use self::count::CountTotal;
Expand Down
2 changes: 1 addition & 1 deletion tests/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ extern crate differential_dataflow;
use timely::dataflow::operators::{ToStream, Capture, Map};
use timely::dataflow::operators::capture::Extract;
use differential_dataflow::AsCollection;
use differential_dataflow::operators::{Consolidate, Join, Count};
use differential_dataflow::operators::{Join, Count};

#[test]
fn join() {
Expand Down

0 comments on commit e9e1157

Please sign in to comment.