From 0871fbd3063702b58f75aaf0984670e7461f564f Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 8 Dec 2023 15:05:18 -0500 Subject: [PATCH] Log size/capacity/allocations from the merge batchers (#434) * Log size/capacity/allocations from columnar merge batcher This logs size/capacity/allocation changes from the columnar merge batcher. Signed-off-by: Moritz Hoffmann * Merge batcher announce records Signed-off-by: Moritz Hoffmann --------- Signed-off-by: Moritz Hoffmann --- src/logging.rs | 19 ++++ src/operators/arrange/arrangement.rs | 2 +- src/trace/implementations/merge_batcher.rs | 105 ++++++++++++----- .../implementations/merge_batcher_col.rs | 107 +++++++++++++----- src/trace/mod.rs | 5 +- tests/trace.rs | 2 +- 6 files changed, 183 insertions(+), 57 deletions(-) diff --git a/src/logging.rs b/src/logging.rs index c53919448..e42be4ee1 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -31,6 +31,8 @@ pub enum DifferentialEvent { MergeShortfall(MergeShortfall), /// Trace sharing event. TraceShare(TraceShare), + /// Batcher size event + Batcher(BatcherEvent), } /// Either the start or end of a merge event. @@ -45,6 +47,23 @@ pub struct BatchEvent { impl From for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } } +/// Either the start or end of a merge event. +#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)] +pub struct BatcherEvent { + /// Operator identifier. + pub operator: usize, + /// Change in records. + pub records_diff: isize, + /// Change in used size. + pub size_diff: isize, + /// Change in capacity. + pub capacity_diff: isize, + /// Change in number of allocations. + pub allocations_diff: isize, +} + +impl From for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } } + /// Either the start or end of a merge event. #[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)] pub struct DropEvent { diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 6f640074b..f9f39fc86 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -619,7 +619,7 @@ where }; // Where we will deposit received updates, and from which we extract batches. - let mut batcher = Tr::Batcher::new(); + let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id); // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 53c1625bf..2c0feec3f 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -3,10 +3,12 @@ use std::collections::VecDeque; use timely::communication::message::RefOrMut; +use timely::logging::WorkerIdentifier; +use timely::logging_core::Logger; use timely::progress::{frontier::Antichain, Timestamp}; use crate::difference::Semigroup; - +use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder}; /// Creates batches from unordered tuples. @@ -26,11 +28,11 @@ where type Item = ((K,V),T,D); type Time = T; - fn new() -> Self { + fn new(logger: Option>, operator_id: usize) -> Self { MergeBatcher { - sorter: MergeSorter::new(), + sorter: MergeSorter::new(logger, operator_id), frontier: Antichain::new(), - lower: Antichain::from_elem(::minimum()), + lower: Antichain::from_elem(T::minimum()), } } @@ -132,20 +134,23 @@ where self.sorter.push(&mut buffer); } - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); self.lower = upper; seal } - // the frontier of elements remaining after the most recent call to `self.seal`. + /// The frontier of elements remaining after the most recent call to `self.seal`. fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.frontier.borrow() } } struct MergeSorter { - queue: Vec>>, // each power-of-two length list of allocations. + /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. + queue: Vec>>, stash: Vec>, + logger: Option>, + operator_id: usize, } impl MergeSorter { @@ -164,21 +169,20 @@ impl MergeSorter { } #[inline] - pub fn new() -> Self { MergeSorter { queue: Vec::new(), stash: Vec::new() } } + fn new(logger: Option>, operator_id: usize) -> Self { + Self { + logger, + operator_id, + queue: Vec::new(), + stash: Vec::new(), + } + } #[inline] pub fn empty(&mut self) -> Vec<(D, T, R)> { self.stash.pop().unwrap_or_else(|| Vec::with_capacity(Self::buffer_size())) } - #[inline(never)] - pub fn _sort(&mut self, list: &mut Vec>) { - for mut batch in list.drain(..) { - self.push(&mut batch); - } - self.finish_into(list); - } - #[inline] pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) { // TODO: Reason about possible unbounded stash growth. How to / should we return them? @@ -192,12 +196,13 @@ impl MergeSorter { if !batch.is_empty() { crate::consolidation::consolidate_updates(&mut batch); - self.queue.push(vec![batch]); + self.account([batch.len()], 1); + self.queue_push(vec![batch]); while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { - let list1 = self.queue.pop().unwrap(); - let list2 = self.queue.pop().unwrap(); + let list1 = self.queue_pop().unwrap(); + let list2 = self.queue_pop().unwrap(); let merged = self.merge_by(list1, list2); - self.queue.push(merged); + self.queue_push(merged); } } } @@ -206,24 +211,24 @@ impl MergeSorter { // to break it down to be so. pub fn push_list(&mut self, list: Vec>) { while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { - let list1 = self.queue.pop().unwrap(); - let list2 = self.queue.pop().unwrap(); + let list1 = self.queue_pop().unwrap(); + let list2 = self.queue_pop().unwrap(); let merged = self.merge_by(list1, list2); - self.queue.push(merged); + self.queue_push(merged); } - self.queue.push(list); + self.queue_push(list); } #[inline(never)] pub fn finish_into(&mut self, target: &mut Vec>) { while self.queue.len() > 1 { - let list1 = self.queue.pop().unwrap(); - let list2 = self.queue.pop().unwrap(); + let list1 = self.queue_pop().unwrap(); + let list2 = self.queue_pop().unwrap(); let merged = self.merge_by(list1, list2); - self.queue.push(merged); + self.queue_push(merged); } - if let Some(mut last) = self.queue.pop() { + if let Some(mut last) = self.queue_pop() { ::std::mem::swap(&mut last, target); } } @@ -231,6 +236,7 @@ impl MergeSorter { // merges two sorted input lists into one sorted output list. #[inline(never)] fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { + self.account(list1.iter().chain(list2.iter()).map(Vec::len), -1); use std::cmp::Ordering; @@ -305,3 +311,46 @@ impl MergeSorter { output } } + +impl MergeSorter { + /// Pop a batch from `self.queue` and account size changes. + #[inline] + fn queue_pop(&mut self) -> Option>> { + let batch = self.queue.pop(); + self.account(batch.iter().flatten().map(Vec::len), -1); + batch + } + + /// Push a batch to `self.queue` and account size changes. + #[inline] + fn queue_push(&mut self, batch: Vec>) { + self.account(batch.iter().map(Vec::len), 1); + self.queue.push(batch); + } + + /// Account size changes. Only performs work if a logger exists. + /// + /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute + /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. + fn account>(&self, items: I, diff: isize) { + if let Some(logger) = &self.logger { + let mut records= 0isize; + for len in items { + records = records.saturating_add_unsigned(len); + } + logger.log(BatcherEvent { + operator: self.operator_id, + records_diff: records * diff, + size_diff: 0, + capacity_diff: 0, + allocations_diff: 0, + }) + } + } +} + +impl Drop for MergeSorter { + fn drop(&mut self) { + while self.queue_pop().is_some() { } + } +} diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 5e25140f3..fcd9f5ff8 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -3,19 +3,21 @@ use timely::Container; use timely::communication::message::RefOrMut; use timely::container::columnation::{Columnation, TimelyStack}; +use timely::logging::WorkerIdentifier; +use timely::logging_core::Logger; use timely::progress::{frontier::Antichain, Timestamp}; use crate::difference::Semigroup; - +use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder}; /// Creates batches from unordered tuples. pub struct ColumnatedMergeBatcher where - K: Columnation, - V: Columnation, - T: Columnation, - D: Columnation, + K: Columnation + 'static, + V: Columnation + 'static, + T: Columnation + 'static, + D: Columnation + 'static, { sorter: MergeSorterColumnation<(K, V), T, D>, lower: Antichain, @@ -32,9 +34,9 @@ where type Item = ((K,V),T,D); type Time = T; - fn new() -> Self { + fn new(logger: Option>, operator_id: usize) -> Self { ColumnatedMergeBatcher { - sorter: MergeSorterColumnation::new(), + sorter: MergeSorterColumnation::new(logger, operator_id), frontier: Antichain::new(), lower: Antichain::from_elem(::minimum()), } @@ -130,12 +132,12 @@ where // Drain buffers (fast reclamation). self.sorter.clear_stash(); - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(::minimum())); + let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); self.lower = upper; seal } - // the frontier of elements remaining after the most recent call to `self.seal`. + /// The frontier of elements remaining after the most recent call to `self.seal`. fn frontier(&mut self) -> timely::progress::frontier::AntichainRef { self.frontier.borrow() } @@ -182,10 +184,13 @@ impl TimelyStackQueue { } } -struct MergeSorterColumnation { - queue: Vec>>, // each power-of-two length list of allocations. +struct MergeSorterColumnation { + /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. + queue: Vec>>, stash: Vec>, pending: Vec<(D, T, R)>, + logger: Option>, + operator_id: usize, } impl MergeSorterColumnation { @@ -209,11 +214,13 @@ impl Self { + fn new(logger: Option>, operator_id: usize) -> Self { Self { + logger, + operator_id, queue: Vec::new(), stash: Vec::new(), - pending: Vec::new() + pending: Vec::new(), } } @@ -261,12 +268,12 @@ impl 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { - let list1 = self.queue.pop().unwrap(); - let list2 = self.queue.pop().unwrap(); + let list1 = self.queue_pop().unwrap(); + let list2 = self.queue_pop().unwrap(); let merged = self.merge_by(list1, list2); - self.queue.push(merged); + self.queue_push(merged); } } } @@ -275,32 +282,31 @@ impl>) { while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { - let list1 = self.queue.pop().unwrap(); - let list2 = self.queue.pop().unwrap(); + let list1 = self.queue_pop().unwrap(); + let list2 = self.queue_pop().unwrap(); let merged = self.merge_by(list1, list2); - self.queue.push(merged); + self.queue_push(merged); } - self.queue.push(list); + self.queue_push(list); } fn finish_into(&mut self, target: &mut Vec>) { crate::consolidation::consolidate_updates(&mut self.pending); self.flush_pending(); while self.queue.len() > 1 { - let list1 = self.queue.pop().unwrap(); - let list2 = self.queue.pop().unwrap(); + let list1 = self.queue_pop().unwrap(); + let list2 = self.queue_pop().unwrap(); let merged = self.merge_by(list1, list2); - self.queue.push(merged); + self.queue_push(merged); } - if let Some(mut last) = self.queue.pop() { + if let Some(mut last) = self.queue_pop() { std::mem::swap(&mut last, target); } } // merges two sorted input lists into one sorted output list. fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { - use std::cmp::Ordering; // TODO: `list1` and `list2` get dropped; would be better to reuse? @@ -378,3 +384,52 @@ impl MergeSorterColumnation { + /// Pop a batch from `self.queue` and account size changes. + #[inline] + fn queue_pop(&mut self) -> Option>> { + let batch = self.queue.pop(); + self.account(batch.iter().flatten(), -1); + batch + } + + /// Push a batch to `self.queue` and account size changes. + #[inline] + fn queue_push(&mut self, batch: Vec>) { + self.account(&batch, 1); + self.queue.push(batch); + } + + /// Account size changes. Only performs work if a logger exists. + /// + /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute + /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. + fn account<'a, I: IntoIterator>>(&self, items: I, diff: isize) { + if let Some(logger) = &self.logger { + let (mut records, mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize); + for stack in items { + records = records.saturating_add_unsigned(stack.len()); + stack.heap_size(|s, c| { + siz = siz.saturating_add_unsigned(s); + capacity = capacity.saturating_add_unsigned(c); + allocations += isize::from(c > 0); + }); + } + logger.log(BatcherEvent { + operator: self.operator_id, + records_diff: records * diff, + size_diff: siz * diff, + capacity_diff: capacity * diff, + allocations_diff: allocations * diff, + }) + } + } + +} + +impl Drop for MergeSorterColumnation { + fn drop(&mut self) { + while self.queue_pop().is_some() { } + } +} diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 98ae340cc..8531c555c 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -13,9 +13,12 @@ pub mod implementations; pub mod wrappers; use timely::communication::message::RefOrMut; +use timely::logging::WorkerIdentifier; +use timely::logging_core::Logger; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; +use crate::logging::DifferentialEvent; use crate::trace::cursor::MyTrait; // use ::difference::Semigroup; @@ -313,7 +316,7 @@ pub trait Batcher { /// Times at which batches are formed. type Time: Timestamp; /// Allocates a new empty batcher. - fn new() -> Self; + fn new(logger: Option>, operator_id: usize) -> Self; /// Adds an unordered batch of elements to the batcher. fn push_batch(&mut self, batch: RefOrMut>); /// Returns all updates not greater or equal to an element of `upper`. diff --git a/tests/trace.rs b/tests/trace.rs index abf8b539f..dac7eff22 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -12,7 +12,7 @@ fn get_trace() -> ValSpine { let op_info = OperatorInfo::new(0, 0, &[]); let mut trace = IntegerTrace::new(op_info, None, None); { - let mut batcher = ::Batcher::new(); + let mut batcher = ::Batcher::new(None, 0); use timely::communication::message::RefOrMut; batcher.push_batch(RefOrMut::Mut(&mut vec![