Skip to content

Commit

Permalink
Remove batcher dependence on updates (#431)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Nov 28, 2023
1 parent df03c4e commit 165d215
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 55 deletions.
37 changes: 20 additions & 17 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,34 @@
use std::collections::VecDeque;

use timely::communication::message::RefOrMut;
use timely::progress::frontier::Antichain;
use timely::progress::{frontier::Antichain, Timestamp};

use ::difference::Semigroup;

use trace::{Batcher, Builder};
use trace::implementations::Update;

/// Creates batches from unordered tuples.
pub struct MergeBatcher<U: Update> {
sorter: MergeSorter<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>,
lower: Antichain<U::Time>,
frontier: Antichain<U::Time>,
phantom: ::std::marker::PhantomData<U>,
pub struct MergeBatcher<K, V, T, D> {
sorter: MergeSorter<(K, V), T, D>,
lower: Antichain<T>,
frontier: Antichain<T>,
}

impl<U: Update> Batcher for MergeBatcher<U> {
type Item = ((U::KeyOwned,U::ValOwned),U::Time,U::Diff);
type Time = U::Time;
impl<K, V, T, D> Batcher for MergeBatcher<K, V, T, D>
where
K: Ord + Clone,
V: Ord + Clone,
T: Timestamp,
D: Semigroup,
{
type Item = ((K,V),T,D);
type Time = T;

fn new() -> Self {
MergeBatcher {
sorter: MergeSorter::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()),
phantom: ::std::marker::PhantomData,
lower: Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()),
}
}

Expand All @@ -53,7 +56,7 @@ impl<U: Update> Batcher for MergeBatcher<U> {
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline(never)]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<U::Time>) -> B::Output {
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {

let mut merged = Vec::new();
self.sorter.finish_into(&mut merged);
Expand Down Expand Up @@ -126,23 +129,23 @@ impl<U: Update> Batcher for MergeBatcher<U> {
let mut buffer = Vec::new();
self.sorter.push(&mut buffer);
// We recycle buffers with allocations (capacity, and not zero-sized).
while buffer.capacity() > 0 && std::mem::size_of::<((U::KeyOwned,U::ValOwned),U::Time,U::Diff)>() > 0 {
while buffer.capacity() > 0 && std::mem::size_of::<((K,V),T,D)>() > 0 {
buffer = Vec::new();
self.sorter.push(&mut buffer);
}

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()));
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()));
self.lower = upper;
seal
}

// the frontier of elements remaining after the most recent call to `self.seal`.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<U::Time> {
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
self.frontier.borrow()
}
}

