From 546c0beb9cc0a8f6cff75a158283c29d9cd33929 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 14 Nov 2023 10:49:55 -0500 Subject: [PATCH 1/8] Copy merge_batcher Signed-off-by: Moritz Hoffmann --- .../implementations/merge_batcher_col.rs | 285 ++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 src/trace/implementations/merge_batcher_col.rs diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs new file mode 100644 index 000000000..00b01501f --- /dev/null +++ b/src/trace/implementations/merge_batcher_col.rs @@ -0,0 +1,285 @@ +//! A general purpose `Batcher` implementation based on radix sort. + +use std::collections::VecDeque; + +use timely::communication::message::RefOrMut; +use timely::progress::frontier::Antichain; + +use ::difference::Semigroup; + +use lattice::Lattice; +use trace::{Batch, Batcher, Builder}; + +/// Creates batches from unordered tuples. +pub struct MergeBatcher where B::Key: Ord, B::Val: Ord, B::Time: Ord, B::R: Semigroup { + sorter: MergeSorter<(B::Key, B::Val), B::Time, B::R>, + lower: Antichain, + frontier: Antichain, + phantom: ::std::marker::PhantomData, +} + +impl Batcher for MergeBatcher +where + B: Batch, + B::Key: Ord+Clone, + B::Val: Ord+Clone, + B::Time: Lattice+timely::progress::Timestamp+Ord+Clone, + B::R: Semigroup, +{ + fn new() -> Self { + MergeBatcher { + sorter: MergeSorter::new(), + frontier: Antichain::new(), + lower: Antichain::from_elem(::minimum()), + phantom: ::std::marker::PhantomData, + } + } + + #[inline(never)] + fn push_batch(&mut self, batch: RefOrMut>) { + // `batch` is either a shared reference or an owned allocations. + match batch { + RefOrMut::Ref(reference) => { + // This is a moment at which we could capture the allocations backing + // `batch` into a different form of region, rather than just cloning. + let mut owned: Vec<_> = self.sorter.empty(); + owned.clone_from(reference); + self.sorter.push(&mut owned); + }, + RefOrMut::Mut(reference) => { + self.sorter.push(reference); + } + } + } + + // Sealing a batch means finding those updates with times not greater or equal to any time + // in `upper`. All updates must have time greater or equal to the previously used `upper`, + // which we call `lower`, by assumption that after sealing a batcher we receive no more + // updates with times not greater or equal to `upper`. + #[inline(never)] + fn seal(&mut self, upper: Antichain) -> B { + + let mut builder = B::Builder::new(); + + let mut merged = Vec::new(); + self.sorter.finish_into(&mut merged); + + let mut kept = Vec::new(); + let mut keep = Vec::new(); + + self.frontier.clear(); + + // TODO: Re-use buffer, rather than dropping. + for mut buffer in merged.drain(..) { + for ((key, val), time, diff) in buffer.drain(..) { + if upper.less_equal(&time) { + self.frontier.insert(time.clone()); + if keep.len() == keep.capacity() { + if keep.len() > 0 { + kept.push(keep); + keep = self.sorter.empty(); + } + } + keep.push(((key, val), time, diff)); + } + else { + builder.push((key, val, time, diff)); + } + } + // Recycling buffer. + self.sorter.push(&mut buffer); + } + + // Finish the kept data. + if keep.len() > 0 { + kept.push(keep); + } + if kept.len() > 0 { + self.sorter.push_list(kept); + } + + // Drain buffers (fast reclaimation). + // TODO : This isn't obviously the best policy, but "safe" wrt footprint. + // In particular, if we are reading serialized input data, we may + // prefer to keep these buffers around to re-fill, if possible. + let mut buffer = Vec::new(); + self.sorter.push(&mut buffer); + // We recycle buffers with allocations (capacity, and not zero-sized). + while buffer.capacity() > 0 && std::mem::size_of::<((B::Key,B::Val),B::Time,B::R)>() > 0 { + buffer = Vec::new(); + self.sorter.push(&mut buffer); + } + + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + self.lower = upper; + seal + } + + // the frontier of elements remaining after the most recent call to `self.seal`. + fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { + self.frontier.borrow() + } +} + +struct MergeSorter { + queue: Vec>>, // each power-of-two length list of allocations. + stash: Vec>, +} + +impl MergeSorter { + + const BUFFER_SIZE_BYTES: usize = 1 << 13; + + fn buffer_size() -> usize { + let size = ::std::mem::size_of::<(D, T, R)>(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } + + #[inline] + pub fn new() -> Self { MergeSorter { queue: Vec::new(), stash: Vec::new() } } + + #[inline] + pub fn empty(&mut self) -> Vec<(D, T, R)> { + self.stash.pop().unwrap_or_else(|| Vec::with_capacity(Self::buffer_size())) + } + + #[inline(never)] + pub fn _sort(&mut self, list: &mut Vec>) { + for mut batch in list.drain(..) { + self.push(&mut batch); + } + self.finish_into(list); + } + + #[inline] + pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) { + // TODO: Reason about possible unbounded stash growth. How to / should we return them? + // TODO: Reason about mis-sized vectors, from deserialized data; should probably drop. + let mut batch = if self.stash.len() > 2 { + ::std::mem::replace(batch, self.stash.pop().unwrap()) + } + else { + ::std::mem::replace(batch, Vec::new()) + }; + + if batch.len() > 0 { + crate::consolidation::consolidate_updates(&mut batch); + self.queue.push(vec![batch]); + while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { + let list1 = self.queue.pop().unwrap(); + let list2 = self.queue.pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue.push(merged); + } + } + } + + // This is awkward, because it isn't a power-of-two length any more, and we don't want + // to break it down to be so. + pub fn push_list(&mut self, list: Vec>) { + while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { + let list1 = self.queue.pop().unwrap(); + let list2 = self.queue.pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue.push(merged); + } + self.queue.push(list); + } + + #[inline(never)] + pub fn finish_into(&mut self, target: &mut Vec>) { + while self.queue.len() > 1 { + let list1 = self.queue.pop().unwrap(); + let list2 = self.queue.pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue.push(merged); + } + + if let Some(mut last) = self.queue.pop() { + ::std::mem::swap(&mut last, target); + } + } + + // merges two sorted input lists into one sorted output list. + #[inline(never)] + fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { + + use std::cmp::Ordering; + + // TODO: `list1` and `list2` get dropped; would be better to reuse? + let mut output = Vec::with_capacity(list1.len() + list2.len()); + let mut result = self.empty(); + + let mut list1 = list1.into_iter(); + let mut list2 = list2.into_iter(); + + let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); + let mut head2 = VecDeque::from(list2.next().unwrap_or_default()); + + // while we have valid data in each input, merge. + while !head1.is_empty() && !head2.is_empty() { + + while (result.capacity() - result.len()) > 0 && head1.len() > 0 && head2.len() > 0 { + + let cmp = { + let x = head1.front().unwrap(); + let y = head2.front().unwrap(); + (&x.0, &x.1).cmp(&(&y.0, &y.1)) + }; + match cmp { + Ordering::Less => result.push(head1.pop_front().unwrap()), + Ordering::Greater => result.push(head2.pop_front().unwrap()), + Ordering::Equal => { + let (data1, time1, mut diff1) = head1.pop_front().unwrap(); + let (_data2, _time2, diff2) = head2.pop_front().unwrap(); + diff1.plus_equals(&diff2); + if !diff1.is_zero() { + result.push((data1, time1, diff1)); + } + } + } + } + + if result.capacity() == result.len() { + output.push(result); + result = self.empty(); + } + + if head1.is_empty() { + let done1 = Vec::from(head1); + if done1.capacity() == Self::buffer_size() { self.stash.push(done1); } + head1 = VecDeque::from(list1.next().unwrap_or_default()); + } + if head2.is_empty() { + let done2 = Vec::from(head2); + if done2.capacity() == Self::buffer_size() { self.stash.push(done2); } + head2 = VecDeque::from(list2.next().unwrap_or_default()); + } + } + + if result.len() > 0 { output.push(result); } + else if result.capacity() > 0 { self.stash.push(result); } + + if !head1.is_empty() { + let mut result = self.empty(); + for item1 in head1 { result.push(item1); } + output.push(result); + } + output.extend(list1); + + if !head2.is_empty() { + let mut result = self.empty(); + for item2 in head2 { result.push(item2); } + output.push(result); + } + output.extend(list2); + + output + } +} From 0c5b0f33b7a4741d4aabf3d544e62cb955a2a187 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 14 Nov 2023 10:50:49 -0500 Subject: [PATCH 2/8] Merge batcher col updates --- .../implementations/merge_batcher_col.rs | 244 +++++++++++------- src/trace/implementations/mod.rs | 1 + 2 files changed, 158 insertions(+), 87 deletions(-) diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 00b01501f..2805ff773 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -1,8 +1,8 @@ -//! A general purpose `Batcher` implementation based on radix sort. - -use std::collections::VecDeque; +//! A general purpose `Batcher` implementation based on radix sort for TimelyStack. +use timely::Container; use timely::communication::message::RefOrMut; +use timely::container::columnation::{Columnation, TimelyStack}; use timely::progress::frontier::Antichain; use ::difference::Semigroup; @@ -11,40 +11,43 @@ use lattice::Lattice; use trace::{Batch, Batcher, Builder}; /// Creates batches from unordered tuples. -pub struct MergeBatcher where B::Key: Ord, B::Val: Ord, B::Time: Ord, B::R: Semigroup { - sorter: MergeSorter<(B::Key, B::Val), B::Time, B::R>, +pub struct ColumnatedMergeBatcher + where + B::Key: Ord+Clone+Columnation, + B::Val: Ord+Clone+Columnation, + B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation, + B::R: Semigroup+Columnation, +{ + sorter: MergeSorterColumnation<(B::Key, B::Val), B::Time, B::R>, lower: Antichain, frontier: Antichain, phantom: ::std::marker::PhantomData, } -impl Batcher for MergeBatcher -where - B: Batch, - B::Key: Ord+Clone, - B::Val: Ord+Clone, - B::Time: Lattice+timely::progress::Timestamp+Ord+Clone, - B::R: Semigroup, +impl Batcher for ColumnatedMergeBatcher + where + B::Key: Ord+Clone+Columnation+'static, + B::Val: Ord+Clone+Columnation+'static, + B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Columnation+'static, + B::R: Semigroup+Columnation+'static, { fn new() -> Self { - MergeBatcher { - sorter: MergeSorter::new(), + ColumnatedMergeBatcher { + sorter: MergeSorterColumnation::new(), frontier: Antichain::new(), lower: Antichain::from_elem(::minimum()), phantom: ::std::marker::PhantomData, } } - #[inline(never)] - fn push_batch(&mut self, batch: RefOrMut>) { + #[inline] + fn push_batch(&mut self, batch: RefOrMut>) { // `batch` is either a shared reference or an owned allocations. match batch { RefOrMut::Ref(reference) => { // This is a moment at which we could capture the allocations backing // `batch` into a different form of region, rather than just cloning. - let mut owned: Vec<_> = self.sorter.empty(); - owned.clone_from(reference); - self.sorter.push(&mut owned); + self.sorter.push(&mut reference.clone()); }, RefOrMut::Mut(reference) => { self.sorter.push(reference); @@ -56,23 +59,23 @@ where // in `upper`. All updates must have time greater or equal to the previously used `upper`, // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. - #[inline(never)] + #[inline] fn seal(&mut self, upper: Antichain) -> B { let mut builder = B::Builder::new(); - let mut merged = Vec::new(); + let mut merged = Default::default(); self.sorter.finish_into(&mut merged); let mut kept = Vec::new(); - let mut keep = Vec::new(); + let mut keep = TimelyStack::default(); self.frontier.clear(); - // TODO: Re-use buffer, rather than dropping. for mut buffer in merged.drain(..) { - for ((key, val), time, diff) in buffer.drain(..) { - if upper.less_equal(&time) { + let mut last: Option<&((_, _), _, _)> = None; + for datum @ ((key, val), time, diff) in &buffer[..] { + if upper.less_equal(time) { self.frontier.insert(time.clone()); if keep.len() == keep.capacity() { if keep.len() > 0 { @@ -80,14 +83,19 @@ where keep = self.sorter.empty(); } } - keep.push(((key, val), time, diff)); + keep.copy(datum); } else { - builder.push((key, val, time, diff)); + builder.push((key.clone(), val.clone(), time.clone(), diff.clone())); } + if let Some(last) = last { + assert!(last <= datum); + } + last = Some(datum); } // Recycling buffer. - self.sorter.push(&mut buffer); + buffer.clear(); + self.sorter.recycle(buffer); } // Finish the kept data. @@ -99,16 +107,7 @@ where } // Drain buffers (fast reclaimation). - // TODO : This isn't obviously the best policy, but "safe" wrt footprint. - // In particular, if we are reading serialized input data, we may - // prefer to keep these buffers around to re-fill, if possible. - let mut buffer = Vec::new(); - self.sorter.push(&mut buffer); - // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((B::Key,B::Val),B::Time,B::R)>() > 0 { - buffer = Vec::new(); - self.sorter.push(&mut buffer); - } + self.sorter.clear_stash(); let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); self.lower = upper; @@ -121,16 +120,56 @@ where } } -struct MergeSorter { - queue: Vec>>, // each power-of-two length list of allocations. - stash: Vec>, +pub struct TimelyStackQueue { + list: TimelyStack, + head: usize, +} + +impl TimelyStackQueue { + #[inline] + pub fn new() -> Self { TimelyStackQueue::from(Default::default()) } + #[inline] + pub fn pop(&mut self) -> &T { + self.head += 1; + &self.list[self.head - 1] + } + #[inline] + pub fn peek(&self) -> &T { + &self.list[self.head] + } + #[inline] + pub fn from(list: TimelyStack) -> Self { + TimelyStackQueue { + list, + head: 0, + } + } + #[inline] + pub fn done(mut self) -> TimelyStack { + self.list.clear(); + self.list + } + #[inline] + pub fn len(&self) -> usize { self.list.len() - self.head } + #[inline] + pub fn is_empty(&self) -> bool { self.head == self.list.len() } + + fn iter(&self) -> impl Iterator + Clone + ExactSizeIterator { + self.list.iter().skip(self.head) + } +} + +pub struct MergeSorterColumnation { + queue: Vec>>, // each power-of-two length list of allocations. + stash: Vec>, + pending: Vec<(D, T, R)>, } -impl MergeSorter { +impl MergeSorterColumnation { - const BUFFER_SIZE_BYTES: usize = 1 << 13; + const BUFFER_SIZE_BYTES: usize = 64 << 10; - fn buffer_size() -> usize { + const fn buffer_size() -> usize { let size = ::std::mem::size_of::<(D, T, R)>(); if size == 0 { Self::BUFFER_SIZE_BYTES @@ -142,35 +181,58 @@ impl MergeSorter { } #[inline] - pub fn new() -> Self { MergeSorter { queue: Vec::new(), stash: Vec::new() } } + pub fn new() -> Self { + Self { + queue: Vec::new(), + stash: Vec::new(), + pending: Vec::new() + } + } #[inline] - pub fn empty(&mut self) -> Vec<(D, T, R)> { - self.stash.pop().unwrap_or_else(|| Vec::with_capacity(Self::buffer_size())) + pub fn empty(&mut self) -> TimelyStack<(D, T, R)> { + self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) } - #[inline(never)] - pub fn _sort(&mut self, list: &mut Vec>) { - for mut batch in list.drain(..) { - self.push(&mut batch); + pub fn clear_stash(&mut self) { + self.stash.clear(); + } + + #[inline] + pub fn recycle(&mut self, buffer: TimelyStack<(D, T, R)>) { + assert!(buffer.is_empty()); + if buffer.capacity() == Self::buffer_size() { + self.stash.push(buffer); } - self.finish_into(list); } #[inline] pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) { - // TODO: Reason about possible unbounded stash growth. How to / should we return them? - // TODO: Reason about mis-sized vectors, from deserialized data; should probably drop. - let mut batch = if self.stash.len() > 2 { - ::std::mem::replace(batch, self.stash.pop().unwrap()) + if self.pending.capacity() == 0 { + self.pending.reserve(Self::buffer_size()); + } + + while batch.len() > 0 { + self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()))); + if self.pending.len() == self.pending.capacity() { + crate::consolidation::consolidate_updates(&mut self.pending); + if self.pending.len() > self.pending.capacity() / 2 { + self.flush_pending(); + } + } } - else { - ::std::mem::replace(batch, Vec::new()) - }; + } - if batch.len() > 0 { - crate::consolidation::consolidate_updates(&mut batch); - self.queue.push(vec![batch]); + #[inline] + fn flush_pending(&mut self) { + if self.pending.len() > 0 { + let mut stack = self.empty(); + stack.reserve_items(self.pending.iter()); + for tuple in self.pending.iter() { + stack.copy(tuple); + } + self.pending.clear(); + self.queue.push(vec![stack]); while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { let list1 = self.queue.pop().unwrap(); let list2 = self.queue.pop().unwrap(); @@ -182,7 +244,7 @@ impl MergeSorter { // This is awkward, because it isn't a power-of-two length any more, and we don't want // to break it down to be so. - pub fn push_list(&mut self, list: Vec>) { + pub fn push_list(&mut self, list: Vec>) { while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { let list1 = self.queue.pop().unwrap(); let list2 = self.queue.pop().unwrap(); @@ -192,8 +254,10 @@ impl MergeSorter { self.queue.push(list); } - #[inline(never)] - pub fn finish_into(&mut self, target: &mut Vec>) { + #[inline] + pub fn finish_into(&mut self, target: &mut Vec>) { + crate::consolidation::consolidate_updates(&mut self.pending); + self.flush_pending(); while self.queue.len() > 1 { let list1 = self.queue.pop().unwrap(); let list2 = self.queue.pop().unwrap(); @@ -208,7 +272,7 @@ impl MergeSorter { // merges two sorted input lists into one sorted output list. #[inline(never)] - fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { + fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { use std::cmp::Ordering; @@ -216,11 +280,11 @@ impl MergeSorter { let mut output = Vec::with_capacity(list1.len() + list2.len()); let mut result = self.empty(); - let mut list1 = list1.into_iter(); - let mut list2 = list2.into_iter(); + let mut list1 = list1.into_iter().peekable(); + let mut list2 = list2.into_iter().peekable(); - let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); - let mut head2 = VecDeque::from(list2.next().unwrap_or_default()); + let mut head1 = if list1.peek().is_some() { TimelyStackQueue::from(list1.next().unwrap()) } else { TimelyStackQueue::new() }; + let mut head2 = if list2.peek().is_some() { TimelyStackQueue::from(list2.next().unwrap()) } else { TimelyStackQueue::new() }; // while we have valid data in each input, merge. while !head1.is_empty() && !head2.is_empty() { @@ -228,19 +292,20 @@ impl MergeSorter { while (result.capacity() - result.len()) > 0 && head1.len() > 0 && head2.len() > 0 { let cmp = { - let x = head1.front().unwrap(); - let y = head2.front().unwrap(); + let x = head1.peek(); + let y = head2.peek(); (&x.0, &x.1).cmp(&(&y.0, &y.1)) }; match cmp { - Ordering::Less => result.push(head1.pop_front().unwrap()), - Ordering::Greater => result.push(head2.pop_front().unwrap()), + Ordering::Less => { result.copy(head1.pop()); } + Ordering::Greater => { result.copy(head2.pop()); } Ordering::Equal => { - let (data1, time1, mut diff1) = head1.pop_front().unwrap(); - let (_data2, _time2, diff2) = head2.pop_front().unwrap(); + let (data1, time1, diff1) = head1.pop(); + let (_data2, _time2, diff2) = head2.pop(); + let mut diff1 = diff1.clone(); diff1.plus_equals(&diff2); if !diff1.is_zero() { - result.push((data1, time1, diff1)); + result.copy_destructured(data1, time1, &diff1); } } } @@ -252,30 +317,35 @@ impl MergeSorter { } if head1.is_empty() { - let done1 = Vec::from(head1); - if done1.capacity() == Self::buffer_size() { self.stash.push(done1); } - head1 = VecDeque::from(list1.next().unwrap_or_default()); + let done1 = head1.done(); + self.recycle(done1); + head1 = if list1.peek().is_some() { TimelyStackQueue::from(list1.next().unwrap()) } else { TimelyStackQueue::new() }; } if head2.is_empty() { - let done2 = Vec::from(head2); - if done2.capacity() == Self::buffer_size() { self.stash.push(done2); } - head2 = VecDeque::from(list2.next().unwrap_or_default()); + let done2 = head2.done(); + self.recycle(done2); + head2 = if list2.peek().is_some() { TimelyStackQueue::from(list2.next().unwrap()) } else { TimelyStackQueue::new() }; } } - if result.len() > 0 { output.push(result); } - else if result.capacity() > 0 { self.stash.push(result); } + if result.len() > 0 { + output.push(result); + } else { + self.recycle(result); + } if !head1.is_empty() { let mut result = self.empty(); - for item1 in head1 { result.push(item1); } + result.reserve_items(head1.iter()); + for _ in 0 .. head1.len() { result.copy(head1.pop()); } output.push(result); } output.extend(list1); if !head2.is_empty() { let mut result = self.empty(); - for item2 in head2 { result.push(item2); } + result.reserve_items(head2.iter()); + for _ in 0 .. head2.len() { result.copy(head2.pop()); } output.push(result); } output.extend(list2); diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 4eee120de..8bbe4f311 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -41,6 +41,7 @@ pub mod spine_fueled; mod merge_batcher; +pub(crate) mod merge_batcher_col; pub use self::merge_batcher::MergeBatcher as Batcher; From c584dd48e0eed933e51e73cb6d71bb784eefe3b1 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 14 Nov 2023 10:10:47 -0500 Subject: [PATCH 3/8] Introduce update type on Vector/TStack layouts Signed-off-by: Moritz Hoffmann --- src/trace/implementations/ord.rs | 76 +++++++++++++++++++++----------- tests/trace.rs | 4 +- 2 files changed, 53 insertions(+), 27 deletions(-) diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 989c274af..6187b8ba9 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -12,7 +12,6 @@ use std::rc::Rc; use std::convert::{TryFrom, TryInto}; use std::marker::PhantomData; use std::fmt::Debug; -use std::ops::Deref; use timely::container::columnation::TimelyStack; use timely::container::columnation::Columnation; @@ -36,6 +35,7 @@ use super::spine_fueled::Spine; use super::merge_batcher::MergeBatcher; use abomonation::abomonated::Abomonated; +use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher; /// A type that names constituent update types. pub trait Update { @@ -44,7 +44,7 @@ pub trait Update { /// Values associated with the key. type Val: Ord+Clone; /// Time at which updates occur. - type Time: Ord+Lattice+timely::progress::Timestamp+Clone; + type Time: Lattice+timely::progress::Timestamp; /// Way in which updates occur. type Diff: Semigroup+Clone; } @@ -73,12 +73,10 @@ pub trait Layout { /// Container for update keys. type KeyContainer: BatchContainer::Key> - +Deref::Key]> +RetainFrom<::Key>; /// Container for update vals. type ValContainer: BatchContainer::Val> - +Deref::Val]> +RetainFrom<::Val>; } @@ -114,10 +112,10 @@ where /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine>>>; +pub type OrdValSpine = Spine, Vec<((K,V),T,R)>>>>; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine>, Vec>>>; +pub type OrdValSpineAbom = Spine, Vec<((K,V),T,R)>>, Vec>>>; /// A trace implementation for empty values using a spine of ordered lists. pub type OrdKeySpine = Spine>>>; @@ -126,7 +124,7 @@ pub type OrdKeySpine = Spine = Spine>, Vec>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine>>>; +pub type ColValSpine = Spine, TimelyStack<((K,V),T,R)>>>>; /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine>>>; @@ -163,11 +161,13 @@ impl RetainFrom for TimelyStack { /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Abomonation)] -pub struct OrdValBatch { +pub struct OrdValBatch { /// Where all the dataz is. pub layer: KVTDLayer, /// Description of the update times this layer represents. pub desc: Description<::Time>, + /// Phantom data + pub phantom: PhantomData, } // Type aliases to make certain types readable. @@ -180,19 +180,20 @@ type VTDBuilder = OrderedBuilder<<::Target as Update>::Val, TDBu type KTDBuilder = OrderedBuilder<<::Target as Update>::Key, TDBuilder, ::KeyOffset, ::KeyContainer>; type KVTDBuilder = OrderedBuilder<<::Target as Update>::Key, VTDBuilder, ::KeyOffset, ::KeyContainer>; -impl BatchReader for OrdValBatch { +impl BatchReader for OrdValBatch { type Key = ::Key; type Val = ::Val; type Time = ::Time; type R = ::Diff; - type Cursor = OrdValCursor; + type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { OrdValCursor { cursor: self.layer.cursor(), phantom: std::marker::PhantomData } } fn len(&self) -> usize { as Trie>::tuples(&self.layer) } fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdValBatch { +impl Batch for OrdValBatch> +{ type Batcher = MergeBatcher; type Builder = OrdValBuilder; type Merger = OrdValMerger; @@ -202,7 +203,24 @@ impl Batch for OrdValBatch { } } -impl OrdValBatch { +impl Batch for OrdValBatch> +where + ::Target: Columnation, + Self::Key: Columnation + 'static, + Self::Val: Columnation + 'static, + Self::Time: Columnation + 'static, + Self::R: Columnation + 'static, +{ + type Batcher = ColumnatedMergeBatcher; + type Builder = OrdValBuilder; + type Merger = OrdValMerger; + + fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { + OrdValMerger::new(self, other, compaction_frontier) + } +} + +impl OrdValBatch { fn advance_builder_from(layer: &mut KVTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; @@ -305,8 +323,11 @@ pub struct OrdValMerger { should_compact: bool, } -impl Merger> for OrdValMerger { - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option::Time>>) -> Self { +impl Merger> for OrdValMerger +where + OrdValBatch: Batch::Time> +{ + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option as BatchReader>::Time>>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -327,7 +348,7 @@ impl Merger> for OrdValMerger { should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdValBatch { + fn done(self) -> OrdValBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -335,9 +356,10 @@ impl Merger> for OrdValMerger { OrdValBatch { layer: self.result.done(), desc: self.description, + phantom: PhantomData, } } - fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdValBatch, source2: &OrdValBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.vals.len(); let mut effort = 0isize; @@ -376,7 +398,7 @@ impl Merger> for OrdValMerger { // if we are supplied a frontier, we should compact. if self.should_compact { - OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -388,18 +410,18 @@ impl Merger> for OrdValMerger { } /// A cursor for navigating a single layer. -pub struct OrdValCursor { - phantom: std::marker::PhantomData, +pub struct OrdValCursor { + phantom: std::marker::PhantomData<(L, C)>, cursor: OrderedCursor>, } -impl Cursor for OrdValCursor { +impl Cursor for OrdValCursor { type Key = ::Key; type Val = ::Val; type Time = ::Time; type R = ::Diff; - type Storage = OrdValBatch; + type Storage = OrdValBatch; fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { &self.cursor.child.key(&storage.layer.vals) } @@ -426,7 +448,10 @@ pub struct OrdValBuilder { } -impl Builder> for OrdValBuilder { +impl Builder> for OrdValBuilder +where + OrdValBatch: Batch::Key, Val=::Val, Time=::Time, R=::Diff> +{ fn new() -> Self { OrdValBuilder { @@ -440,15 +465,16 @@ impl Builder> for OrdValBuilder { } #[inline] - fn push(&mut self, (key, val, time, diff): (::Key, ::Val, ::Time, ::Diff)) { + fn push(&mut self, (key, val, time, diff): ( as BatchReader>::Key, as BatchReader>::Val, as BatchReader>::Time, as BatchReader>::R)) { self.builder.push_tuple((key, (val, (time, diff)))); } #[inline(never)] - fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdValBatch { + fn done(self, lower: Antichain< as BatchReader>::Time>, upper: Antichain< as BatchReader>::Time>, since: Antichain< as BatchReader>::Time>) -> OrdValBatch { OrdValBatch { layer: self.builder.done(), - desc: Description::new(lower, upper, since) + desc: Description::new(lower, upper, since), + phantom: PhantomData, } } } diff --git a/tests/trace.rs b/tests/trace.rs index 1d830e094..6cf59b608 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -11,11 +11,11 @@ use differential_dataflow::trace::{Trace, TraceReader, Batch, Batcher}; use differential_dataflow::trace::cursor::Cursor; use differential_dataflow::trace::implementations::spine_fueled::Spine; -pub type OrdValSpine = Spine>>>; +pub type OrdValSpine = Spine, Vec<((K, V), T, R)>>>>; type IntegerTrace = OrdValSpine; -fn get_trace() -> Spine>>> { +fn get_trace() -> Spine, Vec<((u64, u64), usize, i64)>>>> { let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None, None); { From 658287dfff6071b05ecb7fb035ad9cbcb2bcd236 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 14 Nov 2023 10:51:04 -0500 Subject: [PATCH 4/8] cleanup Signed-off-by: Moritz Hoffmann --- src/trace/implementations/merge_batcher.rs | 37 +--------------------- 1 file changed, 1 insertion(+), 36 deletions(-) diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 9fc2fb14e..00b01501f 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -121,7 +121,7 @@ where } } -pub struct MergeSorter { +struct MergeSorter { queue: Vec>>, // each power-of-two length list of allocations. stash: Vec>, } @@ -283,38 +283,3 @@ impl MergeSorter { output } } - -/// Reports the number of elements satisfing the predicate. -/// -/// This methods *relies strongly* on the assumption that the predicate -/// stays false once it becomes false, a joint property of the predicate -/// and the slice. This allows `advance` to use exponential search to -/// count the number of elements in time logarithmic in the result. -#[inline] -pub fn _advancebool>(slice: &[T], function: F) -> usize { - - // start with no advance - let mut index = 0; - if index < slice.len() && function(&slice[index]) { - - // advance in exponentially growing steps. - let mut step = 1; - while index + step < slice.len() && function(&slice[index + step]) { - index += step; - step = step << 1; - } - - // advance in exponentially shrinking steps. - step = step >> 1; - while step > 0 { - if index + step < slice.len() && function(&slice[index + step]) { - index += step; - } - step = step >> 1; - } - - index += 1; - } - - index -} From f40889584c99dd69ccb5508d0e7e5f614c5c05d4 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 17 Nov 2023 14:11:54 -0500 Subject: [PATCH 5/8] Cleanup of merge_batcher_col Signed-off-by: Moritz Hoffmann --- .../implementations/merge_batcher_col.rs | 90 +++++++++---------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 2805ff773..d6160df92 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -1,5 +1,6 @@ //! A general purpose `Batcher` implementation based on radix sort for TimelyStack. +use std::marker::PhantomData; use timely::Container; use timely::communication::message::RefOrMut; use timely::container::columnation::{Columnation, TimelyStack}; @@ -21,7 +22,7 @@ pub struct ColumnatedMergeBatcher sorter: MergeSorterColumnation<(B::Key, B::Val), B::Time, B::R>, lower: Antichain, frontier: Antichain, - phantom: ::std::marker::PhantomData, + phantom: PhantomData, } impl Batcher for ColumnatedMergeBatcher @@ -36,7 +37,7 @@ impl Batcher for ColumnatedMergeBatcher sorter: MergeSorterColumnation::new(), frontier: Antichain::new(), lower: Antichain::from_elem(::minimum()), - phantom: ::std::marker::PhantomData, + phantom: PhantomData, } } @@ -77,11 +78,9 @@ impl Batcher for ColumnatedMergeBatcher for datum @ ((key, val), time, diff) in &buffer[..] { if upper.less_equal(time) { self.frontier.insert(time.clone()); - if keep.len() == keep.capacity() { - if keep.len() > 0 { - kept.push(keep); - keep = self.sorter.empty(); - } + if !keep.is_empty() && keep.len() == keep.capacity() { + kept.push(keep); + keep = self.sorter.empty(); } keep.copy(datum); } @@ -99,14 +98,14 @@ impl Batcher for ColumnatedMergeBatcher } // Finish the kept data. - if keep.len() > 0 { + if !keep.is_empty() { kept.push(keep); } - if kept.len() > 0 { + if !kept.is_empty() { self.sorter.push_list(kept); } - // Drain buffers (fast reclaimation). + // Drain buffers (fast reclamation). self.sorter.clear_stash(); let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); @@ -120,46 +119,47 @@ impl Batcher for ColumnatedMergeBatcher } } -pub struct TimelyStackQueue { +struct TimelyStackQueue { list: TimelyStack, head: usize, } impl TimelyStackQueue { - #[inline] - pub fn new() -> Self { TimelyStackQueue::from(Default::default()) } - #[inline] - pub fn pop(&mut self) -> &T { + + fn new() -> Self { TimelyStackQueue::from(Default::default()) } + + fn pop(&mut self) -> &T { self.head += 1; &self.list[self.head - 1] } - #[inline] - pub fn peek(&self) -> &T { + + fn peek(&self) -> &T { &self.list[self.head] } - #[inline] - pub fn from(list: TimelyStack) -> Self { + + fn from(list: TimelyStack) -> Self { TimelyStackQueue { list, head: 0, } } - #[inline] - pub fn done(mut self) -> TimelyStack { + + fn done(mut self) -> TimelyStack { self.list.clear(); self.list } - #[inline] - pub fn len(&self) -> usize { self.list.len() - self.head } - #[inline] - pub fn is_empty(&self) -> bool { self.head == self.list.len() } + fn len(&self) -> usize { self.list.len() - self.head } + + fn is_empty(&self) -> bool { self.head == self.list.len() } + + /// Return an iterator over the remaining elements. fn iter(&self) -> impl Iterator + Clone + ExactSizeIterator { self.list.iter().skip(self.head) } } -pub struct MergeSorterColumnation { +struct MergeSorterColumnation { queue: Vec>>, // each power-of-two length list of allocations. stash: Vec>, pending: Vec<(D, T, R)>, @@ -169,8 +169,9 @@ impl usize { - let size = ::std::mem::size_of::<(D, T, R)>(); + let size = std::mem::size_of::<(D, T, R)>(); if size == 0 { Self::BUFFER_SIZE_BYTES } else if size <= Self::BUFFER_SIZE_BYTES { @@ -180,8 +181,7 @@ impl Self { + fn new() -> Self { Self { queue: Vec::new(), stash: Vec::new(), @@ -189,30 +189,29 @@ impl TimelyStack<(D, T, R)> { + fn empty(&mut self) -> TimelyStack<(D, T, R)> { self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) } - pub fn clear_stash(&mut self) { + /// Remove all elements from the stash. + fn clear_stash(&mut self) { self.stash.clear(); } - #[inline] - pub fn recycle(&mut self, buffer: TimelyStack<(D, T, R)>) { + /// Insert an empty buffer into the stash. Panics if the buffer is not empty. + fn recycle(&mut self, buffer: TimelyStack<(D, T, R)>) { assert!(buffer.is_empty()); if buffer.capacity() == Self::buffer_size() { self.stash.push(buffer); } } - #[inline] - pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) { + fn push(&mut self, batch: &mut Vec<(D, T, R)>) { if self.pending.capacity() == 0 { self.pending.reserve(Self::buffer_size()); } - while batch.len() > 0 { + while !batch.is_empty() { self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()))); if self.pending.len() == self.pending.capacity() { crate::consolidation::consolidate_updates(&mut self.pending); @@ -223,9 +222,10 @@ impl 0 { + if !self.pending.is_empty() { let mut stack = self.empty(); stack.reserve_items(self.pending.iter()); for tuple in self.pending.iter() { @@ -244,7 +244,7 @@ impl>) { + fn push_list(&mut self, list: Vec>) { while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { let list1 = self.queue.pop().unwrap(); let list2 = self.queue.pop().unwrap(); @@ -254,8 +254,7 @@ impl>) { + fn finish_into(&mut self, target: &mut Vec>) { crate::consolidation::consolidate_updates(&mut self.pending); self.flush_pending(); while self.queue.len() > 1 { @@ -266,12 +265,11 @@ impl>, list2: Vec>) -> Vec> { use std::cmp::Ordering; @@ -289,7 +287,7 @@ impl 0 && head1.len() > 0 && head2.len() > 0 { + while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { let cmp = { let x = head1.peek(); @@ -303,7 +301,7 @@ impl Date: Fri, 17 Nov 2023 15:22:01 -0500 Subject: [PATCH 6/8] ColKeySpine with TimelyStack in the merge batcher Signed-off-by: Moritz Hoffmann --- src/trace/implementations/ord.rs | 66 ++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 20 deletions(-) diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 6187b8ba9..331a4c305 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -118,15 +118,15 @@ pub type OrdValSpine = Spine = Spine, Vec<((K,V),T,R)>>, Vec>>>; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine = Spine>>>; +pub type OrdKeySpine = Spine, Vec<((K,()),T,R)>>>>; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom = Spine>, Vec>>>; +pub type OrdKeySpineAbom = Spine, Vec<((K,()),T,R)>>, Vec>>>; /// A trace implementation backed by columnar storage. pub type ColValSpine = Spine, TimelyStack<((K,V),T,R)>>>>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine>>>; +pub type ColKeySpine = Spine, TimelyStack<((K,()),T,R)>>>>; /// A container that can retain/discard from some offset onward. @@ -484,20 +484,22 @@ where /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Abomonation)] -pub struct OrdKeyBatch { +pub struct OrdKeyBatch { /// Where all the dataz is. pub layer: KTDLayer, /// Description of the update times this layer represents. pub desc: Description<::Time>, + /// Phantom data + pub phantom: PhantomData, } -impl BatchReader for OrdKeyBatch { +impl BatchReader for OrdKeyBatch { type Key = ::Key; type Val = (); type Time = ::Time; type R = ::Diff; - type Cursor = OrdKeyCursor; + type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { valid: true, @@ -509,7 +511,7 @@ impl BatchReader for OrdKeyBatch { fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdKeyBatch { +impl Batch for OrdKeyBatch> { type Batcher = MergeBatcher; type Builder = OrdKeyBuilder; type Merger = OrdKeyMerger; @@ -519,7 +521,23 @@ impl Batch for OrdKeyBatch { } } -impl OrdKeyBatch { +impl Batch for OrdKeyBatch> +where + ::Target: Columnation + 'static, + Self::Key: Columnation + 'static, + Self::Time: Columnation + 'static, + Self::R: Columnation + 'static, +{ + type Batcher = ColumnatedMergeBatcher; + type Builder = OrdKeyBuilder; + type Merger = OrdKeyMerger; + + fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { + OrdKeyMerger::new(self, other, compaction_frontier) + } +} + +impl OrdKeyBatch { fn advance_builder_from(layer: &mut KTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; @@ -596,8 +614,11 @@ pub struct OrdKeyMerger { should_compact: bool, } -impl Merger> for OrdKeyMerger { - fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option::Time>>) -> Self { +impl Merger> for OrdKeyMerger +where + OrdKeyBatch: Batch::Time> +{ + fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option::Time>>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -618,7 +639,7 @@ impl Merger> for OrdKeyMerger { should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdKeyBatch { + fn done(self) -> OrdKeyBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -626,9 +647,10 @@ impl Merger> for OrdKeyMerger { OrdKeyBatch { layer: self.result.done(), desc: self.description, + phantom: PhantomData, } } - fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.len(); let mut effort = 0isize; @@ -673,7 +695,7 @@ impl Merger> for OrdKeyMerger { // if we are supplied a frontier, we should compact. if self.should_compact { - OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -687,19 +709,19 @@ impl Merger> for OrdKeyMerger { /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdKeyCursor { +pub struct OrdKeyCursor { valid: bool, cursor: OrderedCursor::Time, ::Diff>>, - phantom: PhantomData, + phantom: PhantomData<(L, C)>, } -impl Cursor for OrdKeyCursor { +impl Cursor for OrdKeyCursor { type Key = ::Key; type Val = (); type Time = ::Time; type R = ::Diff; - type Storage = OrdKeyBatch; + type Storage = OrdKeyBatch; fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } @@ -726,7 +748,10 @@ pub struct OrdKeyBuilder { builder: KTDBuilder, } -impl Builder> for OrdKeyBuilder { +impl Builder> for OrdKeyBuilder +where + OrdKeyBatch: Batch::Key, Val=(), Time=::Time, R=::Diff> +{ fn new() -> Self { OrdKeyBuilder { @@ -746,10 +771,11 @@ impl Builder> for OrdKeyBuilder { } #[inline(never)] - fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdKeyBatch { + fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdKeyBatch { OrdKeyBatch { layer: self.builder.done(), - desc: Description::new(lower, upper, since) + desc: Description::new(lower, upper, since), + phantom: PhantomData, } } } From 516951652463e2e687c88d55d7035e2598df1295 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sun, 19 Nov 2023 20:30:18 -0500 Subject: [PATCH 7/8] Address comments Signed-off-by: Moritz Hoffmann --- .../implementations/merge_batcher_col.rs | 68 ++++++++++--------- src/trace/implementations/ord.rs | 3 + 2 files changed, 38 insertions(+), 33 deletions(-) diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index d6160df92..775f56093 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -73,8 +73,7 @@ impl Batcher for ColumnatedMergeBatcher self.frontier.clear(); - for mut buffer in merged.drain(..) { - let mut last: Option<&((_, _), _, _)> = None; + for buffer in merged.drain(..) { for datum @ ((key, val), time, diff) in &buffer[..] { if upper.less_equal(time) { self.frontier.insert(time.clone()); @@ -87,13 +86,8 @@ impl Batcher for ColumnatedMergeBatcher else { builder.push((key.clone(), val.clone(), time.clone(), diff.clone())); } - if let Some(last) = last { - assert!(last <= datum); - } - last = Some(datum); } // Recycling buffer. - buffer.clear(); self.sorter.recycle(buffer); } @@ -124,9 +118,15 @@ struct TimelyStackQueue { head: usize, } -impl TimelyStackQueue { +impl Default for TimelyStackQueue { + fn default() -> Self { + Self::empty() + } +} + +impl TimelyStackQueue { - fn new() -> Self { TimelyStackQueue::from(Default::default()) } + fn empty() -> Self { TimelyStackQueue::from(Default::default()) } fn pop(&mut self) -> &T { self.head += 1; @@ -149,13 +149,11 @@ impl TimelyStackQueue { self.list } - fn len(&self) -> usize { self.list.len() - self.head } - - fn is_empty(&self) -> bool { self.head == self.list.len() } + fn is_empty(&self) -> bool { self.head == self.list[..].len() } /// Return an iterator over the remaining elements. fn iter(&self) -> impl Iterator + Clone + ExactSizeIterator { - self.list.iter().skip(self.head) + self.list[self.head..].iter() } } @@ -181,6 +179,11 @@ impl usize { + Self::buffer_size() * 2 + } + fn new() -> Self { Self { queue: Vec::new(), @@ -199,16 +202,17 @@ impl) { - assert!(buffer.is_empty()); + fn recycle(&mut self, mut buffer: TimelyStack<(D, T, R)>) { if buffer.capacity() == Self::buffer_size() { + buffer.clear(); self.stash.push(buffer); } } fn push(&mut self, batch: &mut Vec<(D, T, R)>) { - if self.pending.capacity() == 0 { - self.pending.reserve(Self::buffer_size()); + // Ensure `self.pending` has a capacity of `Self::pending_buffer_size`. + if self.pending.capacity() < Self::pending_buffer_size() { + self.pending.reserve(Self::pending_buffer_size() - self.pending.capacity()); } while !batch.is_empty() { @@ -216,6 +220,7 @@ impl self.pending.capacity() / 2 { + // Flush if `self.pending` is more than half full after consolidation. self.flush_pending(); } } @@ -223,15 +228,14 @@ impl 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { let list1 = self.queue.pop().unwrap(); @@ -278,11 +282,11 @@ impl RetainFrom for TimelyStack { } /// An immutable collection of update tuples, from a contiguous interval of logical times. +/// +/// The `L` parameter captures the updates should be laid out, and `C` determines which +/// merge batcher to select. #[derive(Abomonation)] pub struct OrdValBatch { /// Where all the dataz is. From 8fb8e6149485c6847cdede352d1eb47133b776ae Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sun, 19 Nov 2023 21:11:32 -0500 Subject: [PATCH 8/8] Simpler TimelyStackQueue Signed-off-by: Moritz Hoffmann --- src/trace/implementations/merge_batcher_col.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 775f56093..8a06fb68c 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -120,14 +120,12 @@ struct TimelyStackQueue { impl Default for TimelyStackQueue { fn default() -> Self { - Self::empty() + Self::from(Default::default()) } } impl TimelyStackQueue { - fn empty() -> Self { TimelyStackQueue::from(Default::default()) } - fn pop(&mut self) -> &T { self.head += 1; &self.list[self.head - 1] @@ -145,7 +143,6 @@ impl TimelyStackQueue { } fn done(mut self) -> TimelyStack { - self.list.clear(); self.list } @@ -285,8 +282,8 @@ impl