Skip to content

Commit

Permalink
Cleanup of merge_batcher_col
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Nov 19, 2023
1 parent 658287d commit f408895
Showing 1 changed file with 44 additions and 46 deletions.
90 changes: 44 additions & 46 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! A general purpose `Batcher` implementation based on radix sort for TimelyStack.
use std::marker::PhantomData;
use timely::Container;
use timely::communication::message::RefOrMut;
use timely::container::columnation::{Columnation, TimelyStack};
Expand All @@ -21,7 +22,7 @@ pub struct ColumnatedMergeBatcher<B: Batch>
sorter: MergeSorterColumnation<(B::Key, B::Val), B::Time, B::R>,
lower: Antichain<B::Time>,
frontier: Antichain<B::Time>,
phantom: ::std::marker::PhantomData<B>,
phantom: PhantomData<B>,
}

impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
Expand All @@ -36,7 +37,7 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
sorter: MergeSorterColumnation::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
phantom: ::std::marker::PhantomData,
phantom: PhantomData,
}
}

Expand Down Expand Up @@ -77,11 +78,9 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
for datum @ ((key, val), time, diff) in &buffer[..] {
if upper.less_equal(time) {
self.frontier.insert(time.clone());
if keep.len() == keep.capacity() {
if keep.len() > 0 {
kept.push(keep);
keep = self.sorter.empty();
}
if !keep.is_empty() && keep.len() == keep.capacity() {
kept.push(keep);
keep = self.sorter.empty();
}
keep.copy(datum);
}
Expand All @@ -99,14 +98,14 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
}

// Finish the kept data.
if keep.len() > 0 {
if !keep.is_empty() {
kept.push(keep);
}
if kept.len() > 0 {
if !kept.is_empty() {
self.sorter.push_list(kept);
}

// Drain buffers (fast reclaimation).
// Drain buffers (fast reclamation).
self.sorter.clear_stash();

let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()));
Expand All @@ -120,46 +119,47 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
}
}

