Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement OrdValBatch without retain_from #419

Merged
merged 4 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dogsdogsdogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ impl<G: Scope, R: Monoid+Multiply<Output = R>, P, E> ValidateExtensionMethod<G,
}

// These are all defined here so that users can be assured a common layout.
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine};
type TraceValHandle<K,V,T,R> = TraceAgent<OrdValSpine<K,V,T,R>>;
type TraceKeyHandle<K,T,R> = TraceAgent<OrdKeySpine<K,T,R>>;
use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
type TraceValHandle<K,V,T,R> = TraceAgent<ValSpine<K,V,T,R>>;
type TraceKeyHandle<K,T,R> = TraceAgent<KeySpine<K,T,R>>;

pub struct CollectionIndex<K, V, T, R>
where
Expand Down
6 changes: 3 additions & 3 deletions examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ pub struct Query {
pub productions: Vec<Production>,
}

use differential_dataflow::trace::implementations::ord::{OrdValSpine, OrdKeySpine};
use differential_dataflow::trace::implementations::{ValSpine, KeySpine};
use differential_dataflow::operators::arrange::{Arranged, TraceAgent};

type TraceKeyHandle<K,T,R> = TraceAgent<OrdKeySpine<K, T, R>>;
type TraceValHandle<K,V,T,R> = TraceAgent<OrdValSpine<K, V, T, R>>;
type TraceKeyHandle<K,T,R> = TraceAgent<KeySpine<K, T, R>>;
type TraceValHandle<K,V,T,R> = TraceAgent<ValSpine<K, V, T, R>>;
type Arrange<G,K,V,R> = Arranged<G, TraceValHandle<K, V, <G as ScopeParent>::Timestamp, R>>;

/// An evolving set of edges.
Expand Down
4 changes: 2 additions & 2 deletions examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where G::Timestamp: Lattice+Ord {

use differential_dataflow::operators::iterate::SemigroupVariable;
use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
use differential_dataflow::trace::implementations::KeySpine;


use timely::order::Product;
Expand All @@ -155,7 +155,7 @@ where G::Timestamp: Lattice+Ord {
.join_map(&edges, |_k,&(),d| *d)
.concat(&roots)
.map(|x| (x,()))
.reduce_core::<_,DefaultKeyTrace<_,_,_>>("Reduce", |_key, input, output, updates| {
.reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
Expand Down
4 changes: 0 additions & 4 deletions experiments/src/bin/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::operators::count::CountTotal;
use differential_dataflow::operators::threshold::ThresholdTotal;

// use differential_dataflow::trace::implementations::ord::OrdKeySpine;

#[derive(Debug)]
enum Comp {
Nothing,
Expand Down Expand Up @@ -66,8 +64,6 @@ fn main() {
Comp::Distinct => data.arrange_by_self().distinct_total().probe(),
};

// OrdKeySpine::<usize, Product<RootTimestamp,u64>,isize>::with_effort(work)

(handle, probe)
});

Expand Down
4 changes: 2 additions & 2 deletions experiments/src/bin/deals-interactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ use differential_dataflow::operators::*;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::lattice::Lattice;

use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use differential_dataflow::trace::implementations::ValSpine;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;

type Arrange<G, K, V, R> = Arranged<G, TraceAgent<DefaultValTrace<K, V, <G as ScopeParent>::Timestamp, R>>>;
type Arrange<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;

type Node = u32;

Expand Down
16 changes: 8 additions & 8 deletions experiments/src/bin/deals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;

use differential_dataflow::trace::implementations::ord::{OrdValSpine, OrdKeySpine};
use differential_dataflow::trace::implementations::{ValSpine, KeySpine};
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::operators::iterate::SemigroupVariable;
use differential_dataflow::difference::Present;

type EdgeArranged<G, K, V, R> = Arranged<G, TraceAgent<OrdValSpine<K, V, <G as ScopeParent>::Timestamp, R, Offs>>>;
type EdgeArranged<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R, Offs>>>;

type Node = u32;
type Edge = (Node, Node);
Expand Down Expand Up @@ -47,7 +47,7 @@ fn main() {
let (input, graph) = scope.new_collection();

// each edge should exist in both directions.
let graph = graph.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let graph = graph.arrange::<ValSpine<_,_,_,_,Offs>>();

match program.as_str() {
"tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(),
Expand Down Expand Up @@ -100,10 +100,10 @@ fn tc<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> C
let result =
inner
.map(|(x,y)| (y,x))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&edges, |_y,&x,&z| Some((x, z)))
.concat(&edges.as_collection(|&k,&v| (k,v)))
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;

Expand All @@ -127,12 +127,12 @@ fn sg<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> C

let result =
inner
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&edges, |_,&x,&z| Some((x, z)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&edges, |_,&x,&z| Some((x, z)))
.concat(&peers)
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;

Expand Down
4 changes: 2 additions & 2 deletions experiments/src/bin/graphs-interactive-alt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,11 @@ fn main() {
}).unwrap();
}

use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use differential_dataflow::trace::implementations::ValSpine;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;

type Arrange<G, K, V, R> = Arranged<G, TraceAgent<DefaultValTrace<K, V, <G as ScopeParent>::Timestamp, R>>>;
type Arrange<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn three_hop<G: Scope>(
Expand Down
4 changes: 2 additions & 2 deletions experiments/src/bin/graphs-interactive-neu-zwei.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ fn main() {
}).unwrap();
}

use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use differential_dataflow::trace::implementations::ValSpine;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;

type Arrange<G, K, V, R> = Arranged<G, TraceAgent<DefaultValTrace<K, V, <G as ScopeParent>::Timestamp, R>>>;
type Arrange<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn three_hop<G: Scope>(
Expand Down
4 changes: 2 additions & 2 deletions experiments/src/bin/graphs-interactive-neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,11 @@ fn main() {
}).unwrap();
}

