From 42e032d5de5a8f7e2e572df926d3ac27a68192fa Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 20 Nov 2023 10:15:24 -0500 Subject: [PATCH] Quality of life updates: non-optional compaction, and `Builder::copy`. Co-authored-by: Moritz Hoffmann --- .../implementations/merge_batcher_col.rs | 2 +- src/trace/implementations/ord.rs | 41 +++++++++---------- src/trace/implementations/ord_neu.rs | 35 +++++++++++++--- src/trace/implementations/spine_fueled.rs | 4 +- src/trace/mod.rs | 19 ++++++--- 5 files changed, 66 insertions(+), 35 deletions(-) diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 8aab38528..9cc82fb38 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -84,7 +84,7 @@ impl Batcher for ColumnatedMergeBatcher keep.copy(datum); } else { - builder.push((key.clone(), val.clone(), time.clone(), diff.clone())); + builder.copy((key, val, time, diff)); } } // Recycling buffer. diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 0866b5a34..28272588a 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -98,7 +98,7 @@ impl Batch for OrdValBatch> type Builder = OrdValBuilder; type Merger = OrdValMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { OrdValMerger::new(self, other, compaction_frontier) } } @@ -115,7 +115,7 @@ where type Builder = OrdValBuilder; type Merger = OrdValMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { OrdValMerger::new(self, other, compaction_frontier) } } @@ -220,21 +220,18 @@ pub struct OrdValMerger { // result that we are currently assembling. result: as Trie>::MergeBuilder, description: Description<::Time>, - should_compact: bool, } impl Merger> for OrdValMerger where OrdValBatch: Batch::Time> { - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option as BatchReader>::Time>>) -> Self { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef< as BatchReader>::Time>) -> Self { assert!(batch1.upper() == batch2.lower()); let mut since = batch1.description().since().join(batch2.description().since()); - if let Some(compaction_frontier) = compaction_frontier { - since = since.join(&compaction_frontier.to_owned()); - } + since = since.join(&compaction_frontier.to_owned()); let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since); @@ -245,7 +242,6 @@ where upper2: batch2.layer.keys(), result: < as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), description: description, - should_compact: compaction_frontier.is_some(), } } fn done(self) -> OrdValBatch { @@ -297,9 +293,7 @@ where effort = (self.result.vals.vals.vals.len() - starting_updates) as isize; // if we are supplied a frontier, we should compact. - if self.should_compact { - OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); - } + OrdValBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); *fuel -= effort; @@ -369,6 +363,10 @@ where self.builder.push_tuple((key, (val, (time, diff)))); } + fn copy(&mut self, (key, val, time, diff): (& as BatchReader>::Key, & as BatchReader>::Val, & as BatchReader>::Time, & as BatchReader>::R)) { + self.builder.push_tuple((key.clone(), (val.clone(), (time.clone(), diff.clone())))); + } + #[inline(never)] fn done(self, lower: Antichain< as BatchReader>::Time>, upper: Antichain< as BatchReader>::Time>, since: Antichain< as BatchReader>::Time>) -> OrdValBatch { OrdValBatch { @@ -416,7 +414,7 @@ impl Batch for OrdKeyBatch> { type Builder = OrdKeyBuilder; type Merger = OrdKeyMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { OrdKeyMerger::new(self, other, compaction_frontier) } } @@ -432,7 +430,7 @@ where type Builder = OrdKeyBuilder; type Merger = OrdKeyMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { OrdKeyMerger::new(self, other, compaction_frontier) } } @@ -511,21 +509,18 @@ pub struct OrdKeyMerger { // result that we are currently assembling. result: as Trie>::MergeBuilder, description: Description<::Time>, - should_compact: bool, } impl Merger> for OrdKeyMerger where OrdKeyBatch: Batch::Time> { - fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option::Time>>) -> Self { + fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: AntichainRef<::Time>) -> Self { assert!(batch1.upper() == batch2.lower()); let mut since = batch1.description().since().join(batch2.description().since()); - if let Some(compaction_frontier) = compaction_frontier { - since = since.join(&compaction_frontier.to_owned()); - } + since = since.join(&compaction_frontier.to_owned()); let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since); @@ -536,7 +531,6 @@ where upper2: batch2.layer.keys(), result: < as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), description: description, - should_compact: compaction_frontier.is_some(), } } fn done(self) -> OrdKeyBatch { @@ -594,9 +588,7 @@ where effort = (self.result.vals.vals.len() - starting_updates) as isize; // if we are supplied a frontier, we should compact. - if self.should_compact { - OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); - } + OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); *fuel -= effort; @@ -670,6 +662,11 @@ where self.builder.push_tuple((key, (time, diff))); } + #[inline] + fn copy(&mut self, (key, _, time, diff): (&::Key, &(), &::Time, &::Diff)) { + self.builder.push_tuple((key.clone(), (time.clone(), diff.clone()))); + } + #[inline(never)] fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdKeyBatch { OrdKeyBatch { diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 8aac140bc..96f27a753 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -109,7 +109,7 @@ mod val_batch { type Builder = OrdValBuilder; type Merger = OrdValMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { OrdValMerger::new(self, other, compaction_frontier) } } @@ -133,14 +133,13 @@ mod val_batch { } impl Merger> for OrdValMerger { - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: Option::Time>>) -> Self { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { assert!(batch1.upper() == batch2.lower()); use lattice::Lattice; let mut since = batch1.description().since().join(batch2.description().since()); - if let Some(compaction_frontier) = compaction_frontier { - since = since.join(&compaction_frontier.to_owned()); - } + since = since.join(&compaction_frontier.to_owned()); + let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since); let batch1 = &batch1.storage; @@ -470,6 +469,32 @@ mod val_batch { } } + #[inline] + fn copy(&mut self, (key, val, time, diff): (&::Key, &::Val, &::Time, &::Diff)) { + + // Perhaps this is a continuation of an already received key. + if self.result.keys.last() == Some(key) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last() == Some(val) { + // TODO: here we could look for repetition, and not push the update in that case. + // More logic (and state) would be required to correctly wrangle this. + self.result.updates.push((time.clone(), diff.clone())); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); + self.result.updates.push((time.clone(), diff.clone())); + self.result.vals.copy(val); + } + } else { + // New key; complete representation of prior key. + self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap()); + self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap()); + self.result.updates.push((time.clone(), diff.clone())); + self.result.vals.copy(val); + self.result.keys.copy(key); + } + } + #[inline(never)] fn done(mut self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdValBatch { // Record the final offsets diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 33edd4d79..f1c5f538b 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -675,7 +675,7 @@ where complete: None, } )); - let compaction_frontier = Some(self.logical_frontier.borrow()); + let compaction_frontier = self.logical_frontier.borrow(); self.merging[index] = MergeState::begin_merge(old, batch, compaction_frontier); } MergeState::Double(_) => { @@ -869,7 +869,7 @@ impl MergeState where B::Time: Eq { /// empty batch whose upper and lower froniers are equal. This /// option exists purely for bookkeeping purposes, and no computation /// is performed to merge the two batches. - fn begin_merge(batch1: Option, batch2: Option, compaction_frontier: Option>) -> MergeState { + fn begin_merge(batch1: Option, batch2: Option, compaction_frontier: AntichainRef) -> MergeState { let variant = match (batch1, batch2) { (Some(batch1), Some(batch2)) => { diff --git a/src/trace/mod.rs b/src/trace/mod.rs index d9c5e3ca3..46e70add5 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -288,7 +288,7 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized { /// The result of this method can be exercised to eventually produce the same result /// that a call to `self.merge(other)` would produce, but it can be done in a measured /// fashion. This can help to avoid latency spikes where a large merge needs to happen. - fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef) -> Self::Merger { Self::Merger::new(self, other, compaction_frontier) } /// Creates an empty batch with the stated bounds. @@ -316,7 +316,14 @@ pub trait Builder { /// Allocates an empty builder with some capacity. fn with_capacity(cap: usize) -> Self; /// Adds an element to the batch. - fn push(&mut self, element: (Output::Key, Output::Val, Output::Time, Output::R)); + /// + /// The default implementation uses `self.copy` with references to the owned arguments. + /// One should override it if the builder can take advantage of owned arguments. + fn push(&mut self, element: (Output::Key, Output::Val, Output::Time, Output::R)) { + self.copy((&element.0, &element.1, &element.2, &element.3)); + } + /// Adds an element to the batch. + fn copy(&mut self, element: (&Output::Key, &Output::Val, &Output::Time, &Output::R)); /// Adds an ordered sequence of elements to the batch. fn extend>(&mut self, iter: I) { for item in iter { self.push(item); } @@ -329,7 +336,7 @@ pub trait Builder { pub trait Merger { /// Creates a new merger to merge the supplied batches, optionally compacting /// up to the supplied frontier. - fn new(source1: &Output, source2: &Output, compaction_frontier: Option>) -> Self; + fn new(source1: &Output, source2: &Output, compaction_frontier: AntichainRef) -> Self; /// Perform some amount of work, decrementing `fuel`. /// /// If `fuel` is non-zero after the call, the merging is complete and @@ -441,6 +448,7 @@ pub mod rc_blanket_impls { fn new() -> Self { RcBuilder { builder: >::new() } } fn with_capacity(cap: usize) -> Self { RcBuilder { builder: >::with_capacity(cap) } } fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) } + fn copy(&mut self, element: (&B::Key, &B::Val, &B::Time, &B::R)) { self.builder.copy(element) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } } @@ -449,7 +457,7 @@ pub mod rc_blanket_impls { /// Represents a merge in progress. impl Merger> for RcMerger { - fn new(source1: &Rc, source2: &Rc, compaction_frontier: Option>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } + fn new(source1: &Rc, source2: &Rc, compaction_frontier: AntichainRef) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } fn work(&mut self, source1: &Rc, source2: &Rc, fuel: &mut isize) { self.merger.work(source1, source2, fuel) } fn done(self) -> Rc { Rc::new(self.merger.done()) } } @@ -562,6 +570,7 @@ pub mod abomonated_blanket_impls { fn new() -> Self { AbomonatedBuilder { builder: >::new() } } fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: >::with_capacity(cap) } } fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) } + fn copy(&mut self, element: (&B::Key, &B::Val, &B::Time, &B::R)) { self.builder.copy(element) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Abomonated> { let batch = self.builder.done(lower, upper, since); let mut bytes = Vec::with_capacity(measure(&batch)); @@ -575,7 +584,7 @@ pub mod abomonated_blanket_impls { /// Represents a merge in progress. impl Merger>> for AbomonatedMerger { - fn new(source1: &Abomonated>, source2: &Abomonated>, compaction_frontier: Option>) -> Self { + fn new(source1: &Abomonated>, source2: &Abomonated>, compaction_frontier: AntichainRef) -> Self { AbomonatedMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } } fn work(&mut self, source1: &Abomonated>, source2: &Abomonated>, fuel: &mut isize) {