Skip to content

Commit

Permalink
Implement OrdValBatch without retain_from (#419)
Browse files Browse the repository at this point in the history
* Implement OrdValBatch without retain_from

* Demonstrate update container

* Organize opinions on default spines

* Protect against cursor overflow
  • Loading branch information
frankmcsherry authored Nov 20, 2023
1 parent 9163a29 commit b82c3ee
Show file tree
Hide file tree
Showing 29 changed files with 708 additions and 205 deletions.
6 changes: 3 additions & 3 deletions dogsdogsdogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ impl<G: Scope, R: Monoid+Multiply<Output = R>, P, E> ValidateExtensionMethod<G,
}

// These are all defined here so that users can be assured a common layout.
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine};
type TraceValHandle<K,V,T,R> = TraceAgent<OrdValSpine<K,V,T,R>>;
type TraceKeyHandle<K,T,R> = TraceAgent<OrdKeySpine<K,T,R>>;
use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
type TraceValHandle<K,V,T,R> = TraceAgent<ValSpine<K,V,T,R>>;
type TraceKeyHandle<K,T,R> = TraceAgent<KeySpine<K,T,R>>;

pub struct CollectionIndex<K, V, T, R>
where
Expand Down
6 changes: 3 additions & 3 deletions examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ pub struct Query {
pub productions: Vec<Production>,
}

use differential_dataflow::trace::implementations::ord::{OrdValSpine, OrdKeySpine};
use differential_dataflow::trace::implementations::{ValSpine, KeySpine};
use differential_dataflow::operators::arrange::{Arranged, TraceAgent};

type TraceKeyHandle<K,T,R> = TraceAgent<OrdKeySpine<K, T, R>>;
type TraceValHandle<K,V,T,R> = TraceAgent<OrdValSpine<K, V, T, R>>;
type TraceKeyHandle<K,T,R> = TraceAgent<KeySpine<K, T, R>>;
type TraceValHandle<K,V,T,R> = TraceAgent<ValSpine<K, V, T, R>>;
type Arrange<G,K,V,R> = Arranged<G, TraceValHandle<K, V, <G as ScopeParent>::Timestamp, R>>;

/// An evolving set of edges.
Expand Down
4 changes: 2 additions & 2 deletions examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where G::Timestamp: Lattice+Ord {

use differential_dataflow::operators::iterate::SemigroupVariable;
use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
use differential_dataflow::trace::implementations::KeySpine;


use timely::order::Product;
Expand All @@ -155,7 +155,7 @@ where G::Timestamp: Lattice+Ord {
.join_map(&edges, |_k,&(),d| *d)
.concat(&roots)
.map(|x| (x,()))
.reduce_core::<_,DefaultKeyTrace<_,_,_>>("Reduce", |_key, input, output, updates| {
.reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
Expand Down
4 changes: 0 additions & 4 deletions experiments/src/bin/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::operators::count::CountTotal;
use differential_dataflow::operators::threshold::ThresholdTotal;

// use differential_dataflow::trace::implementations::ord::OrdKeySpine;

#[derive(Debug)]
enum Comp {
Nothing,
Expand Down Expand Up @@ -66,8 +64,6 @@ fn main() {
Comp::Distinct => data.arrange_by_self().distinct_total().probe(),
};

// OrdKeySpine::<usize, Product<RootTimestamp,u64>,isize>::with_effort(work)

(handle, probe)
});

Expand Down
4 changes: 2 additions & 2 deletions experiments/src/bin/deals-interactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ use differential_dataflow::operators::*;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::lattice::Lattice;

use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use differential_dataflow::trace::implementations::ValSpine;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;

type Arrange<G, K, V, R> = Arranged<G, TraceAgent<DefaultValTrace<K, V, <G as ScopeParent>::Timestamp, R>>>;
type Arrange<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;

type Node = u32;

Expand Down
16 changes: 8 additions & 8 deletions experiments/src/bin/deals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;

use differential_dataflow::trace::implementations::ord::{OrdValSpine, OrdKeySpine};
use differential_dataflow::trace::implementations::{ValSpine, KeySpine};
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::operators::iterate::SemigroupVariable;
use differential_dataflow::difference::Present;

type EdgeArranged<G, K, V, R> = Arranged<G, TraceAgent<OrdValSpine<K, V, <G as ScopeParent>::Timestamp, R, Offs>>>;
type EdgeArranged<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R, Offs>>>;

type Node = u32;
type Edge = (Node, Node);
Expand Down Expand Up @@ -47,7 +47,7 @@ fn main() {
let (input, graph) = scope.new_collection();

// each edge should exist in both directions.
let graph = graph.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let graph = graph.arrange::<ValSpine<_,_,_,_,Offs>>();

match program.as_str() {
"tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(),
Expand Down Expand Up @@ -100,10 +100,10 @@ fn tc<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> C
let result =
inner
.map(|(x,y)| (y,x))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&edges, |_y,&x,&z| Some((x, z)))
.concat(&edges.as_collection(|&k,&v| (k,v)))
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;

Expand All @@ -127,12 +127,12 @@ fn sg<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> C

let result =
inner
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&edges, |_,&x,&z| Some((x, z)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&edges, |_,&x,&z| Some((x, z)))
.concat(&peers)
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;

Expand Down
4 changes: 2 additions & 2 deletions experiments/src/bin/graphs-interactive-alt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,11 @@ fn main() {
}).unwrap();
}

use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use differential_dataflow::trace::implementations::ValSpine;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;

type Arrange<G, K, V, R> = Arranged<G, TraceAgent<DefaultValTrace<K, V, <G as ScopeParent>::Timestamp, R>>>;
type Arrange<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn three_hop<G: Scope>(
Expand Down
4 changes: 2 additions & 2 deletions experiments/src/bin/graphs-interactive-neu-zwei.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ fn main() {
}).unwrap();
}

use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use differential_dataflow::trace::implementations::ValSpine;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;

type Arrange<G, K, V, R> = Arranged<G, TraceAgent<DefaultValTrace<K, V, <G as ScopeParent>::Timestamp, R>>>;
type Arrange<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn three_hop<G: Scope>(
Expand Down
4 changes: 2 additions & 2 deletions experiments/src/bin/graphs-interactive-neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,11 @@ fn main() {
}).unwrap();
}