use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use differential_dataflow::trace::implementations::ValSpine;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;

type Arrange<G, K, V, R> = Arranged<G, TraceAgent<DefaultValTrace<K, V, <G as ScopeParent>::Timestamp, R>>>;
type Arrange<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn three_hop<G: Scope>(
Expand Down
4 changes: 2 additions & 2 deletions experiments/src/bin/graphs-interactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ fn main() {
}).unwrap();
}

use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use differential_dataflow::trace::implementations::ValSpine;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;

type Arrange<G, K, V, R> = Arranged<G, TraceAgent<DefaultValTrace<K, V, <G as ScopeParent>::Timestamp, R>>>;
type Arrange<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;


// returns pairs (n, s) indicating node n can be reached from a root in s steps.
Expand Down
40 changes: 20 additions & 20 deletions experiments/src/bin/graspan2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use differential_dataflow::Collection;
use differential_dataflow::input::Input;
use differential_dataflow::operators::*;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::trace::implementations::ord::{OrdValSpine, OrdKeySpine};
use differential_dataflow::trace::implementations::{ValSpine, KeySpine};
use differential_dataflow::difference::Present;

type Node = u32;
Expand Down Expand Up @@ -52,7 +52,7 @@ fn unoptimized() {
.flat_map(|(a,b)| vec![a,b])
.concat(&dereference.flat_map(|(a,b)| vec![a,b]));

let dereference = dereference.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let dereference = dereference.arrange::<ValSpine<_,_,_,_,Offs>>();

let (value_flow, memory_alias, value_alias) =
scope
Expand All @@ -65,14 +65,14 @@ fn unoptimized() {
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));

let value_flow_arranged = value_flow.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let memory_alias_arranged = memory_alias.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let value_flow_arranged = value_flow.arrange::<ValSpine<_,_,_,_,Offs>>();
let memory_alias_arranged = memory_alias.arrange::<ValSpine<_,_,_,_,Offs>>();

// VA(a,b) <- VF(x,a),VF(x,b)
// VA(a,b) <- VF(x,a),MA(x,y),VF(y,b)
let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)));
let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&value_alias_next);