pub struct TimelyStackQueue<T: Columnation> {
struct TimelyStackQueue<T: Columnation> {
list: TimelyStack<T>,
head: usize,
}

impl<T: Columnation + 'static> TimelyStackQueue<T> {
#[inline]
pub fn new() -> Self { TimelyStackQueue::from(Default::default()) }
#[inline]
pub fn pop(&mut self) -> &T {

fn new() -> Self { TimelyStackQueue::from(Default::default()) }

fn pop(&mut self) -> &T {
self.head += 1;
&self.list[self.head - 1]
}
#[inline]
pub fn peek(&self) -> &T {

fn peek(&self) -> &T {
&self.list[self.head]
}
#[inline]
pub fn from(list: TimelyStack<T>) -> Self {

fn from(list: TimelyStack<T>) -> Self {
TimelyStackQueue {
list,
head: 0,
}
}
#[inline]
pub fn done(mut self) -> TimelyStack<T> {

fn done(mut self) -> TimelyStack<T> {
self.list.clear();
self.list
}
#[inline]
pub fn len(&self) -> usize { self.list.len() - self.head }
#[inline]
pub fn is_empty(&self) -> bool { self.head == self.list.len() }

fn len(&self) -> usize { self.list.len() - self.head }

fn is_empty(&self) -> bool { self.head == self.list.len() }

/// Return an iterator over the remaining elements.
fn iter(&self) -> impl Iterator<Item=&T> + Clone + ExactSizeIterator {
self.list.iter().skip(self.head)
}
}

pub struct MergeSorterColumnation<D: Ord+Columnation, T: Ord+Columnation, R: Semigroup+Columnation> {
struct MergeSorterColumnation<D: Ord+Columnation, T: Ord+Columnation, R: Semigroup+Columnation> {
queue: Vec<Vec<TimelyStack<(D, T, R)>>>, // each power-of-two length list of allocations.
stash: Vec<TimelyStack<(D, T, R)>>,
pending: Vec<(D, T, R)>,
Expand All @@ -169,8 +169,9 @@ impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semi

const BUFFER_SIZE_BYTES: usize = 64 << 10;

/// Buffer size (number of elements) to use for new/empty buffers.
const fn buffer_size() -> usize {
let size = ::std::mem::size_of::<(D, T, R)>();
let size = std::mem::size_of::<(D, T, R)>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
} else if size <= Self::BUFFER_SIZE_BYTES {
Expand All @@ -180,39 +181,37 @@ impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semi
}
}

#[inline]
pub fn new() -> Self {
fn new() -> Self {
Self {
queue: Vec::new(),
stash: Vec::new(),
pending: Vec::new()
}
}

#[inline]
pub fn empty(&mut self) -> TimelyStack<(D, T, R)> {
fn empty(&mut self) -> TimelyStack<(D, T, R)> {
self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size()))
}

pub fn clear_stash(&mut self) {
/// Remove all elements from the stash.
fn clear_stash(&mut self) {
self.stash.clear();
}

#[inline]
pub fn recycle(&mut self, buffer: TimelyStack<(D, T, R)>) {
/// Insert an empty buffer into the stash. Panics if the buffer is not empty.
fn recycle(&mut self, buffer: TimelyStack<(D, T, R)>) {
assert!(buffer.is_empty());
if buffer.capacity() == Self::buffer_size() {
self.stash.push(buffer);
}
}

#[inline]
pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) {
fn push(&mut self, batch: &mut Vec<(D, T, R)>) {
if self.pending.capacity() == 0 {
self.pending.reserve(Self::buffer_size());
}

while batch.len() > 0 {
while !batch.is_empty() {
self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len())));
if self.pending.len() == self.pending.capacity() {
crate::consolidation::consolidate_updates(&mut self.pending);
Expand All @@ -223,9 +222,10 @@ impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semi
}
}

#[inline]
/// Move all elements in `pending` into `queue`. The data in `pending` must be compacted and
/// sorted.
fn flush_pending(&mut self) {
if self.pending.len() > 0 {
if !self.pending.is_empty() {
let mut stack = self.empty();
stack.reserve_items(self.pending.iter());
for tuple in self.pending.iter() {
Expand All @@ -244,7 +244,7 @@ impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semi

// This is awkward, because it isn't a power-of-two length any more, and we don't want
// to break it down to be so.
pub fn push_list(&mut self, list: Vec<TimelyStack<(D, T, R)>>) {
fn push_list(&mut self, list: Vec<TimelyStack<(D, T, R)>>) {
while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() {
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
Expand All @@ -254,8 +254,7 @@ impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semi
self.queue.push(list);
}

#[inline]
pub fn finish_into(&mut self, target: &mut Vec<TimelyStack<(D, T, R)>>) {
fn finish_into(&mut self, target: &mut Vec<TimelyStack<(D, T, R)>>) {
crate::consolidation::consolidate_updates(&mut self.pending);
self.flush_pending();
while self.queue.len() > 1 {
Expand All @@ -266,12 +265,11 @@ impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semi
}

if let Some(mut last) = self.queue.pop() {
::std::mem::swap(&mut last, target);
std::mem::swap(&mut last, target);
}
}

// merges two sorted input lists into one sorted output list.
#[inline(never)]
fn merge_by(&mut self, list1: Vec<TimelyStack<(D, T, R)>>, list2: Vec<TimelyStack<(D, T, R)>>) -> Vec<TimelyStack<(D, T, R)>> {

use std::cmp::Ordering;
Expand All @@ -289,7 +287,7 @@ impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semi
// while we have valid data in each input, merge.
while !head1.is_empty() && !head2.is_empty() {

while (result.capacity() - result.len()) > 0 && head1.len() > 0 && head2.len() > 0 {
while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {

let cmp = {
let x = head1.peek();
Expand All @@ -303,7 +301,7 @@ impl<D: Ord+Clone+Columnation+'static, T: Ord+Clone+Columnation+'static, R: Semi
let (data1, time1, diff1) = head1.pop();
let (_data2, _time2, diff2) = head2.pop();
let mut diff1 = diff1.clone();
diff1.plus_equals(&diff2);
diff1.plus_equals(diff2);
if !diff1.is_zero() {
result.copy_destructured(data1, time1, &diff1);
}
Expand Down

0 comments on commit f408895

Please sign in to comment.