diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index ef07ba761..efe817698 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -40,7 +40,7 @@ impl Batcher for ColumnatedMergeBatcher } } - #[inline(never)] + #[inline] fn push_batch(&mut self, batch: RefOrMut>) { // `batch` is either a shared reference or an owned allocations. match batch { @@ -59,7 +59,7 @@ impl Batcher for ColumnatedMergeBatcher // in `upper`. All updates must have time greater or equal to the previously used `upper`, // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. - #[inline(never)] + #[inline] fn seal(&mut self, upper: Antichain) -> B { let mut builder = B::Builder::new(); @@ -72,8 +72,7 @@ impl Batcher for ColumnatedMergeBatcher self.frontier.clear(); - // TODO: Re-use buffer, rather than dropping. - for buffer in merged.drain(..) { + for mut buffer in merged.drain(..) { for datum @ ((key, val), time, diff) in &buffer[..] { if upper.less_equal(time) { self.frontier.insert(time.clone()); @@ -89,9 +88,9 @@ impl Batcher for ColumnatedMergeBatcher builder.push((key.clone(), val.clone(), time.clone(), diff.clone())); } } - // buffer.clear(); // Recycling buffer. - // self.sorter.push(&mut buffer); + buffer.clear(); + self.sorter.recycle(buffer); } // Finish the kept data. @@ -103,16 +102,7 @@ impl Batcher for ColumnatedMergeBatcher } // Drain buffers (fast reclaimation). - // TODO : This isn't obviously the best policy, but "safe" wrt footprint. - // In particular, if we are reading serialized input data, we may - // prefer to keep these buffers around to re-fill, if possible. - let mut buffer = Default::default(); - self.sorter.push(&mut buffer); - // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((B::Key,B::Val),B::Time,B::R)>() > 0 { - buffer = Default::default(); - self.sorter.push(&mut buffer); - } + self.sorter.clear_stash(); let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); self.lower = upper; @@ -163,13 +153,14 @@ impl TimelyStackQueue { pub struct MergeSorterColumnation { queue: Vec>>, // each power-of-two length list of allocations. stash: Vec>, + pending: Vec<(D, T, R)>, } impl MergeSorterColumnation { - const BUFFER_SIZE_BYTES: usize = 1 << 13; + const BUFFER_SIZE_BYTES: usize = 64 << 10; - fn buffer_size() -> usize { + const fn buffer_size() -> usize { let size = ::std::mem::size_of::<(D, T, R)>(); if size == 0 { Self::BUFFER_SIZE_BYTES @@ -181,25 +172,60 @@ impl Self { MergeSorterColumnation { queue: Vec::new(), stash: Vec::new() } } + pub fn new() -> Self { + Self { + queue: Vec::new(), + stash: Vec::new(), + pending: Vec::new() + } + } #[inline] pub fn empty(&mut self) -> TimelyStack<(D, T, R)> { self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) } + pub fn clear_stash(&mut self) { + self.stash.clear(); + } + + #[inline] + pub fn recycle(&mut self, buffer: TimelyStack<(D, T, R)>) { + assert!(buffer.is_empty()); + if buffer.capacity() == Self::buffer_size() { + self.stash.push(buffer); + } + } + #[inline] pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) { + if self.pending.capacity() == 0 { + self.pending.reserve(Self::buffer_size()); + } + + while batch.len() > 0 { + self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()))); + if self.pending.len() == self.pending.capacity() { + crate::consolidation::consolidate_updates(&mut self.pending); + if self.pending.len() > self.pending.capacity() / 2 { + self.flush_pending(); + } + } + } + } - if batch.len() > 0 { - crate::consolidation::consolidate_updates(batch); - let mut stack = TimelyStack::with_capacity(batch.len()); - stack.reserve_items(batch.iter()); - for tuple in batch.iter() { + #[inline] + fn flush_pending(&mut self) { + if self.pending.len() > 0 { + crate::consolidation::consolidate_updates(&mut self.pending); + let mut stack = self.empty(); + stack.reserve_items(self.pending.iter()); + for tuple in self.pending.iter() { stack.copy(tuple); } + self.pending.clear(); self.queue.push(vec![stack]); - while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() - 1) { + while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { let list1 = self.queue.pop().unwrap(); let list2 = self.queue.pop().unwrap(); let merged = self.merge_by(list1, list2); @@ -220,8 +246,9 @@ impl>) { + self.flush_pending(); while self.queue.len() > 1 { let list1 = self.queue.pop().unwrap(); let list2 = self.queue.pop().unwrap(); @@ -276,25 +303,27 @@ impl 0 { output.push(result); } - else if result.capacity() > 0 { self.stash.push(result); } + if result.len() > 0 { + output.push(result); + } else { + self.recycle(result); + } if !head1.is_empty() { let mut result = self.empty();