Skip to content

Commit

Permalink
spine: fix logging of completed merges
Browse files Browse the repository at this point in the history
This commit makes sure completed merges are logged also for merges that
used the empty batch optimization.

Also, if we log the dropping of a merged batch, we shouldn't forget to
also log the merge event, so consumers don't become confused about the
whereabouts of the input batches.
  • Loading branch information
teskje committed Aug 28, 2023
1 parent 9505a43 commit 301e496
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ where
/// Drops and logs batches. Used in `set_logical_compaction` and drop.
fn drop_batches(&mut self) {
if let Some(logger) = &self.logger {
for batch in self.merging.drain(..) {
for (index, batch) in self.merging.drain(..).enumerate() {
match batch {
MergeState::Single(Some(batch)) => {
logger.log(::logging::DropEvent {
Expand All @@ -361,7 +361,16 @@ where
length: batch2.len(),
});
},
MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => {
MergeState::Double(MergeVariant::Complete(Some((batch, input_lengths)))) => {
if let Some((length1, length2)) = input_lengths {
logger.log(::logging::MergeEvent {
operator: self.operator.global_id,
scale: index,
length1,
length2,
complete: Some(batch.len()),
});
}
logger.log(::logging::DropEvent {
operator: self.operator.global_id,
length: batch.len(),
Expand Down Expand Up @@ -679,15 +688,15 @@ where

/// Completes and extracts what ever is at layer `index`.
fn complete_at(&mut self, index: usize) -> Option<B> {
if let Some((merged, inputs)) = self.merging[index].complete() {
if let Some((input1, input2)) = inputs {
if let Some((merged, input_lengths)) = self.merging[index].complete() {
if let Some((length1, length2)) = input_lengths {
// Log the completion of a merge from existing parts.
self.logger.as_ref().map(|l| l.log(
::logging::MergeEvent {
operator: self.operator.global_id,
scale: index,
length1: input1.len(),
length2: input2.len(),
length1,
length2,
complete: Some(merged.len()),
}
));
Expand Down Expand Up @@ -816,7 +825,7 @@ impl<B: Batch> MergeState<B> where B::Time: Eq {
/// with the `is_complete()` method.
///
/// There is the addional option of input batches.
fn complete(&mut self) -> Option<(B, Option<(B, B)>)> {
fn complete(&mut self) -> Option<(B, Option<(usize, usize)>)> {
match std::mem::replace(self, MergeState::Vacant) {
MergeState::Vacant => None,
MergeState::Single(batch) => batch.map(|b| (b, None)),
Expand Down Expand Up @@ -867,12 +876,13 @@ impl<B: Batch> MergeState<B> where B::Time: Eq {
match (batch1, batch2) {
(Some(batch1), Some(batch2)) => {
assert!(batch1.upper() == batch2.lower());
let input_lengths = Some((batch1.len(), batch2.len()));
if batch1.is_empty() {
let batch = batch2.merge_empty(&batch1);
MergeVariant::Complete(Some((batch, None)))
MergeVariant::Complete(Some((batch, input_lengths)))
} else if batch2.is_empty() {
let batch = batch1.merge_empty(&batch2);
MergeVariant::Complete(Some((batch, None)))
MergeVariant::Complete(Some((batch, input_lengths)))
} else {
let merger = batch1.begin_merge(&batch2, compaction_frontier);
MergeVariant::InProgress(batch1, batch2, merger)
Expand All @@ -891,7 +901,10 @@ enum MergeVariant<B: Batch> {
/// Describes an actual in-progress merge between two non-trivial batches.
InProgress(B, B, <B as Batch>::Merger),
/// A merge that requires no further work. May or may not represent a non-trivial batch.
Complete(Option<(B, Option<(B, B)>)>),
///
/// In the case of a non-trivial batch that is the result of merging two input batches, the
/// second component contains the lengths of those input batches.
Complete(Option<(B, Option<(usize, usize)>)>),
}

impl<B: Batch> MergeVariant<B> {
Expand All @@ -900,7 +913,7 @@ impl<B: Batch> MergeVariant<B> {
///
/// The result is either `None`, for structurally empty batches,
/// or a batch and optionally input batches from which it derived.
fn complete(mut self) -> Option<(B, Option<(B, B)>)> {
fn complete(mut self) -> Option<(B, Option<(usize, usize)>)> {
let mut fuel = isize::max_value();
self.work(&mut fuel);
if let MergeVariant::Complete(batch) = self { batch }
Expand All @@ -916,7 +929,8 @@ impl<B: Batch> MergeVariant<B> {
if let MergeVariant::InProgress(b1,b2,mut merge) = variant {
merge.work(&b1,&b2,fuel);
if *fuel > 0 {
*self = MergeVariant::Complete(Some((merge.done(), Some((b1,b2)))));
let input_lengths = Some((b1.len(), b2.len()));
*self = MergeVariant::Complete(Some((merge.done(), input_lengths)));
}
else {
*self = MergeVariant::InProgress(b1,b2,merge);
Expand Down

0 comments on commit 301e496

Please sign in to comment.