Skip to content

Commit

Permalink
Reorganize arrangement methods (#442)
Browse files Browse the repository at this point in the history
* Make inherent join methods

* Make inherent reduce methods
  • Loading branch information
frankmcsherry authored Dec 6, 2023
1 parent 41fc7a8 commit 008680f
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 75 deletions.
2 changes: 1 addition & 1 deletion examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use differential_dataflow::lattice::Lattice;
use differential_dataflow::input::{Input, InputSession};
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
use differential_dataflow::operators::iterate::Variable;
use differential_dataflow::operators::{Threshold, JoinCore};
use differential_dataflow::operators::Threshold;

type Node = usize;
type Edge = (Node, Node);
Expand Down
13 changes: 6 additions & 7 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
let mut probe = Handle::new();
let (mut data_input, mut keys_input) = worker.dataflow(|scope| {

use differential_dataflow::operators::{arrange::Arrange, JoinCore, join::join_traces};
use differential_dataflow::operators::{arrange::Arrange};

let (data_input, data) = scope.new_collection::<String, isize>();
let (keys_input, keys) = scope.new_collection::<String, isize>();
Expand Down Expand Up @@ -54,18 +54,17 @@ fn main() {
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::PreferredSpine;
use differential_dataflow::operators::reduce::ReduceCore;

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredSpine<[u8],[u8],_,_>>();
// .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredSpine<[u8],u8,_,_>>();
// .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

join_traces(&keys, &data, |k,v1,v2,t,r1,r2| {
keys.join_core(&data, |k,_v1,_v2| {
println!("{:?}", k.text);
Option::<((),isize,isize)>::None
})
Expand Down
2 changes: 0 additions & 2 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ where

use timely::order::Product;

use operators::join::JoinCore;

let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));

Expand Down
85 changes: 85 additions & 0 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,91 @@ where
}
}


use difference::Multiply;
// Direct join implementations.
impl<G: Scope, Tr> Arranged<G, Tr>
where
G::Timestamp: Lattice+Ord,
Tr: TraceReader<Time=G::Timestamp> + Clone + 'static,
Tr::Diff: Semigroup,
{
/// A direct implementation of the `JoinCore::join_core` method.
pub fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,I::Item,<Tr::Diff as Multiply<Tr2::Diff>>::Output>
where
Tr2: for<'a> TraceReader<Key<'a>=Tr::Key<'a>,Time=G::Timestamp>+Clone+'static,
Tr2::Diff: Semigroup,
Tr::Diff: Multiply<Tr2::Diff>,
<Tr::Diff as Multiply<Tr2::Diff>>::Output: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(Tr::Key<'_>,Tr::Val<'_>,Tr2::Val<'_>)->I+'static
{
let result = move |k: Tr::Key<'_>, v1: Tr::Val<'_>, v2: Tr2::Val<'_>, t: &G::Timestamp, r1: &Tr::Diff, r2: &Tr2::Diff| {
let t = t.clone();
let r = (r1.clone()).multiply(r2);
result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
};
self.join_core_internal_unsafe(other, result)
}
/// A direct implementation of the `JoinCore::join_core_internal_unsafe` method.
pub fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, other: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
Tr2: for<'a> TraceReader<Key<'a>=Tr::Key<'a>, Time=G::Timestamp>+Clone+'static,
Tr2::Diff: Semigroup,
D: Data,
ROut: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(Tr::Key<'_>, Tr::Val<'_>,Tr2::Val<'_>,&G::Timestamp,&Tr::Diff,&Tr2::Diff)->I+'static,
{
use operators::join::join_traces;
join_traces(self, other, result)
}
}

// Direct reduce implementations.
use difference::Abelian;
impl<G: Scope, T1> Arranged<G, T1>
where
G::Timestamp: Lattice+Ord,
T1: TraceReader<Time=G::Timestamp>+Clone+'static,
T1::Diff: Semigroup,
{
/// A direct implementation of `ReduceCore::reduce_abelian`.
pub fn reduce_abelian<L, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>= T1::Key<'a>, Time=G::Timestamp>+'static,
T2::ValOwned: Data,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
if !input.is_empty() {
logic(key, input, change);
}
change.extend(output.drain(..).map(|(x,d)| (x, d.negate())));
crate::consolidation::consolidate(change);
})
}

/// A direct implementation of `ReduceCore::reduce_core`.
pub fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=G::Timestamp>+'static,
T2::ValOwned: Data,
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
use operators::reduce::reduce_trace;
reduce_trace(self, name, logic)
}
}


