Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trait reorganization #424

Merged
5 changes: 3 additions & 2 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use timely::progress::{Antichain, frontier::AntichainRef};
use timely::dataflow::operators::CapabilitySet;

use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, BatchReader, Cursor};
use trace::{Trace, TraceReader, Batch, BatchReader};

use trace::wrappers::rc::TraceBox;

Expand Down Expand Up @@ -53,6 +53,7 @@ where
type R = Tr::R;

type Batch = Tr::Batch;
type Storage = Tr::Storage;
type Cursor = Tr::Cursor;

fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
Expand All @@ -77,7 +78,7 @@ where
fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
self.physical_compaction.borrow()
}
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, <Tr::Cursor as Cursor>::Storage)> {
fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
self.trace.borrow_mut().trace.cursor_through(frontier)
}
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
Expand Down
36 changes: 25 additions & 11 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use timely::dataflow::operators::Capability;
use ::{Data, ExchangeData, Collection, AsCollection, Hashable};
use ::difference::Semigroup;
use lattice::Lattice;
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor};
use trace::implementations::{KeySpine, ValSpine};

use trace::wrappers::enter::{TraceEnter, BatchEnter,};
Expand Down Expand Up @@ -454,8 +454,10 @@ where
K: ExchangeData+Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.arrange_named("Arrange")
}
Expand All @@ -470,8 +472,10 @@ where
K: ExchangeData+Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
Expand All @@ -485,8 +489,10 @@ where
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
;
}

Expand All @@ -503,7 +509,9 @@ where
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Key=K, Val=V, Time=G::Timestamp, R=R> + 'static, Tr::Batch: Batch
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.arrange_named("Arrange")
}
Expand All @@ -513,7 +521,9 @@ where
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Key=K, Val=V, Time=G::Timestamp, R=R> + 'static, Tr::Batch: Batch
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
Expand All @@ -522,8 +532,10 @@ where
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
// The `Arrange` operator is tasked with reacting to an advancing input
// frontier by producing the sequence of batches whose lower and upper
Expand Down Expand Up @@ -557,7 +569,7 @@ where
};

// Where we will deposit received updates, and from which we extract batches.
let mut batcher = <Tr::Batch as Batch>::Batcher::new();
let mut batcher = Tr::Batcher::new();

// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
Expand Down Expand Up @@ -633,7 +645,7 @@ where
}

// Extract updates not in advance of `upper`.
let batch = batcher.seal(upper.clone());
let batch = batcher.seal::<Tr::Builder>(upper.clone());

writer.insert(batch.clone(), Some(capability.time().clone()));

Expand Down Expand Up @@ -661,7 +673,7 @@ where
}
else {
// Announce progress updates, even without data.
let _batch = batcher.seal(input.frontier().frontier().to_owned());
let _batch = batcher.seal::<Tr::Builder>(input.frontier().frontier().to_owned());
writer.seal(input.frontier().frontier().to_owned());
}

Expand All @@ -685,8 +697,10 @@ where
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
Tr: Trace+TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R>+'static,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.map(|k| (k, ()))
.arrange_core(pact, name)
Expand Down
7 changes: 4 additions & 3 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ where
Tr::Val: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp,R=isize>+'static,
Tr::Batch: Batch,
Tr::Builder: Builder<Item = ((Tr::Key, Tr::Val), Tr::Time, Tr::R)>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -248,7 +249,7 @@ where
// Prepare a cursor to the existing arrangement, and a batch builder for
// new stuff that we add.
let (mut trace_cursor, trace_storage) = reader_local.cursor();
let mut builder = <Tr::Batch as Batch>::Builder::new();
let mut builder = Tr::Builder::new();
for (key, mut list) in to_process.drain(..) {

// The prior value associated with the key.
Expand Down Expand Up @@ -277,10 +278,10 @@ where
for (time, std::cmp::Reverse(next)) in list {
if prev_value != next {
if let Some(prev) = prev_value {
updates.push((key.clone(), prev, time.clone(), -1));
updates.push(((key.clone(), prev), time.clone(), -1));
}
if let Some(next) = next.as_ref() {
updates.push((key.clone(), next.clone(), time.clone(), 1));
updates.push(((key.clone(), next.clone()), time.clone(), 1));
}
prev_value = next;
}
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
if self.upper != upper {
use trace::Builder;
let builder = <Tr::Batch as Batch>::Builder::new();
let builder = Tr::Builder::new();
let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum()));
self.insert(batch, None);
}
Expand Down
3 changes: 3 additions & 0 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use ::difference::Semigroup;

