Skip to content

Commit

Permalink
Revert to old allocation strategy in merge batcher col
Browse files Browse the repository at this point in the history
This uses constant-sized buffers instead of variably-sized, which seems
to be better from a performance point-of-view. It also increases the buffer
size so we'd expect reasonably large regions to be formed.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Nov 16, 2023
1 parent e405414 commit e232749
Showing 1 changed file with 61 additions and 32 deletions.
93 changes: 61 additions & 32 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
}
}

#[inline(never)]
#[inline]
fn push_batch(&mut self, batch: RefOrMut<Vec<((B::Key, B::Val), B::Time, B::R)>>) {
// `batch` is either a shared reference or an owned allocations.
match batch {
Expand All @@ -59,7 +59,7 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
// in `upper`. All updates must have time greater or equal to the previously used `upper`,
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline(never)]
#[inline]
fn seal(&mut self, upper: Antichain<B::Time>) -> B {

let mut builder = B::Builder::new();
Expand All @@ -72,8 +72,7 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>

self.frontier.clear();

// TODO: Re-use buffer, rather than dropping.
for buffer in merged.drain(..) {
for mut buffer in merged.drain(..) {
for datum @ ((key, val), time, diff) in &buffer[..] {
if upper.less_equal(time) {
self.frontier.insert(time.clone());
Expand All @@ -89,9 +88,9 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
builder.push((key.clone(), val.clone(), time.clone(), diff.clone()));
}
}
// buffer.clear();
// Recycling buffer.
// self.sorter.push(&mut buffer);
buffer.clear();
self.sorter.recycle(buffer);
}

// Finish the kept data.
Expand All @@ -103,16 +102,7 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
}

// Drain buffers (fast reclaimation).
// TODO : This isn't obviously the best policy, but "safe" wrt footprint.
// In particular, if we are reading serialized input data, we may
// prefer to keep these buffers around to re-fill, if possible.
let mut buffer = Default::default();
self.sorter.push(&mut buffer);
// We recycle buffers with allocations (capacity, and not zero-sized).
while buffer.capacity() > 0 && std::mem::size_of::<((B::Key,B::Val),B::Time,B::R)>() > 0 {
buffer = Default::default();
self.sorter.push(&mut buffer);
}
self.sorter.clear_stash();

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()));
self.lower = upper;
Expand Down Expand Up @@ -163,13 +153,14 @@ impl<T: Columnation + 'static> TimelyStackQueue<T> {
pub struct MergeSorterColumnation<D: Ord+Columnation, T: Ord+Columnation, R: Semigroup+Columnation> {
queue: Vec<Vec<TimelyStack<(D, T, R)>>>, // each power-of-two length list of allocations.
stash: Vec<TimelyStack<(D, T, R)>>,
pending: Vec<(D, T, R)>,
}

impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semigroup+Columnation+'static> MergeSorterColumnation<D, T, R> {

const BUFFER_SIZE_BYTES: usize = 1 << 13;
const BUFFER_SIZE_BYTES: usize = 64 << 10;

fn buffer_size() -> usize {
const fn buffer_size() -> usize {
let size = ::std::mem::size_of::<(D, T, R)>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
Expand All @@ -181,25 +172,60 @@ impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semi
}

#[inline]
pub fn new() -> Self { MergeSorterColumnation { queue: Vec::new(), stash: Vec::new() } }
pub fn new() -> Self {
Self {
queue: Vec::new(),
stash: Vec::new(),
pending: Vec::new()
}
}

#[inline]
pub fn empty(&mut self) -> TimelyStack<(D, T, R)> {
self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size()))
}

pub fn clear_stash(&mut self) {
self.stash.clear();
}

#[inline]
pub fn recycle(&mut self, buffer: TimelyStack<(D, T, R)>) {
assert!(buffer.is_empty());
if buffer.capacity() == Self::buffer_size() {
self.stash.push(buffer);
}
}

#[inline]
pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) {
if self.pending.capacity() == 0 {
self.pending.reserve(Self::buffer_size());
}

while batch.len() > 0 {
self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len())));
if self.pending.len() == self.pending.capacity() {
crate::consolidation::consolidate_updates(&mut self.pending);
if self.pending.len() > self.pending.capacity() / 2 {
self.flush_pending();
}
}
}
}

if batch.len() > 0 {
crate::consolidation::consolidate_updates(batch);
let mut stack = TimelyStack::with_capacity(batch.len());
stack.reserve_items(batch.iter());
for tuple in batch.iter() {
#[inline]
fn flush_pending(&mut self) {
if self.pending.len() > 0 {
crate::consolidation::consolidate_updates(&mut self.pending);
let mut stack = self.empty();
stack.reserve_items(self.pending.iter());
for tuple in self.pending.iter() {
stack.copy(tuple);
}
self.pending.clear();
self.queue.push(vec![stack]);
while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() - 1) {
while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) {
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
let merged = self.merge_by(list1, list2);
Expand All @@ -220,8 +246,9 @@ impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semi
self.queue.push(list);
}

#[inline(never)]
#[inline]
pub fn finish_into(&mut self, target: &mut Vec<TimelyStack<(D, T, R)>>) {
self.flush_pending();
while self.queue.len() > 1 {
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
Expand Down Expand Up @@ -276,25 +303,27 @@ impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semi
}

if result.capacity() == result.len() {
let len = result.len();
output.push(result);
result = TimelyStack::with_capacity(len * 2);
result = self.empty();
}

if head1.is_empty() {
let done1 = head1.done();
if done1.capacity() == Self::buffer_size() { self.stash.push(done1); }
self.recycle(done1);
head1 = if list1.peek().is_some() { TimelyStackQueue::from(list1.next().unwrap()) } else { TimelyStackQueue::new() };
}
if head2.is_empty() {
let done2 = head2.done();
if done2.capacity() == Self::buffer_size() { self.stash.push(done2); }
self.recycle(done2);
head2 = if list2.peek().is_some() { TimelyStackQueue::from(list2.next().unwrap()) } else { TimelyStackQueue::new() };
}
}

if result.len() > 0 { output.push(result); }
else if result.capacity() > 0 { self.stash.push(result); }
if result.len() > 0 {
output.push(result);
} else {
self.recycle(result);
}

if !head1.is_empty() {
let mut result = self.empty();
Expand Down

0 comments on commit e232749

Please sign in to comment.