From c6838a5d9634c06a20295ebb96e39921f2b4a28a Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 6 Mar 2024 18:01:38 -0500 Subject: [PATCH] wip address comments, but tests fail Signed-off-by: Moritz Hoffmann --- examples/hello.rs | 6 +- src/trace/implementations/merge_batcher.rs | 184 +++++++++++---------- 2 files changed, 102 insertions(+), 88 deletions(-) diff --git a/examples/hello.rs b/examples/hello.rs index 419bbef2f..fb5a5498d 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -1,3 +1,4 @@ +use std::time::Instant; use rand::{Rng, SeedableRng, StdRng}; use differential_dataflow::input::Input; @@ -52,6 +53,7 @@ fn main() { // Load up graph data. Round-robin among workers. for _ in 0 .. (edges / peers) + if index < (edges % peers) { 1 } else { 0 } { input.update_at((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1_000_000, 1) + // input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1) } input.advance_to(1); @@ -67,7 +69,8 @@ fn main() { if index == 0 { let mut next = batch; - for round in 1 .. { + let start_time = Instant::now(); + for round in 1 .. 1_000_100 { input.advance_to(round); input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1); @@ -83,6 +86,7 @@ fn main() { next += batch; } } + println!("rounds finished after {:?}", start_time.elapsed()); } } }).unwrap(); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index f5ebb56c2..c075f2be1 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -55,10 +55,9 @@ where // in `upper`. All updates must have time greater or equal to the previously used `upper`, // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. - #[inline] fn seal>(&mut self, upper: Antichain) -> B::Output { self.frontier.clear(); - let merged = self.sorter.extract_into(upper.borrow(), &mut self.frontier); + let extracted = self.sorter.extract(upper.borrow(), &mut self.frontier); // Determine the number of distinct keys, values, and updates, // and form a builder pre-sized for these numbers. @@ -67,7 +66,7 @@ where let mut vals = 0; let mut upds = 0; let mut prev_keyval = None; - for ((key, val), _time, _) in merged.iter().map(|t| t.iter()).flatten() { + for ((key, val), _time, _) in extracted.iter().flatten() { if let Some((p_key, p_val)) = prev_keyval { if p_key != key { keys += 1; @@ -85,9 +84,9 @@ where B::with_capacity(keys, vals, upds) }; - for buffer in merged.into_iter() { - for datum in &buffer[..] { - builder.copy(datum); + for mut buffer in extracted { + for datum in buffer.drain(..) { + builder.push(datum); } // Recycling buffer. self.sorter.recycle(buffer); @@ -112,9 +111,11 @@ where } struct MergeSorter { - /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. + /// each power-of-two length list of least times and allocations. Do not push/pop directly but use the corresponding functions. queue: Vec, Vec<(D, T, R)>)>>, + /// Empty, recycled allocations. Use [`MergeSorter::emtpy`] to pop an allocation. stash: Vec>, + /// Data that was pushed but not yet inserted into queue. Not necessarily sorted or compacted. pending: Vec<(D, T, R)>, logger: Option>, operator_id: usize, @@ -138,6 +139,7 @@ impl MergeSorter { #[inline] fn new(logger: Option>, operator_id: usize) -> Self { + // Construct `Self` with zero capacity to avoid allocations if never used. Self { logger, operator_id, @@ -165,6 +167,10 @@ impl MergeSorter { } } + /// Push an update into this sorter. + /// + /// We assume that the length of `batch` is shorter than the capacity of `self.pending`. + /// Otherwise, this function can get quadratic behavior. fn push(&mut self, batch: &mut Vec<(D, T, R)>) { // Ensure `self.pending` has a capacity of `Self::pending_buffer_size`. if self.pending.capacity() < Self::buffer_size() { @@ -172,10 +178,13 @@ impl MergeSorter { .reserve(Self::buffer_size() - self.pending.capacity()); } + // Consolidate to avoid redundant work. + crate::consolidation::consolidate_updates(batch); + while !batch.is_empty() { self.pending.extend( batch.drain( - ..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()), + std::cmp::min(0, batch.len().saturating_sub(self.pending.capacity() - self.pending.len())).., ), ); if self.pending.len() == self.pending.capacity() { @@ -192,15 +201,15 @@ impl MergeSorter { /// sorted. After this function returns, `self.pending` is empty. fn flush_pending(&mut self) { if !self.pending.is_empty() { - let mut stack = self.empty(); - let mut frontier = Antichain::new(); + let mut block = self.empty(); + let mut least_times = Antichain::new(); for tuple in self.pending.drain(..) { - frontier.insert_ref(&tuple.1); - stack.push(tuple); + least_times.insert_ref(&tuple.1); + block.push(tuple); } - let batch = vec![(frontier, stack)]; - self.account(&batch, 1); - self.queue.push(batch); + let chain = vec![(least_times, block)]; + self.account(&chain, 1); + self.queue.push(chain); while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() > self.queue[self.queue.len()-2].len() / 2) { let list1 = self.queue.pop().unwrap(); let list2 = self.queue.pop().unwrap(); @@ -210,39 +219,17 @@ impl MergeSorter { } } - /// Maintain the internal chain structure. Ensures that: - /// * All chains are sorted by size. - /// * Within each chain, adjacent blocks are reduced, i.e., their combined length is larger than - /// the block size. - /// * All chains are of geometrically increasing length. + /// Maintain the internal chain structure. Ensures that all chains are of geometrically + /// increasing length. The function assumes that chains itself are well-formed, i.e., + /// they contain elements in increasing order. fn maintain(&mut self) { self.account(self.queue.iter().flatten(), -1); - // Step 1: Canonicalize each chain by adjacent blocks that combined fit into a single block. - for chain in &mut self.queue { - let mut target: Vec<(Antichain, Vec<_>)> = Vec::with_capacity(chain.len()); - for (frontier, block) in chain.drain(..) { - if target.last().map_or(false, |(_, last)| { - last.len() + block.len() <= Self::buffer_size() - }) { - // merge `target.last()` with `block` - let (last_frontier, last) = target.last_mut().unwrap(); - for item in block.into_iter() { - last_frontier.insert_ref(&item.1); - last.push(item); - } - } else { - target.push((frontier, block)); - } - } - *chain = target; - } - - // Step 2: Sort queue by chain length. Depending on how much we extracted, + // Step 1: Sort queue by chain length. Depending on how much we extracted, // the chains might be mis-ordered. self.queue.sort_by_key(|chain| std::cmp::Reverse(chain.len())); - // Step 3: Merge chains that are within a power of two. + // Step 2: Merge chains that are within a power of two. let mut index = self.queue.len().saturating_sub(1); while index > 0 { if self.queue[index-1].len() / 2 < self.queue[index].len() { @@ -259,7 +246,7 @@ impl MergeSorter { /// Extract all data that is not in advance of `upper`. Record the lower bound of the remaining /// data's time in `frontier`. - fn extract_into( + fn extract( &mut self, upper: AntichainRef, frontier: &mut Antichain, @@ -268,90 +255,114 @@ impl MergeSorter { crate::consolidation::consolidate_updates(&mut self.pending); self.flush_pending(); - let mut keep = self.empty(); + let mut keep_buffer = self.empty(); let mut keep_frontier = Antichain::new(); - let mut ship = self.empty(); - let mut ship_list = Vec::default(); + let mut ship_buffer = self.empty(); + let mut ship_chains = Vec::default(); self.account(self.queue.iter().flatten(), -1); // Walk all chains, separate ready data from data to keep. for mut chain in std::mem::take(&mut self.queue).drain(..) { - let mut block_list = Vec::default(); - let mut keep_list = Vec::default(); + println!("extract drain chain {:?}", chain.iter().map(|v| v.1.len())); + let mut ship_chain = Vec::default(); + let mut keep_chain = Vec::default(); for (block_frontier, mut block) in chain.drain(..) { // Is any data ready to be shipped? - let any = !PartialOrder::less_equal(&upper, &block_frontier.borrow()); - // Is all data ready to be shipped? - let all = any && block.iter().all(|(_, t, _)| !upper.less_equal(t)); - - if all { - // All data is ready, push what we accumulated, stash whole block. - if !ship.is_empty() { - block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty()))); + if PartialOrder::less_equal(&upper, &block_frontier.borrow()) { + // Keep the entire block. + if !keep_buffer.is_empty() { + for t in keep_frontier.iter() { + frontier.insert_ref(t); + } + keep_chain.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep_buffer, self.empty()))); + } + for t in block_frontier.iter() { + frontier.insert_ref(t); } - block_list.push((Antichain::new(), block)); - } else if any { + keep_chain.push((block_frontier, block)); + } else { + // Split the block: Some data may be ready. + // Iterate block, sorting items into ship and keep for datum in block.drain(..) { if upper.less_equal(&datum.1) { - frontier.insert_ref(&datum.1); keep_frontier.insert_ref(&datum.1); - keep.push(datum); - if keep.capacity() == keep.len() { + keep_buffer.push(datum); + if keep_buffer.capacity() == keep_buffer.len() { // remember keep - keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty()))); + for t in keep_frontier.iter() { + frontier.insert_ref(t); + } + keep_chain.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep_buffer, self.empty()))); } } else { - ship.push(datum); - if ship.capacity() == ship.len() { + ship_buffer.push(datum); + if ship_buffer.capacity() == ship_buffer.len() { // Ship is full, push in on the block list, get an empty one. - block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty()))); + ship_chain.push((Antichain::new(), std::mem::replace(&mut ship_buffer, self.empty()))); } } } // Recycle leftovers self.recycle(block); - } else { - // Keep the entire block. - if !keep.is_empty() { - keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty()))); - } - keep_list.push((block_frontier, block)); } } // Capture any residue left after iterating blocks. - if !ship.is_empty() { - block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty()))); + if !ship_buffer.is_empty() { + ship_chain.push((Antichain::new(), std::mem::replace(&mut ship_buffer, self.empty()))); } - if !keep.is_empty() { - keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty()))); + if !keep_buffer.is_empty() { + keep_chain.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep_buffer, self.empty()))); } // Collect finished chains - if !block_list.is_empty() { - ship_list.push(block_list); + if !keep_chain.is_empty() { + if !ship_chain.is_empty() { + // Canonicalize the chain by adjacent blocks that combined fit into a single block. + let mut target: Vec<(Antichain, Vec<_>)> = Vec::with_capacity(chain.len()); + for (frontier, mut block) in chain.drain(..) { + if let Some((last_frontier, last)) = target.last_mut().filter(|(_, last)| { + last.len() + block.len() <= Self::buffer_size() + }) { + // merge `target.last()` with `block` + for item in block.drain(..) { + last_frontier.insert_ref(&item.1); + last.push(item); + } + self.recycle(block); + } else { + target.push((frontier, block)); + } + } + keep_chain = target; + } + self.queue.push(keep_chain); } - if !keep_list.is_empty() { - self.queue.push(keep_list); + if !ship_chain.is_empty() { + ship_chains.push(ship_chain); } } self.account(self.queue.iter().flatten(), 1); - if ship_list.len() > 0 { + if !ship_chains.is_empty() { self.maintain(); } - while ship_list.len() > 1 { - let list1 = ship_list.pop().unwrap(); - let list2 = ship_list.pop().unwrap(); - ship_list.push(self.merge_by(list1, list2)); + // Merge `ship_chains` into a single element. Roll up from the smallest to the largest + // chain. + ship_chains.sort_by_key(|chain| std::cmp::Reverse(chain.len())); + + while ship_chains.len() > 1 { + let list1 = ship_chains.pop().unwrap(); + let list2 = ship_chains.pop().unwrap(); + ship_chains.push(self.merge_by(list1, list2)); } // Pop the last element, or return an empty chain. - ship_list.pop().unwrap_or_default().into_iter().map(|(_, list)| list).collect() + ship_chains.pop().unwrap_or_default().into_iter().map(|(_, list)| list).collect() } // merges two sorted input lists into one sorted output list. @@ -451,8 +462,7 @@ impl MergeSorter { } } -impl MergeSorter -{ +impl MergeSorter { /// Account size changes. Only performs work if a logger exists. /// /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute