From 487b7383323828473e0eddee226be17d723f2931 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 17 Nov 2023 15:22:01 -0500 Subject: [PATCH] ColKeySpine with TimelyStack in the merge batcher Signed-off-by: Moritz Hoffmann --- src/trace/implementations/ord.rs | 66 ++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 20 deletions(-) diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 6187b8ba9..331a4c305 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -118,15 +118,15 @@ pub type OrdValSpine = Spine = Spine, Vec<((K,V),T,R)>>, Vec>>>; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine = Spine>>>; +pub type OrdKeySpine = Spine, Vec<((K,()),T,R)>>>>; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom = Spine>, Vec>>>; +pub type OrdKeySpineAbom = Spine, Vec<((K,()),T,R)>>, Vec>>>; /// A trace implementation backed by columnar storage. pub type ColValSpine = Spine, TimelyStack<((K,V),T,R)>>>>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine>>>; +pub type ColKeySpine = Spine, TimelyStack<((K,()),T,R)>>>>; /// A container that can retain/discard from some offset onward. @@ -484,20 +484,22 @@ where /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Abomonation)] -pub struct OrdKeyBatch { +pub struct OrdKeyBatch { /// Where all the dataz is. pub layer: KTDLayer, /// Description of the update times this layer represents. pub desc: Description<::Time>, + /// Phantom data + pub phantom: PhantomData, } -impl BatchReader for OrdKeyBatch { +impl BatchReader for OrdKeyBatch { type Key = ::Key; type Val = (); type Time = ::Time; type R = ::Diff; - type Cursor = OrdKeyCursor; + type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { valid: true, @@ -509,7 +511,7 @@ impl BatchReader for OrdKeyBatch { fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdKeyBatch { +impl Batch for OrdKeyBatch> { type Batcher = MergeBatcher; type Builder = OrdKeyBuilder; type Merger = OrdKeyMerger; @@ -519,7 +521,23 @@ impl Batch for OrdKeyBatch { } } -impl OrdKeyBatch { +impl Batch for OrdKeyBatch> +where + ::Target: Columnation + 'static, + Self::Key: Columnation + 'static, + Self::Time: Columnation + 'static, + Self::R: Columnation + 'static, +{ + type Batcher = ColumnatedMergeBatcher; + type Builder = OrdKeyBuilder; + type Merger = OrdKeyMerger; + + fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { + OrdKeyMerger::new(self, other, compaction_frontier) + } +} + +impl OrdKeyBatch { fn advance_builder_from(layer: &mut KTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; @@ -596,8 +614,11 @@ pub struct OrdKeyMerger { should_compact: bool, } -impl Merger> for OrdKeyMerger { - fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option::Time>>) -> Self { +impl Merger> for OrdKeyMerger +where + OrdKeyBatch: Batch::Time> +{ + fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option::Time>>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -618,7 +639,7 @@ impl Merger> for OrdKeyMerger { should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdKeyBatch { + fn done(self) -> OrdKeyBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -626,9 +647,10 @@ impl Merger> for OrdKeyMerger { OrdKeyBatch { layer: self.result.done(), desc: self.description, + phantom: PhantomData, } } - fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.len(); let mut effort = 0isize; @@ -673,7 +695,7 @@ impl Merger> for OrdKeyMerger { // if we are supplied a frontier, we should compact. if self.should_compact { - OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -687,19 +709,19 @@ impl Merger> for OrdKeyMerger { /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdKeyCursor { +pub struct OrdKeyCursor { valid: bool, cursor: OrderedCursor::Time, ::Diff>>, - phantom: PhantomData, + phantom: PhantomData<(L, C)>, } -impl Cursor for OrdKeyCursor { +impl Cursor for OrdKeyCursor { type Key = ::Key; type Val = (); type Time = ::Time; type R = ::Diff; - type Storage = OrdKeyBatch; + type Storage = OrdKeyBatch; fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } @@ -726,7 +748,10 @@ pub struct OrdKeyBuilder { builder: KTDBuilder, } -impl Builder> for OrdKeyBuilder { +impl Builder> for OrdKeyBuilder +where + OrdKeyBatch: Batch::Key, Val=(), Time=::Time, R=::Diff> +{ fn new() -> Self { OrdKeyBuilder { @@ -746,10 +771,11 @@ impl Builder> for OrdKeyBuilder { } #[inline(never)] - fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdKeyBatch { + fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdKeyBatch { OrdKeyBatch { layer: self.builder.done(), - desc: Description::new(lower, upper, since) + desc: Description::new(lower, upper, since), + phantom: PhantomData, } } }