From e9e1157b7ab7d2548df699c9b95b578e0e64772a Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 1 Feb 2023 09:41:21 -0500 Subject: [PATCH] Move consolidate methods to inherent implementations (#376) --- examples/accumulate.rs | 1 - examples/arrange.rs | 1 - examples/graspan.rs | 2 +- src/collection.rs | 4 ++- src/operators/consolidate.rs | 56 +++++++++++++++--------------------- src/operators/iterate.rs | 2 -- src/operators/mod.rs | 1 - tests/join.rs | 2 +- 8 files changed, 28 insertions(+), 41 deletions(-) diff --git a/examples/accumulate.rs b/examples/accumulate.rs index cfb37078f6..efd1dfc8e5 100644 --- a/examples/accumulate.rs +++ b/examples/accumulate.rs @@ -5,7 +5,6 @@ extern crate differential_dataflow; use rand::{Rng, SeedableRng, StdRng}; use differential_dataflow::input::Input; -use differential_dataflow::operators::Consolidate; fn main() { diff --git a/examples/arrange.rs b/examples/arrange.rs index 8fbda1cba7..f69240944e 100644 --- a/examples/arrange.rs +++ b/examples/arrange.rs @@ -14,7 +14,6 @@ use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::operators::reduce::Reduce; use differential_dataflow::operators::join::JoinCore; use differential_dataflow::operators::Iterate; -use differential_dataflow::operators::Consolidate; fn main() { diff --git a/examples/graspan.rs b/examples/graspan.rs index c031695537..d06f223788 100644 --- a/examples/graspan.rs +++ b/examples/graspan.rs @@ -17,7 +17,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::input::{Input, InputSession}; use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf}; use differential_dataflow::operators::iterate::Variable; -use differential_dataflow::operators::{Threshold, JoinCore, Consolidate}; +use differential_dataflow::operators::{Threshold, JoinCore}; type Node = usize; type Edge = (Node, Node); diff --git a/src/collection.rs b/src/collection.rs index 23c5f0dad6..eb82c5d0d9 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -531,7 +531,6 @@ impl Collection where G::Timestamp: Da R: ::ExchangeData+Hashable, G::Timestamp: Lattice+Ord, { - use operators::consolidate::Consolidate; self.consolidate() .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x)); } @@ -545,6 +544,7 @@ impl Collection where G::Timestamp: Da use timely::dataflow::scopes::ScopeParent; use timely::progress::timestamp::Refines; +/// Methods requiring a nested scope. impl<'a, G: Scope, T: Timestamp, D: Data, R: Semigroup> Collection, D, R> where T: Refines<::Timestamp>, @@ -582,6 +582,7 @@ where } } +/// Methods requiring a region as the scope. impl<'a, G: Scope, D: Data, R: Semigroup> Collection, D, R> { /// Returns the value of a Collection from a nested region to its containing scope. @@ -595,6 +596,7 @@ impl<'a, G: Scope, D: Data, R: Semigroup> Collection, } } +/// Methods requiring an Abelian difference, to support negation. impl Collection where G::Timestamp: Data { /// Creates a new collection whose counts are the negation of those in the input. /// diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index d36f04905a..32b770c22e 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -10,10 +10,18 @@ use timely::dataflow::Scope; use ::{Collection, ExchangeData, Hashable}; use ::difference::Semigroup; -use operators::arrange::arrangement::Arrange; -/// An extension method for consolidating weighted streams. -pub trait Consolidate : Sized { +use Data; +use lattice::Lattice; + +/// Methods which require data be arrangeable. +impl Collection +where + G: Scope, + G::Timestamp: Data+Lattice, + D: ExchangeData+Hashable, + R: Semigroup+ExchangeData, +{ /// Aggregates the weights of equal records into at most one record. /// /// This method uses the type `D`'s `hashed()` method to partition the data. The data are @@ -26,7 +34,6 @@ pub trait Consolidate : Sized { /// extern crate differential_dataflow; /// /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::Consolidate; /// /// fn main() { /// ::timely::example(|scope| { @@ -40,30 +47,23 @@ pub trait Consolidate : Sized { /// }); /// } /// ``` - fn consolidate(&self) -> Self { - self.consolidate_named("Consolidate") + pub fn consolidate(&self) -> Self { + use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; + self.consolidate_named::>("Consolidate") } - /// As `consolidate` but with the ability to name the operator. - fn consolidate_named(&self, name: &str) -> Self; -} - -impl Consolidate for Collection -where - D: ExchangeData+Hashable, - R: ExchangeData+Semigroup, - G::Timestamp: ::lattice::Lattice+Ord, - { - fn consolidate_named(&self, name: &str) -> Self { - use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; + /// As `consolidate` but with the ability to name the operator and specify the trace type. + pub fn consolidate_named(&self, name: &str) -> Self + where + Tr: crate::trace::Trace+crate::trace::TraceReader+'static, + Tr::Batch: crate::trace::Batch, + { + use operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) - .arrange_named::>(name) + .arrange_named::(name) .as_collection(|d: &D, _| d.clone()) } -} -/// An extension method for consolidating weighted streams. -pub trait ConsolidateStream { /// Aggregates the weights of equal records. /// /// Unlike `consolidate`, this method does not exchange data and does not @@ -79,7 +79,6 @@ pub trait ConsolidateStream { /// extern crate differential_dataflow; /// /// use differential_dataflow::input::Input; - /// use differential_dataflow::operators::consolidate::ConsolidateStream; /// /// fn main() { /// ::timely::example(|scope| { @@ -93,16 +92,7 @@ pub trait ConsolidateStream { /// }); /// } /// ``` - fn consolidate_stream(&self) -> Self; -} - -impl ConsolidateStream for Collection -where - D: ExchangeData+Hashable, - R: ExchangeData+Semigroup, - G::Timestamp: ::lattice::Lattice+Ord, - { - fn consolidate_stream(&self) -> Self { + pub fn consolidate_stream(&self) -> Self { use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Operator; diff --git a/src/operators/iterate.rs b/src/operators/iterate.rs index 062d02255e..ac3636703c 100644 --- a/src/operators/iterate.rs +++ b/src/operators/iterate.rs @@ -63,7 +63,6 @@ pub trait Iterate { /// /// use differential_dataflow::input::Input; /// use differential_dataflow::operators::Iterate; - /// use differential_dataflow::operators::Consolidate; /// /// fn main() { /// ::timely::example(|scope| { @@ -145,7 +144,6 @@ impl Iterate for G { /// /// use differential_dataflow::input::Input; /// use differential_dataflow::operators::iterate::Variable; -/// use differential_dataflow::operators::Consolidate; /// /// fn main() { /// ::timely::example(|scope| { diff --git a/src/operators/mod.rs b/src/operators/mod.rs index 96f0acc937..9f048cd5db 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -5,7 +5,6 @@ //! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`). pub use self::reduce::{Reduce, Threshold, Count}; -pub use self::consolidate::Consolidate; pub use self::iterate::Iterate; pub use self::join::{Join, JoinCore}; pub use self::count::CountTotal; diff --git a/tests/join.rs b/tests/join.rs index a2d9285379..2c956a4b19 100644 --- a/tests/join.rs +++ b/tests/join.rs @@ -4,7 +4,7 @@ extern crate differential_dataflow; use timely::dataflow::operators::{ToStream, Capture, Map}; use timely::dataflow::operators::capture::Extract; use differential_dataflow::AsCollection; -use differential_dataflow::operators::{Consolidate, Join, Count}; +use differential_dataflow::operators::{Join, Count}; #[test] fn join() {