diff --git a/examples/cursors.rs b/examples/cursors.rs
index 952f54bd8..b1db68699 100644
--- a/examples/cursors.rs
+++ b/examples/cursors.rs
@@ -135,8 +135,8 @@ fn main() {
fn dump_cursor
(round: u32, index: usize, trace: &mut Tr)
where
Tr: TraceReader,
- Tr::Key: Debug + Clone,
- Tr::Val: Debug + Clone,
+ Tr::KeyOwned: Debug + Clone,
+ Tr::ValOwned: Debug + Clone,
Tr::Time: Debug + Clone,
Tr::Diff: Debug + Clone,
::ValOwned: Debug,
diff --git a/examples/spines.rs b/examples/spines.rs
index d148d610f..e359cd696 100644
--- a/examples/spines.rs
+++ b/examples/spines.rs
@@ -24,33 +24,33 @@ fn main() {
let mut probe = Handle::new();
let (mut data_input, mut keys_input) = worker.dataflow(|scope| {
- use differential_dataflow::operators::{arrange::Arrange, JoinCore};
+ use differential_dataflow::operators::{arrange::Arrange, JoinCore, join::join_traces};
let (data_input, data) = scope.new_collection::();
let (keys_input, keys) = scope.new_collection::();
match mode.as_str() {
- "new" => {
- use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;
- let data = data.arrange::>();
- let keys = keys.arrange::>();
- keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
- .probe_with(&mut probe);
- },
- "old" => {
- use differential_dataflow::trace::implementations::ord_neu::OrdKeySpine;
- let data = data.arrange::>();
- let keys = keys.arrange::>();
- keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
- .probe_with(&mut probe);
- },
- "rhh" => {
- use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
- let data = data.map(|x| HashWrapper { inner: x }).arrange::>();
- let keys = keys.map(|x| HashWrapper { inner: x }).arrange::>();
- keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
- .probe_with(&mut probe);
- },
+ // "new" => {
+ // use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;
+ // let data = data.arrange::>();
+ // let keys = keys.arrange::>();
+ // keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
+ // .probe_with(&mut probe);
+ // },
+ // "old" => {
+ // use differential_dataflow::trace::implementations::ord_neu::OrdKeySpine;
+ // let data = data.arrange::>();
+ // let keys = keys.arrange::>();
+ // keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
+ // .probe_with(&mut probe);
+ // },
+ // "rhh" => {
+ // use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine};
+ // let data = data.map(|x| HashWrapper { inner: x }).arrange::>();
+ // let keys = keys.map(|x| HashWrapper { inner: x }).arrange::>();
+ // keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
+ // .probe_with(&mut probe);
+ // },
"slc" => {
use differential_dataflow::trace::implementations::ord_neu::PreferredSpine;
@@ -58,14 +58,17 @@ fn main() {
let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
- .arrange::>()
- .reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));
+ .arrange::>();
+ // .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
- .arrange::>()
- .reduce_abelian::<_, PreferredSpine<_,_,_,_>>("distinct", |_,_,output| output.push(((), 1)));
+ .arrange::>();
+ // .reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
- keys.join_core(&data, |_k,&(),&()| Option::<()>::None)
+ join_traces(&keys, &data, |k,v1,v2,t,r1,r2| {
+ println!("{:?}", k.text);
+ Option::<((),isize,isize)>::None
+ })
.probe_with(&mut probe);
},
_ => {
diff --git a/src/algorithms/graphs/bfs.rs b/src/algorithms/graphs/bfs.rs
index 8f42cb47d..756b1e631 100644
--- a/src/algorithms/graphs/bfs.rs
+++ b/src/algorithms/graphs/bfs.rs
@@ -29,7 +29,7 @@ where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
- Tr: TraceReader+Clone+'static,
+ Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
diff --git a/src/algorithms/graphs/bijkstra.rs b/src/algorithms/graphs/bijkstra.rs
index f8a4662be..8a416318d 100644
--- a/src/algorithms/graphs/bijkstra.rs
+++ b/src/algorithms/graphs/bijkstra.rs
@@ -45,7 +45,7 @@ where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
- Tr: TraceReader+Clone+'static,
+ Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=isize>+Clone+'static,
{
forward
.stream
diff --git a/src/algorithms/graphs/propagate.rs b/src/algorithms/graphs/propagate.rs
index 32947d4c7..42f9b58b8 100644
--- a/src/algorithms/graphs/propagate.rs
+++ b/src/algorithms/graphs/propagate.rs
@@ -5,7 +5,6 @@ use std::hash::Hash;
use timely::dataflow::*;
use ::{Collection, ExchangeData};
-use ::operators::*;
use ::lattice::Lattice;
use ::difference::{Abelian, Multiply};
use ::operators::arrange::arrangement::ArrangeByKey;
@@ -64,7 +63,7 @@ where
R: Multiply,
R: From,
L: ExchangeData,
- Tr: TraceReader+Clone+'static,
+ Tr: for<'a> TraceReader=&'a N, Val<'a>=&'a N, Time=G::Timestamp, Diff=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
@@ -90,6 +89,8 @@ where
use timely::order::Product;
+ use operators::join::JoinCore;
+
let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));
diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs
index 03ea5af1b..088693c87 100644
--- a/src/operators/arrange/agent.rs
+++ b/src/operators/arrange/agent.rs
@@ -47,8 +47,10 @@ where
Tr: TraceReader,
Tr::Time: Lattice+Ord+Clone+'static,
{
- type Key = Tr::Key;
- type Val = Tr::Val;
+ type Key<'a> = Tr::Key<'a>;
+ type KeyOwned = Tr::KeyOwned;
+ type Val<'a> = Tr::Val<'a>;
+ type ValOwned = Tr::ValOwned;
type Time = Tr::Time;
type Diff = Tr::Diff;
diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs
index 42bf28f87..6e246bc4e 100644
--- a/src/operators/arrange/arrangement.rs
+++ b/src/operators/arrange/arrangement.rs
@@ -37,6 +37,8 @@ use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
use trace::wrappers::filter::{TraceFilter, BatchFilter};
+use trace::cursor::MyTrait;
+
use super::TraceAgent;
/// An arranged collection of `(K,V)` values.
@@ -89,8 +91,6 @@ where
pub fn enter<'a, TInner>(&self, child: &Child<'a, G, TInner>)
-> Arranged, TraceEnter>
where
- Tr::Key: 'static,
- Tr::Val: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
TInner: Refines+Lattice+Timestamp+Clone+'static,
@@ -108,8 +108,6 @@ where
pub fn enter_region<'a>(&self, child: &Child<'a, G, G::Timestamp>)
-> Arranged, Tr>
where
- Tr::Key: 'static,
- Tr::Val: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
{
@@ -127,12 +125,10 @@ where
pub fn enter_at<'a, TInner, F, P>(&self, child: &Child<'a, G, TInner>, logic: F, prior: P)
-> Arranged, TraceEnterAt>
where
- Tr::Key: 'static,
- Tr::Val: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
TInner: Refines+Lattice+Timestamp+Clone+'static,
- F: FnMut(&Tr::Key, &Tr::Val, &G::Timestamp)->TInner+Clone+'static,
+ F: for <'b> FnMut(Tr::Key<'b>, Tr::Val<'b>, &G::Timestamp)->TInner+Clone+'static,
P: FnMut(&TInner)->Tr::Time+Clone+'static,
{
let logic1 = logic.clone();
@@ -177,11 +173,9 @@ where
pub fn filter(&self, logic: F)
-> Arranged>
where
- Tr::Key: 'static,
- Tr::Val: 'static,
Tr::Diff: 'static,
G::Timestamp: Clone+'static,
- F: FnMut(&Tr::Key, &Tr::Val)->bool+Clone+'static,
+ F: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>)->bool+Clone+'static,
{
let logic1 = logic.clone();
let logic2 = logic.clone();
@@ -198,7 +192,7 @@ where
pub fn as_collection(&self, mut logic: L) -> Collection
where
Tr::Diff: Semigroup,
- L: FnMut(&Tr::Key, &Tr::Val) -> D+'static,
+ L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> D+'static,
{
self.flat_map_ref(move |key, val| Some(logic(key,val)))
}
@@ -212,7 +206,7 @@ where
Tr::Diff: Semigroup,
I: IntoIterator,
I::Item: Data,
- L: FnMut(&Tr::Key, &Tr::Val) -> I+'static,
+ L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> I+'static,
{
Self::flat_map_batches(&self.stream, logic)
}
@@ -229,7 +223,7 @@ where
Tr::Diff: Semigroup,
I: IntoIterator,
I::Item: Data,
- L: FnMut(&Tr::Key, &Tr::Val) -> I+'static,
+ L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>) -> I+'static,
{
stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
input.for_each(|time, data| {
@@ -258,16 +252,16 @@ where
///
/// This method consumes a stream of (key, time) queries and reports the corresponding stream of
/// (key, value, time, diff) accumulations in the `self` trace.
- pub fn lookup(&self, queries: &Stream) -> Stream
+ pub fn lookup(&self, queries: &Stream) -> Stream
where
G::Timestamp: Data+Lattice+Ord+TotalOrder,
- Tr::Key: ExchangeData+Hashable,
- Tr::Val: ExchangeData,
+ Tr::KeyOwned: ExchangeData+Hashable,
+ Tr::ValOwned: ExchangeData,
Tr::Diff: ExchangeData+Semigroup,
Tr: 'static,
{
// while the arrangement is already correctly distributed, the query stream may not be.
- let exchange = Exchange::new(move |update: &(Tr::Key,G::Timestamp)| update.0.hashed().into());
+ let exchange = Exchange::new(move |update: &(Tr::KeyOwned,G::Timestamp)| update.0.hashed().into());
queries.binary_frontier(&self.stream, exchange, Pipeline, "TraceQuery", move |_capability, _info| {
let mut trace = Some(self.trace.clone());
@@ -280,8 +274,8 @@ where
let mut active = Vec::new();
let mut retain = Vec::new();
- let mut working: Vec<(G::Timestamp, Tr::Val, Tr::Diff)> = Vec::new();
- let mut working2: Vec<(Tr::Val, Tr::Diff)> = Vec::new();
+ let mut working: Vec<(G::Timestamp, Tr::ValOwned, Tr::Diff)> = Vec::new();
+ let mut working2: Vec<(Tr::ValOwned, Tr::Diff)> = Vec::new();
move |input1, input2, output| {
@@ -346,13 +340,13 @@ where
same_key += 1;
}
- cursor.seek_key(&storage, key);
- if cursor.get_key(&storage) == Some(key) {
+ cursor.seek_key_owned(&storage, key);
+ if cursor.get_key(&storage).map(|k| k.equals(key)).unwrap_or(false) {
let mut active = &active[active_finger .. same_key];
while let Some(val) = cursor.get_val(&storage) {
- cursor.map_times(&storage, |t,d| working.push((t.clone(), val.clone(), d.clone())));
+ cursor.map_times(&storage, |t,d| working.push((t.clone(), val.into_owned(), d.clone())));
cursor.step_val(&storage);
}
diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs
index b5b4618bc..d5a5cbcb0 100644
--- a/src/operators/arrange/upsert.rs
+++ b/src/operators/arrange/upsert.rs
@@ -136,17 +136,17 @@ use super::TraceAgent;
/// understand what a "sequence" of upserts would mean for partially ordered
/// timestamps.
pub fn arrange_from_upsert(
- stream: &Stream, G::Timestamp)>,
+ stream: &Stream, G::Timestamp)>,
name: &str,
) -> Arranged>
where
G: Scope,
G::Timestamp: Lattice+Ord+TotalOrder+ExchangeData,
- Tr::Key: ExchangeData+Hashable+std::hash::Hash,
- Tr::Val: ExchangeData,
+ Tr::KeyOwned: ExchangeData+Hashable+std::hash::Hash,
+ Tr::ValOwned: ExchangeData,
Tr: Trace+TraceReader