From 301e496db518550521bc6583616ef80416c8bf72 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Sun, 27 Aug 2023 17:07:19 +0200 Subject: [PATCH] spine: fix logging of completed merges This commit makes sure completed merges are logged also for merges that used the empty batch optimization. Also, if we log the dropping of a merged batch, we shouldn't forget to also log the merge event, so consumers don't become confused about the whereabouts of the input batches. --- src/trace/implementations/spine_fueled.rs | 38 ++++++++++++++++------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 10717de20..62b5628ab 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -343,7 +343,7 @@ where /// Drops and logs batches. Used in `set_logical_compaction` and drop. fn drop_batches(&mut self) { if let Some(logger) = &self.logger { - for batch in self.merging.drain(..) { + for (index, batch) in self.merging.drain(..).enumerate() { match batch { MergeState::Single(Some(batch)) => { logger.log(::logging::DropEvent { @@ -361,7 +361,16 @@ where length: batch2.len(), }); }, - MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { + MergeState::Double(MergeVariant::Complete(Some((batch, input_lengths)))) => { + if let Some((length1, length2)) = input_lengths { + logger.log(::logging::MergeEvent { + operator: self.operator.global_id, + scale: index, + length1, + length2, + complete: Some(batch.len()), + }); + } logger.log(::logging::DropEvent { operator: self.operator.global_id, length: batch.len(), @@ -679,15 +688,15 @@ where /// Completes and extracts what ever is at layer `index`. fn complete_at(&mut self, index: usize) -> Option { - if let Some((merged, inputs)) = self.merging[index].complete() { - if let Some((input1, input2)) = inputs { + if let Some((merged, input_lengths)) = self.merging[index].complete() { + if let Some((length1, length2)) = input_lengths { // Log the completion of a merge from existing parts. self.logger.as_ref().map(|l| l.log( ::logging::MergeEvent { operator: self.operator.global_id, scale: index, - length1: input1.len(), - length2: input2.len(), + length1, + length2, complete: Some(merged.len()), } )); @@ -816,7 +825,7 @@ impl MergeState where B::Time: Eq { /// with the `is_complete()` method. /// /// There is the addional option of input batches. - fn complete(&mut self) -> Option<(B, Option<(B, B)>)> { + fn complete(&mut self) -> Option<(B, Option<(usize, usize)>)> { match std::mem::replace(self, MergeState::Vacant) { MergeState::Vacant => None, MergeState::Single(batch) => batch.map(|b| (b, None)), @@ -867,12 +876,13 @@ impl MergeState where B::Time: Eq { match (batch1, batch2) { (Some(batch1), Some(batch2)) => { assert!(batch1.upper() == batch2.lower()); + let input_lengths = Some((batch1.len(), batch2.len())); if batch1.is_empty() { let batch = batch2.merge_empty(&batch1); - MergeVariant::Complete(Some((batch, None))) + MergeVariant::Complete(Some((batch, input_lengths))) } else if batch2.is_empty() { let batch = batch1.merge_empty(&batch2); - MergeVariant::Complete(Some((batch, None))) + MergeVariant::Complete(Some((batch, input_lengths))) } else { let merger = batch1.begin_merge(&batch2, compaction_frontier); MergeVariant::InProgress(batch1, batch2, merger) @@ -891,7 +901,10 @@ enum MergeVariant { /// Describes an actual in-progress merge between two non-trivial batches. InProgress(B, B, ::Merger), /// A merge that requires no further work. May or may not represent a non-trivial batch. - Complete(Option<(B, Option<(B, B)>)>), + /// + /// In the case of a non-trivial batch that is the result of merging two input batches, the + /// second component contains the lengths of those input batches. + Complete(Option<(B, Option<(usize, usize)>)>), } impl MergeVariant { @@ -900,7 +913,7 @@ impl MergeVariant { /// /// The result is either `None`, for structurally empty batches, /// or a batch and optionally input batches from which it derived. - fn complete(mut self) -> Option<(B, Option<(B, B)>)> { + fn complete(mut self) -> Option<(B, Option<(usize, usize)>)> { let mut fuel = isize::max_value(); self.work(&mut fuel); if let MergeVariant::Complete(batch) = self { batch } @@ -916,7 +929,8 @@ impl MergeVariant { if let MergeVariant::InProgress(b1,b2,mut merge) = variant { merge.work(&b1,&b2,fuel); if *fuel > 0 { - *self = MergeVariant::Complete(Some((merge.done(), Some((b1,b2))))); + let input_lengths = Some((b1.len(), b2.len())); + *self = MergeVariant::Complete(Some((merge.done(), input_lengths))); } else { *self = MergeVariant::InProgress(b1,b2,merge);