Skip to content

Commit

Permalink
Merge batcher announce records
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Nov 29, 2023
1 parent b6e23da commit e2016ae
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { D
pub struct BatcherEvent {
/// Operator identifier.
pub operator: usize,
/// Change in records.
pub records_diff: isize,
/// Change in used size.
pub size_diff: isize,
/// Change in capacity.
Expand Down
11 changes: 10 additions & 1 deletion src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,9 @@ impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Column
/// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
fn account<'a, I: IntoIterator<Item=&'a TimelyStack<(D, T, R)>>>(&self, items: I, diff: isize) {
if let Some(logger) = &self.logger {
let (mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize);
let (mut records, mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
for stack in items {
records = records.saturating_add_unsigned(stack.len());
stack.heap_size(|s, c| {
siz = siz.saturating_add_unsigned(s);
capacity = capacity.saturating_add_unsigned(c);
Expand All @@ -240,6 +241,7 @@ impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Column
}
logger.log(BatcherEvent {
operator: self.operator_id,
records_diff: records * diff,
size_diff: siz * diff,
capacity_diff: capacity * diff,
allocations_diff: allocations * diff,
Expand Down Expand Up @@ -412,3 +414,10 @@ impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Column
output
}
}

// TODO: Enable once we don't require `'static` to interact with containers.
// impl<D: Columnation, T: Columnation, R: Columnation> Drop for MergeSorterColumnation<D, T, R> {
// fn drop(&mut self) {
// self.account(self.queue.iter().flatten(), -1);
// }
// }

0 comments on commit e2016ae

Please sign in to comment.