use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use differential_dataflow::trace::implementations::ValSpine;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;

type Arrange<G, K, V, R> = Arranged<G, TraceAgent<DefaultValTrace<K, V, <G as ScopeParent>::Timestamp, R>>>;
type Arrange<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn three_hop<G: Scope>(
Expand Down
4 changes: 2 additions & 2 deletions experiments/src/bin/graphs-interactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ fn main() {
}).unwrap();
}

use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use differential_dataflow::trace::implementations::ValSpine;
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::operators::arrange::Arranged;

type Arrange<G, K, V, R> = Arranged<G, TraceAgent<DefaultValTrace<K, V, <G as ScopeParent>::Timestamp, R>>>;
type Arrange<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;


// returns pairs (n, s) indicating node n can be reached from a root in s steps.
Expand Down
40 changes: 20 additions & 20 deletions experiments/src/bin/graspan2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use differential_dataflow::Collection;
use differential_dataflow::input::Input;
use differential_dataflow::operators::*;
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::trace::implementations::ord::{OrdValSpine, OrdKeySpine};
use differential_dataflow::trace::implementations::{ValSpine, KeySpine};
use differential_dataflow::difference::Present;

type Node = u32;
Expand Down Expand Up @@ -52,7 +52,7 @@ fn unoptimized() {
.flat_map(|(a,b)| vec![a,b])
.concat(&dereference.flat_map(|(a,b)| vec![a,b]));

let dereference = dereference.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let dereference = dereference.arrange::<ValSpine<_,_,_,_,Offs>>();

let (value_flow, memory_alias, value_alias) =
scope
Expand All @@ -65,14 +65,14 @@ fn unoptimized() {
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));

let value_flow_arranged = value_flow.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let memory_alias_arranged = memory_alias.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let value_flow_arranged = value_flow.arrange::<ValSpine<_,_,_,_,Offs>>();
let memory_alias_arranged = memory_alias.arrange::<ValSpine<_,_,_,_,Offs>>();

// VA(a,b) <- VF(x,a),VF(x,b)
// VA(a,b) <- VF(x,a),MA(x,y),VF(y,b)
let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)));
let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&value_alias_next);

