diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 2805ff773..d6160df92 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -1,5 +1,6 @@ //! A general purpose `Batcher` implementation based on radix sort for TimelyStack. +use std::marker::PhantomData; use timely::Container; use timely::communication::message::RefOrMut; use timely::container::columnation::{Columnation, TimelyStack}; @@ -21,7 +22,7 @@ pub struct ColumnatedMergeBatcher sorter: MergeSorterColumnation<(B::Key, B::Val), B::Time, B::R>, lower: Antichain, frontier: Antichain, - phantom: ::std::marker::PhantomData, + phantom: PhantomData, } impl Batcher for ColumnatedMergeBatcher @@ -36,7 +37,7 @@ impl Batcher for ColumnatedMergeBatcher sorter: MergeSorterColumnation::new(), frontier: Antichain::new(), lower: Antichain::from_elem(::minimum()), - phantom: ::std::marker::PhantomData, + phantom: PhantomData, } } @@ -77,11 +78,9 @@ impl Batcher for ColumnatedMergeBatcher for datum @ ((key, val), time, diff) in &buffer[..] { if upper.less_equal(time) { self.frontier.insert(time.clone()); - if keep.len() == keep.capacity() { - if keep.len() > 0 { - kept.push(keep); - keep = self.sorter.empty(); - } + if !keep.is_empty() && keep.len() == keep.capacity() { + kept.push(keep); + keep = self.sorter.empty(); } keep.copy(datum); } @@ -99,14 +98,14 @@ impl Batcher for ColumnatedMergeBatcher } // Finish the kept data. - if keep.len() > 0 { + if !keep.is_empty() { kept.push(keep); } - if kept.len() > 0 { + if !kept.is_empty() { self.sorter.push_list(kept); } - // Drain buffers (fast reclaimation). + // Drain buffers (fast reclamation). self.sorter.clear_stash(); let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); @@ -120,46 +119,47 @@ impl Batcher for ColumnatedMergeBatcher } } -pub struct TimelyStackQueue { +struct TimelyStackQueue { list: TimelyStack, head: usize, } impl TimelyStackQueue { - #[inline] - pub fn new() -> Self { TimelyStackQueue::from(Default::default()) } - #[inline] - pub fn pop(&mut self) -> &T { + + fn new() -> Self { TimelyStackQueue::from(Default::default()) } + + fn pop(&mut self) -> &T { self.head += 1; &self.list[self.head - 1] } - #[inline] - pub fn peek(&self) -> &T { + + fn peek(&self) -> &T { &self.list[self.head] } - #[inline] - pub fn from(list: TimelyStack) -> Self { + + fn from(list: TimelyStack) -> Self { TimelyStackQueue { list, head: 0, } } - #[inline] - pub fn done(mut self) -> TimelyStack { + + fn done(mut self) -> TimelyStack { self.list.clear(); self.list } - #[inline] - pub fn len(&self) -> usize { self.list.len() - self.head } - #[inline] - pub fn is_empty(&self) -> bool { self.head == self.list.len() } + fn len(&self) -> usize { self.list.len() - self.head } + + fn is_empty(&self) -> bool { self.head == self.list.len() } + + /// Return an iterator over the remaining elements. fn iter(&self) -> impl Iterator + Clone + ExactSizeIterator { self.list.iter().skip(self.head) } } -pub struct MergeSorterColumnation { +struct MergeSorterColumnation { queue: Vec>>, // each power-of-two length list of allocations. stash: Vec>, pending: Vec<(D, T, R)>, @@ -169,8 +169,9 @@ impl usize { - let size = ::std::mem::size_of::<(D, T, R)>(); + let size = std::mem::size_of::<(D, T, R)>(); if size == 0 { Self::BUFFER_SIZE_BYTES } else if size <= Self::BUFFER_SIZE_BYTES { @@ -180,8 +181,7 @@ impl Self { + fn new() -> Self { Self { queue: Vec::new(), stash: Vec::new(), @@ -189,30 +189,29 @@ impl TimelyStack<(D, T, R)> { + fn empty(&mut self) -> TimelyStack<(D, T, R)> { self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) } - pub fn clear_stash(&mut self) { + /// Remove all elements from the stash. + fn clear_stash(&mut self) { self.stash.clear(); } - #[inline] - pub fn recycle(&mut self, buffer: TimelyStack<(D, T, R)>) { + /// Insert an empty buffer into the stash. Panics if the buffer is not empty. + fn recycle(&mut self, buffer: TimelyStack<(D, T, R)>) { assert!(buffer.is_empty()); if buffer.capacity() == Self::buffer_size() { self.stash.push(buffer); } } - #[inline] - pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) { + fn push(&mut self, batch: &mut Vec<(D, T, R)>) { if self.pending.capacity() == 0 { self.pending.reserve(Self::buffer_size()); } - while batch.len() > 0 { + while !batch.is_empty() { self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()))); if self.pending.len() == self.pending.capacity() { crate::consolidation::consolidate_updates(&mut self.pending); @@ -223,9 +222,10 @@ impl 0 { + if !self.pending.is_empty() { let mut stack = self.empty(); stack.reserve_items(self.pending.iter()); for tuple in self.pending.iter() { @@ -244,7 +244,7 @@ impl>) { + fn push_list(&mut self, list: Vec>) { while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { let list1 = self.queue.pop().unwrap(); let list2 = self.queue.pop().unwrap(); @@ -254,8 +254,7 @@ impl>) { + fn finish_into(&mut self, target: &mut Vec>) { crate::consolidation::consolidate_updates(&mut self.pending); self.flush_pending(); while self.queue.len() > 1 { @@ -266,12 +265,11 @@ impl>, list2: Vec>) -> Vec> { use std::cmp::Ordering; @@ -289,7 +287,7 @@ impl 0 && head1.len() > 0 && head2.len() > 0 { + while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { let cmp = { let x = head1.peek(); @@ -303,7 +301,7 @@ impl