Skip to content

Commit

Permalink
wip address comments, but tests fail
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Mar 6, 2024
1 parent 1d6a5e5 commit c6838a5
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 88 deletions.
6 changes: 5 additions & 1 deletion examples/hello.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::Instant;
use rand::{Rng, SeedableRng, StdRng};

use differential_dataflow::input::Input;
Expand Down Expand Up @@ -52,6 +53,7 @@ fn main() {
// Load up graph data. Round-robin among workers.
for _ in 0 .. (edges / peers) + if index < (edges % peers) { 1 } else { 0 } {
input.update_at((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1_000_000, 1)
// input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1)
}

input.advance_to(1);
Expand All @@ -67,7 +69,8 @@ fn main() {
if index == 0 {

let mut next = batch;
for round in 1 .. {
let start_time = Instant::now();
for round in 1 .. 1_000_100 {

input.advance_to(round);
input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1);
Expand All @@ -83,6 +86,7 @@ fn main() {
next += batch;
}
}
println!("rounds finished after {:?}", start_time.elapsed());
}
}
}).unwrap();
Expand Down
184 changes: 97 additions & 87 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ where
// in `upper`. All updates must have time greater or equal to the previously used `upper`,
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
self.frontier.clear();
let merged = self.sorter.extract_into(upper.borrow(), &mut self.frontier);
let extracted = self.sorter.extract(upper.borrow(), &mut self.frontier);

