Skip to content

Commit

Permalink
merge_batcher: use Rust iterators and VecDeque in place of VecQueue (T…
Browse files Browse the repository at this point in the history
…imelyDataflow#380)

The standard library offers `VecDeque` which also has cheap (i.e non
allocating) conversions to and from `Vec`s so we can use that directly
and reduce the number of unsafe calls in the project.

Note that while all uses of the API of `VecQueue` were correct its
methods were marked as safe but you could accidentally cause undefined
behavior:

```rust
let queue = VecQueue::new();
queue.pop(); // UB, length isn't checked
```

Signed-off-by: Petros Angelatos <[email protected]>
  • Loading branch information
petrosagg authored Feb 27, 2023
1 parent cc88469 commit 6ae61ad
Showing 1 changed file with 20 additions and 76 deletions.
96 changes: 20 additions & 76 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! A general purpose `Batcher` implementation based on radix sort.

use std::collections::VecDeque;

use timely::communication::message::RefOrMut;
use timely::progress::frontier::Antichain;

Expand Down Expand Up @@ -120,60 +122,6 @@ where
}


use std::slice::{from_raw_parts};

pub struct VecQueue<T> {
list: Vec<T>,
head: usize,
tail: usize,
}

impl<T> VecQueue<T> {
#[inline]
pub fn new() -> Self { VecQueue::from(Vec::new()) }
#[inline]
pub fn pop(&mut self) -> T {
debug_assert!(self.head < self.tail);
self.head += 1;
unsafe { ::std::ptr::read(self.list.as_mut_ptr().offset((self.head as isize) - 1)) }
}
#[inline]
pub fn peek(&self) -> &T {
debug_assert!(self.head < self.tail);
unsafe { self.list.get_unchecked(self.head) }
}
#[inline]
pub fn _peek_tail(&self) -> &T {
debug_assert!(self.head < self.tail);
unsafe { self.list.get_unchecked(self.tail-1) }
}
#[inline]
pub fn _slice(&self) -> &[T] {
debug_assert!(self.head < self.tail);
unsafe { from_raw_parts(self.list.get_unchecked(self.head), self.tail - self.head) }
}
#[inline]
pub fn from(mut list: Vec<T>) -> Self {
let tail = list.len();
unsafe { list.set_len(0); }
VecQueue {
list: list,
head: 0,
tail: tail,
}
}
// could leak, if self.head != self.tail.
#[inline]
pub fn done(self) -> Vec<T> {
debug_assert!(self.head == self.tail);
self.list
}
#[inline]
pub fn len(&self) -> usize { self.tail - self.head }
#[inline]
pub fn is_empty(&self) -> bool { self.head == self.tail }
}

#[inline]
unsafe fn push_unchecked<T>(vec: &mut Vec<T>, element: T) {
debug_assert!(vec.len() < vec.capacity());
Expand Down Expand Up @@ -277,28 +225,28 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
let mut output = Vec::with_capacity(list1.len() + list2.len());
let mut result = self.empty();

let mut list1 = VecQueue::from(list1);
let mut list2 = VecQueue::from(list2);
let mut list1 = list1.into_iter();
let mut list2 = list2.into_iter();

let mut head1 = if !list1.is_empty() { VecQueue::from(list1.pop()) } else { VecQueue::new() };
let mut head2 = if !list2.is_empty() { VecQueue::from(list2.pop()) } else { VecQueue::new() };
let mut head1 = VecDeque::from(list1.next().unwrap_or_default());
let mut head2 = VecDeque::from(list2.next().unwrap_or_default());

// while we have valid data in each input, merge.
while !head1.is_empty() && !head2.is_empty() {

while (result.capacity() - result.len()) > 0 && head1.len() > 0 && head2.len() > 0 {

let cmp = {
let x = head1.peek();
let y = head2.peek();
let x = head1.front().unwrap();
let y = head2.front().unwrap();
(&x.0, &x.1).cmp(&(&y.0, &y.1))
};
match cmp {
Ordering::Less => { unsafe { push_unchecked(&mut result, head1.pop()); } }
Ordering::Greater => { unsafe { push_unchecked(&mut result, head2.pop()); } }
Ordering::Less => { unsafe { push_unchecked(&mut result, head1.pop_front().unwrap()); } }
Ordering::Greater => { unsafe { push_unchecked(&mut result, head2.pop_front().unwrap()); } }
Ordering::Equal => {
let (data1, time1, mut diff1) = head1.pop();
let (_data2, _time2, diff2) = head2.pop();
let (data1, time1, mut diff1) = head1.pop_front().unwrap();
let (_data2, _time2, diff2) = head2.pop_front().unwrap();
diff1.plus_equals(&diff2);
if !diff1.is_zero() {
unsafe { push_unchecked(&mut result, (data1, time1, diff1)); }
Expand All @@ -313,14 +261,14 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
}

if head1.is_empty() {
let done1 = head1.done();
let done1 = Vec::from(head1);
if done1.capacity() == Self::buffer_size() { self.stash.push(done1); }
head1 = if !list1.is_empty() { VecQueue::from(list1.pop()) } else { VecQueue::new() };
head1 = VecDeque::from(list1.next().unwrap_or_default());
}
if head2.is_empty() {
let done2 = head2.done();
let done2 = Vec::from(head2);
if done2.capacity() == Self::buffer_size() { self.stash.push(done2); }
head2 = if !list2.is_empty() { VecQueue::from(list2.pop()) } else { VecQueue::new() };
head2 = VecDeque::from(list2.next().unwrap_or_default());
}
}

Expand All @@ -329,21 +277,17 @@ impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {

if !head1.is_empty() {
let mut result = self.empty();
for _ in 0 .. head1.len() { result.push(head1.pop()); }
for item1 in head1 { result.push(item1); }
output.push(result);
}
while !list1.is_empty() {
output.push(list1.pop());
}
output.extend(list1);

if !head2.is_empty() {
let mut result = self.empty();
for _ in 0 .. head2.len() { result.push(head2.pop()); }
for item2 in head2 { result.push(item2); }
output.push(result);
}
while !list2.is_empty() {
output.push(list2.pop());
}
output.extend(list2);

output
}
Expand Down

0 comments on commit 6ae61ad

Please sign in to comment.