Skip to content

Commit

Permalink
Allow Batcher::seal to be generic in Builder
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 22, 2023
1 parent f4c3dd1 commit 1bd6c4e
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 165 deletions.
34 changes: 17 additions & 17 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use timely::dataflow::operators::Capability;
use ::{Data, ExchangeData, Collection, AsCollection, Hashable};
use ::difference::Semigroup;
use lattice::Lattice;
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor};
use trace::implementations::{KeySpine, ValSpine};

use trace::wrappers::enter::{TraceEnter, BatchEnter,};
Expand Down Expand Up @@ -456,8 +456,8 @@ where
R: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.arrange_named("Arrange")
}
Expand All @@ -474,8 +474,8 @@ where
R: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
Expand All @@ -491,8 +491,8 @@ where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
;
}

Expand All @@ -510,8 +510,8 @@ where
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.arrange_named("Arrange")
}
Expand All @@ -522,8 +522,8 @@ where
V: ExchangeData,
R: ExchangeData,
Tr: Trace + TraceReader<Time=G::Timestamp> + 'static, Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
self.arrange_core(exchange, name)
Expand All @@ -534,8 +534,8 @@ where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,V),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>,
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,V),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
// The `Arrange` operator is tasked with reacting to an advancing input
// frontier by producing the sequence of batches whose lower and upper
Expand Down Expand Up @@ -645,7 +645,7 @@ where
}

// Extract updates not in advance of `upper`.
let batch = batcher.seal(upper.clone());
let batch = batcher.seal::<<Tr::Batch as Batch>::Builder>(upper.clone());

writer.insert(batch.clone(), Some(capability.time().clone()));

Expand Down Expand Up @@ -673,7 +673,7 @@ where
}
else {
// Announce progress updates, even without data.
let _batch = batcher.seal(input.frontier().frontier().to_owned());
let _batch = batcher.seal::<<Tr::Batch as Batch>::Builder>(input.frontier().frontier().to_owned());
writer.seal(input.frontier().frontier().to_owned());
}

Expand All @@ -699,8 +699,8 @@ where
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
Tr: Trace+TraceReader<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((K,()),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp>,
<Tr::Batch as Batch>::Builder: Builder<Item = ((K,()),G::Timestamp,R), Time = G::Timestamp, Output = Tr::Batch>,
{
self.map(|k| (k, ()))
.arrange_core(pact, name)
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ where
Tr::Val: ExchangeData,
Tr: Trace+TraceReader<Time=G::Timestamp,R=isize>+'static,
Tr::Batch: Batch,
<Tr::Batch as Batch>::Builder: Builder<Tr::Batch, Item = ((Tr::Key, Tr::Val), Tr::Time, Tr::R)>,
<Tr::Batch as Batch>::Builder: Builder<Item = ((Tr::Key, Tr::Val), Tr::Time, Tr::R)>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down
6 changes: 3 additions & 3 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use ::difference::Semigroup;

use Data;
use lattice::Lattice;
use trace::{Batch, Batcher};
use trace::{Batch, Batcher, Builder};

/// Methods which require data be arrangeable.
impl<G, D, R> Collection<G, D, R>
Expand Down Expand Up @@ -58,8 +58,8 @@ where
where
Tr: crate::trace::Trace+crate::trace::TraceReader<Key=D,Val=(),Time=G::Timestamp,R=R>+'static,
Tr::Batch: crate::trace::Batch,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Item = ((D,()),G::Timestamp,R)>,
<Tr::Batch as Batch>::Batcher: Batcher<Tr::Batch, Time = G::Timestamp>,
<Tr::Batch as Batch>::Batcher: Batcher<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
<Tr::Batch as Batch>::Builder: Builder<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
{
use operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
Expand Down
8 changes: 4 additions & 4 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
T2::Val: Data,
T2::R: Abelian,
T2::Batch: Batch,
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
<T2::Batch as Batch>::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
Expand All @@ -299,7 +299,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
T2::Val: Data,
T2::R: Semigroup,
T2::Batch: Batch,
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
<T2::Batch as Batch>::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static
;
}
Expand All @@ -318,7 +318,7 @@ where
T2::R: Semigroup,
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
<T2::Batch as Batch>::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand All @@ -337,7 +337,7 @@ where
T2::Val: Data,
T2::R: Semigroup,
T2::Batch: Batch,
<T2::Batch as Batch>::Builder: Builder<T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
<T2::Batch as Batch>::Builder: Builder<Output=T2::Batch, Item = ((T2::Key, T2::Val), T2::Time, T2::R)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static {

let mut result_trace = None;
Expand Down
40 changes: 16 additions & 24 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,26 @@ use timely::progress::frontier::Antichain;

use ::difference::Semigroup;

use lattice::Lattice;
use trace::{Batch, Batcher, Builder};
use trace::{Batcher, Builder};
use trace::implementations::Update;

/// Creates batches from unordered tuples.
pub struct MergeBatcher<B: Batch> where B::Key: Ord, B::Val: Ord, B::Time: Ord, B::R: Semigroup {
sorter: MergeSorter<(B::Key, B::Val), B::Time, B::R>,
lower: Antichain<B::Time>,
frontier: Antichain<B::Time>,
phantom: ::std::marker::PhantomData<B>,
pub struct MergeBatcher<U: Update> {
sorter: MergeSorter<(U::Key, U::Val), U::Time, U::Diff>,
lower: Antichain<U::Time>,
frontier: Antichain<U::Time>,
phantom: ::std::marker::PhantomData<U>,
}

impl<B> Batcher<B> for MergeBatcher<B>
where
B: Batch,
B::Key: Ord+Clone,
B::Val: Ord+Clone,
B::Time: Lattice+timely::progress::Timestamp+Ord+Clone,
B::R: Semigroup,
B::Builder: Builder<B, Item = ((B::Key, B::Val), B::Time, B::R)>,
{
type Item = ((B::Key,B::Val),B::Time,B::R);
type Time = B::Time;
impl<U: Update> Batcher for MergeBatcher<U> {
type Item = ((U::Key,U::Val),U::Time,U::Diff);
type Time = U::Time;

fn new() -> Self {
MergeBatcher {
sorter: MergeSorter::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
lower: Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()),
phantom: ::std::marker::PhantomData,
}
}
Expand All @@ -61,9 +53,9 @@ where
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline(never)]
fn seal(&mut self, upper: Antichain<B::Time>) -> B {
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<U::Time>) -> B::Output {

let mut builder = B::Builder::new();
let mut builder = B::new();

let mut merged = Vec::new();
self.sorter.finish_into(&mut merged);
Expand Down Expand Up @@ -109,18 +101,18 @@ where
let mut buffer = Vec::new();
self.sorter.push(&mut buffer);
// We recycle buffers with allocations (capacity, and not zero-sized).
while buffer.capacity() > 0 && std::mem::size_of::<((B::Key,B::Val),B::Time,B::R)>() > 0 {
while buffer.capacity() > 0 && std::mem::size_of::<((U::Key,U::Val),U::Time,U::Diff)>() > 0 {
buffer = Vec::new();
self.sorter.push(&mut buffer);
}

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()));
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()));
self.lower = upper;
seal
}

// the frontier of elements remaining after the most recent call to `self.seal`.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<B::Time> {
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<U::Time> {
self.frontier.borrow()
}
}
Expand Down
51 changes: 25 additions & 26 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,38 @@ use timely::progress::frontier::Antichain;

