From b82c3eeef688c390bb9d06101f9fa6a7f4db5364 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 20 Nov 2023 09:14:22 -0500 Subject: [PATCH] Implement `OrdValBatch` without `retain_from` (#419) * Implement OrdValBatch without retain_from * Demonstrate update container * Organize opinions on default spines * Protect against cursor overflow --- dogsdogsdogs/src/lib.rs | 6 +- examples/graspan.rs | 6 +- examples/monoid-bfs.rs | 4 +- experiments/src/bin/arrange.rs | 4 - experiments/src/bin/deals-interactive.rs | 4 +- experiments/src/bin/deals.rs | 16 +- experiments/src/bin/graphs-interactive-alt.rs | 4 +- .../src/bin/graphs-interactive-neu-zwei.rs | 4 +- experiments/src/bin/graphs-interactive-neu.rs | 4 +- experiments/src/bin/graphs-interactive.rs | 4 +- experiments/src/bin/graspan2.rs | 40 +- interactive/src/manager.rs | 6 +- interactive/src/plan/mod.rs | 4 +- src/algorithms/graphs/propagate.rs | 4 +- src/operators/arrange/agent.rs | 3 - src/operators/arrange/arrangement.rs | 19 +- src/operators/arrange/upsert.rs | 4 +- src/operators/consolidate.rs | 4 +- src/operators/join.rs | 2 - src/operators/reduce.rs | 13 +- .../implementations/merge_batcher_col.rs | 2 +- src/trace/implementations/mod.rs | 121 +++++ src/trace/implementations/ord.rs | 109 +--- src/trace/implementations/ord_neu.rs | 492 ++++++++++++++++++ src/trace/layers/mod.rs | 9 + tests/trace.rs | 11 +- tpchlike/src/lib.rs | 4 +- tpchlike/src/queries/query20.rs | 4 +- tpchlike/src/queries/query22.rs | 6 +- 29 files changed, 708 insertions(+), 205 deletions(-) create mode 100644 src/trace/implementations/ord_neu.rs diff --git a/dogsdogsdogs/src/lib.rs b/dogsdogsdogs/src/lib.rs index 3e288c3d1a..f734d4bc91 100644 --- a/dogsdogsdogs/src/lib.rs +++ b/dogsdogsdogs/src/lib.rs @@ -103,9 +103,9 @@ impl, P, E> ValidateExtensionMethod = TraceAgent>; -type TraceKeyHandle = TraceAgent>; +use differential_dataflow::trace::implementations::{KeySpine, ValSpine}; +type TraceValHandle = TraceAgent>; +type TraceKeyHandle = TraceAgent>; pub struct CollectionIndex where diff --git a/examples/graspan.rs b/examples/graspan.rs index d06f223788..f49c48897a 100644 --- a/examples/graspan.rs +++ b/examples/graspan.rs @@ -71,11 +71,11 @@ pub struct Query { pub productions: Vec, } -use differential_dataflow::trace::implementations::ord::{OrdValSpine, OrdKeySpine}; +use differential_dataflow::trace::implementations::{ValSpine, KeySpine}; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; -type TraceKeyHandle = TraceAgent>; -type TraceValHandle = TraceAgent>; +type TraceKeyHandle = TraceAgent>; +type TraceValHandle = TraceAgent>; type Arrange = Arranged::Timestamp, R>>; /// An evolving set of edges. diff --git a/examples/monoid-bfs.rs b/examples/monoid-bfs.rs index 5f66890c49..01183c4c0f 100644 --- a/examples/monoid-bfs.rs +++ b/examples/monoid-bfs.rs @@ -140,7 +140,7 @@ where G::Timestamp: Lattice+Ord { use differential_dataflow::operators::iterate::SemigroupVariable; use differential_dataflow::operators::reduce::ReduceCore; - use differential_dataflow::trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; + use differential_dataflow::trace::implementations::KeySpine; use timely::order::Product; @@ -155,7 +155,7 @@ where G::Timestamp: Lattice+Ord { .join_map(&edges, |_k,&(),d| *d) .concat(&roots) .map(|x| (x,())) - .reduce_core::<_,DefaultKeyTrace<_,_,_>>("Reduce", |_key, input, output, updates| { + .reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| { if output.is_empty() || input[0].1 < output[0].1 { updates.push(((), input[0].1)); } diff --git a/experiments/src/bin/arrange.rs b/experiments/src/bin/arrange.rs index fad111979b..09acd664c3 100644 --- a/experiments/src/bin/arrange.rs +++ b/experiments/src/bin/arrange.rs @@ -14,8 +14,6 @@ use differential_dataflow::operators::arrange::ArrangeBySelf; use differential_dataflow::operators::count::CountTotal; use differential_dataflow::operators::threshold::ThresholdTotal; -// use differential_dataflow::trace::implementations::ord::OrdKeySpine; - #[derive(Debug)] enum Comp { Nothing, @@ -66,8 +64,6 @@ fn main() { Comp::Distinct => data.arrange_by_self().distinct_total().probe(), }; - // OrdKeySpine::,isize>::with_effort(work) - (handle, probe) }); diff --git a/experiments/src/bin/deals-interactive.rs b/experiments/src/bin/deals-interactive.rs index d6a0e6ab0f..bcbf81f405 100644 --- a/experiments/src/bin/deals-interactive.rs +++ b/experiments/src/bin/deals-interactive.rs @@ -14,11 +14,11 @@ use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::ArrangeByKey; use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace; +use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; type Node = u32; diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index 699abf8334..7f8b2bf2cd 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -11,14 +11,14 @@ use differential_dataflow::input::Input; use differential_dataflow::Collection; use differential_dataflow::operators::*; -use differential_dataflow::trace::implementations::ord::{OrdValSpine, OrdKeySpine}; +use differential_dataflow::trace::implementations::{ValSpine, KeySpine}; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::Arrange; use differential_dataflow::operators::iterate::SemigroupVariable; use differential_dataflow::difference::Present; -type EdgeArranged = Arranged::Timestamp, R, Offs>>>; +type EdgeArranged = Arranged::Timestamp, R, Offs>>>; type Node = u32; type Edge = (Node, Node); @@ -47,7 +47,7 @@ fn main() { let (input, graph) = scope.new_collection(); // each edge should exist in both directions. - let graph = graph.arrange::>(); + let graph = graph.arrange::>(); match program.as_str() { "tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(), @@ -100,10 +100,10 @@ fn tc>(edges: &EdgeArranged) -> C let result = inner .map(|(x,y)| (y,x)) - .arrange::>() + .arrange::>() .join_core(&edges, |_y,&x,&z| Some((x, z))) .concat(&edges.as_collection(|&k,&v| (k,v))) - .arrange::>() + .arrange::>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -127,12 +127,12 @@ fn sg>(edges: &EdgeArranged) -> C let result = inner - .arrange::>() + .arrange::>() .join_core(&edges, |_,&x,&z| Some((x, z))) - .arrange::>() + .arrange::>() .join_core(&edges, |_,&x,&z| Some((x, z))) .concat(&peers) - .arrange::>() + .arrange::>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 2dc4c248fa..967bd32473 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -259,11 +259,11 @@ fn main() { }).unwrap(); } -use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace; +use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( diff --git a/experiments/src/bin/graphs-interactive-neu-zwei.rs b/experiments/src/bin/graphs-interactive-neu-zwei.rs index 9b0fe2c23b..30faa6dc40 100644 --- a/experiments/src/bin/graphs-interactive-neu-zwei.rs +++ b/experiments/src/bin/graphs-interactive-neu-zwei.rs @@ -231,11 +231,11 @@ fn main() { }).unwrap(); } -use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace; +use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index 1fcbe149c0..a1a720cabd 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -293,11 +293,11 @@ fn main() { }).unwrap(); } -use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace; +use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. fn three_hop( diff --git a/experiments/src/bin/graphs-interactive.rs b/experiments/src/bin/graphs-interactive.rs index 74d5d43691..942565b159 100644 --- a/experiments/src/bin/graphs-interactive.rs +++ b/experiments/src/bin/graphs-interactive.rs @@ -199,11 +199,11 @@ fn main() { }).unwrap(); } -use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace; +use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; -type Arrange = Arranged::Timestamp, R>>>; +type Arrange = Arranged::Timestamp, R>>>; // returns pairs (n, s) indicating node n can be reached from a root in s steps. diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index c0eeb15ea5..3f3b708370 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -14,7 +14,7 @@ use differential_dataflow::Collection; use differential_dataflow::input::Input; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::Arrange; -use differential_dataflow::trace::implementations::ord::{OrdValSpine, OrdKeySpine}; +use differential_dataflow::trace::implementations::{ValSpine, KeySpine}; use differential_dataflow::difference::Present; type Node = u32; @@ -52,7 +52,7 @@ fn unoptimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::>(); + let dereference = dereference.arrange::>(); let (value_flow, memory_alias, value_alias) = scope @@ -65,14 +65,14 @@ fn unoptimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::>(); - let memory_alias_arranged = memory_alias.arrange::>(); + let value_flow_arranged = value_flow.arrange::>(); + let memory_alias_arranged = memory_alias.arrange::>(); // VA(a,b) <- VF(x,a),VF(x,b) // VA(a,b) <- VF(x,a),MA(x,y),VF(y,b) let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))); let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) - .arrange::>() + .arrange::>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&value_alias_next); @@ -82,16 +82,16 @@ fn unoptimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::>() + .arrange::>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::>() + .arrange::>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))); let value_flow_next = value_flow_next - .arrange::>() + .arrange::>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -100,12 +100,12 @@ fn unoptimized() { let memory_alias_next: Collection<_,_,Present> = value_alias_next .join_core(&dereference, |_x,&y,&a| Some((y,a))) - .arrange::>() + .arrange::>() .join_core(&dereference, |_y,&a,&b| Some((a,b))); let memory_alias_next: Collection<_,_,Present> = memory_alias_next - .arrange::>() + .arrange::>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -177,7 +177,7 @@ fn optimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::>(); + let dereference = dereference.arrange::>(); let (value_flow, memory_alias) = scope @@ -190,8 +190,8 @@ fn optimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::>(); - let memory_alias_arranged = memory_alias.arrange::>(); + let value_flow_arranged = value_flow.arrange::>(); + let memory_alias_arranged = memory_alias.arrange::>(); // VF(a,a) <- // VF(a,b) <- A(a,x),VF(x,b) @@ -199,13 +199,13 @@ fn optimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::>() + .arrange::>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::>() + .arrange::>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))) - .arrange::>() + .arrange::>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -214,9 +214,9 @@ fn optimized() { let value_flow_deref = value_flow .map(|(a,b)| (b,a)) - .arrange::>() + .arrange::>() .join_core(&dereference, |_x,&a,&b| Some((a,b))) - .arrange::>(); + .arrange::>(); // MA(a,b) <- VFD(x,a),VFD(y,b) // MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b) @@ -227,10 +227,10 @@ fn optimized() { let memory_alias_next = memory_alias_arranged .join_core(&value_flow_deref, |_x,&y,&a| Some((y,a))) - .arrange::>() + .arrange::>() .join_core(&value_flow_deref, |_y,&a,&b| Some((a,b))) .concat(&memory_alias_next) - .arrange::>() + .arrange::>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; diff --git a/interactive/src/manager.rs b/interactive/src/manager.rs index bd28cefdec..164ca42f44 100644 --- a/interactive/src/manager.rs +++ b/interactive/src/manager.rs @@ -12,7 +12,7 @@ use timely::logging::TimelyEvent; // use timely::dataflow::operators::capture::event::EventIterator; use differential_dataflow::ExchangeData; -use differential_dataflow::trace::implementations::ord::{OrdKeySpine, OrdValSpine}; +use differential_dataflow::trace::implementations::{KeySpine, ValSpine}; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::input::InputSession; @@ -21,9 +21,9 @@ use differential_dataflow::logging::DifferentialEvent; use crate::{Time, Diff, Plan, Datum}; /// A trace handle for key-only data. -pub type TraceKeyHandle = TraceAgent>; +pub type TraceKeyHandle = TraceAgent>; /// A trace handle for key-value data. -pub type TraceValHandle = TraceAgent>; +pub type TraceValHandle = TraceAgent>; /// A key-only trace handle binding `Time` and `Diff` using `Vec` as data. pub type KeysOnlyHandle = TraceKeyHandle, Time, Diff>; /// A key-value trace handle binding `Time` and `Diff` using `Vec` as data. diff --git a/interactive/src/plan/mod.rs b/interactive/src/plan/mod.rs index ca54717600..1d6b6a992c 100644 --- a/interactive/src/plan/mod.rs +++ b/interactive/src/plan/mod.rs @@ -158,7 +158,7 @@ impl Render for Plan { use differential_dataflow::operators::reduce::ReduceCore; use differential_dataflow::operators::arrange::ArrangeBySelf; - use differential_dataflow::trace::implementations::ord::OrdKeySpine; + use differential_dataflow::trace::implementations::KeySpine; let input = if let Some(mut trace) = arrangements.get_unkeyed(&self) { @@ -170,7 +170,7 @@ impl Render for Plan { input_arrangement }; - let output = input.reduce_abelian::<_,OrdKeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1))); + let output = input.reduce_abelian::<_,KeySpine<_,_,_>>("Distinct", move |_,_,t| t.push(((), 1))); arrangements.set_unkeyed(&self, &output.trace); output.as_collection(|k,&()| k.clone()) diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index 62bd922834..b0404c8ee2 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -86,7 +86,7 @@ where use crate::operators::reduce::ReduceCore; use crate::operators::iterate::SemigroupVariable; - use crate::trace::implementations::ord::OrdValSpine as DefaultValTrace; + use crate::trace::implementations::ValSpine; use timely::order::Product; @@ -98,7 +98,7 @@ where let labels = proposals .concat(&nodes) - .reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8)))); + .reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8)))); let propagate: Collection<_, (N, L), R> = labels diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index cc3b37bdc3..8476474305 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -208,7 +208,6 @@ where /// use differential_dataflow::operators::arrange::ArrangeBySelf; /// use differential_dataflow::operators::reduce::Reduce; /// use differential_dataflow::trace::Trace; - /// use differential_dataflow::trace::implementations::ord::OrdValSpine; /// /// fn main() { /// ::timely::execute(Config::thread(), |worker| { @@ -267,7 +266,6 @@ where /// use differential_dataflow::operators::arrange::ArrangeBySelf; /// use differential_dataflow::operators::reduce::Reduce; /// use differential_dataflow::trace::Trace; - /// use differential_dataflow::trace::implementations::ord::OrdValSpine; /// /// fn main() { /// ::timely::execute(Config::thread(), |worker| { @@ -380,7 +378,6 @@ where /// use differential_dataflow::operators::reduce::Reduce; /// use differential_dataflow::trace::Trace; /// use differential_dataflow::trace::TraceReader; - /// use differential_dataflow::trace::implementations::ord::OrdValSpine; /// use differential_dataflow::input::Input; /// /// fn main() { diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 1e94ed1302..bd6c48dcc1 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -30,8 +30,7 @@ use ::{Data, ExchangeData, Collection, AsCollection, Hashable}; use ::difference::Semigroup; use lattice::Lattice; use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor}; -use trace::implementations::ord::OrdValSpine as DefaultValTrace; -use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; +use trace::implementations::{KeySpine, ValSpine}; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -706,21 +705,21 @@ where G::Timestamp: Lattice+Ord { /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// This trace is current for all times completed by the output stream, which can be used to /// safely identify the stable times and values in the trace. - fn arrange_by_key(&self) -> Arranged>>; + fn arrange_by_key(&self) -> Arranged>>; /// As `arrange_by_key` but with the ability to name the arrangement. - fn arrange_by_key_named(&self, name: &str) -> Arranged>>; + fn arrange_by_key_named(&self, name: &str) -> Arranged>>; } impl ArrangeByKey for Collection where G::Timestamp: Lattice+Ord { - fn arrange_by_key(&self) -> Arranged>> { + fn arrange_by_key(&self) -> Arranged>> { self.arrange_by_key_named("ArrangeByKey") } - fn arrange_by_key_named(&self, name: &str) -> Arranged>> { + fn arrange_by_key_named(&self, name: &str) -> Arranged>> { self.arrange_named(name) } } @@ -739,10 +738,10 @@ where /// This operator arranges a collection of records into a shared trace, whose contents it maintains. /// This trace is current for all times complete in the output stream, which can be used to safely /// identify the stable times and values in the trace. - fn arrange_by_self(&self) -> Arranged>>; + fn arrange_by_self(&self) -> Arranged>>; /// As `arrange_by_self` but with the ability to name the arrangement. - fn arrange_by_self_named(&self, name: &str) -> Arranged>>; + fn arrange_by_self_named(&self, name: &str) -> Arranged>>; } @@ -750,11 +749,11 @@ impl ArrangeBySel where G::Timestamp: Lattice+Ord { - fn arrange_by_self(&self) -> Arranged>> { + fn arrange_by_self(&self) -> Arranged>> { self.arrange_by_self_named("ArrangeBySelf") } - fn arrange_by_self_named(&self, name: &str) -> Arranged>> { + fn arrange_by_self_named(&self, name: &str) -> Arranged>> { self.map(|k| (k, ())) .arrange_named(name) } diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 869947ec3f..9f152e3f07 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -60,11 +60,11 @@ //! worker.dataflow(|scope| { //! //! use timely::dataflow::operators::Input; -//! use differential_dataflow::trace::implementations::ord::OrdValSpine; +//! use differential_dataflow::trace::implementations::ValSpine; //! use differential_dataflow::operators::arrange::upsert; //! //! let stream = scope.input_from(&mut input); -//! let arranged = upsert::arrange_from_upsert::<_, OrdValSpine>(&stream, &"test"); +//! let arranged = upsert::arrange_from_upsert::<_, ValSpine>(&stream, &"test"); //! //! arranged //! .as_collection(|k,v| (k.clone(), v.clone())) diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 32b770c22e..fff61eaad1 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -48,8 +48,8 @@ where /// } /// ``` pub fn consolidate(&self) -> Self { - use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; - self.consolidate_named::>("Consolidate") + use trace::implementations::KeySpine; + self.consolidate_named::>("Consolidate") } /// As `consolidate` but with the ability to name the operator and specify the trace type. diff --git a/src/operators/join.rs b/src/operators/join.rs index 8710ac07b3..f530918dfb 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -236,7 +236,6 @@ pub trait JoinCore where G::Time /// use differential_dataflow::operators::arrange::ArrangeByKey; /// use differential_dataflow::operators::join::JoinCore; /// use differential_dataflow::trace::Trace; - /// use differential_dataflow::trace::implementations::ord::OrdValSpine; /// /// fn main() { /// ::timely::example(|scope| { @@ -286,7 +285,6 @@ pub trait JoinCore where G::Time /// use differential_dataflow::operators::arrange::ArrangeByKey; /// use differential_dataflow::operators::join::JoinCore; /// use differential_dataflow::trace::Trace; - /// use differential_dataflow::trace::implementations::ord::OrdValSpine; /// /// fn main() { /// ::timely::example(|scope| { diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 0715028850..bdf7b6dae4 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -21,8 +21,7 @@ use operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent}; use lattice::Lattice; use trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic}; use trace::cursor::CursorList; -use trace::implementations::ord::OrdValSpine as DefaultValTrace; -use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; +use trace::implementations::{KeySpine, ValSpine}; use trace::TraceReader; @@ -93,7 +92,7 @@ where { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { - self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>(name, logic) + self.reduce_abelian::<_,ValSpine<_,_,_,_>>(name, logic) .as_collection(|k,v| (k.clone(), v.clone())) } } @@ -179,7 +178,7 @@ where T1: TraceReader+Clone+'static, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { - self.reduce_abelian::<_,DefaultKeyTrace<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) + self.reduce_abelian::<_,KeySpine<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) .as_collection(|k,_| k.clone()) } } @@ -234,7 +233,7 @@ where T1: TraceReader+Clone+'static, { fn count_core>(&self) -> Collection { - self.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) + self.reduce_abelian::<_,ValSpine<_,_,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) .as_collection(|k,c| (k.clone(), c.clone())) } } @@ -255,7 +254,7 @@ pub trait ReduceCore where G::Timestam /// use differential_dataflow::input::Input; /// use differential_dataflow::operators::reduce::ReduceCore; /// use differential_dataflow::trace::Trace; - /// use differential_dataflow::trace::implementations::ord::OrdValSpine; + /// use differential_dataflow::trace::implementations::ValSpine; /// /// fn main() { /// ::timely::example(|scope| { @@ -263,7 +262,7 @@ pub trait ReduceCore where G::Timestam /// let trace = /// scope.new_collection_from(1 .. 10u32).1 /// .map(|x| (x, x)) - /// .reduce_abelian::<_,OrdValSpine<_,_,_,_>>( + /// .reduce_abelian::<_,ValSpine<_,_,_,_>>( /// "Example", /// move |_key, src, dst| dst.push((*src[0].0, 1)) /// ) diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 8a06fb68c5..8aab385285 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -142,7 +142,7 @@ impl TimelyStackQueue { } } - fn done(mut self) -> TimelyStack { + fn done(self) -> TimelyStack { self.list } diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 8bbe4f311c..59937caf0d 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -46,3 +46,124 @@ pub(crate) mod merge_batcher_col; pub use self::merge_batcher::MergeBatcher as Batcher; pub mod ord; +pub mod ord_neu; + +// Opinionated takes on default spines. +pub use self::ord::OrdValSpine as ValSpine; +pub use self::ord::OrdKeySpine as KeySpine; + +use timely::container::columnation::{Columnation, TimelyStack}; +use lattice::Lattice; +use difference::Semigroup; +use trace::layers::BatchContainer; +use trace::layers::ordered::OrdOffset; + +/// A type that names constituent update types. +pub trait Update { + /// Key by which data are grouped. + type Key: Ord+Clone; + /// Values associated with the key. + type Val: Ord+Clone; + /// Time at which updates occur. + type Time: Ord+Lattice+timely::progress::Timestamp+Clone; + /// Way in which updates occur. + type Diff: Semigroup+Clone; +} + +impl Update for ((K, V), T, R) +where + K: Ord+Clone, + V: Ord+Clone, + T: Ord+Lattice+timely::progress::Timestamp+Clone, + R: Semigroup+Clone, +{ + type Key = K; + type Val = V; + type Time = T; + type Diff = R; +} + +/// A type with opinions on how updates should be laid out. +pub trait Layout { + /// The represented update. + type Target: Update; + /// Offsets to use from keys into vals. + type KeyOffset: OrdOffset; + /// Offsets to use from vals into updates. + type ValOffset: OrdOffset; + /// Container for update keys. + type KeyContainer: + RetainFrom<::Key>+ + BatchContainer::Key>; + /// Container for update vals. + type ValContainer: + RetainFrom<::Val>+ + BatchContainer::Val>; + /// Container for update vals. + type UpdContainer: + BatchContainer::Time, ::Diff)>; +} + +/// A layout that uses vectors +pub struct Vector { + phantom: std::marker::PhantomData<(U, O)>, +} + +impl Layout for Vector { + type Target = U; + type KeyOffset = O; + type ValOffset = O; + type KeyContainer = Vec; + type ValContainer = Vec; + type UpdContainer = Vec<(U::Time, U::Diff)>; +} + +/// A layout based on timely stacks +pub struct TStack { + phantom: std::marker::PhantomData<(U, O)>, +} + +impl Layout for TStack +where + U::Key: Columnation, + U::Val: Columnation, + U::Time: Columnation, + U::Diff: Columnation, +{ + type Target = U; + type KeyOffset = O; + type ValOffset = O; + type KeyContainer = TimelyStack; + type ValContainer = TimelyStack; + type UpdContainer = TimelyStack<(U::Time, U::Diff)>; +} + +/// A container that can retain/discard from some offset onward. +pub trait RetainFrom { + /// Retains elements from an index onwards that satisfy a predicate. + fn retain_frombool>(&mut self, index: usize, predicate: P); +} + +impl RetainFrom for Vec { + fn retain_frombool>(&mut self, index: usize, mut predicate: P) { + let mut write_position = index; + for position in index .. self.len() { + if predicate(position, &self[position]) { + self.swap(position, write_position); + write_position += 1; + } + } + self.truncate(write_position); + } +} + +impl RetainFrom for TimelyStack { + fn retain_frombool>(&mut self, index: usize, mut predicate: P) { + let mut position = index; + self.retain_from(index, |item| { + let result = predicate(position, item); + position += 1; + result + }) + } +} diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 7b8c2cede0..0866b5a344 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -17,13 +17,12 @@ use timely::container::columnation::TimelyStack; use timely::container::columnation::Columnation; use timely::progress::{Antichain, frontier::AntichainRef}; -use ::difference::Semigroup; use lattice::Lattice; use trace::layers::{Trie, TupleBuilder, BatchContainer}; use trace::layers::Builder as TrieBuilder; use trace::layers::Cursor as TrieCursor; -use trace::layers::ordered::{OrdOffset, OrderedLayer, OrderedBuilder, OrderedCursor}; +use trace::layers::ordered::{OrderedLayer, OrderedBuilder, OrderedCursor}; use trace::layers::ordered_leaf::{OrderedLeaf, OrderedLeafBuilder}; use trace::{Batch, BatchReader, Builder, Merger, Cursor}; use trace::description::Description; @@ -36,80 +35,9 @@ use super::merge_batcher::MergeBatcher; use abomonation::abomonated::Abomonated; use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; +use trace::implementations::RetainFrom; -/// A type that names constituent update types. -pub trait Update { - /// Key by which data are grouped. - type Key: Ord+Clone; - /// Values associated with the key. - type Val: Ord+Clone; - /// Time at which updates occur. - type Time: Lattice+timely::progress::Timestamp; - /// Way in which updates occur. - type Diff: Semigroup+Clone; -} - -impl Update for ((K, V), T, R) -where - K: Ord+Clone, - V: Ord+Clone, - T: Ord+Lattice+timely::progress::Timestamp+Clone, - R: Semigroup+Clone, -{ - type Key = K; - type Val = V; - type Time = T; - type Diff = R; -} - -/// A type with opinions on how updates should be laid out. -pub trait Layout { - /// The represented update. - type Target: Update; - /// Offsets to use from keys into vals. - type KeyOffset: OrdOffset; - /// Offsets to use from vals into updates. - type ValOffset: OrdOffset; - /// Container for update keys. - type KeyContainer: - BatchContainer::Key> - +RetainFrom<::Key>; - /// Container for update vals. - type ValContainer: - BatchContainer::Val> - +RetainFrom<::Val>; -} - -/// A layout that uses vectors -pub struct Vector { - phantom: std::marker::PhantomData<(U, O)>, -} - -impl Layout for Vector { - type Target = U; - type KeyOffset = O; - type ValOffset = O; - type KeyContainer = Vec; - type ValContainer = Vec; -} - -/// A layout based on timely stacks -pub struct TStack { - phantom: std::marker::PhantomData<(U, O)>, -} - -impl Layout for TStack -where - U::Key: Columnation, - U::Val: Columnation, -{ - type Target = U; - type KeyOffset = O; - type ValOffset = O; - type KeyContainer = TimelyStack; - type ValContainer = TimelyStack; -} - +use super::{Update, Layout, Vector, TStack}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine, Vec<((K,V),T,R)>>>>; @@ -128,37 +56,6 @@ pub type ColValSpine = Spine = Spine, TimelyStack<((K,()),T,R)>>>>; - -/// A container that can retain/discard from some offset onward. -pub trait RetainFrom { - /// Retains elements from an index onwards that satisfy a predicate. - fn retain_frombool>(&mut self, index: usize, predicate: P); -} - -impl RetainFrom for Vec { - fn retain_frombool>(&mut self, index: usize, mut predicate: P) { - let mut write_position = index; - for position in index .. self.len() { - if predicate(position, &self[position]) { - self.swap(position, write_position); - write_position += 1; - } - } - self.truncate(write_position); - } -} - -impl RetainFrom for TimelyStack { - fn retain_frombool>(&mut self, index: usize, mut predicate: P) { - let mut position = index; - self.retain_from(index, |item| { - let result = predicate(position, item); - position += 1; - result - }) - } -} - /// An immutable collection of update tuples, from a contiguous interval of logical times. /// /// The `L` parameter captures the updates should be laid out, and `C` determines which diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs new file mode 100644 index 0000000000..8aac140bcb --- /dev/null +++ b/src/trace/implementations/ord_neu.rs @@ -0,0 +1,492 @@ +//! Trace and batch implementations based on sorted ranges. +//! +//! The types and type aliases in this module start with either +//! +//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered. +//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered. +//! +//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation +//! and should consume fewer resources (computation and memory) when it applies. + +use std::rc::Rc; + +use trace::implementations::spine_fueled::Spine; + +use super::{Update, Layout, Vector, TStack}; + +use self::val_batch::{OrdValBatch}; + + +/// A trace implementation using a spine of ordered lists. +pub type OrdValSpine = Spine>>>; +// /// A trace implementation for empty values using a spine of ordered lists. +// pub type OrdKeySpine = Spine>>>; + +/// A trace implementation backed by columnar storage. +pub type ColValSpine = Spine>>>; +// /// A trace implementation backed by columnar storage. +// pub type ColKeySpine = Spine>>>; + +mod val_batch { + + use std::convert::TryInto; + use timely::progress::{Antichain, frontier::AntichainRef}; + + use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; + use trace::layers::BatchContainer; + + use super::{Layout, Update}; + use super::super::merge_batcher::MergeBatcher; + + /// An immutable collection of update tuples, from a contiguous interval of logical times. + #[derive(Abomonation, Debug)] + pub struct OrdValStorage { + /// An ordered list of keys, corresponding to entries in `keys_offs`. + pub keys: L::KeyContainer, + /// Offsets used to provide indexes from keys to values. + /// + /// The length of this list is one longer than `keys`, so that we can avoid bounds logic. + pub keys_offs: Vec, + /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`. + pub vals: L::ValContainer, + /// Offsets used to provide indexes from values to updates. + /// + /// This list has a special representation that any empty range indicates the singleton + /// element just before the range, as if the start were decremented by one. The empty + /// range is otherwise an invalid representation, and we borrow it to compactly encode + /// single common update values (e.g. in a snapshot, the minimal time and a diff of one). + /// + /// The length of this list is one longer than `vals`, so that we can avoid bounds logic. + pub vals_offs: Vec, + /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`. + pub updates: L::UpdContainer, + } + + impl OrdValStorage { + /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. + fn values_for_key(&self, index: usize) -> (usize, usize) { + (self.keys_offs[index].try_into().ok().unwrap(), self.keys_offs[index+1].try_into().ok().unwrap()) + } + /// Lower and upper bounds in `self.updates` corresponding to the value at `index`. + fn updates_for_value(&self, index: usize) -> (usize, usize) { + (self.vals_offs[index].try_into().ok().unwrap(), self.vals_offs[index+1].try_into().ok().unwrap()) + } + } + + /// An immutable collection of update tuples, from a contiguous interval of logical times. + #[derive(Abomonation)] + pub struct OrdValBatch { + /// The updates themselves. + pub storage: OrdValStorage, + /// Description of the update times this layer represents. + pub description: Description<::Time>, + } + + impl BatchReader for OrdValBatch { + type Key = ::Key; + type Val = ::Val; + type Time = ::Time; + type R = ::Diff; + + type Cursor = OrdValCursor; + fn cursor(&self) -> Self::Cursor { + OrdValCursor { + key_cursor: 0, + val_cursor: 0, + phantom: std::marker::PhantomData, + } + } + fn len(&self) -> usize { + // Normally this would be `self.updates.len()`, but we have a clever compact encoding. + // Perhaps we should count such exceptions to the side, to provide a correct accounting. + self.storage.updates.len() + } + fn description(&self) -> &Description<::Time> { &self.description } + } + + impl Batch for OrdValBatch { + type Batcher = MergeBatcher; + type Builder = OrdValBuilder; + type Merger = OrdValMerger; + + fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { + OrdValMerger::new(self, other, compaction_frontier) + } + } + + /// State for an in-progress merge. + pub struct OrdValMerger { + /// Key position to merge next in the first batch. + key_cursor1: usize, + /// Key position to merge next in the second batch. + key_cursor2: usize, + /// result that we are currently assembling. + result: OrdValStorage, + /// description + description: Description<::Time>, + + /// Local stash of updates, to use for consolidation. + /// + /// We could emulate a `ChangeBatch` here, with related compaction smarts. + /// A `ChangeBatch` itself needs an `i64` diff type, which we have not. + update_stash: Vec<(::Time, ::Diff)>, + } + + impl Merger> for OrdValMerger { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option::Time>>) -> Self { + + assert!(batch1.upper() == batch2.lower()); + use lattice::Lattice; + let mut since = batch1.description().since().join(batch2.description().since()); + if let Some(compaction_frontier) = compaction_frontier { + since = since.join(&compaction_frontier.to_owned()); + } + let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since); + + let batch1 = &batch1.storage; + let batch2 = &batch2.storage; + + let mut storage = OrdValStorage { + keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys), + keys_offs: Vec::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), + vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals), + vals_offs: Vec::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), + updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates), + }; + + storage.keys_offs.push(0.try_into().ok().unwrap()); + storage.vals_offs.push(0.try_into().ok().unwrap()); + + OrdValMerger { + key_cursor1: 0, + key_cursor2: 0, + result: storage, + description, + update_stash: Vec::new(), + } + } + fn done(self) -> OrdValBatch { + OrdValBatch { + storage: self.result, + description: self.description, + } + } + fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { + + // An (incomplete) indication of the amount of work we've done so far. + let starting_updates = self.result.updates.len(); + let mut effort = 0isize; + + // While both mergees are still active, perform single-key merges. + while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { + self.merge_key(&source1.storage, &source2.storage); + // An (incomplete) accounting of the work we've done. + effort = (self.result.updates.len() - starting_updates) as isize; + } + + // Merging is complete, and only copying remains. + // Key-by-key copying allows effort interruption, and compaction. + while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel { + self.copy_key(&source1.storage, self.key_cursor1); + self.key_cursor1 += 1; + effort = (self.result.updates.len() - starting_updates) as isize; + } + while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel { + self.copy_key(&source2.storage, self.key_cursor2); + self.key_cursor2 += 1; + effort = (self.result.updates.len() - starting_updates) as isize; + } + + *fuel -= effort; + } + } + + // Helper methods in support of merging batches. + impl OrdValMerger { + /// Copy the next key in `source`. + /// + /// The method extracts the key in `source` at `cursor`, and merges it in to `self`. + /// If the result does not wholly cancel, they key will be present in `self` with the + /// compacted values and updates. + /// + /// The caller should be certain to update the cursor, as this method does not do this. + fn copy_key(&mut self, source: &OrdValStorage, cursor: usize) { + // Capture the initial number of values to determine if the merge was ultimately non-empty. + let init_vals = self.result.vals.len(); + let (mut lower, upper) = source.values_for_key(cursor); + while lower < upper { + self.stash_updates_for_val(source, lower); + if let Some(off) = self.consolidate_updates() { + self.result.vals_offs.push(off); + self.result.vals.copy(source.vals.index(lower)); + } + lower += 1; + } + + // If we have pushed any values, copy the key as well. + if self.result.vals.len() > init_vals { + self.result.keys.copy(source.keys.index(cursor)); + self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); + } + } + /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. + /// + /// This method only merges a single key. It applies all compaction necessary, and may result in no output + /// if the updates cancel either directly or after compaction. + fn merge_key(&mut self, source1: &OrdValStorage, source2: &OrdValStorage) { + use ::std::cmp::Ordering; + match source1.keys.index(self.key_cursor1).cmp(source2.keys.index(self.key_cursor2)) { + Ordering::Less => { + self.copy_key(source1, self.key_cursor1); + self.key_cursor1 += 1; + }, + Ordering::Equal => { + // Keys are equal; must merge all values from both sources for this one key. + let (lower1, upper1) = source1.values_for_key(self.key_cursor1); + let (lower2, upper2) = source2.values_for_key(self.key_cursor2); + if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { + self.result.keys.copy(source1.keys.index(self.key_cursor1)); + self.result.keys_offs.push(off); + } + // Increment cursors in either case; the keys are merged. + self.key_cursor1 += 1; + self.key_cursor2 += 1; + }, + Ordering::Greater => { + self.copy_key(source2, self.key_cursor2); + self.key_cursor2 += 1; + }, + } + } + /// Merge two ranges of values into `self`. + /// + /// If the compacted result contains values with non-empty updates, the function returns + /// an offset that should be recorded to indicate the upper extent of the result values. + fn merge_vals( + &mut self, + (source1, mut lower1, upper1): (&OrdValStorage, usize, usize), + (source2, mut lower2, upper2): (&OrdValStorage, usize, usize), + ) -> Option { + // Capture the initial number of values to determine if the merge was ultimately non-empty. + let init_vals = self.result.vals.len(); + while lower1 < upper1 && lower2 < upper2 { + // We compare values, and fold in updates for the lowest values; + // if they are non-empty post-consolidation, we write the value. + // We could multi-way merge and it wouldn't be very complicated. + use ::std::cmp::Ordering; + match source1.vals.index(lower1).cmp(source2.vals.index(lower2)) { + Ordering::Less => { + // Extend stash by updates, with logical compaction applied. + self.stash_updates_for_val(source1, lower1); + if let Some(off) = self.consolidate_updates() { + self.result.vals_offs.push(off); + self.result.vals.copy(source1.vals.index(lower1)); + } + lower1 += 1; + }, + Ordering::Equal => { + self.stash_updates_for_val(source1, lower1); + self.stash_updates_for_val(source2, lower2); + if let Some(off) = self.consolidate_updates() { + self.result.vals_offs.push(off); + self.result.vals.copy(source1.vals.index(lower1)); + } + lower1 += 1; + lower2 += 1; + }, + Ordering::Greater => { + // Extend stash by updates, with logical compaction applied. + self.stash_updates_for_val(source2, lower2); + if let Some(off) = self.consolidate_updates() { + self.result.vals_offs.push(off); + self.result.vals.copy(source2.vals.index(lower2)); + } + lower2 += 1; + }, + } + } + // Merging is complete, but we may have remaining elements to push. + while lower1 < upper1 { + self.stash_updates_for_val(source1, lower1); + if let Some(off) = self.consolidate_updates() { + self.result.vals_offs.push(off); + self.result.vals.copy(source1.vals.index(lower1)); + } + lower1 += 1; + } + while lower2 < upper2 { + self.stash_updates_for_val(source2, lower2); + if let Some(off) = self.consolidate_updates() { + self.result.vals_offs.push(off); + self.result.vals.copy(source2.vals.index(lower2)); + } + lower2 += 1; + } + + // Values being pushed indicate non-emptiness. + if self.result.vals.len() > init_vals { + Some(self.result.vals.len().try_into().ok().unwrap()) + } else { + None + } + } + + /// Transfer updates for an indexed value in `source` into `self`, with compaction applied. + fn stash_updates_for_val(&mut self, source: &OrdValStorage, index: usize) { + let (lower, upper) = source.updates_for_value(index); + for i in lower .. upper { + // NB: Here is where we would need to look back if `lower == upper`. + let (time, diff) = &source.updates.index(i); + use lattice::Lattice; + let mut new_time = time.clone(); + new_time.advance_by(self.description.since().borrow()); + self.update_stash.push((new_time, diff.clone())); + } + } + + /// Consolidates `self.updates_stash` and produces the offset to record, if any. + fn consolidate_updates(&mut self) -> Option { + use consolidation; + consolidation::consolidate(&mut self.update_stash); + if !self.update_stash.is_empty() { + for item in self.update_stash.drain(..) { + self.result.updates.push(item); + } + Some(self.result.updates.len().try_into().ok().unwrap()) + } else { + None + } + } + } + + /// A cursor for navigating a single layer. + pub struct OrdValCursor { + /// Absolute position of the current key. + key_cursor: usize, + /// Absolute position of the current value. + val_cursor: usize, + /// Phantom marker for Rust happiness. + phantom: std::marker::PhantomData, + } + + impl Cursor for OrdValCursor { + type Key = ::Key; + type Val = ::Val; + type Time = ::Time; + type R = ::Diff; + + type Storage = OrdValBatch; + + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &storage.storage.keys.index(self.key_cursor.try_into().ok().unwrap()) } + fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { &storage.storage.vals.index(self.val_cursor.try_into().ok().unwrap()) } + fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { + let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); + for index in lower .. upper { + let (time, diff) = &storage.storage.updates.index(index); + logic(time, diff); + } + } + fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() } + fn val_valid(&self, storage: &Self::Storage) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 } + fn step_key(&mut self, storage: &Self::Storage){ + self.key_cursor += 1; + if self.key_valid(storage) { + self.rewind_vals(storage); + } + else { + self.key_cursor = storage.storage.keys.len(); + } + } + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { + self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key)); + if self.key_valid(storage) { + self.rewind_vals(storage); + } + } + fn step_val(&mut self, storage: &Self::Storage) { + self.val_cursor += 1; + if !self.val_valid(storage) { + self.val_cursor = storage.storage.values_for_key(self.key_cursor).1; + } + } + fn seek_val(&mut self, storage: &Self::Storage, val: &Self::Val) { + self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(val)); + } + fn rewind_keys(&mut self, storage: &Self::Storage) { + self.key_cursor = 0; + if self.key_valid(storage) { + self.rewind_vals(storage) + } + } + fn rewind_vals(&mut self, storage: &Self::Storage) { + self.val_cursor = storage.storage.values_for_key(self.key_cursor).0; + } + } + + /// A builder for creating layers from unsorted update tuples. + pub struct OrdValBuilder { + result: OrdValStorage, + } + + impl Builder> for OrdValBuilder { + + fn new() -> Self { Self::with_capacity(0) } + fn with_capacity(cap: usize) -> Self { + // We don't introduce zero offsets as they will be introduced by the first `push` call. + Self { + result: OrdValStorage { + keys: L::KeyContainer::with_capacity(cap), + keys_offs: Vec::with_capacity(cap), + vals: L::ValContainer::with_capacity(cap), + vals_offs: Vec::with_capacity(cap), + updates: L::UpdContainer::with_capacity(cap), + } + } + } + + #[inline] + fn push(&mut self, (key, val, time, diff): (::Key, ::Val, ::Time, ::Diff)) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last() == Some(&key) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last() == Some(&val) { + // TODO: here we could look for repetition, and not push the update in that case. + // More logic (and state) would be required to correctly wrangle this. + self.result.updates.push((time, diff)); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); + self.result.updates.push((time, diff)); + self.result.vals.push(val); + } + } else { + // New key; complete representation of prior key. + self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); + self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); + self.result.updates.push((time, diff)); + self.result.vals.push(val); + self.result.keys.push(key); + } + } + + #[inline(never)] + fn done(mut self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdValBatch { + // Record the final offsets + self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); + self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); + + OrdValBatch { + storage: self.result, + description: Description::new(lower, upper, since), + } + } + } + +} + +mod key_batch { + + // Copy the above, once it works! + +} \ No newline at end of file diff --git a/src/trace/layers/mod.rs b/src/trace/layers/mod.rs index c074386bbd..aa214e1d04 100644 --- a/src/trace/layers/mod.rs +++ b/src/trace/layers/mod.rs @@ -132,6 +132,15 @@ pub trait BatchContainer: Default { fn index(&self, index: usize) -> &Self::Item; /// Number of contained elements fn len(&self) -> usize; + /// Returns the last item if the container is non-empty. + fn last(&self) -> Option<&Self::Item> { + if self.len() > 0 { + Some(self.index(self.len()-1)) + } + else { + None + } + } /// Reports the number of elements satisfing the predicate. /// diff --git a/tests/trace.rs b/tests/trace.rs index 6cf59b608c..d29999af2b 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -1,21 +1,16 @@ extern crate timely; extern crate differential_dataflow; -use std::rc::Rc; - use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; -use differential_dataflow::trace::implementations::ord::{OrdValBatch, Vector}; +use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher}; use differential_dataflow::trace::cursor::Cursor; -use differential_dataflow::trace::implementations::spine_fueled::Spine; - -pub type OrdValSpine = Spine, Vec<((K, V), T, R)>>>>; -type IntegerTrace = OrdValSpine; +type IntegerTrace = ValSpine; -fn get_trace() -> Spine, Vec<((u64, u64), usize, i64)>>>> { +fn get_trace() -> ValSpine { let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None, None); { diff --git a/tpchlike/src/lib.rs b/tpchlike/src/lib.rs index a798e4236c..b323596bd3 100644 --- a/tpchlike/src/lib.rs +++ b/tpchlike/src/lib.rs @@ -115,11 +115,11 @@ impl Collections { } -use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace; +use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::operators::arrange::{Arranged, TraceAgent}; type ArrangedScope = Arranged>; -type ArrangedIndex = TraceAgent>; +type ArrangedIndex = TraceAgent>; pub struct ArrangementsInScope> { customer: ArrangedScope, diff --git a/tpchlike/src/queries/query20.rs b/tpchlike/src/queries/query20.rs index 845570602c..c127a0396a 100644 --- a/tpchlike/src/queries/query20.rs +++ b/tpchlike/src/queries/query20.rs @@ -4,7 +4,7 @@ use timely::dataflow::operators::probe::Handle as ProbeHandle; use differential_dataflow::operators::*; use differential_dataflow::operators::reduce::ReduceCore; -use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace; +use differential_dataflow::trace::implementations::ValSpine; use differential_dataflow::lattice::Lattice; use {Arrangements, Experiment, Collections}; @@ -77,7 +77,7 @@ where G::Timestamp: Lattice+TotalOrder+Ord { ) .semijoin(&partkeys) .explode(|l| Some(((((l.0 as u64) << 32) + (l.1).0 as u64, ()), (l.1).1 as isize))) - .reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Reduce", |_k,s,t| t.push((s[0].1, 1))); + .reduce_abelian::<_,ValSpine<_,_,_,_>>("Reduce", |_k,s,t| t.push((s[0].1, 1))); let suppliers = collections diff --git a/tpchlike/src/queries/query22.rs b/tpchlike/src/queries/query22.rs index bcd1a2a3ac..4073949865 100644 --- a/tpchlike/src/queries/query22.rs +++ b/tpchlike/src/queries/query22.rs @@ -8,7 +8,7 @@ use differential_dataflow::operators::reduce::ReduceCore; use differential_dataflow::operators::ThresholdTotal; use differential_dataflow::lattice::Lattice; -use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultValTrace; +use differential_dataflow::trace::implementations::ValSpine; use {Arrangements, Experiment, Collections}; @@ -80,7 +80,7 @@ where G::Timestamp: Lattice+TotalOrder+Ord { let averages = customers .explode(|(cc, acctbal, _)| Some(((cc, ()), DiffPair::new(acctbal as isize, 1)))) - .reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Reduce", |_k,s,t| t.push((s[0].1, 1))); + .reduce_abelian::<_,ValSpine<_,_,_,_>>("Reduce", |_k,s,t| t.push((s[0].1, 1))); customers .map(|(cc, acct, key)| (key, (cc, acct))) @@ -133,7 +133,7 @@ where let averages = customers .explode(|(cc, acctbal, _)| Some(((cc, ()), DiffPair::new(acctbal as isize, 1)))) - .reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Reduce", |_k,s,t| t.push((s[0].1, 1))); + .reduce_abelian::<_,ValSpine<_,_,_,_>>("Reduce", |_k,s,t| t.push((s[0].1, 1))); let orders = arrangements