struct MergeSorter<D: Ord, T: Ord, R: Semigroup> {
struct MergeSorter<D, T, R> {
queue: Vec<Vec<Vec<(D, T, R)>>>, // each power-of-two length list of allocations.
stash: Vec<Vec<(D, T, R)>>,
}
Expand Down
50 changes: 23 additions & 27 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,42 @@
//! A general purpose `Batcher` implementation based on radix sort for TimelyStack.

use std::marker::PhantomData;
use timely::Container;
use timely::communication::message::RefOrMut;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::progress::frontier::Antichain;
use timely::progress::{frontier::Antichain, Timestamp};

use ::difference::Semigroup;

use trace::{Batcher, Builder};
use trace::implementations::Update;

/// Creates batches from unordered tuples.
pub struct ColumnatedMergeBatcher<U: Update>
pub struct ColumnatedMergeBatcher<K, V, T, D>
where
U::KeyOwned: Columnation,
U::ValOwned: Columnation,
U::Time: Columnation,
U::Diff: Columnation,
K: Columnation,
V: Columnation,
T: Columnation,
D: Columnation,
{
sorter: MergeSorterColumnation<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>,
lower: Antichain<U::Time>,
frontier: Antichain<U::Time>,
phantom: PhantomData<U>,
sorter: MergeSorterColumnation<(K, V), T, D>,
lower: Antichain<T>,
frontier: Antichain<T>,
}

impl<U: Update> Batcher for ColumnatedMergeBatcher<U>
impl<K, V, T, D> Batcher for ColumnatedMergeBatcher<K, V, T, D>
where
U::KeyOwned: Columnation + 'static,
U::ValOwned: Columnation + 'static,
U::Time: Columnation + 'static,
U::Diff: Columnation + 'static,
K: Columnation + Ord + Clone + 'static,
V: Columnation + Ord + Clone + 'static,
T: Columnation + Timestamp + 'static,
D: Columnation + Semigroup + 'static,
{
type Item = ((U::KeyOwned,U::ValOwned),U::Time,U::Diff);
type Time = U::Time;
type Item = ((K,V),T,D);
type Time = T;

fn new() -> Self {
ColumnatedMergeBatcher {
sorter: MergeSorterColumnation::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()),
phantom: PhantomData,
lower: Antichain::from_elem(<T as Timestamp>::minimum()),
}
}

Expand All @@ -64,7 +60,7 @@ where
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<U::Time>) -> B::Output {
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {

let mut merged = Default::default();
self.sorter.finish_into(&mut merged);
Expand Down Expand Up @@ -106,7 +102,7 @@ where
if upper.less_equal(time) {
self.frontier.insert(time.clone());
if keep.is_empty() {
if keep.capacity() != MergeSorterColumnation::<(U::KeyOwned, U::ValOwned), U::Time, U::Diff>::buffer_size() {
if keep.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() {
keep = self.sorter.empty();
}
} else if keep.len() == keep.capacity() {
Expand Down Expand Up @@ -134,13 +130,13 @@ where
// Drain buffers (fast reclamation).
self.sorter.clear_stash();

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<U::Time as timely::progress::Timestamp>::minimum()));
let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<T as timely::progress::Timestamp>::minimum()));
self.lower = upper;
seal
}

// the frontier of elements remaining after the most recent call to `self.seal`.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<U::Time> {
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
self.frontier.borrow()
}
}
Expand Down Expand Up @@ -186,13 +182,13 @@ impl<T: Columnation> TimelyStackQueue<T> {
}
}

struct MergeSorterColumnation<D: Ord+Columnation, T: Ord+Columnation, R: Semigroup+Columnation> {
struct MergeSorterColumnation<D: Columnation, T: Columnation, R: Columnation> {
queue: Vec<Vec<TimelyStack<(D, T, R)>>>, // each power-of-two length list of allocations.
stash: Vec<TimelyStack<(D, T, R)>>,
pending: Vec<(D, T, R)>,
}

impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semigroup+Columnation+'static> MergeSorterColumnation<D, T, R> {
impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Columnation+'static> MergeSorterColumnation<D, T, R> {

const BUFFER_SIZE_BYTES: usize = 64 << 10;

Expand Down
12 changes: 6 additions & 6 deletions src/trace/implementations/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,41 +42,41 @@ use trace::abomonated_blanket_impls::AbomonatedBuilder;
/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<
Rc<OrdValBatch<Vector<((K,V),T,R), O>>>,
MergeBatcher<((K,V),T,R)>,
MergeBatcher<K,V,T,R>,
RcBuilder<OrdValBuilder<Vector<((K,V),T,R), O>>>,
>;

/// A trace implementation using a spine of abomonated ordered lists.
pub type OrdValSpineAbom<K, V, T, R, O=usize> = Spine<
Rc<Abomonated<OrdValBatch<Vector<((K,V),T,R), O>>, Vec<u8>>>,
MergeBatcher<((K,V),T,R)>,
MergeBatcher<K,V,T,R>,
AbomonatedBuilder<OrdValBuilder<Vector<((K,V),T,R), O>>>,
>;

/// A trace implementation for empty values using a spine of ordered lists.
pub type OrdKeySpine<K, T, R, O=usize> = Spine<
Rc<OrdKeyBatch<Vector<((K,()),T,R), O>>>,
MergeBatcher<((K,()),T,R)>,
MergeBatcher<K,(),T,R>,
RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R), O>>>,
>;