use Data;
use lattice::Lattice;
use trace::{Batcher, Builder};

/// Methods which require data be arrangeable.
impl<G, D, R> Collection<G, D, R>
Expand Down Expand Up @@ -57,6 +58,8 @@ where
where
Tr: crate::trace::Trace+crate::trace::TraceReader<Key=D,Val=(),Time=G::Timestamp,R=R>+'static,
Tr::Batch: crate::trace::Batch,
Tr::Batcher: Batcher<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
{
use operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
Expand Down
18 changes: 9 additions & 9 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,12 +633,12 @@ impl<G, T1> JoinCore<G, T1::Key, T1::Val, T1::R> for Arranged<G,T1>
/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
/// dataflow system a chance to run operators that can consume and aggregate the data.
struct Deferred<K, T, R, C1, C2, D>
struct Deferred<K, T, R, S1, S2, C1, C2, D>
where
T: Timestamp+Lattice+Ord+Debug,
R: Semigroup,
C1: Cursor<Key=K, Time=T>,
C2: Cursor<Key=K, Time=T>,
C1: Cursor<S1, Key=K, Time=T>,
C2: Cursor<S2, Key=K, Time=T>,
C1::Val: Ord+Clone,
C2::Val: Ord+Clone,
C1::R: Semigroup,
Expand All @@ -647,19 +647,19 @@ where
{
phant: ::std::marker::PhantomData<K>,
trace: C1,
trace_storage: C1::Storage,
trace_storage: S1,
batch: C2,
batch_storage: C2::Storage,
batch_storage: S2,
capability: Capability<T>,
done: bool,
temp: Vec<((D, T), R)>,
}

impl<K, T, R, C1, C2, D> Deferred<K, T, R, C1, C2, D>
impl<K, T, R, S1, S2, C1, C2, D> Deferred<K, T, R, S1, S2, C1, C2, D>
where
K: Ord+Debug+Eq,
C1: Cursor<Key=K, Time=T>,
C2: Cursor<Key=K, Time=T>,
C1: Cursor<S1, Key=K, Time=T>,
C2: Cursor<S2, Key=K, Time=T>,
C1::Val: Ord+Clone+Debug,
C2::Val: Ord+Clone+Debug,
C1::R: Semigroup,
Expand All @@ -668,7 +668,7 @@ where
R: Semigroup,
D: Clone+Data,
{
fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability<T>) -> Self {
fn new(trace: C1, trace_storage: S1, batch: C2, batch_storage: S2, capability: Capability<T>) -> Self {
Deferred {
phant: ::std::marker::PhantomData,
trace,
Expand Down
14 changes: 7 additions & 7 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ impl<'a, V:'a, T, R> EditList<'a, V, T, R> where T: Ord+Clone, R: Semigroup {
}
}
/// Loads the contents of a cursor.
fn load<C, L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
where V: Clone, C: Cursor<Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
fn load<S, C, L>(&mut self, cursor: &mut C, storage: &'a S, logic: L)
where V: Clone, C: Cursor<S, Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
self.clear();
while cursor.val_valid(storage) {
cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone()));
Expand Down Expand Up @@ -101,22 +101,22 @@ impl<'storage, V: Ord+Clone+'storage, T: Lattice+Ord+Clone, R: Semigroup> ValueH
self.history.clear();
self.buffer.clear();
}
fn load<C, L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
where C: Cursor<Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
fn load<S, C, L>(&mut self, cursor: &mut C, storage: &'storage S, logic: L)
where C: Cursor<S, Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T {
self.edits.load(cursor, storage, logic);
}

/// Loads and replays a specified key.
///
/// If the key is absent, the replayed history will be empty.
fn replay_key<'history, C, L>(
fn replay_key<'history, S, C, L>(
&'history mut self,
cursor: &mut C,
storage: &'storage C::Storage,
storage: &'storage S,
key: &C::Key,
logic: L
) -> HistoryReplay<'storage, 'history, V, T, R>
where C: Cursor<Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T
where C: Cursor<S, Val=V, Time=T, R=R>, C::Key: Eq, L: Fn(&T)->T
{
self.clear();
cursor.seek_key(storage, key);
Expand Down
36 changes: 20 additions & 16 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
T2::Val: Data,
T2::R: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
Expand All @@ -298,6 +299,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
T2::Val: Data,
T2::R: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static
;
}
Expand All @@ -316,6 +318,7 @@ where
T2::R: Semigroup,
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand All @@ -334,6 +337,7 @@ where
T2::Val: Data,
T2::R: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static {

let mut result_trace = None;
Expand Down Expand Up @@ -472,7 +476,7 @@ where
let mut builders = Vec::new();
for i in 0 .. capabilities.len() {
buffers.push((capabilities[i].time().clone(), Vec::new()));
builders.push(<T2::Batch as Batch>::Builder::new());
builders.push(T2::Builder::new());
}

// cursors for navigating input and output traces.
Expand Down Expand Up @@ -548,7 +552,7 @@ where
for index in 0 .. buffers.len() {
buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
for (val, time, diff) in buffers[index].1.drain(..) {
builders[index].push((key.clone(), val, time, diff));
builders[index].push(((key.clone(), val), time, diff));
}
}
}
Expand Down Expand Up @@ -651,22 +655,22 @@ where
R2: Semigroup,
{
fn new() -> Self;
fn compute<K, C1, C2, C3, L>(
fn compute<K, S1, S2, S3, C1, C2, C3, L>(
&mut self,
key: &K,
source_cursor: (&mut C1, &'a C1::Storage),
output_cursor: (&mut C2, &'a C2::Storage),
batch_cursor: (&mut C3, &'a C3::Storage),
source_cursor: (&mut C1, &'a S1),
output_cursor: (&mut C2, &'a S2),
batch_cursor: (&mut C3, &'a S3),
times: &mut Vec<T>,
logic: &mut L,
upper_limit: &Antichain<T>,
outputs: &mut [(T, Vec<(V2, T, R2)>)],
new_interesting: &mut Vec<T>) -> (usize, usize)
where
K: Eq+Clone,
C1: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C1: Cursor<S1, Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<S2, Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<S3, Key = K, Val = V1, Time = T, R = R1>,
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>);
}

Expand Down Expand Up @@ -729,22 +733,22 @@ mod history_replay {
}
}
#[inline(never)]
fn compute<K, C1, C2, C3, L>(
fn compute<K, S1, S2, S3, C1, C2, C3, L>(
&mut self,
key: &K,
(source_cursor, source_storage): (&mut C1, &'a C1::Storage),
(output_cursor, output_storage): (&mut C2, &'a C2::Storage),
(batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
(source_cursor, source_storage): (&mut C1, &'a S1),
(output_cursor, output_storage): (&mut C2, &'a S2),
(batch_cursor, batch_storage): (&mut C3, &'a S3),
times: &mut Vec<T>,
logic: &mut L,
upper_limit: &Antichain<T>,
outputs: &mut [(T, Vec<(V2, T, R2)>)],
new_interesting: &mut Vec<T>) -> (usize, usize)
where
K: Eq+Clone,
C1: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<Key = K, Val = V1, Time = T, R = R1>,
C1: Cursor<S1, Key = K, Val = V1, Time = T, R = R1>,
C2: Cursor<S2, Key = K, Val = V2, Time = T, R = R2>,
C3: Cursor<S3, Key = K, Val = V1, Time = T, R = R1>,
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>)
{

Expand Down
Loading