From e2016ae668d770afd701f4a864a6ca58ac27de2a Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 28 Nov 2023 20:21:31 -0500 Subject: [PATCH] Merge batcher announce records Signed-off-by: Moritz Hoffmann --- src/logging.rs | 2 ++ src/trace/implementations/merge_batcher_col.rs | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/logging.rs b/src/logging.rs index 29127ad35..1728d1094 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -50,6 +50,8 @@ impl From for DifferentialEvent { fn from(e: BatchEvent) -> Self { D pub struct BatcherEvent { /// Operator identifier. pub operator: usize, + /// Change in records. + pub records_diff: isize, /// Change in used size. pub size_diff: isize, /// Change in capacity. diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 0f9fa06d6..8f211ea28 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -230,8 +230,9 @@ impl>>(&self, items: I, diff: isize) { if let Some(logger) = &self.logger { - let (mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize); + let (mut records, mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize); for stack in items { + records = records.saturating_add_unsigned(stack.len()); stack.heap_size(|s, c| { siz = siz.saturating_add_unsigned(s); capacity = capacity.saturating_add_unsigned(c); @@ -240,6 +241,7 @@ impl Drop for MergeSorterColumnation { +// fn drop(&mut self) { +// self.account(self.queue.iter().flatten(), -1); +// } +// }