use ::difference::Semigroup;

use lattice::Lattice;
use trace::{Batch, Batcher, Builder};
use trace::{Batcher, Builder};
use trace::implementations::Update;

/// Creates batches from unordered tuples.
pub struct ColumnatedMergeBatcher<B: Batch>
where
B::Key: Ord+Clone+Columnation,
B::Val: Ord+Clone+Columnation,
B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation,
B::R: Semigroup+Columnation,
pub struct ColumnatedMergeBatcher<U: Update>
where
U::Key: Columnation,
U::Val: Columnation,
U::Time: Columnation,
U::Diff: Columnation,
{
sorter: MergeSorterColumnation<(B::Key, B::Val), B::Time, B::R>,
lower: Antichain<B::Time>,
frontier: Antichain<B::Time>,
phantom: PhantomData<B>,
sorter: MergeSorterColumnation<(U::Key, U::Val), U::Time, U::Diff>,
lower: Antichain<U::Time>,
frontier: Antichain<U::Time>,
phantom: PhantomData<U>,
}

impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
where
B::Key: Ord+Clone+Columnation+'static,
B::Val: Ord+Clone+Columnation+'static,
B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation+'static,
B::R: Semigroup+Columnation+'static,
B::Builder: Builder<B, Item = ((B::Key, B::Val), B::Time, B::R)>,
impl<U: Update> Batcher for ColumnatedMergeBatcher<U>
where
U::Key: Columnation + 'static,
U::Val: Columnation + 'static,
U::Time: Columnation + 'static,
U::Diff: Columnation + 'static,
{
type Item = ((B::Key,B::Val),B::Time,B::R);
type Time = B::Time;
type Item = ((U::Key,U::Val),U::Time,U::Diff);
type Time = U::Time;

fn new() -> Self {
ColumnatedMergeBatcher {
sorter: MergeSorterColumnation::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
lower: Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()),
phantom: PhantomData,
}
}
Expand All @@ -65,9 +64,9 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline]
fn seal(&mut self, upper: Antichain<B::Time>) -> B {
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<U::Time>) -> B::Output {

let mut builder = B::Builder::new();
let mut builder = B::new();

let mut merged = Default::default();
self.sorter.finish_into(&mut merged);
Expand Down Expand Up @@ -106,13 +105,13 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
// Drain buffers (fast reclamation).
self.sorter.clear_stash();

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()));
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()));
self.lower = upper;
seal
}

// the frontier of elements remaining after the most recent call to `self.seal`.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<B::Time> {
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<U::Time> {
self.frontier.borrow()
}
}
Expand Down
Loading

0 comments on commit 1bd6c4e

Please sign in to comment.