From 44d110aba9cf92acbecc963861249c50ff7f12cf Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Nov 2023 16:35:25 -0500 Subject: [PATCH] Non-working commit --- examples/cursors.rs | 4 +- examples/spines.rs | 57 ++-- src/algorithms/graphs/bfs.rs | 2 +- src/algorithms/graphs/bijkstra.rs | 2 +- src/algorithms/graphs/propagate.rs | 5 +- src/operators/arrange/agent.rs | 6 +- src/operators/arrange/arrangement.rs | 38 +-- src/operators/arrange/upsert.rs | 22 +- src/operators/consolidate.rs | 2 +- src/operators/count.rs | 8 +- src/operators/join.rs | 83 +++--- src/operators/mod.rs | 3 +- src/operators/reduce.rs | 96 +++--- src/operators/threshold.rs | 10 +- src/trace/cursor/cursor_list.rs | 18 +- src/trace/cursor/mod.rs | 56 +++- src/trace/implementations/mod.rs | 338 ++++++++++++++++------ src/trace/implementations/ord.rs | 16 +- src/trace/implementations/ord_neu.rs | 102 +++---- src/trace/implementations/rhh.rs | 118 ++++---- src/trace/implementations/spine_fueled.rs | 12 +- src/trace/mod.rs | 50 ++-- src/trace/wrappers/enter.rs | 28 +- src/trace/wrappers/enter_at.rs | 36 ++- src/trace/wrappers/filter.rs | 36 ++- src/trace/wrappers/freeze.rs | 32 +- src/trace/wrappers/frontier.rs | 28 +- src/trace/wrappers/rc.rs | 6 +- 28 files changed, 703 insertions(+), 511 deletions(-) diff --git a/examples/cursors.rs b/examples/cursors.rs index 952f54bd8..b1db68699 100644 --- a/examples/cursors.rs +++ b/examples/cursors.rs @@ -135,8 +135,8 @@ fn main() { fn dump_cursor(round: u32, index: usize, trace: &mut Tr) where Tr: TraceReader, - Tr::Key: Debug + Clone, - Tr::Val: Debug + Clone, + Tr::KeyOwned: Debug + Clone, + Tr::ValOwned: Debug + Clone, Tr::Time: Debug + Clone, Tr::Diff: Debug + Clone, ::ValOwned: Debug, diff --git a/examples/spines.rs b/examples/spines.rs index d148d610f..e359cd696 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -24,33 +24,33 @@ fn main() { let mut probe = Handle::new(); let (mut data_input, mut keys_input) = worker.dataflow(|scope| { - use differential_dataflow::operators::{arrange::Arrange, JoinCore}; + use differential_dataflow::operators::{arrange::Arrange, JoinCore, join::join_traces}; let (data_input, data) = scope.new_collection::(); let (keys_input, keys) = scope.new_collection::(); match mode.as_str() { - "new" => { - use differential_dataflow::trace::implementations::ord_neu::ColKeySpine; - let data = data.arrange::>(); - let keys = keys.arrange::>(); - keys.join_core(&data, |_k, &(), &()| Option::<()>::None) - .probe_with(&mut probe); - }, - "old" => { - use differential_dataflow::trace::implementations::ord_neu::OrdKeySpine; - let data = data.arrange::>(); - let keys = keys.arrange::>(); - keys.join_core(&data, |_k, &(), &()| Option::<()>::None) - .probe_with(&mut probe); - }, - "rhh" => { - use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine}; - let data = data.map(|x| HashWrapper { inner: x }).arrange::>(); - let keys = keys.map(|x| HashWrapper { inner: x }).arrange::>(); - keys.join_core(&data, |_k, &(), &()| Option::<()>::None) - .probe_with(&mut probe); - }, + // "new" => { + // use differential_dataflow::trace::implementations::ord_neu::ColKeySpine; + // let data = data.arrange::>(); + // let keys = keys.arrange::>(); + // keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + // .probe_with(&mut probe); + // }, + // "old" => { + // use differential_dataflow::trace::implementations::ord_neu::OrdKeySpine; + // let data = data.arrange::>(); + // let keys = keys.arrange::>(); + // keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + // .probe_with(&mut probe); + // }, + // "rhh" => { + // use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine}; + // let data = data.map(|x| HashWrapper { inner: x }).arrange::>(); + // let keys = keys.map(|x| HashWrapper { inner: x }).arrange::>(); + // keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + // .probe_with(&mut probe); + // }, "slc" => { use differential_dataflow::trace::implementations::ord_neu::PreferredSpine; @@ -58,14 +58,17 @@ fn main() { let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) - .arrange::>() - .reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1))); + .arrange::>(); + // .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); let keys = keys.map(|x| (x.clone().into_bytes(), 7)) - .arrange::>() - .reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1))); + .arrange::>(); + // .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); - keys.join_core(&data, |_k,&(),&()| Option::<()>::None) + join_traces(&keys, &data, |k,v1,v2,t,r1,r2| { + println!("{:?}", k.text); + Option::<((),isize,isize)>::None + }) .probe_with(&mut probe); }, _ => { diff --git a/src/algorithms/graphs/bfs.rs b/src/algorithms/graphs/bfs.rs index 8f42cb47d..756b1e631 100644 --- a/src/algorithms/graphs/bfs.rs +++ b/src/algorithms/graphs/bfs.rs @@ -29,7 +29,7 @@ where G: Scope, G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, - Tr: TraceReader+Clone+'static, + Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static, { // initialize roots as reaching themselves at distance 0 let nodes = roots.map(|x| (x, 0)); diff --git a/src/algorithms/graphs/bijkstra.rs b/src/algorithms/graphs/bijkstra.rs index f8a4662be..8a416318d 100644 --- a/src/algorithms/graphs/bijkstra.rs +++ b/src/algorithms/graphs/bijkstra.rs @@ -45,7 +45,7 @@ where G: Scope, G::Timestamp: Lattice+Ord, N: ExchangeData+Hash, - Tr: TraceReader+Clone+'static, + Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static, { forward .stream diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs index 32947d4c7..42f9b58b8 100644 --- a/src/algorithms/graphs/propagate.rs +++ b/src/algorithms/graphs/propagate.rs @@ -5,7 +5,6 @@ use std::hash::Hash; use timely::dataflow::*; use ::{Collection, ExchangeData}; -use ::operators::*; use ::lattice::Lattice; use ::difference::{Abelian, Multiply}; use ::operators::arrange::arrangement::ArrangeByKey; @@ -64,7 +63,7 @@ where R: Multiply, R: From, L: ExchangeData, - Tr: TraceReader+Clone+'static, + Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=R>+Clone+'static, F: Fn(&L)->u64+Clone+'static, { // Morally the code performs the following iterative computation. However, in the interest of a simplified @@ -90,6 +89,8 @@ where use timely::order::Product; + use operators::join::JoinCore; + let edges = edges.enter(scope); let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize)); diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 03ea5af1b..088693c87 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -47,8 +47,10 @@ where Tr: TraceReader, Tr::Time: Lattice+Ord+Clone+'static, { - type Key = Tr::Key; - type Val = Tr::Val; + type Key<'a> = Tr::Key<'a>; + type KeyOwned = Tr::KeyOwned; + type Val<'a> = Tr::Val<'a>; + type ValOwned = Tr::ValOwned; type Time = Tr::Time; type Diff = Tr::Diff; diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 42bf28f87..6e246bc4e 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -37,6 +37,8 @@ use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; use trace::wrappers::enter_at::BatchEnter as BatchEnterAt; use trace::wrappers::filter::{TraceFilter, BatchFilter}; +use trace::cursor::MyTrait; + use super::TraceAgent; /// An arranged collection of `(K,V)` values. @@ -89,8 +91,6 @@ where pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>) -> Arranged, TraceEnter> where - Tr::Key: 'static, - Tr::Val: 'static, Tr::Diff: 'static, G::Timestamp: Clone+'static, TInner: Refines+Lattice+Timestamp+Clone+'static, @@ -108,8 +108,6 @@ where pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>) -> Arranged, Tr> where - Tr::Key: 'static, - Tr::Val: 'static, Tr::Diff: 'static, G::Timestamp: Clone+'static, { @@ -127,12 +125,10 @@ where pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P) -> Arranged, TraceEnterAt> where - Tr::Key: 'static, - Tr::Val: 'static, Tr::Diff: 'static, G::Timestamp: Clone+'static, TInner: Refines+Lattice+Timestamp+Clone+'static, - F: FnMut(&Tr::Key, &Tr::Val, &G::Timestamp)->TInner+Clone+'static, + F: for <'b> FnMut(Tr::Key<'b>, Tr::Val<'b>, &G::Timestamp)->TInner+Clone+'static, P: FnMut(&TInner)->Tr::Time+Clone+'static, { let logic1 = logic.clone(); @@ -177,11 +173,9 @@ where pub fn filter(&self, logic: F) -> Arranged> where - Tr::Key: 'static, - Tr::Val: 'static, Tr::Diff: 'static, G::Timestamp: Clone+'static, - F: FnMut(&Tr::Key, &Tr::Val)->bool+Clone+'static, + F: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>)->bool+Clone+'static, { let logic1 = logic.clone(); let logic2 = logic.clone(); @@ -198,7 +192,7 @@ where pub fn as_collection(&self, mut logic: L) -> Collection where Tr::Diff: Semigroup, - L: FnMut(&Tr::Key, &Tr::Val) -> D+'static, + L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> D+'static, { self.flat_map_ref(move |key, val| Some(logic(key,val))) } @@ -212,7 +206,7 @@ where Tr::Diff: Semigroup, I: IntoIterator, I::Item: Data, - L: FnMut(&Tr::Key, &Tr::Val) -> I+'static, + L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> I+'static, { Self::flat_map_batches(&self.stream, logic) } @@ -229,7 +223,7 @@ where Tr::Diff: Semigroup, I: IntoIterator, I::Item: Data, - L: FnMut(&Tr::Key, &Tr::Val) -> I+'static, + L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> I+'static, { stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| { input.for_each(|time, data| { @@ -258,16 +252,16 @@ where /// /// This method consumes a stream of (key, time) queries and reports the corresponding stream of /// (key, value, time, diff) accumulations in the `self` trace. - pub fn lookup(&self, queries: &Stream) -> Stream + pub fn lookup(&self, queries: &Stream) -> Stream where G::Timestamp: Data+Lattice+Ord+TotalOrder, - Tr::Key: ExchangeData+Hashable, - Tr::Val: ExchangeData, + Tr::KeyOwned: ExchangeData+Hashable, + Tr::ValOwned: ExchangeData, Tr::Diff: ExchangeData+Semigroup, Tr: 'static, { // while the arrangement is already correctly distributed, the query stream may not be. - let exchange = Exchange::new(move |update: &(Tr::Key,G::Timestamp)| update.0.hashed().into()); + let exchange = Exchange::new(move |update: &(Tr::KeyOwned,G::Timestamp)| update.0.hashed().into()); queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| { let mut trace = Some(self.trace.clone()); @@ -280,8 +274,8 @@ where let mut active = Vec::new(); let mut retain = Vec::new(); - let mut working: Vec<(G::Timestamp, Tr::Val, Tr::Diff)> = Vec::new(); - let mut working2: Vec<(Tr::Val, Tr::Diff)> = Vec::new(); + let mut working: Vec<(G::Timestamp, Tr::ValOwned, Tr::Diff)> = Vec::new(); + let mut working2: Vec<(Tr::ValOwned, Tr::Diff)> = Vec::new(); move |input1, input2, output| { @@ -346,13 +340,13 @@ where same_key += 1; } - cursor.seek_key(&storage, key); - if cursor.get_key(&storage) == Some(key) { + cursor.seek_key_owned(&storage, key); + if cursor.get_key(&storage).map(|k| k.equals(key)).unwrap_or(false) { let mut active = &active[active_finger .. same_key]; while let Some(val) = cursor.get_val(&storage) { - cursor.map_times(&storage, |t,d| working.push((t.clone(), val.clone(), d.clone()))); + cursor.map_times(&storage, |t,d| working.push((t.clone(), val.into_owned(), d.clone()))); cursor.step_val(&storage); } diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index b5b4618bc..d5a5cbcb0 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -136,17 +136,17 @@ use super::TraceAgent; /// understand what a "sequence" of upserts would mean for partially ordered /// timestamps. pub fn arrange_from_upsert( - stream: &Stream, G::Timestamp)>, + stream: &Stream, G::Timestamp)>, name: &str, ) -> Arranged> where G: Scope, G::Timestamp: Lattice+Ord+TotalOrder+ExchangeData, - Tr::Key: ExchangeData+Hashable+std::hash::Hash, - Tr::Val: ExchangeData, + Tr::KeyOwned: ExchangeData+Hashable+std::hash::Hash, + Tr::ValOwned: ExchangeData, Tr: Trace+TraceReader+'static, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder, { let mut reader: Option> = None; @@ -155,7 +155,7 @@ where let reader = &mut reader; - let exchange = Exchange::new(move |update: &(Tr::Key,Option,G::Timestamp)| (update.0).hashed().into()); + let exchange = Exchange::new(move |update: &(Tr::KeyOwned,Option,G::Timestamp)| (update.0).hashed().into()); stream.unary_frontier(exchange, name, move |_capability, info| { @@ -185,7 +185,7 @@ where let mut prev_frontier = Antichain::from_elem(::minimum()); // For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap). - let mut priority_queue = BinaryHeap::)>>::new(); + let mut priority_queue = BinaryHeap::)>>::new(); let mut updates = Vec::new(); move |input, output| { @@ -252,12 +252,14 @@ where let mut builder = Tr::Builder::new(); for (key, mut list) in to_process.drain(..) { + use trace::cursor::MyTrait; + // The prior value associated with the key. - let mut prev_value: Option = None; + let mut prev_value: Option = None; // Attempt to find the key in the trace. - trace_cursor.seek_key(&trace_storage, &key); - if trace_cursor.get_key(&trace_storage) == Some(&key) { + trace_cursor.seek_key_owned(&trace_storage, &key); + if trace_cursor.get_key(&trace_storage).map(|k| k.equals(&key)).unwrap_or(false) { // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; @@ -265,7 +267,7 @@ where assert!(count == 0 || count == 1); if count == 1 { assert!(prev_value.is_none()); - prev_value = Some(val.clone()); + prev_value = Some(val.into_owned()); } trace_cursor.step_val(&trace_storage); } diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 7345a3ef9..6240b4b0c 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -56,7 +56,7 @@ where /// As `consolidate` but with the ability to name the operator and specify the trace type. pub fn consolidate_named(&self, name: &str) -> Self where - Tr: crate::trace::Trace+crate::trace::TraceReader+'static, + Tr: for<'a> crate::trace::Trace=&'a D,Val<'a>=&'a (),Time=G::Timestamp,Diff=R>+'static, Tr::Batch: crate::trace::Batch, Tr::Batcher: Batcher, Tr::Builder: Builder, diff --git a/src/operators/count.rs b/src/operators/count.rs index 5d8a6dbeb..77c084da9 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -55,14 +55,14 @@ where G::Timestamp: TotalOrder+Lattice+Ord { } } -impl CountTotal for Arranged +impl CountTotal for Arranged where G::Timestamp: TotalOrder+Lattice+Ord, - T1: TraceReader+Clone+'static, - T1::Key: ExchangeData, + T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Time=G::Timestamp>+Clone+'static, + K: ExchangeData, T1::Diff: ExchangeData+Semigroup, { - fn count_total_core>(&self) -> Collection { + fn count_total_core>(&self) -> Collection { let mut trace = self.trace.clone(); let mut buffer = Vec::new(); diff --git a/src/operators/join.rs b/src/operators/join.rs index 67b7ebfb4..ed372e423 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -181,29 +181,33 @@ where } } -impl Join for Arranged +impl Join for Arranged where G: Scope, G::Timestamp: Lattice+Ord, - Tr: TraceReader+Clone+'static, - Tr::Key: Data+Hashable, - Tr::Val: Data, + Tr: for<'a> TraceReader = &'a K, Val<'a> = &'a V>+Clone+'static, + K: ExchangeData+Hashable, + V: Data + 'static, Tr::Diff: Semigroup, { - fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> - where Tr::Key: ExchangeData, Tr::Diff: Multiply, >::Output: Semigroup, L: FnMut(&Tr::Key, &Tr::Val, &V2)->D+'static { + fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> + where + Tr::Diff: Multiply, + >::Output: Semigroup, + L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static, + { let arranged2 = other.arrange_by_key(); self.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2))) } - fn semijoin(&self, other: &Collection) -> Collection>::Output> - where Tr::Key: ExchangeData, Tr::Diff: Multiply, >::Output: Semigroup { + fn semijoin(&self, other: &Collection) -> Collection>::Output> + where Tr::Diff: Multiply, >::Output: Semigroup { let arranged2 = other.arrange_by_self(); self.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone()))) } - fn antijoin(&self, other: &Collection) -> Collection - where Tr::Key: ExchangeData, Tr::Diff: Multiply, Tr::Diff: Abelian { + fn antijoin(&self, other: &Collection) -> Collection + where Tr::Diff: Multiply, Tr::Diff: Abelian { self.as_collection(|k,v| (k.clone(), v.clone())) .concat(&self.semijoin(other).negate()) } @@ -253,14 +257,13 @@ pub trait JoinCore (&self, stream2: &Arranged, result: L) -> Collection>::Output> where - Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+'static, + Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, Tr2::Diff: Semigroup, R: Multiply, >::Output: Semigroup, I: IntoIterator, I::Item: Data, - L: FnMut(&K,&V,&Tr2::Val)->I+'static, + L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static, ; /// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and @@ -303,13 +306,12 @@ pub trait JoinCore (&self, stream2: &Arranged, result: L) -> Collection where - Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+'static, + Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, Tr2::Diff: Semigroup, D: Data, ROut: Semigroup, I: IntoIterator, - L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::Diff)->I+'static, + L: for<'a> FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static, ; } @@ -324,14 +326,13 @@ where { fn join_core (&self, stream2: &Arranged, result: L) -> Collection>::Output> where - Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+'static, + Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, Tr2::Diff: Semigroup, R: Multiply, >::Output: Semigroup, I: IntoIterator, I::Item: Data, - L: FnMut(&K,&V,&Tr2::Val)->I+'static, + L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static, { self.arrange_by_key() .join_core(stream2, result) @@ -339,41 +340,38 @@ where fn join_core_internal_unsafe (&self, stream2: &Arranged, result: L) -> Collection where - Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+'static, + Tr2: for<'a> TraceReader=&'a K, Time=G::Timestamp>+Clone+'static, Tr2::Diff: Semigroup, - R: Semigroup, + I: IntoIterator, + L: FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static, D: Data, ROut: Semigroup, - I: IntoIterator, - L: FnMut(&K,&V,&Tr2::Val,&G::Timestamp,&R,&Tr2::Diff)->I+'static, { self.arrange_by_key().join_core_internal_unsafe(stream2, result) } } -impl JoinCore for Arranged +impl JoinCore for Arranged where G: Scope, G::Timestamp: Lattice+Ord, - T1: TraceReader+Clone+'static, - T1::Key: Ord+'static, - T1::Val: Ord+'static, + T1: for<'a> TraceReader = &'a K, Val<'a> = &'a V, Time=G::Timestamp>+Clone+'static, + K: Ord+'static + ?Sized, + V: Ord+'static + ?Sized, T1::Diff: Semigroup, { fn join_core(&self, other: &Arranged, mut result: L) -> Collection>::Output> where - Tr2::Val: Ord+'static, - Tr2: TraceReader+Clone+'static, + Tr2: for<'a> TraceReader=T1::Key<'a>,Time=G::Timestamp>+Clone+'static, Tr2::Diff: Semigroup, T1::Diff: Multiply, >::Output: Semigroup, I: IntoIterator, I::Item: Data, - L: FnMut(&T1::Key,&T1::Val,&Tr2::Val)->I+'static + L: FnMut(T1::Key<'_>,T1::Val<'_>,Tr2::Val<'_>)->I+'static { - let result = move |k: &T1::Key, v1: &T1::Val, v2: &Tr2::Val, t: &G::Timestamp, r1: &T1::Diff, r2: &Tr2::Diff| { + let result = move |k: T1::Key<'_>, v1: T1::Val<'_>, v2: Tr2::Val<'_>, t: &G::Timestamp, r1: &T1::Diff, r2: &Tr2::Diff| { let t = t.clone(); let r = (r1.clone()).multiply(r2); result(k, v1, v2).into_iter().map(move |d| (d, t.clone(), r.clone())) @@ -383,13 +381,12 @@ impl JoinCore for Arranged fn join_core_internal_unsafe (&self, other: &Arranged, result: L) -> Collection where - Tr2: TraceReader+Clone+'static, - Tr2::Val: Ord+'static, + Tr2: for<'a> TraceReader=T1::Key<'a>, Time=G::Timestamp>+Clone+'static, Tr2::Diff: Semigroup, D: Data, ROut: Semigroup, I: IntoIterator, - L: FnMut(&T1::Key,&T1::Val,&Tr2::Val,&G::Timestamp,&T1::Diff,&Tr2::Diff)->I+'static, + L: FnMut(&K, &V,Tr2::Val<'_>,&G::Timestamp,&T1::Diff,&Tr2::Diff)->I+'static, { join_traces(self, other, result) } @@ -408,16 +405,13 @@ where G: Scope, G::Timestamp: Lattice+Ord, T1: TraceReader+Clone+'static, - T1::Key: Ord, - T1::Val: Ord, T1::Diff: Semigroup, - T2: TraceReader+Clone+'static, - T2::Val: Ord, + T2: for<'a> TraceReader=T1::Key<'a>, Time=G::Timestamp>+Clone+'static, T2::Diff: Semigroup, D: Data, R: Semigroup, I: IntoIterator, - L: FnMut(&T1::Key,&T1::Val,&T2::Val,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, + L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static, { // Rename traces for symmetry from here on out. let mut trace1 = arranged1.trace.clone(); @@ -665,7 +659,7 @@ where T: Timestamp+Lattice+Ord, R: Semigroup, C1: Cursor, - C2: Cursor, + C2: for<'a> Cursor=C1::Key<'a>, Time=T>, C1::Diff: Semigroup, C2::Diff: Semigroup, D: Ord+Clone+Data, @@ -681,9 +675,8 @@ where impl Deferred where - C1::Key: Ord+Eq, C1: Cursor, - C2: Cursor, + C2: for<'a> Cursor=C1::Key<'a>, Time=T>, C1::Diff: Semigroup, C2::Diff: Semigroup, T: Timestamp+Lattice+Ord, @@ -711,7 +704,7 @@ where fn work(&mut self, output: &mut OutputHandle>, mut logic: L, fuel: &mut usize) where I: IntoIterator, - L: for<'a> FnMut(&C1::Key, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I, + L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I, { let meet = self.capability.time(); @@ -730,7 +723,7 @@ where while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel { - match trace.key(trace_storage).cmp(batch.key(batch_storage)) { + match trace.key(trace_storage).cmp(&batch.key(batch_storage)) { Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)), Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)), Ordering::Equal => { diff --git a/src/operators/mod.rs b/src/operators/mod.rs index 84a658b33..53ddb1d6b 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -121,11 +121,10 @@ where &'history mut self, cursor: &mut C, storage: &'storage C::Storage, - key: &C::Key, + key: C::Key<'storage>, logic: L ) -> HistoryReplay<'storage, 'history, C> where - C::Key: Eq, L: Fn(&C::Time)->C::Time, { self.clear(); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 8a0303b54..4c2d609f6 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -26,7 +26,7 @@ use trace::implementations::{KeySpine, ValSpine}; use trace::TraceReader; /// Extension trait for the `reduce` differential dataflow method. -pub trait Reduce where G::Timestamp: Lattice+Ord { +pub trait Reduce : ReduceCore where G::Timestamp: Lattice+Ord { /// Applies a reduction function on records grouped by key. /// /// Input data must be structured as `(key, val)` pairs. @@ -88,7 +88,7 @@ impl Reduce for Collection impl Reduce for Arranged where G::Timestamp: Lattice+Ord, - T1: TraceReader+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwned=K, Val<'a>=&'a V, Time=G::Timestamp, Diff=R>+Clone+'static, { fn reduce_named(&self, name: &str, logic: L) -> Collection where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static { @@ -175,7 +175,7 @@ where G::Timestamp: Lattice+Ord { impl Threshold for Arranged where G::Timestamp: Lattice+Ord, - T1: TraceReader+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwned=K, Val<'a>=&'a (), Time=G::Timestamp, Diff=R1>+Clone+'static, { fn threshold_namedR2+'static>(&self, name: &str, mut thresh: F) -> Collection { self.reduce_abelian::<_,KeySpine<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1)))) @@ -230,7 +230,7 @@ where impl Count for Arranged where G::Timestamp: Lattice+Ord, - T1: TraceReader+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwned=K, Val<'a>=&'a (), Time=G::Timestamp, Diff=R>+Clone+'static, { fn count_core>(&self) -> Collection { self.reduce_abelian::<_,ValSpine<_,_,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8)))) @@ -272,13 +272,11 @@ pub trait ReduceCore(&self, name: &str, mut logic: L) -> Arranged> where - T2: Trace+TraceReader+'static, - T2::Val: Data, - T2::Val: Ord + ToOwned::ValOwned>, - ::Owned: Data, + T2: for<'a> Trace= &'a K, Time=G::Timestamp>+'static, + T2::ValOwned: Data, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder::Owned), T2::Time, T2::Diff)>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned, T2::Diff)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -297,13 +295,11 @@ pub trait ReduceCore(&self, name: &str, logic: L) -> Arranged> where - T2: Trace+TraceReader+'static, - T2::Val: Data, - T2::Val: Ord + ToOwned::ValOwned>, - ::Owned: Data, + T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, + T2::ValOwned: Data, T2::Diff: Semigroup, T2::Batch: Batch, - T2::Builder: Builder::Owned), T2::Time, T2::Diff)>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, ; } @@ -319,13 +315,11 @@ where { fn reduce_core(&self, name: &str, logic: L) -> Arranged> where - T2::Val: Data, - T2::Val: Ord + ToOwned::ValOwned>, - ::Owned: Data, + T2::ValOwned: Data, T2::Diff: Semigroup, - T2: Trace+TraceReader+'static, + T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder::Owned), T2::Time, T2::Diff)>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -339,16 +333,15 @@ where K::Owned: Data, V: ToOwned + Ord + ?Sized, G::Timestamp: Lattice+Ord, - T1: TraceReader+Clone+'static, + T1: for<'a> TraceReader=&'a K, KeyOwned = ::Owned, Val<'a>=&'a V, Time=G::Timestamp, Diff=R>+Clone+'static, { fn reduce_core(&self, name: &str, logic: L) -> Arranged> where - T2: Trace+TraceReader+'static, - T2::Val: Ord + ToOwned::ValOwned>, - ::Owned: Data, + T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, + T2::ValOwned: Data, T2::Diff: Semigroup, T2::Batch: Batch, - T2::Builder: Builder::Owned), T2::Time, T2::Diff)>, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, { reduce_trace(self, name, logic) @@ -360,17 +353,13 @@ where G: Scope, G::Timestamp: Lattice+Ord, T1: TraceReader + Clone + 'static, - T1::Key: Ord + ToOwned, - ::Owned: Ord, - T1::Val: Ord, T1::Diff: Semigroup, - T2: Trace+TraceReader + 'static, - T2::Val: Ord + ToOwned::ValOwned>, - ::Owned: Data, + T2: for<'a> Trace=T1::Key<'a>, Time=G::Timestamp> + 'static, + T2::ValOwned: Data, T2::Diff: Semigroup, T2::Batch: Batch, - T2::Builder: Builder::Owned, ::Owned), T2::Time, T2::Diff)>, - L: FnMut(&T1::Key, &[(&T1::Val, T1::Diff)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, + T2::Builder: Builder, + L: for<'a> FnMut(T1::Key<'a>, &[(T1::Val<'a>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static, { let mut result_trace = None; @@ -407,7 +396,7 @@ where // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times, // as well as capabilities for these times (or their lower envelope, at least). - let mut interesting = Vec::<(::Owned, G::Timestamp)>::new(); + let mut interesting = Vec::<(T1::KeyOwned, G::Timestamp)>::new(); let mut capabilities = Vec::>::new(); // buffers and logic for computing per-key interesting times "efficiently". @@ -530,13 +519,14 @@ where while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() { use std::borrow::Borrow; - + use trace::cursor::MyTrait; + // Determine the next key we will work on; could be synthetic, could be from a batch. - let key1 = exposed.get(exposed_position).map(|x| &x.0); + let key1 = exposed.get(exposed_position).map(|x| <_ as MyTrait>::borrow_as(&x.0)); let key2 = batch_cursor.get_key(&batch_storage); let key = match (key1, key2) { - (Some(key1), Some(key2)) => ::std::cmp::min(key1.borrow(), key2), - (Some(key1), None) => key1.borrow(), + (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), + (Some(key1), None) => key1, (None, Some(key2)) => key2, (None, None) => unreachable!(), }; @@ -548,7 +538,7 @@ where interesting_times.clear(); // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. - while exposed.get(exposed_position).map(|x| x.0.borrow()) == Some(key) { + while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.equals(k)).unwrap_or(false) { interesting_times.push(exposed[exposed_position].1.clone()); exposed_position += 1; } @@ -576,7 +566,7 @@ where // Record future warnings about interesting times (and assert they should be "future"). for time in new_interesting_times.drain(..) { debug_assert!(upper_limit.less_equal(&time)); - interesting.push((key.to_owned(), time)); + interesting.push((key.into_owned(), time)); } // Sort each buffer by value and move into the corresponding builder. @@ -586,7 +576,7 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - builders[index].push(((key.to_owned(), val), time, diff)); + builders[index].push(((key.into_owned(), val), time, diff)); } } } @@ -683,8 +673,8 @@ fn sort_dedup(list: &mut Vec) { trait PerKeyCompute<'a, C1, C2, C3> where C1: Cursor, - C2: Cursor, - C3: Cursor = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, + C2: Cursor = C1::Key<'a>, Time = C1::Time>, + C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, C2::ValOwned: Ord + Clone, C1::Time: Lattice+Ord+Clone, C1::Diff: Semigroup, @@ -693,7 +683,7 @@ where fn new() -> Self; fn compute( &mut self, - key: &C1::Key, + key: C1::Key<'a>, source_cursor: (&mut C1, &'a C1::Storage), output_cursor: (&mut C2, &'a C2::Storage), batch_cursor: (&mut C3, &'a C3::Storage), @@ -703,9 +693,9 @@ where outputs: &mut [(C2::Time, Vec<(C2::ValOwned, C2::Time, C2::Diff)>)], new_interesting: &mut Vec) -> (usize, usize) where - C1::Key: Eq, L: FnMut( - &C1::Key, &[(C1::Val<'a>, C1::Diff)], + C1::Key<'a>, + &[(C1::Val<'a>, C1::Diff)], &mut Vec<(C2::ValOwned, C2::Diff)>, &mut Vec<(C2::ValOwned, C2::Diff)>, ); @@ -730,8 +720,8 @@ mod history_replay { pub struct HistoryReplayer<'a, C1, C2, C3>//V1, V2, T, R1, R2> where C1: Cursor, - C2: Cursor, - C3: Cursor = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, + C2: Cursor = C1::Key<'a>, Time = C1::Time>, + C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, C2::ValOwned: Ord + Clone, C1::Time: Lattice+Ord+Clone, C1::Diff: Semigroup, @@ -753,8 +743,8 @@ mod history_replay { impl<'a, C1, C2, C3> PerKeyCompute<'a, C1, C2, C3> for HistoryReplayer<'a, C1, C2, C3> where C1: Cursor, - C2: Cursor, - C3: Cursor = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, + C2: Cursor = C1::Key<'a>, Time = C1::Time>, + C3: Cursor = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>, C2::ValOwned: Ord + Clone, C1::Time: Lattice+Ord+Clone, C1::Diff: Semigroup, @@ -778,7 +768,7 @@ mod history_replay { #[inline(never)] fn compute( &mut self, - key: &C1::Key, + key: C1::Key<'a>, (source_cursor, source_storage): (&mut C1, &'a C1::Storage), (output_cursor, output_storage): (&mut C2, &'a C2::Storage), (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage), @@ -788,9 +778,9 @@ mod history_replay { outputs: &mut [(C2::Time, Vec<(C2::ValOwned, C2::Time, C2::Diff)>)], new_interesting: &mut Vec) -> (usize, usize) where - C1::Key: Eq, L: FnMut( - &C1::Key, &[(C1::Val<'a>, C1::Diff)], + C1::Key<'a>, + &[(C1::Val<'a>, C1::Diff)], &mut Vec<(C2::ValOwned, C2::Diff)>, &mut Vec<(C2::ValOwned, C2::Diff)>, ) @@ -956,7 +946,7 @@ mod history_replay { for &((value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { use trace::cursor::MyTrait; - self.output_buffer.push((<_ as MyTrait>::to_owned(value), diff.clone())); + self.output_buffer.push((<_ as MyTrait>::into_owned(value), diff.clone())); } else { self.temporary.push(next_time.join(time)); diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index d17078b12..9feb2386f 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -102,17 +102,17 @@ where G::Timestamp: TotalOrder+Lattice+Ord { } } -impl ThresholdTotal for Arranged +impl ThresholdTotal for Arranged where G::Timestamp: TotalOrder+Lattice+Ord, - T1: TraceReader+Clone+'static, - T1::Key: ExchangeData, + T1: for<'a> TraceReader=&'a K, Val<'a>=&'a (), Time=G::Timestamp>+Clone+'static, + K: ExchangeData, T1::Diff: ExchangeData+Semigroup, { - fn threshold_semigroup(&self, mut thresh: F) -> Collection + fn threshold_semigroup(&self, mut thresh: F) -> Collection where R2: Semigroup, - F: FnMut(&T1::Key,&T1::Diff,Option<&T1::Diff>)->Option+'static, + F: for<'a> FnMut(T1::Key<'a>,&T1::Diff,Option<&T1::Diff>)->Option+'static, { let mut trace = self.trace.clone(); diff --git a/src/trace/cursor/cursor_list.rs b/src/trace/cursor/cursor_list.rs index 3b0bcd623..6192ce66a 100644 --- a/src/trace/cursor/cursor_list.rs +++ b/src/trace/cursor/cursor_list.rs @@ -13,11 +13,7 @@ pub struct CursorList { min_val: Vec, } -impl CursorList -where - C: Cursor, - C::Key: Ord, -{ +impl CursorList { /// Creates a new cursor list from pre-existing cursors. pub fn new(cursors: Vec, storage: &[C::Storage]) -> Self { let mut result = CursorList { @@ -88,11 +84,9 @@ where } } -impl Cursor for CursorList -where - C::Key: Ord, -{ - type Key = C::Key; +impl Cursor for CursorList { + type Key<'a> = C::Key<'a>; + type KeyOwned = C::KeyOwned; type Val<'a> = C::Val<'a>; type ValOwned = C::ValOwned; type Time = C::Time; @@ -108,7 +102,7 @@ where // accessors #[inline] - fn key<'a>(&self, storage: &'a Vec) -> &'a Self::Key { + fn key<'a>(&self, storage: &'a Vec) -> Self::Key<'a> { debug_assert!(self.key_valid(storage)); debug_assert!(self.cursors[self.min_key[0]].key_valid(&storage[self.min_key[0]])); self.cursors[self.min_key[0]].key(&storage[self.min_key[0]]) @@ -136,7 +130,7 @@ where self.minimize_keys(storage); } #[inline] - fn seek_key(&mut self, storage: &Vec, key: &Self::Key) { + fn seek_key<'a>(&mut self, storage: &Vec, key: Self::Key<'a>) { for index in 0 .. self.cursors.len() { self.cursors[index].seek_key(&storage[index], key); } diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 431247b5b..2da033386 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -12,33 +12,56 @@ pub mod cursor_list; pub use self::cursor_list::CursorList; use std::borrow::Borrow; +use std::cmp::Ordering; + /// A type that may be converted into and compared with another type. /// /// The type must also be comparable with itself, and follow the same /// order as if converting instances to `T` and comparing the results. -pub trait MyTrait : Ord { +pub trait MyTrait<'a> : Ord { /// Owned type into which this type can be converted. type Owned; /// Conversion from an instance of this type to the owned type. - fn to_owned(self) -> Self::Owned; + fn into_owned(self) -> Self::Owned; + /// + fn clone_onto(&self, other: &mut Self::Owned); /// Indicates that `self <= other`; used for sorting. - fn less_equal(&self, other: &Self::Owned) -> bool; + fn compare(&self, other: &Self::Owned) -> Ordering; + /// `self <= other` + fn less_equals(&self, other: &Self::Owned) -> bool { + self.compare(other) != Ordering::Greater + } + /// `self == other` + fn equals(&self, other: &Self::Owned) -> bool { + self.compare(other) == Ordering::Equal + } + /// `self < other` + fn less_than(&self, other: &Self::Owned) -> bool { + self.compare(other) == Ordering::Less + } + /// Borrows an owned instance as onesself. + fn borrow_as(other: &'a Self::Owned) -> Self; } -impl<'a, T: Ord+ToOwned+?Sized> MyTrait for &'a T { +impl<'a, T: Ord+ToOwned+?Sized> MyTrait<'a> for &'a T { type Owned = T::Owned; - fn to_owned(self) -> Self::Owned { self.to_owned() } - fn less_equal(&self, other: &Self::Owned) -> bool { self.le(&other.borrow()) } + fn into_owned(self) -> Self::Owned { self.to_owned() } + fn clone_onto(&self, other: &mut Self::Owned) { ::clone_into(self, other) } + fn compare(&self, other: &Self::Owned) -> Ordering { self.cmp(&other.borrow()) } + fn borrow_as(other: &'a Self::Owned) -> Self { + other.borrow() + } } - /// A cursor for navigating ordered `(key, val, time, diff)` updates. pub trait Cursor { /// Key by which updates are indexed. - type Key: ?Sized; + type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>; + /// Owned version of the above. + type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait; + type Val<'a>: Copy + Clone + MyTrait<'a, Owned = Self::ValOwned> + for<'b> PartialOrd>; /// Owned version of the above. type ValOwned: Ord + Clone; /// Timestamps associated with updates @@ -59,12 +82,12 @@ pub trait Cursor { fn val_valid(&self, storage: &Self::Storage) -> bool; /// A reference to the current key. Asserts if invalid. - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key; + fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a>; /// A reference to the current value. Asserts if invalid. fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a>; /// Returns a reference to the current key, if valid. - fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<&'a Self::Key> { + fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option> { if self.key_valid(storage) { Some(self.key(storage)) } else { None } } /// Returns a reference to the current value, if valid. @@ -79,7 +102,11 @@ pub trait Cursor { /// Advances the cursor to the next key. fn step_key(&mut self, storage: &Self::Storage); /// Advances the cursor to the specified key. - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key); + fn seek_key<'a>(&mut self, storage: &Self::Storage, key: Self::Key<'a>); + /// Convenience method to get access by reference to an owned key. + fn seek_key_owned<'a>(&mut self, storage: &Self::Storage, key: &'a Self::KeyOwned) { + self.seek_key(storage, as MyTrait<'a>>::borrow_as(key)); + } /// Advances the cursor to the next value. fn step_val(&mut self, storage: &Self::Storage); @@ -92,9 +119,8 @@ pub trait Cursor { fn rewind_vals(&mut self, storage: &Self::Storage); /// Rewinds the cursor and outputs its contents to a Vec - fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((Self::Key, Self::ValOwned), Vec<(Self::Time, Self::Diff)>)> + fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((Self::KeyOwned, Self::ValOwned), Vec<(Self::Time, Self::Diff)>)> where - Self::Key: Clone, Self::Time: Clone, Self::Diff: Clone, { @@ -107,7 +133,7 @@ pub trait Cursor { self.map_times(storage, |ts, r| { kv_out.push((ts.clone(), r.clone())); }); - out.push(((self.key(storage).clone(), self.val(storage).to_owned()), kv_out)); + out.push(((self.key(storage).into_owned(), self.val(storage).into_owned()), kv_out)); self.step_val(storage); } self.step_key(storage); diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 5617c40eb..b5e2f40a2 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -53,7 +53,7 @@ pub mod rhh; pub use self::ord_neu::OrdValSpine as ValSpine; pub use self::ord_neu::OrdKeySpine as KeySpine; -use std::borrow::{Borrow, ToOwned}; +use std::borrow::{ToOwned}; use timely::container::columnation::{Columnation, TimelyStack}; use lattice::Lattice; @@ -61,14 +61,10 @@ use difference::Semigroup; /// A type that names constituent update types. pub trait Update { - /// We will be able to read out references to this type, and must supply `Key::Owned` as input. - type Key: Ord + ToOwned + ?Sized; /// Key by which data are grouped. - type KeyOwned: Ord+Clone + Borrow; + type Key: Ord + Clone + 'static; /// Values associated with the key. - type Val: Ord + ToOwned + ?Sized + 'static; - /// Values associated with the key, in owned form - type ValOwned: Ord+Clone + Borrow; + type Val: Ord + Clone + 'static; /// Time at which updates occur. type Time: Ord+Lattice+timely::progress::Timestamp+Clone; /// Way in which updates occur. @@ -77,15 +73,13 @@ pub trait Update { impl Update for ((K, V), T, R) where - K: Ord+Clone, + K: Ord+Clone+'static, V: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, R: Semigroup+Clone, { type Key = K; - type KeyOwned = K; type Val = V; - type ValOwned = V; type Time = T; type Diff = R; } @@ -96,15 +90,13 @@ pub trait Layout { type Target: Update + ?Sized; /// Container for update keys. type KeyContainer: - RetainFrom<::Key>+ - BatchContainer::Key>; + BatchContainer::Key>; /// Container for update vals. type ValContainer: - RetainFrom<::Val>+ - BatchContainer::Val>; + BatchContainer::Val>; /// Container for update vals. type UpdContainer: - BatchContainer::Time, ::Diff)>; + for<'a> BatchContainer::Time, ::Diff), ReadItem<'a> = &'a (::Time, ::Diff)>; } /// A layout that uses vectors @@ -114,8 +106,11 @@ pub struct Vector { impl Layout for Vector where - U::Key: ToOwned + Sized + Clone, - U::Val: ToOwned + Sized + Clone, + U::Key: 'static, + U::Val: 'static, +// where +// U::Key: ToOwned + Sized + Clone + 'static, +// U::Val: ToOwned + Sized + Clone + 'static, { type Target = U; type KeyContainer = Vec; @@ -128,10 +123,10 @@ pub struct TStack { phantom: std::marker::PhantomData, } -impl Layout for TStack +impl Layout for TStack where - U::Key: Columnation + ToOwned, - U::Val: Columnation + ToOwned, + U::Key: Columnation + 'static, + U::Val: Columnation + 'static, U::Time: Columnation, U::Diff: Columnation, { @@ -146,15 +141,15 @@ where /// Examples include types that implement `Clone` who prefer pub trait PreferredContainer : ToOwned { /// The preferred container for the type. - type Container: BatchContainer + RetainFrom; + type Container: BatchContainer; } -impl PreferredContainer for T { +impl PreferredContainer for T { type Container = Vec; } -impl PreferredContainer for [T] { - type Container = SliceContainer; +impl PreferredContainer for [T] { + type Container = SliceContainer2; } /// An update and layout description based on preferred containers. @@ -164,17 +159,15 @@ pub struct Preferred { impl Update for Preferred where - K: Ord+ToOwned + ?Sized, - K::Owned: Ord+Clone, - V: Ord+ToOwned + ?Sized + 'static, + K: ToOwned + ?Sized, + K::Owned: Ord+Clone+'static, + V: ToOwned + ?Sized + 'static, V::Owned: Ord+Clone, T: Ord+Lattice+timely::progress::Timestamp+Clone, R: Semigroup+Clone, { - type Key = K; - type KeyOwned = K::Owned; - type Val = V; - type ValOwned = V::Owned; + type Key = K::Owned; + type Val = V::Owned; type Time = T; type Diff = R; } @@ -182,7 +175,8 @@ where impl Layout for Preferred where K: Ord+ToOwned+PreferredContainer + ?Sized, - K::Owned: Ord+Clone, + K::Owned: Ord+Clone+'static, + // for<'a> K::Container: BatchContainer = &'a K>, V: Ord+ToOwned+PreferredContainer + ?Sized + 'static, V::Owned: Ord+Clone, T: Ord+Lattice+timely::progress::Timestamp+Clone, @@ -195,35 +189,35 @@ where } -/// A container that can retain/discard from some offset onward. -pub trait RetainFrom { - /// Retains elements from an index onwards that satisfy a predicate. - fn retain_frombool>(&mut self, index: usize, predicate: P); -} - -impl RetainFrom for Vec { - fn retain_frombool>(&mut self, index: usize, mut predicate: P) { - let mut write_position = index; - for position in index .. self.len() { - if predicate(position, &self[position]) { - self.swap(position, write_position); - write_position += 1; - } - } - self.truncate(write_position); - } -} - -impl RetainFrom for TimelyStack { - fn retain_frombool>(&mut self, index: usize, mut predicate: P) { - let mut position = index; - self.retain_from(index, |item| { - let result = predicate(position, item); - position += 1; - result - }) - } -} +// /// A container that can retain/discard from some offset onward. +// pub trait RetainFrom { +// /// Retains elements from an index onwards that satisfy a predicate. +// fn retain_frombool>(&mut self, index: usize, predicate: P); +// } + +// impl RetainFrom for Vec { +// fn retain_frombool>(&mut self, index: usize, mut predicate: P) { +// let mut write_position = index; +// for position in index .. self.len() { +// if predicate(position, &self[position]) { +// self.swap(position, write_position); +// write_position += 1; +// } +// } +// self.truncate(write_position); +// } +// } + +// impl RetainFrom for TimelyStack { +// fn retain_frombool>(&mut self, index: usize, mut predicate: P) { +// let mut position = index; +// self.retain_from(index, |item| { +// let result = predicate(position, item); +// position += 1; +// result +// }) +// } +// } use std::convert::TryInto; @@ -310,7 +304,7 @@ impl OffsetList { } } -pub use self::containers::{BatchContainer, SliceContainer}; +pub use self::containers::{BatchContainer, SliceContainer, SliceContainer2}; /// Containers for data that resemble `Vec`, with leaner implementations. pub mod containers { @@ -318,19 +312,24 @@ pub mod containers { use timely::container::columnation::{Columnation, TimelyStack}; use std::borrow::{Borrow, ToOwned}; + use trace::MyTrait; /// A general-purpose container resembling `Vec`. - pub trait BatchContainer: Default { + pub trait BatchContainer: Default + 'static { /// The type of contained item. /// /// The container only supplies references to the item, so it needn't be sized. - type Item: ?Sized; + type PushItem; + /// The type that can be read back out of the container. + type ReadItem<'a>: Copy + MyTrait<'a, Owned = Self::PushItem> + for<'b> PartialOrd>; + /// Inserts an owned item. + fn push(&mut self, item: Self::PushItem); /// Inserts an owned item. - fn push(&mut self, item: ::Owned) where Self::Item: ToOwned; + fn copy_push(&mut self, item: &Self::PushItem); /// Inserts a borrowed item. - fn copy(&mut self, item: &Self::Item); + fn copy<'a>(&mut self, item: Self::ReadItem<'a>); /// Extends from a slice of items. - fn copy_slice(&mut self, slice: &[::Owned]) where Self::Item: ToOwned; + fn copy_slice(&mut self, slice: &[Self::PushItem]); /// Extends from a range of items in another`Self`. fn copy_range(&mut self, other: &Self, start: usize, end: usize); /// Creates a new container with sufficient capacity. @@ -341,11 +340,11 @@ pub mod containers { fn merge_capacity(cont1: &Self, cont2: &Self) -> Self; /// Reference to the element at this position. - fn index(&self, index: usize) -> &Self::Item; + fn index<'a>(&'a self, index: usize) -> Self::ReadItem<'a>; /// Number of contained elements fn len(&self) -> usize; /// Returns the last item if the container is non-empty. - fn last(&self) -> Option<&Self::Item> { + fn last<'a>(&'a self) -> Option> { if self.len() > 0 { Some(self.index(self.len()-1)) } @@ -360,7 +359,7 @@ pub mod containers { /// stays false once it becomes false, a joint property of the predicate /// and the layout of `Self. This allows `advance` to use exponential search to /// count the number of elements in time logarithmic in the result. - fn advancebool>(&self, start: usize, end: usize, function: F) -> usize { + fn advance Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize { let small_limit = 8; @@ -401,15 +400,20 @@ pub mod containers { // All `T: Clone` also implement `ToOwned`, but without the constraint Rust // struggles to understand why the owned type must be `T` (i.e. the one blanket impl). - impl> BatchContainer for Vec { - type Item = T; + impl BatchContainer for Vec { + type PushItem = T; + type ReadItem<'a> = &'a Self::PushItem; + fn push(&mut self, item: T) { self.push(item); } + fn copy_push(&mut self, item: &T) { + self.copy(item); + } fn copy(&mut self, item: &T) { self.push(item.clone()); } - fn copy_slice(&mut self, slice: &[T]) where T: Sized { + fn copy_slice(&mut self, slice: &[T]) { self.extend_from_slice(slice); } fn copy_range(&mut self, other: &Self, start: usize, end: usize) { @@ -424,7 +428,7 @@ pub mod containers { fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { Vec::with_capacity(cont1.len() + cont2.len()) } - fn index(&self, index: usize) -> &Self::Item { + fn index<'a>(&'a self, index: usize) -> Self::ReadItem<'a> { &self[index] } fn len(&self) -> usize { @@ -434,15 +438,20 @@ pub mod containers { // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now // be presented with the actual contained type, rather than a type that borrows into it. - impl> BatchContainer for TimelyStack { - type Item = T; - fn push(&mut self, item: ::Owned) where Self::Item: ToOwned { + impl + 'static> BatchContainer for TimelyStack { + type PushItem = T; + type ReadItem<'a> = &'a Self::PushItem; + + fn push(&mut self, item: Self::PushItem) { self.copy(item.borrow()); } + fn copy_push(&mut self, item: &Self::PushItem) { + self.copy(item); + } fn copy(&mut self, item: &T) { self.copy(item); } - fn copy_slice(&mut self, slice: &[::Owned]) where Self::Item: ToOwned { + fn copy_slice(&mut self, slice: &[Self::PushItem]) { self.reserve_items(slice.iter()); for item in slice.iter() { self.copy(item); @@ -465,7 +474,7 @@ pub mod containers { new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2))); new } - fn index(&self, index: usize) -> &Self::Item { + fn index<'a>(&'a self, index: usize) -> Self::ReadItem<'a> { &self[index] } fn len(&self) -> usize { @@ -486,23 +495,26 @@ pub mod containers { impl BatchContainer for SliceContainer where - B: Clone + Sized, - [B]: ToOwned>, + B: Ord + Clone + Sized + 'static, { - type Item = [B]; - fn push(&mut self, item: Vec) where Self::Item: ToOwned { + type PushItem = Vec; + type ReadItem<'a> = &'a [B]; + fn push(&mut self, item: Vec) { for x in item.into_iter() { self.inner.push(x); } self.offsets.push(self.inner.len()); } - fn copy(&mut self, item: &Self::Item) { + fn copy_push(&mut self, item: &Vec) { + self.copy(&item[..]); + } + fn copy<'a>(&mut self, item: Self::ReadItem<'a>) { for x in item.iter() { self.inner.copy(x); } self.offsets.push(self.inner.len()); } - fn copy_slice(&mut self, slice: &[Vec]) where Self::Item: ToOwned { + fn copy_slice(&mut self, slice: &[Vec]) { for item in slice { self.copy(item); } @@ -530,7 +542,7 @@ pub mod containers { inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()), } } - fn index(&self, index: usize) -> &Self::Item { + fn index<'a>(&'a self, index: usize) -> Self::ReadItem<'a> { let lower = self.offsets[index]; let upper = self.offsets[index+1]; &self.inner[lower .. upper] @@ -550,12 +562,154 @@ pub mod containers { } } - use trace::implementations::RetainFrom; - /// A container that can retain/discard from some offset onward. - impl RetainFrom<[B]> for SliceContainer { - /// Retains elements from an index onwards that satisfy a predicate. - fn retain_frombool>(&mut self, _index: usize, _predicate: P) { - unimplemented!() + /// A container that accepts slices `[B::Item]`. + pub struct SliceContainer2 { + text: String, + /// Offsets that bound each contained slice. + /// + /// The length will be one greater than the number of contained slices, + /// starting with zero and ending with `self.inner.len()`. + offsets: Vec, + /// An inner container for sequences of `B` that dereferences to a slice. + inner: Vec, + } + + /// Welcome to GATs! + pub struct Greetings<'a, B> { + /// Text that decorates the data. + pub text: Option<&'a str>, + /// The data itself. + pub slice: &'a [B], + } + + impl<'a, B> Copy for Greetings<'a, B> { } + impl<'a, B> Clone for Greetings<'a, B> { + fn clone(&self) -> Self { + Self { + text: self.text.clone(), + slice: self.slice, + } + } + } + + use std::cmp::Ordering; + impl<'a, 'b, B: Ord> PartialEq> for Greetings<'b, B> { + fn eq(&self, other: &Greetings<'a, B>) -> bool { + self.slice.eq(&other.slice[..]) + } + } + impl<'a, B: Ord> Eq for Greetings<'a, B> { } + impl<'a, 'b, B: Ord> PartialOrd> for Greetings<'b, B> { + fn partial_cmp(&self, other: &Greetings<'a, B>) -> Option { + self.slice.partial_cmp(&other.slice[..]) + } + } + impl<'a, B: Ord> Ord for Greetings<'a, B> { + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap() + } + } + + impl<'a, B: Ord + Clone> MyTrait<'a> for Greetings<'a, B> { + type Owned = Vec; + fn into_owned(self) -> Self::Owned { self.slice.to_vec() } + fn clone_onto(&self, other: &mut Self::Owned) { + self.slice.clone_into(other); + } + fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { + self.slice.cmp(&other[..]) + } + fn borrow_as(other: &'a Self::Owned) -> Self { + Self { + text: None, + slice: &other[..], + } + } + } + + + + impl BatchContainer for SliceContainer2 + where + B: Ord + Clone + Sized + 'static, + { + type PushItem = Vec; + type ReadItem<'a> = Greetings<'a, B>; + fn push(&mut self, item: Vec) { + for x in item.into_iter() { + self.inner.push(x); + } + self.offsets.push(self.inner.len()); + } + fn copy_push(&mut self, item: &Vec) { + self.copy(<_ as MyTrait>::borrow_as(item)); + } + fn copy<'a>(&mut self, item: Self::ReadItem<'a>) { + for x in item.slice.iter() { + self.inner.copy(x); + } + self.offsets.push(self.inner.len()); + } + fn copy_slice(&mut self, slice: &[Vec]) { + for item in slice { + self.copy_push(item); + } + } + fn copy_range(&mut self, other: &Self, start: usize, end: usize) { + for index in start .. end { + self.copy(other.index(index)); + } + } + fn with_capacity(size: usize) -> Self { + let mut offsets = Vec::with_capacity(size + 1); + offsets.push(0); + Self { + text: format!("Hello!"), + offsets, + inner: Vec::with_capacity(size), + } + } + fn reserve(&mut self, _additional: usize) { + } + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1); + offsets.push(0); + Self { + text: format!("Hello!"), + offsets, + inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()), + } + } + fn index<'a>(&'a self, index: usize) -> Self::ReadItem<'a> { + let lower = self.offsets[index]; + let upper = self.offsets[index+1]; + Greetings { + text: Some(&self.text), + slice: &self.inner[lower .. upper], + } + } + fn len(&self) -> usize { + self.offsets.len() - 1 + } + } + + /// Default implementation introduces a first offset. + impl Default for SliceContainer2 { + fn default() -> Self { + Self { + text: format!("Hello!"), + offsets: vec![0], + inner: Default::default(), + } } } + + // use trace::implementations::RetainFrom; + // /// A container that can retain/discard from some offset onward. + // impl RetainFrom<[B]> for SliceContainer { + // /// Retains elements from an index onwards that satisfy a predicate. + // fn retain_frombool>(&mut self, _index: usize, _predicate: P) { + // unimplemented!() + // } + // } } diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index a6b90000c..cd4be6851 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -97,11 +97,11 @@ where // Type aliases to make certain types readable. type TDLayer = OrderedLeaf<<::Target as Update>::Time, <::Target as Update>::Diff>; -type VTDLayer = OrderedLayer<<::Target as Update>::Val, TDLayer, ::ValContainer>; +type VTDLayer = OrderedLayer<<::Target as Update>::ValOwned, TDLayer, ::ValContainer>; type KTDLayer = OrderedLayer<<::Target as Update>::Key, TDLayer, ::KeyContainer>; type KVTDLayer = OrderedLayer<<::Target as Update>::Key, VTDLayer, ::KeyContainer>; type TDBuilder = OrderedLeafBuilder<<::Target as Update>::Time, <::Target as Update>::Diff>; -type VTDBuilder = OrderedBuilder<<::Target as Update>::Val, TDBuilder, ::ValContainer>; +type VTDBuilder = OrderedBuilder<<::Target as Update>::ValOwned, TDBuilder, ::ValContainer>; type KTDBuilder = OrderedBuilder<<::Target as Update>::Key, TDBuilder, ::KeyContainer>; type KVTDBuilder = OrderedBuilder<<::Target as Update>::Key, VTDBuilder, ::KeyContainer>; @@ -111,7 +111,7 @@ where ::Val: Sized + Clone, { type Key = ::Key; - type Val = ::Val; + type Val<'a> = ::Val<'a>; type Time = ::Time; type Diff = ::Diff; @@ -342,7 +342,7 @@ where ::Val: Sized + Clone, { type Key = ::Key; - type Val<'a> = &'a ::Val; + type Val<'a> = ::Val<'a>; type ValOwned = ::ValOwned; type Time = ::Time; type Diff = ::Diff; @@ -382,9 +382,9 @@ impl Builder for OrdValBuilder where ::Key: Sized + Clone, ::Val: Sized + Clone, - OrdValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff> + // OrdValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, { - type Item = ((::Key, ::Val), ::Time, ::Diff); + type Item = ((::Key, ::ValOwned), ::Time, ::Diff); type Time = ::Time; type Output = OrdValBatch; @@ -450,7 +450,7 @@ where impl Batch for OrdKeyBatch where ::Key: Sized + Clone, - L::Target: Update, + L::Target: Update, { type Merger = OrdKeyMerger; @@ -679,7 +679,7 @@ where impl Builder for OrdKeyBuilder where ::Key: Sized + Clone, - OrdKeyBatch: Batch::Key, Val=(), Time=::Time, Diff=::Diff> + // OrdKeyBatch: Batch::Key, Val=(), Time=::Time, Diff=::Diff>, { type Item = ((::Key, ()), ::Time, ::Diff); type Time = ::Time; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 6e9875e7d..f2fe20088 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -65,14 +65,14 @@ pub type PreferredSpine = Spine< mod val_batch { - use std::borrow::Borrow; use std::convert::TryInto; use std::marker::PhantomData; use timely::progress::{Antichain, frontier::AntichainRef}; use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use trace::implementations::{BatchContainer, OffsetList}; - + use trace::cursor::MyTrait; + use super::{Layout, Update}; /// An immutable collection of update tuples, from a contiguous interval of logical times. @@ -137,8 +137,10 @@ mod val_batch { } impl BatchReader for OrdValBatch { - type Key = ::Key; - type Val = ::Val; + type Key<'a> = ::ReadItem<'a>; + type KeyOwned = ::Key; + type Val<'a> = ::ReadItem<'a>; + type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; @@ -293,8 +295,8 @@ mod val_batch { /// if the updates cancel either directly or after compaction. fn merge_key(&mut self, source1: &OrdValStorage, source2: &OrdValStorage) { use ::std::cmp::Ordering; - match source1.keys.index(self.key_cursor1).cmp(source2.keys.index(self.key_cursor2)) { - Ordering::Less => { + match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) { + Ordering::Less => { self.copy_key(source1, self.key_cursor1); self.key_cursor1 += 1; }, @@ -332,7 +334,7 @@ mod val_batch { // if they are non-empty post-consolidation, we write the value. // We could multi-way merge and it wouldn't be very complicated. use ::std::cmp::Ordering; - match source1.vals.index(lower1).cmp(source2.vals.index(lower2)) { + match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) { Ordering::Less => { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); @@ -394,7 +396,7 @@ mod val_batch { let (lower, upper) = source.updates_for_value(index); for i in lower .. upper { // NB: Here is where we would need to look back if `lower == upper`. - let (time, diff) = &source.updates.index(i); + let (time, diff) = &source.updates.index(i).into_owned(); use lattice::Lattice; let mut new_time = time.clone(); new_time.advance_by(self.description.since().borrow()); @@ -409,8 +411,8 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.update_stash.last() == self.result.updates.last() { - // Just clear out update_stash, as we won't drain it here. + if self.update_stash.len() == 1 && self.result.updates.last().map(|ud| self.update_stash.last().unwrap().equals(ud)).unwrap_or(false) { + // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; } @@ -438,15 +440,17 @@ mod val_batch { } impl Cursor for OrdValCursor { - type Key = ::Key; - type Val<'a> = &'a ::Val; - type ValOwned = ::ValOwned; + + type Key<'a> = ::ReadItem<'a>; + type KeyOwned = ::Key; + type Val<'a> = ::ReadItem<'a>; + type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; type Storage = OrdValBatch; - fn key<'a>(&self, storage: &'a OrdValBatch) -> &'a Self::Key { storage.storage.keys.index(self.key_cursor) } + fn key<'a>(&self, storage: &'a OrdValBatch) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, storage: &'a OrdValBatch) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) } fn map_times(&mut self, storage: &OrdValBatch, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); @@ -466,8 +470,8 @@ mod val_batch { self.key_cursor = storage.storage.keys.len(); } } - fn seek_key(&mut self, storage: &OrdValBatch, key: &Self::Key) { - self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key)); + fn seek_key<'a>(&mut self, storage: &OrdValBatch, key: Self::Key<'a>) { + self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(&key)); if self.key_valid(storage) { self.rewind_vals(storage); } @@ -479,7 +483,7 @@ mod val_batch { } } fn seek_val<'a>(&mut self, storage: &OrdValBatch, val: Self::Val<'a>) { - self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(val)); + self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(&val)); } fn rewind_keys(&mut self, storage: &OrdValBatch) { self.key_cursor = 0; @@ -531,12 +535,9 @@ mod val_batch { } } - impl Builder for OrdValBuilder - where - OrdValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, - ::KeyOwned: Borrow<::Key>, - { - type Item = ((::KeyOwned, ::ValOwned), ::Time, ::Diff); + impl Builder for OrdValBuilder { + + type Item = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = OrdValBatch; @@ -559,9 +560,9 @@ mod val_batch { fn push(&mut self, ((key, val), time, diff): Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key.borrow()) { + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(val.borrow()) { + if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { self.push_update(time, diff); } else { // New value; complete representation of prior value. @@ -585,9 +586,9 @@ mod val_batch { fn copy(&mut self, ((key, val), time, diff): &Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key.borrow()) { + if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(val.borrow()) { + if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { // TODO: here we could look for repetition, and not push the update in that case. // More logic (and state) would be required to correctly wrangle this. self.push_update(time.clone(), diff.clone()); @@ -597,7 +598,7 @@ mod val_batch { // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val.borrow()); + self.result.vals.copy_push(val); } } else { // New key; complete representation of prior key. @@ -606,8 +607,8 @@ mod val_batch { if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val.borrow()); - self.result.keys.copy(key.borrow()); + self.result.vals.copy_push(val); + self.result.keys.copy_push(key); } } @@ -630,13 +631,13 @@ mod val_batch { mod key_batch { - use std::borrow::Borrow; use std::convert::TryInto; use std::marker::PhantomData; use timely::progress::{Antichain, frontier::AntichainRef}; use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use trace::implementations::{BatchContainer, OffsetList}; + use trace::cursor::MyTrait; use super::{Layout, Update}; @@ -692,8 +693,11 @@ mod key_batch { } impl BatchReader for OrdKeyBatch { - type Key = ::Key; - type Val = (); + + type Key<'a> = ::ReadItem<'a>; + type KeyOwned = ::Key; + type Val<'a> = &'a (); + type ValOwned = (); type Time = ::Time; type Diff = ::Diff; @@ -833,7 +837,7 @@ mod key_batch { /// if the updates cancel either directly or after compaction. fn merge_key(&mut self, source1: &OrdKeyStorage, source2: &OrdKeyStorage) { use ::std::cmp::Ordering; - match source1.keys.index(self.key_cursor1).cmp(source2.keys.index(self.key_cursor2)) { + match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) { Ordering::Less => { self.copy_key(source1, self.key_cursor1); self.key_cursor1 += 1; @@ -906,14 +910,17 @@ mod key_batch { } impl Cursor for OrdKeyCursor { - type Key = ::Key; - type Val = (); + + type Key<'a> = ::ReadItem<'a>; + type KeyOwned = ::Key; + type Val<'a> = &'a (); + type ValOwned = (); type Time = ::Time; type Diff = ::Diff; type Storage = OrdKeyBatch; - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { storage.storage.keys.index(self.key_cursor) } + fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_key(self.key_cursor); @@ -933,8 +940,8 @@ mod key_batch { self.key_cursor = storage.storage.keys.len(); } } - fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { - self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key)); + fn seek_key<'a>(&mut self, storage: &Self::Storage, key: Self::Key<'a>) { + self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(&key)); if self.key_valid(storage) { self.rewind_vals(storage); } @@ -942,7 +949,7 @@ mod key_batch { fn step_val(&mut self, _storage: &Self::Storage) { self.val_stepped = true; } - fn seek_val(&mut self, _storage: &Self::Storage, _val: &Self::Val) { } + fn seek_val<'a>(&mut self, _storage: &Self::Storage, _val: Self::Val<'a>) { } fn rewind_keys(&mut self, storage: &Self::Storage) { self.key_cursor = 0; if self.key_valid(storage) { @@ -993,12 +1000,9 @@ mod key_batch { } } - impl Builder for OrdKeyBuilder - where - OrdKeyBatch: Batch::Key, Val=(), Time=::Time, Diff=::Diff>, - ::KeyOwned: Borrow<::Key>, - { - type Item = ((::KeyOwned, ()), ::Time, ::Diff); + impl Builder for OrdKeyBuilder { + + type Item = ((::Key, ()), ::Time, ::Diff); type Time = ::Time; type Output = OrdKeyBatch; @@ -1019,7 +1023,7 @@ mod key_batch { fn push(&mut self, ((key, ()), time, diff): Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key.borrow()) { + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { self.push_update(time, diff); } else { // New key; complete representation of prior key. @@ -1035,7 +1039,7 @@ mod key_batch { fn copy(&mut self, ((key, ()), time, diff): &Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key.borrow()) { + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { self.push_update(time.clone(), diff.clone()); } else { // New key; complete representation of prior key. @@ -1043,7 +1047,7 @@ mod key_batch { // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time.clone(), diff.clone()); - self.result.keys.copy(key.borrow()); + self.result.keys.copy_push(key); } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index d21ed94b2..e3d44e164 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -38,6 +38,8 @@ pub type ColSpine = Spine< /// A carrier trait indicating that the type's `Ord` and `PartialOrd` implementations are by `Hashable::hashed()`. pub trait HashOrdered: Hashable { } +impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { } + /// A hash-ordered wrapper that modifies `Ord` and `PartialOrd`. #[derive(Copy, Clone, Eq, PartialEq, Debug, Abomonation, Default)] pub struct HashWrapper { @@ -76,9 +78,12 @@ mod val_batch { use std::marker::PhantomData; use timely::progress::{Antichain, frontier::AntichainRef}; + use hashable::Hashable; + use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use trace::implementations::{BatchContainer, OffsetList}; - + use trace::cursor::MyTrait; + use super::{Layout, Update, HashOrdered}; /// Update tuples organized as a Robin Hood Hash map, ordered by `(hash(Key), Key, Val, Time)`. @@ -95,11 +100,10 @@ mod val_batch { /// We will use the `Hashable` trait here, but any consistent hash function should work out ok. /// We specifically want to use the highest bits of the result (we will) because the low bits have /// likely been spent shuffling the data between workers (by key), and are likely low entropy. - #[derive(Abomonation, Debug)] + #[derive(Abomonation)] pub struct RhhValStorage where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// The requested capacity for `keys`. We use this when determining where a key with a certain hash @@ -133,8 +137,7 @@ mod val_batch { impl RhhValStorage where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { @@ -179,7 +182,7 @@ mod val_batch { // Now we insert the key. Even if it is no longer the desired location because of contention. // If an offset has been supplied we insert it, and otherwise leave it for future determination. - self.keys.copy(key); + self.keys.copy_push(key); if let Some(offset) = offset { self.keys_offs.push(offset); } @@ -187,16 +190,15 @@ mod val_batch { } /// Indicates both the desired location and the hash signature of the key. - fn desired_location(&self, key: &::Key) -> usize { - use hashable::Hashable; + fn desired_location(&self, key: &K) -> usize { let hash: usize = key.hashed().into().try_into().unwrap(); hash / self.divisor } /// Returns true if one should advance one's index in the search for `key`. - fn advance_key(&self, index: usize, key: &::Key) -> bool { + fn advance_key<'a>(&self, index: usize, key: ::ReadItem<'a>) -> bool { // Ideally this short-circuits, as `self.keys[index]` is bogus data. - !self.live_key(index) || self.keys.index(index).lt(key) + !self.live_key(index) || self.keys.index(index).lt(&key) } /// Indicates that a key is valid, rather than dead space, by looking for a valid offset range. @@ -229,8 +231,7 @@ mod val_batch { #[derive(Abomonation)] pub struct RhhValBatch where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// The updates themselves. pub storage: RhhValStorage, @@ -246,11 +247,13 @@ mod val_batch { impl BatchReader for RhhValBatch where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, + for<'a> ::ReadItem<'a>: HashOrdered, { - type Key = ::Key; - type Val = ::Val; + type Key<'a> = ::ReadItem<'a>; + type KeyOwned = ::Key; + type Val<'a> = ::ReadItem<'a>; + type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; @@ -274,8 +277,8 @@ mod val_batch { impl Batch for RhhValBatch where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, + for<'a> ::ReadItem<'a>: HashOrdered, { type Merger = RhhValMerger; @@ -287,8 +290,7 @@ mod val_batch { /// State for an in-progress merge. pub struct RhhValMerger where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// Key position to merge next in the first batch. key_cursor1: usize, @@ -299,6 +301,8 @@ mod val_batch { /// description description: Description<::Time>, + /// Owned key for copying into. + key_owned: <::Key as ToOwned>::Owned, /// Local stash of updates, to use for consolidation. /// /// We could emulate a `ChangeBatch` here, with related compaction smarts. @@ -310,8 +314,7 @@ mod val_batch { impl Merger> for RhhValMerger where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, RhhValBatch: Batch::Time>, { fn new(batch1: &RhhValBatch, batch2: &RhhValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -350,6 +353,7 @@ mod val_batch { key_cursor2: 0, result: storage, description, + key_owned: Default::default(), update_stash: Vec::new(), singletons: 0, } @@ -401,10 +405,8 @@ mod val_batch { // Helper methods in support of merging batches. impl RhhValMerger where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { - /// Copy the next key in `source`. /// /// The method extracts the key in `source` at `cursor`, and merges it in to `self`. @@ -427,7 +429,8 @@ mod val_batch { // If we have pushed any values, copy the key as well. if self.result.vals.len() > init_vals { - self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len().try_into().ok().unwrap())); + source.keys.index(cursor).clone_onto(&mut self.key_owned); + self.result.insert_key(&self.key_owned, Some(self.result.vals.len().try_into().ok().unwrap())); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -437,7 +440,7 @@ mod val_batch { fn merge_key(&mut self, source1: &RhhValStorage, source2: &RhhValStorage) { use ::std::cmp::Ordering; - match source1.keys.index(self.key_cursor1).cmp(source2.keys.index(self.key_cursor2)) { + match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) { Ordering::Less => { self.copy_key(source1, self.key_cursor1); self.key_cursor1 += 1; @@ -447,7 +450,8 @@ mod val_batch { let (lower1, upper1) = source1.values_for_key(self.key_cursor1); let (lower2, upper2) = source2.values_for_key(self.key_cursor2); if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { - self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off)); + source1.keys.index(self.key_cursor1).clone_onto(&mut self.key_owned); + self.result.insert_key(&self.key_owned, Some(off)); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -475,7 +479,7 @@ mod val_batch { // if they are non-empty post-consolidation, we write the value. // We could multi-way merge and it wouldn't be very complicated. use ::std::cmp::Ordering; - match source1.vals.index(lower1).cmp(source2.vals.index(lower2)) { + match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) { Ordering::Less => { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); @@ -538,8 +542,8 @@ mod val_batch { for i in lower .. upper { // NB: Here is where we would need to look back if `lower == upper`. let (time, diff) = &source.updates.index(i); - use lattice::Lattice; let mut new_time = time.clone(); + use lattice::Lattice; new_time.advance_by(self.description.since().borrow()); self.update_stash.push((new_time, diff.clone())); } @@ -552,7 +556,7 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.update_stash.last() == self.result.updates.last() { + if self.update_stash.len() == 1 && self.result.updates.last().map(|l| l.equals(self.update_stash.last().unwrap())).unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; @@ -580,8 +584,7 @@ mod val_batch { /// the cursor, rather than internal state. pub struct RhhValCursor where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// Absolute position of the current key. key_cursor: usize, @@ -593,18 +596,19 @@ mod val_batch { impl Cursor for RhhValCursor where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, + for<'a> ::ReadItem<'a>: HashOrdered, { - type Key = ::Key; - type Val<'a> = &'a ::Val; - type ValOwned = ::ValOwned; + type Key<'a> = ::ReadItem<'a>; + type KeyOwned = ::Key; + type Val<'a> = ::ReadItem<'a>; + type ValOwned = ::Val; type Time = ::Time; type Diff = ::Diff; type Storage = RhhValBatch; - fn key<'a>(&self, storage: &'a RhhValBatch) -> &'a Self::Key { + fn key<'a>(&self, storage: &'a RhhValBatch) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, storage: &'a RhhValBatch) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) } @@ -629,9 +633,9 @@ mod val_batch { self.key_cursor = storage.storage.keys.len(); } } - fn seek_key(&mut self, storage: &RhhValBatch, key: &Self::Key) { + fn seek_key<'a>(&mut self, storage: &RhhValBatch, key: Self::Key<'a>) { // self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key)); - let desired = storage.storage.desired_location(key); + let desired = storage.storage.desired_location(&key); // Advance the cursor, if `desired` is ahead of it. if self.key_cursor < desired { self.key_cursor = desired; @@ -656,7 +660,7 @@ mod val_batch { } } fn seek_val<'a>(&mut self, storage: &RhhValBatch, val: Self::Val<'a>) { - self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(val)); + self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| x.lt(&val)); } fn rewind_keys(&mut self, storage: &RhhValBatch) { self.key_cursor = 0; @@ -674,8 +678,7 @@ mod val_batch { /// A builder for creating layers from unsorted update tuples. pub struct RhhValBuilder where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { result: RhhValStorage, singleton: Option<(::Time, ::Diff)>, @@ -688,8 +691,7 @@ mod val_batch { impl RhhValBuilder where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, + ::Key: Default + HashOrdered, { /// Pushes a single update, which may set `self.singleton` rather than push. /// @@ -720,12 +722,10 @@ mod val_batch { impl Builder for RhhValBuilder where - ::Key: HashOrdered, - ::KeyOwned: Default + HashOrdered, - ::KeyOwned: Borrow<::Key>, - RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff> + ::Key: Default + HashOrdered, + // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, { - type Item = ((::KeyOwned, ::ValOwned), ::Time, ::Diff); + type Item = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = RhhValBatch; @@ -760,9 +760,9 @@ mod val_batch { fn push(&mut self, ((key, val), time, diff): Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key.borrow()) { + if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(val.borrow()) { + if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { self.push_update(time, diff); } else { // New value; complete representation of prior value. @@ -787,9 +787,9 @@ mod val_batch { fn copy(&mut self, ((key, val), time, diff): &Self::Item) { // Perhaps this is a continuation of an already received key. - if self.result.keys.last() == Some(key.borrow()) { + if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { // Perhaps this is a continuation of an already received value. - if self.result.vals.last() == Some(val.borrow()) { + if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { // TODO: here we could look for repetition, and not push the update in that case. // More logic (and state) would be required to correctly wrangle this. self.push_update(time.clone(), diff.clone()); @@ -799,7 +799,7 @@ mod val_batch { // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val.borrow()); + self.result.vals.copy_push(val); } } else { // New key; complete representation of prior key. @@ -808,9 +808,9 @@ mod val_batch { if self.singleton.take().is_some() { self.singletons += 1; } self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); self.push_update(time.clone(), diff.clone()); - self.result.vals.copy(val.borrow()); + self.result.vals.copy_push(val); // Insert the key, but with no specified offset. - self.result.insert_key(key.borrow(), None); + self.result.insert_key(key, None); } } diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index adab0d121..9988f9ada 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -112,13 +112,13 @@ where impl TraceReader for Spine where B: Batch+Clone+'static, - B::Key: Ord, // Clone is required by `batch::advance_*` (in-place could remove). - B::Val: Ord, // Clone is required by `batch::advance_*` (in-place could remove). B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, B::Diff: Semigroup, { - type Key = B::Key; - type Val = B::Val; + type Key<'a> = B::Key<'a>; + type KeyOwned = B::KeyOwned; + type Val<'a> = B::Val<'a>; + type ValOwned = B::ValOwned; type Time = B::Time; type Diff = B::Diff; @@ -260,8 +260,6 @@ where impl Trace for Spine where B: Batch+Clone+'static, - B::Key: Ord, - B::Val: Ord, B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, B::Diff: Semigroup, BA: Batcher