Expand All @@ -82,16 +82,16 @@ fn unoptimized() {
let value_flow_next =
assignment
.map(|(a,b)| (b,a))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.concat(&assignment.map(|(a,b)| (b,a)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&nodes.map(|n| (n,n)));

let value_flow_next =
value_flow_next
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand All @@ -100,12 +100,12 @@ fn unoptimized() {
let memory_alias_next: Collection<_,_,Present> =
value_alias_next
.join_core(&dereference, |_x,&y,&a| Some((y,a)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&dereference, |_y,&a,&b| Some((a,b)));

let memory_alias_next: Collection<_,_,Present> =
memory_alias_next
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand Down Expand Up @@ -177,7 +177,7 @@ fn optimized() {
.flat_map(|(a,b)| vec![a,b])
.concat(&dereference.flat_map(|(a,b)| vec![a,b]));

let dereference = dereference.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let dereference = dereference.arrange::<ValSpine<_,_,_,_,Offs>>();

let (value_flow, memory_alias) =
scope
Expand All @@ -190,22 +190,22 @@ fn optimized() {
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));

let value_flow_arranged = value_flow.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let memory_alias_arranged = memory_alias.arrange::<OrdValSpine<_,_,_,_,Offs>>();
let value_flow_arranged = value_flow.arrange::<ValSpine<_,_,_,_,Offs>>();
let memory_alias_arranged = memory_alias.arrange::<ValSpine<_,_,_,_,Offs>>();

// VF(a,a) <-
// VF(a,b) <- A(a,x),VF(x,b)
// VF(a,b) <- A(a,x),MA(x,y),VF(y,b)
let value_flow_next =
assignment
.map(|(a,b)| (b,a))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a)))
.concat(&assignment.map(|(a,b)| (b,a)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b)))
.concat(&nodes.map(|n| (n,n)))
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand All @@ -214,9 +214,9 @@ fn optimized() {
let value_flow_deref =
value_flow
.map(|(a,b)| (b,a))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&dereference, |_x,&a,&b| Some((a,b)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>();
.arrange::<ValSpine<_,_,_,_,Offs>>();

// MA(a,b) <- VFD(x,a),VFD(y,b)
// MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b)
Expand All @@ -227,10 +227,10 @@ fn optimized() {
let memory_alias_next =
memory_alias_arranged
.join_core(&value_flow_deref, |_x,&y,&a| Some((y,a)))
.arrange::<OrdValSpine<_,_,_,_,Offs>>()
.arrange::<ValSpine<_,_,_,_,Offs>>()
.join_core(&value_flow_deref, |_y,&a,&b| Some((a,b)))
.concat(&memory_alias_next)
.arrange::<OrdKeySpine<_,_,_,Offs>>()
.arrange::<KeySpine<_,_,_,Offs>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand Down
6 changes: 3 additions & 3 deletions interactive/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use timely::logging::TimelyEvent;
// use timely::dataflow::operators::capture::event::EventIterator;

use differential_dataflow::ExchangeData;
use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine};
use differential_dataflow::trace::implementations::{KeySpine, ValSpine};
use differential_dataflow::operators::arrange::TraceAgent;
use differential_dataflow::input::InputSession;

Expand All @@ -21,9 +21,9 @@ use differential_dataflow::logging::DifferentialEvent;
use crate::{Time, Diff, Plan, Datum};

/// A trace handle for key-only data.
pub type TraceKeyHandle<K, T, R> = TraceAgent<OrdKeySpine<K, T, R>>;
pub type TraceKeyHandle<K, T, R> = TraceAgent<KeySpine<K, T, R>>;
/// A trace handle for key-value data.
pub type TraceValHandle<K, V, T, R> = TraceAgent<OrdValSpine<K, V, T, R>>;
pub type TraceValHandle<K, V, T, R> = TraceAgent<ValSpine<K, V, T, R>>;
/// A key-only trace handle binding `Time` and `Diff` using `Vec<V>` as data.
pub type KeysOnlyHandle<V> = TraceKeyHandle<Vec<V>, Time, Diff>;
/// A key-value trace handle binding `Time` and `Diff` using `Vec<V>` as data.
Expand Down
4 changes: 2 additions & 2 deletions interactive/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<V: ExchangeData+Hash+Datum> Render for Plan<V> {

use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::trace::implementations::ord::OrdKeySpine;
use differential_dataflow::trace::implementations::KeySpine;

let input =
if let Some(mut trace) = arrangements.get_unkeyed(&self) {
Expand All @@ -170,7 +170,7 @@ impl<V: ExchangeData+Hash+Datum> Render for Plan<V> {
input_arrangement
};

let output = input.reduce_abelian::<_,OrdKeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));
let output = input.reduce_abelian::<_,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1)));

arrangements.set_unkeyed(&self, &output.trace);
output.as_collection(|k,&()| k.clone())
Expand Down
4 changes: 2 additions & 2 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ where

use crate::operators::reduce::ReduceCore;
use crate::operators::iterate::SemigroupVariable;
use crate::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use crate::trace::implementations::ValSpine;

use timely::order::Product;

Expand All @@ -98,7 +98,7 @@ where
let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8))));
.reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8))));

let propagate: Collection<_, (N, L), R> =
labels
Expand Down
Loading

0 comments on commit b82c3ee

Please sign in to comment.