// Determine the number of distinct keys, values, and updates,
// and form a builder pre-sized for these numbers.
Expand All @@ -67,7 +66,7 @@ where
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for ((key, val), _time, _) in merged.iter().map(|t| t.iter()).flatten() {
for ((key, val), _time, _) in extracted.iter().flatten() {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
Expand All @@ -85,9 +84,9 @@ where
B::with_capacity(keys, vals, upds)
};

for buffer in merged.into_iter() {
for datum in &buffer[..] {
builder.copy(datum);
for mut buffer in extracted {
for datum in buffer.drain(..) {
builder.push(datum);
}
// Recycling buffer.
self.sorter.recycle(buffer);
Expand All @@ -112,9 +111,11 @@ where
}

struct MergeSorter<D, T, R> {
/// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions.
/// each power-of-two length list of least times and allocations. Do not push/pop directly but use the corresponding functions.
queue: Vec<Vec<(Antichain<T>, Vec<(D, T, R)>)>>,
/// Empty, recycled allocations. Use [`MergeSorter::emtpy`] to pop an allocation.
stash: Vec<Vec<(D, T, R)>>,
/// Data that was pushed but not yet inserted into queue. Not necessarily sorted or compacted.
pending: Vec<(D, T, R)>,
logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
operator_id: usize,
Expand All @@ -138,6 +139,7 @@ impl<D: Ord, T: Clone + PartialOrder + Ord, R: Semigroup> MergeSorter<D, T, R> {

#[inline]
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
// Construct `Self` with zero capacity to avoid allocations if never used.
Self {
logger,
operator_id,
Expand Down Expand Up @@ -165,17 +167,24 @@ impl<D: Ord, T: Clone + PartialOrder + Ord, R: Semigroup> MergeSorter<D, T, R> {
}
}

/// Push an update into this sorter.
///
/// We assume that the length of `batch` is shorter than the capacity of `self.pending`.
/// Otherwise, this function can get quadratic behavior.
fn push(&mut self, batch: &mut Vec<(D, T, R)>) {
// Ensure `self.pending` has a capacity of `Self::pending_buffer_size`.
if self.pending.capacity() < Self::buffer_size() {
self.pending
.reserve(Self::buffer_size() - self.pending.capacity());
}

// Consolidate to avoid redundant work.
crate::consolidation::consolidate_updates(batch);

while !batch.is_empty() {
self.pending.extend(
batch.drain(
..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()),
std::cmp::min(0, batch.len().saturating_sub(self.pending.capacity() - self.pending.len()))..,
),
);
if self.pending.len() == self.pending.capacity() {
Expand All @@ -192,15 +201,15 @@ impl<D: Ord, T: Clone + PartialOrder + Ord, R: Semigroup> MergeSorter<D, T, R> {
/// sorted. After this function returns, `self.pending` is empty.
fn flush_pending(&mut self) {
if !self.pending.is_empty() {
let mut stack = self.empty();
let mut frontier = Antichain::new();
let mut block = self.empty();
let mut least_times = Antichain::new();
for tuple in self.pending.drain(..) {
frontier.insert_ref(&tuple.1);
stack.push(tuple);
least_times.insert_ref(&tuple.1);
block.push(tuple);
}
let batch = vec![(frontier, stack)];
self.account(&batch, 1);
self.queue.push(batch);
let chain = vec![(least_times, block)];
self.account(&chain, 1);
self.queue.push(chain);
while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() > self.queue[self.queue.len()-2].len() / 2) {
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
Expand All @@ -210,39 +219,17 @@ impl<D: Ord, T: Clone + PartialOrder + Ord, R: Semigroup> MergeSorter<D, T, R> {
}
}

/// Maintain the internal chain structure. Ensures that:
/// * All chains are sorted by size.
/// * Within each chain, adjacent blocks are reduced, i.e., their combined length is larger than
/// the block size.
/// * All chains are of geometrically increasing length.
/// Maintain the internal chain structure. Ensures that all chains are of geometrically
/// increasing length. The function assumes that chains itself are well-formed, i.e.,
/// they contain elements in increasing order.
fn maintain(&mut self) {
self.account(self.queue.iter().flatten(), -1);

// Step 1: Canonicalize each chain by adjacent blocks that combined fit into a single block.
for chain in &mut self.queue {
let mut target: Vec<(Antichain<T>, Vec<_>)> = Vec::with_capacity(chain.len());
for (frontier, block) in chain.drain(..) {
if target.last().map_or(false, |(_, last)| {
last.len() + block.len() <= Self::buffer_size()
}) {
// merge `target.last()` with `block`
let (last_frontier, last) = target.last_mut().unwrap();
for item in block.into_iter() {
last_frontier.insert_ref(&item.1);
last.push(item);
}
} else {
target.push((frontier, block));
}
}
*chain = target;
}

// Step 2: Sort queue by chain length. Depending on how much we extracted,
// Step 1: Sort queue by chain length. Depending on how much we extracted,
// the chains might be mis-ordered.
self.queue.sort_by_key(|chain| std::cmp::Reverse(chain.len()));

// Step 3: Merge chains that are within a power of two.
// Step 2: Merge chains that are within a power of two.
let mut index = self.queue.len().saturating_sub(1);
while index > 0 {
if self.queue[index-1].len() / 2 < self.queue[index].len() {
Expand All @@ -259,7 +246,7 @@ impl<D: Ord, T: Clone + PartialOrder + Ord, R: Semigroup> MergeSorter<D, T, R> {

/// Extract all data that is not in advance of `upper`. Record the lower bound of the remaining
/// data's time in `frontier`.
fn extract_into(
fn extract(
&mut self,
upper: AntichainRef<T>,
frontier: &mut Antichain<T>,
Expand All @@ -268,90 +255,114 @@ impl<D: Ord, T: Clone + PartialOrder + Ord, R: Semigroup> MergeSorter<D, T, R> {
crate::consolidation::consolidate_updates(&mut self.pending);
self.flush_pending();

let mut keep = self.empty();
let mut keep_buffer = self.empty();
let mut keep_frontier = Antichain::new();
let mut ship = self.empty();
let mut ship_list = Vec::default();
let mut ship_buffer = self.empty();
let mut ship_chains = Vec::default();

self.account(self.queue.iter().flatten(), -1);

// Walk all chains, separate ready data from data to keep.
for mut chain in std::mem::take(&mut self.queue).drain(..) {
let mut block_list = Vec::default();
let mut keep_list = Vec::default();
println!("extract drain chain {:?}", chain.iter().map(|v| v.1.len()));

Check failure on line 267 in src/trace/implementations/merge_batcher.rs

View workflow job for this annotation

GitHub Actions / test mdBook

`T` doesn't implement `Debug`

Check failure on line 267 in src/trace/implementations/merge_batcher.rs

View workflow job for this annotation

GitHub Actions / test mdBook

`D` doesn't implement `Debug`

Check failure on line 267 in src/trace/implementations/merge_batcher.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu

`T` doesn't implement `Debug`

Check failure on line 267 in src/trace/implementations/merge_batcher.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu

`D` doesn't implement `Debug`

Check failure on line 267 in src/trace/implementations/merge_batcher.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu

`T` doesn't implement `Debug`

Check failure on line 267 in src/trace/implementations/merge_batcher.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu

`D` doesn't implement `Debug`
let mut ship_chain = Vec::default();
let mut keep_chain = Vec::default();
for (block_frontier, mut block) in chain.drain(..) {
// Is any data ready to be shipped?
let any = !PartialOrder::less_equal(&upper, &block_frontier.borrow());
// Is all data ready to be shipped?
let all = any && block.iter().all(|(_, t, _)| !upper.less_equal(t));

if all {
// All data is ready, push what we accumulated, stash whole block.
if !ship.is_empty() {
block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty())));
if PartialOrder::less_equal(&upper, &block_frontier.borrow()) {
// Keep the entire block.
if !keep_buffer.is_empty() {
for t in keep_frontier.iter() {
frontier.insert_ref(t);
}
keep_chain.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep_buffer, self.empty())));
}
for t in block_frontier.iter() {
frontier.insert_ref(t);
}
block_list.push((Antichain::new(), block));
} else if any {
keep_chain.push((block_frontier, block));
} else {
// Split the block: Some data may be ready.

// Iterate block, sorting items into ship and keep
for datum in block.drain(..) {
if upper.less_equal(&datum.1) {
frontier.insert_ref(&datum.1);
keep_frontier.insert_ref(&datum.1);
keep.push(datum);
if keep.capacity() == keep.len() {
keep_buffer.push(datum);
if keep_buffer.capacity() == keep_buffer.len() {
// remember keep
keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty())));
for t in keep_frontier.iter() {
frontier.insert_ref(t);
}
keep_chain.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep_buffer, self.empty())));
}
} else {
ship.push(datum);
if ship.capacity() == ship.len() {
ship_buffer.push(datum);
if ship_buffer.capacity() == ship_buffer.len() {
// Ship is full, push in on the block list, get an empty one.
block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty())));
ship_chain.push((Antichain::new(), std::mem::replace(&mut ship_buffer, self.empty())));
}
}
}
// Recycle leftovers
self.recycle(block);
} else {
// Keep the entire block.
if !keep.is_empty() {
keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty())));
}
keep_list.push((block_frontier, block));
}
}

// Capture any residue left after iterating blocks.
if !ship.is_empty() {
block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty())));
if !ship_buffer.is_empty() {
ship_chain.push((Antichain::new(), std::mem::replace(&mut ship_buffer, self.empty())));
}
if !keep.is_empty() {
keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty())));
if !keep_buffer.is_empty() {
keep_chain.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep_buffer, self.empty())));
}