impl<'a, G: Scope, Tr> Arranged<Child<'a, G, G::Timestamp>, Tr>
where
G::Timestamp: Lattice+Ord,
Expand Down
41 changes: 0 additions & 41 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,47 +349,6 @@ where
{
self.arrange_by_key().join_core_internal_unsafe(stream2, result)
}

}

impl<G, K, V, T1> JoinCore<G, K, V, T1::Diff> for Arranged<G,T1>
where
G: Scope,
G::Timestamp: Lattice+Ord,
T1: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V, Time=G::Timestamp>+Clone+'static,
K: Ord+'static,
V: Ord+'static,
T1::Diff: Semigroup,
{
fn join_core<Tr2,I,L>(&self, other: &Arranged<G,Tr2>, mut result: L) -> Collection<G,I::Item,<T1::Diff as Multiply<Tr2::Diff>>::Output>
where
Tr2: for<'a> TraceReader<Key<'a>=T1::Key<'a>,Time=G::Timestamp>+Clone+'static,
Tr2::Diff: Semigroup,
T1::Diff: Multiply<Tr2::Diff>,
<T1::Diff as Multiply<Tr2::Diff>>::Output: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(T1::Key<'_>,T1::Val<'_>,Tr2::Val<'_>)->I+'static
{
let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: Tr2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &Tr2::Diff| {
let t = t.clone();
let r = (r1.clone()).multiply(r2);
result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone()))
};
self.join_core_internal_unsafe(other, result)
}

fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, other: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
where
Tr2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=G::Timestamp>+Clone+'static,
Tr2::Diff: Semigroup,
D: Data,
ROut: Semigroup,
I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
L: FnMut(&K, &V,Tr2::Val<'_>,&G::Timestamp,&T1::Diff,&Tr2::Diff)->I+'static,
{
join_traces(self, other, result)
}
}

/// An equijoin of two traces, sharing a common key type.
Expand Down
30 changes: 6 additions & 24 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use trace::implementations::{KeySpine, ValSpine};
use trace::TraceReader;

/// Extension trait for the `reduce` differential dataflow method.
pub trait Reduce<G: Scope, K: Data, V: Data, R: Semigroup> : ReduceCore<G, K, V, R> where G::Timestamp: Lattice+Ord {
pub trait Reduce<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
/// Applies a reduction function on records grouped by key.
///
/// Input data must be structured as `(key, val)` pairs.
Expand Down Expand Up @@ -327,28 +327,10 @@ where
}
}

impl<G: Scope, K, V, T1, R: Semigroup> ReduceCore<G, K, V, R> for Arranged<G, T1>
where
K: ToOwned + Ord + ?Sized,
K::Owned: Data,
V: ToOwned + Ord + ?Sized,
G::Timestamp: Lattice+Ord,
T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwned = <K as ToOwned>::Owned, Val<'a>=&'a V, Time=G::Timestamp, Diff=R>+Clone+'static,
{
fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::ValOwned: Data,
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
reduce_trace(self, name, logic)
}
}

fn reduce_trace<G, T1, T2, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
/// A key-wise reduction of values in an input trace.
///
/// This method exists to provide reduce functionality without opinions about qualifying trace types.
pub fn reduce_trace<G, T1, T2, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
G: Scope,
G::Timestamp: Lattice+Ord,
Expand All @@ -359,7 +341,7 @@ where
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: for<'a> FnMut(T1::Key<'a>, &[(T1::Val<'a>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static,
{
let mut result_trace = None;

Expand Down

0 comments on commit 008680f

Please sign in to comment.