Expand All @@ -82,16 +82,16 @@ fn unoptimized() {
let value_flow_next =
assignment
.map(|(a,b)| (b,a))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.concat(&assignment.map(|(a,b)| (b,a)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&nodes.map(|n| (n,n)));

let value_flow_next =
value_flow_next
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand All @@ -100,12 +100,12 @@ fn unoptimized() {
let memory_alias_next: Collection<_,_,Present> =
value_alias_next
.join_core(&dereference, |_x,&y,&a| Some((y,a)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&dereference, |_y,&a,&b| Some((a,b)));

let memory_alias_next: Collection<_,_,Present> =
memory_alias_next
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand Down Expand Up @@ -177,7 +177,7 @@ fn optimized() {
.flat_map(|(a,b)| vec![a,b])
.concat(&dereference.flat_map(|(a,b)| vec![a,b]));

let dereference = dereference.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let dereference = dereference.arrange::<ValSpine<_,_,_,_,Offs>>();

let (value_flow, memory_alias) =
scope
Expand All @@ -190,22 +190,22 @@ fn optimized() {
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));

let value_flow_arranged = value_flow.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let memory_alias_arranged = memory_alias.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let value_flow_arranged = value_flow.arrange::<ValSpine<_,_,_,_,Offs>>();
let memory_alias_arranged = memory_alias.arrange::<ValSpine<_,_,_,_,Offs>>();

// VF(a,a) <-
// VF(a,b) <- A(a,x),VF(x,b)
// VF(a,b) <- A(a,x),MA(x,y),VF(y,b)
let value_flow_next =
assignment
.map(|(a,b)| (b,a))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.concat(&assignment.map(|(a,b)| (b,a)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&nodes.map(|n| (n,n)))
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand All @@ -214,9 +214,9 @@ fn optimized() {
let value_flow_deref =
value_flow
.map(|(a,b)| (b,a))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&dereference, |_x,&a,&b| Some((a,b)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>();
.arrange::<ValSpine<_,_,_,_,Offs>>();

// MA(a,b) <- VFD(x,a),VFD(y,b)
// MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b)
Expand All @@ -227,10 +227,10 @@ fn optimized() {
let memory_alias_next =
memory_alias_arranged
.join_core(&value_flow_deref, |_x,&y,&a| Some((y,a)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&value_flow_deref, |_y,&a,&b| Some((a,b)))
.concat(&memory_alias_next)
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand Down
6 changes: 3 additions & 3 deletions interactive/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use timely::logging::TimelyEvent;
// use timely::dataflow::operators::capture::event::EventIterator;

use differential_dataflow::ExchangeData;
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine};
use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::input::InputSession;

Expand All @@ -21,9 +21,9 @@ use differential_dataflow::logging::DifferentialEvent;
use crate::{Time, Diff, Plan, Datum};

/// A trace handle for key-only data.
pub type TraceKeyHandle<K, T, R> = TraceAgent<OrdKeySpine<K, T, R>>;
pub type TraceKeyHandle<K, T, R> = TraceAgent<KeySpine<K, T, R>>;
/// A trace handle for key-value data.
pub type TraceValHandle<K, V, T, R> = TraceAgent<OrdValSpine<K, V, T, R>>;
pub type TraceValHandle<K, V, T, R> = TraceAgent<ValSpine<K, V, T, R>>;
/// A key-only trace handle binding `Time` and `Diff` using `Vec<V>` as data.
pub type KeysOnlyHandle<V> = TraceKeyHandle<Vec<V>, Time, Diff>;
/// A key-value trace handle binding `Time` and `Diff` using `Vec<V>` as data.
Expand Down
4 changes: 2 additions & 2 deletions interactive/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<V: ExchangeData+Hash+Datum> Render for Plan<V> {

use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::trace::implementations::ord::OrdKeySpine;
use differential_dataflow::trace::implementations::KeySpine;

let input =
if let Some(mut trace) = arrangements.get_unkeyed(&self) {
Expand All @@ -170,7 +170,7 @@ impl<V: ExchangeData+Hash+Datum> Render for Plan<V> {
input_arrangement
};

let output = input.reduce_abelian::<_,OrdKeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));
let output = input.reduce_abelian::<_,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));

arrangements.set_unkeyed(&self, &output.trace);
output.as_collection(|k,&()| k.clone())
Expand Down
4 changes: 2 additions & 2 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ where

use crate::operators::reduce::ReduceCore;
use crate::operators::iterate::SemigroupVariable;
use crate::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use crate::trace::implementations::ValSpine;

use timely::order::Product;

Expand All @@ -98,7 +98,7 @@ where
let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8))));
.reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8))));

let propagate: Collection<_, (N, L), R> =
labels
Expand Down
Loading