// Collect finished chains
if !block_list.is_empty() {
ship_list.push(block_list);
if !keep_chain.is_empty() {
if !ship_chain.is_empty() {
// Canonicalize the chain by adjacent blocks that combined fit into a single block.
let mut target: Vec<(Antichain<T>, Vec<_>)> = Vec::with_capacity(chain.len());
for (frontier, mut block) in chain.drain(..) {
if let Some((last_frontier, last)) = target.last_mut().filter(|(_, last)| {
last.len() + block.len() <= Self::buffer_size()
}) {
// merge `target.last()` with `block`
for item in block.drain(..) {
last_frontier.insert_ref(&item.1);
last.push(item);
}
self.recycle(block);
} else {
target.push((frontier, block));
}
}
keep_chain = target;
}
self.queue.push(keep_chain);
}
if !keep_list.is_empty() {
self.queue.push(keep_list);
if !ship_chain.is_empty() {
ship_chains.push(ship_chain);
}
}

self.account(self.queue.iter().flatten(), 1);

if ship_list.len() > 0 {
if !ship_chains.is_empty() {
self.maintain();
}

while ship_list.len() > 1 {
let list1 = ship_list.pop().unwrap();
let list2 = ship_list.pop().unwrap();
ship_list.push(self.merge_by(list1, list2));
// Merge `ship_chains` into a single element. Roll up from the smallest to the largest
// chain.
ship_chains.sort_by_key(|chain| std::cmp::Reverse(chain.len()));

while ship_chains.len() > 1 {
let list1 = ship_chains.pop().unwrap();
let list2 = ship_chains.pop().unwrap();
ship_chains.push(self.merge_by(list1, list2));
}

// Pop the last element, or return an empty chain.
ship_list.pop().unwrap_or_default().into_iter().map(|(_, list)| list).collect()
ship_chains.pop().unwrap_or_default().into_iter().map(|(_, list)| list).collect()
}

// merges two sorted input lists into one sorted output list.
Expand Down Expand Up @@ -451,8 +462,7 @@ impl<D: Ord, T: Clone + PartialOrder + Ord, R: Semigroup> MergeSorter<D, T, R> {
}
}

impl<D, T, R> MergeSorter<D, T, R>
{
impl<D, T, R> MergeSorter<D, T, R> {
/// Account size changes. Only performs work if a logger exists.
///
/// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute
Expand Down

0 comments on commit c6838a5

Please sign in to comment.