/// A trace implementation for empty values using a spine of abomonated ordered lists.
pub type OrdKeySpineAbom<K, T, R, O=usize> = Spine<
Rc<Abomonated<OrdKeyBatch<Vector<((K,()),T,R), O>>, Vec<u8>>>,
MergeBatcher<((K,()),T,R)>,
MergeBatcher<K,(),T,R>,
AbomonatedBuilder<OrdKeyBuilder<Vector<((K,()),T,R), O>>>,
>;

/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R, O=usize> = Spine<
Rc<OrdValBatch<TStack<((K,V),T,R), O>>>,
ColumnatedMergeBatcher<((K,V),T,R)>,
ColumnatedMergeBatcher<K,V,T,R>,
RcBuilder<OrdValBuilder<TStack<((K,V),T,R), O>>>,
>;
/// A trace implementation backed by columnar storage.
pub type ColKeySpine<K, T, R, O=usize> = Spine<
Rc<OrdKeyBatch<TStack<((K,()),T,R), O>>>,
ColumnatedMergeBatcher<((K,()),T,R)>,
ColumnatedMergeBatcher<K,(),T,R>,
RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R), O>>>,
>;

Expand Down
6 changes: 3 additions & 3 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use self::val_batch::{OrdValBatch, OrdValBuilder};
/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<
Rc<OrdValBatch<Vector<((K,V),T,R), O>>>,
MergeBatcher<((K,V),T,R)>,
MergeBatcher<K,V,T,R>,
RcBuilder<OrdValBuilder<Vector<((K,V),T,R), O>>>,
>;
// /// A trace implementation for empty values using a spine of ordered lists.
Expand All @@ -31,14 +31,14 @@ pub type OrdValSpine<K, V, T, R, O=usize> = Spine<
/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R, O=usize> = Spine<
Rc<OrdValBatch<TStack<((K,V),T,R), O>>>,
ColumnatedMergeBatcher<((K,V),T,R)>,
ColumnatedMergeBatcher<K,V,T,R>,
RcBuilder<OrdValBuilder<TStack<((K,V),T,R), O>>>,
>;

/// A trace implementation backed by columnar storage.
pub type PreferredSpine<K, V, T, R, O=usize> = Spine<
Rc<OrdValBatch<Preferred<K,V,T,R,O>>>,
ColumnatedMergeBatcher<Preferred<K,V,T,R,O>>,
ColumnatedMergeBatcher<<K as ToOwned>::Owned,<V as ToOwned>::Owned,T,R>,
RcBuilder<OrdValBuilder<Preferred<K,V,T,R,O>>>,
>;

Expand Down
4 changes: 2 additions & 2 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder};
/// A trace implementation using a spine of ordered lists.
pub type VecSpine<K, V, T, R, O=usize> = Spine<
Rc<RhhValBatch<Vector<((K,V),T,R), O>>>,
MergeBatcher<((K,V),T,R)>,
MergeBatcher<K,V,T,R>,
RcBuilder<RhhValBuilder<Vector<((K,V),T,R), O>>>,
>;
// /// A trace implementation for empty values using a spine of ordered lists.
Expand All @@ -29,7 +29,7 @@ pub type VecSpine<K, V, T, R, O=usize> = Spine<
/// A trace implementation backed by columnar storage.
pub type ColSpine<K, V, T, R, O=usize> = Spine<
Rc<RhhValBatch<TStack<((K,V),T,R), O>>>,
ColumnatedMergeBatcher<((K,V),T,R)>,
ColumnatedMergeBatcher<K,V,T,R>,
RcBuilder<RhhValBuilder<TStack<((K,V),T,R), O>>>,
>;
// /// A trace implementation backed by columnar storage.
Expand Down

0 comments on commit 165d215

Please sign in to comment.