From 304ed9d0408148ecccc0256774c0250e639f88a9 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Sat, 23 Dec 2023 10:34:13 -0500 Subject: [PATCH] Let Layout specify OffsetContainer (#449) * Let Layout specify OffsetContainer Signed-off-by: Moritz Hoffmann * Generalize ReadItem type --------- Signed-off-by: Moritz Hoffmann Co-authored-by: Frank McSherry --- src/trace/implementations/mod.rs | 91 +++++++++++++++++++++++++++- src/trace/implementations/ord_neu.rs | 42 +++++++------ src/trace/implementations/rhh.rs | 31 +++++----- 3 files changed, 130 insertions(+), 34 deletions(-) diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index e4935de0b..2ef41901b 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -54,6 +54,7 @@ pub use self::ord_neu::OrdValSpine as ValSpine; pub use self::ord_neu::OrdKeySpine as KeySpine; use std::borrow::{ToOwned}; +use std::cmp::Ordering; use timely::container::columnation::{Columnation, TimelyStack}; use crate::lattice::Lattice; @@ -97,6 +98,8 @@ pub trait Layout { /// Container for update vals. type UpdContainer: for<'a> BatchContainer::Time, ::Diff), ReadItem<'a> = &'a (::Time, ::Diff)>; + /// Container for offsets. + type OffsetContainer: BatchContainer; } /// A layout that uses vectors @@ -113,6 +116,7 @@ where type KeyContainer = Vec; type ValContainer = Vec; type UpdContainer = Vec<(U::Time, U::Diff)>; + type OffsetContainer = OffsetList; } /// A layout based on timely stacks @@ -131,6 +135,7 @@ where type KeyContainer = TimelyStack; type ValContainer = TimelyStack; type UpdContainer = TimelyStack<(U::Time, U::Diff)>; + type OffsetContainer = OffsetList; } /// A type with a preferred container. @@ -183,10 +188,13 @@ where type KeyContainer = K::Container; type ValContainer = V::Container; type UpdContainer = Vec<(T, D)>; + type OffsetContainer = OffsetList; } use std::convert::TryInto; +use std::ops::Deref; use abomonation_derive::Abomonation; +use crate::trace::cursor::MyTrait; /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)] @@ -234,6 +242,87 @@ impl OffsetList { } } +/// Helper struct to provide `MyTrait` for `Copy` types. +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] +pub struct Wrapper(T); + +impl Deref for Wrapper { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Copy + Ord> MyTrait<'a> for Wrapper { + type Owned = T; + + fn into_owned(self) -> Self::Owned { + self.0 + } + + fn clone_onto(&self, other: &mut Self::Owned) { + *other = self.0; + } + + fn compare(&self, other: &Self::Owned) -> Ordering { + self.0.cmp(other) + } + + fn borrow_as(other: &'a Self::Owned) -> Self { + Self(*other) + } +} + +impl BatchContainer for OffsetList { + type PushItem = usize; + type ReadItem<'a> = Wrapper; + + fn push(&mut self, item: Self::PushItem) { + self.push(item); + } + + fn copy_push(&mut self, item: &Self::PushItem) { + self.push(*item); + } + + fn copy(&mut self, item: Self::ReadItem<'_>) { + self.push(item.0); + } + + fn copy_slice(&mut self, slice: &[Self::PushItem]) { + for index in slice { + self.push(*index); + } + } + + fn copy_range(&mut self, other: &Self, start: usize, end: usize) { + for offset in start..end { + self.push(other.index(offset)); + } + } + + fn with_capacity(size: usize) -> Self { + Self::with_capacity(size) + } + + fn reserve(&mut self, _additional: usize) { + // Nop + } + + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + Self::with_capacity(cont1.len() + cont2.len()) + } + + fn index(&self, index: usize) -> Self::ReadItem<'_> { + Wrapper(self.index(index)) + } + + fn len(&self) -> usize { + self.len() + } +} + pub use self::containers::{BatchContainer, SliceContainer, SliceContainer2}; /// Containers for data that resemble `Vec`, with leaner implementations. @@ -245,7 +334,7 @@ pub mod containers { use crate::trace::MyTrait; /// A general-purpose container resembling `Vec`. - pub trait BatchContainer: Default + 'static { + pub trait BatchContainer: 'static { /// The type of contained item. /// /// The container only supplies references to the item, so it needn't be sized. diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index ae8ff69bd..bf4dd9544 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -70,7 +70,7 @@ mod val_batch { use timely::progress::{Antichain, frontier::AntichainRef}; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::{BatchContainer, OffsetList}; + use crate::trace::implementations::BatchContainer; use crate::trace::cursor::MyTrait; use super::{Layout, Update}; @@ -83,7 +83,7 @@ mod val_batch { /// Offsets used to provide indexes from keys to values. /// /// The length of this list is one longer than `keys`, so that we can avoid bounds logic. - pub keys_offs: OffsetList, + pub keys_offs: L::OffsetContainer, /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`. pub vals: L::ValContainer, /// Offsets used to provide indexes from values to updates. @@ -94,7 +94,7 @@ mod val_batch { /// single common update values (e.g. in a snapshot, the minimal time and a diff of one). /// /// The length of this list is one longer than `vals`, so that we can avoid bounds logic. - pub vals_offs: OffsetList, + pub vals_offs: L::OffsetContainer, /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`. pub updates: L::UpdContainer, } @@ -102,12 +102,12 @@ mod val_batch { impl OrdValStorage { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { - (self.keys_offs.index(index), self.keys_offs.index(index+1)) + (self.keys_offs.index(index).into_owned(), self.keys_offs.index(index+1).into_owned()) } /// Lower and upper bounds in `self.updates` corresponding to the value at `index`. fn updates_for_value(&self, index: usize) -> (usize, usize) { - let mut lower = self.vals_offs.index(index); - let upper = self.vals_offs.index(index+1); + let mut lower = self.vals_offs.index(index).into_owned(); + let upper = self.vals_offs.index(index+1).into_owned(); // We use equal lower and upper to encode "singleton update; just before here". // It should only apply when there is a prior element, so `lower` should be greater than zero. if lower == upper { @@ -206,14 +206,17 @@ mod val_batch { let mut storage = OrdValStorage { keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys), - keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), + keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals), - vals_offs: OffsetList::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), + vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates), }; - storage.keys_offs.push(0); - storage.vals_offs.push(0); + // Mark explicit types because type inference fails to resolve it. + let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; + keys_offs.push(0); + let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs; + vals_offs.push(0); OrdValMerger { key_cursor1: 0, @@ -546,9 +549,9 @@ mod val_batch { Self { result: OrdValStorage { keys: L::KeyContainer::with_capacity(keys), - keys_offs: OffsetList::with_capacity(keys + 1), + keys_offs: L::OffsetContainer::with_capacity(keys + 1), vals: L::ValContainer::with_capacity(vals), - vals_offs: OffsetList::with_capacity(vals + 1), + vals_offs: L::OffsetContainer::with_capacity(vals + 1), updates: L::UpdContainer::with_capacity(upds), }, singleton: None, @@ -636,7 +639,7 @@ mod key_batch { use timely::progress::{Antichain, frontier::AntichainRef}; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::{BatchContainer, OffsetList}; + use crate::trace::implementations::BatchContainer; use crate::trace::cursor::MyTrait; use super::{Layout, Update}; @@ -654,7 +657,7 @@ mod key_batch { /// single common update values (e.g. in a snapshot, the minimal time and a diff of one). /// /// The length of this list is one longer than `keys`, so that we can avoid bounds logic. - pub keys_offs: OffsetList, + pub keys_offs: L::OffsetContainer, /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`. pub updates: L::UpdContainer, } @@ -662,8 +665,8 @@ mod key_batch { impl OrdKeyStorage { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn updates_for_key(&self, index: usize) -> (usize, usize) { - let mut lower = self.keys_offs.index(index); - let upper = self.keys_offs.index(index+1); + let mut lower = self.keys_offs.index(index).into_owned(); + let upper = self.keys_offs.index(index+1).into_owned(); // We use equal lower and upper to encode "singleton update; just before here". // It should only apply when there is a prior element, so `lower` should be greater than zero. if lower == upper { @@ -763,11 +766,12 @@ mod key_batch { let mut storage = OrdKeyStorage { keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys), - keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), + keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates), }; - storage.keys_offs.push(0); + let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; + keys_offs.push(0); OrdKeyMerger { key_cursor1: 0, @@ -1011,7 +1015,7 @@ mod key_batch { Self { result: OrdKeyStorage { keys: L::KeyContainer::with_capacity(keys), - keys_offs: OffsetList::with_capacity(keys + 1), + keys_offs: L::OffsetContainer::with_capacity(keys + 1), updates: L::UpdContainer::with_capacity(upds), }, singleton: None, diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index aa54fe9ba..94ed3b95b 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -84,7 +84,7 @@ mod val_batch { use crate::hashable::Hashable; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::{BatchContainer, OffsetList}; + use crate::trace::implementations::BatchContainer; use crate::trace::cursor::MyTrait; use super::{Layout, Update, HashOrdered}; @@ -122,7 +122,7 @@ mod val_batch { /// Offsets used to provide indexes from keys to values. /// /// The length of this list is one longer than `keys`, so that we can avoid bounds logic. - pub keys_offs: OffsetList, + pub keys_offs: L::OffsetContainer, /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`. pub vals: L::ValContainer, /// Offsets used to provide indexes from values to updates. @@ -133,7 +133,7 @@ mod val_batch { /// single common update values (e.g. in a snapshot, the minimal time and a diff of one). /// /// The length of this list is one longer than `vals`, so that we can avoid bounds logic. - pub vals_offs: OffsetList, + pub vals_offs: L::OffsetContainer, /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`. pub updates: L::UpdContainer, } @@ -144,16 +144,16 @@ mod val_batch { { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { - let lower = self.keys_offs.index(index); - let upper = self.keys_offs.index(index+1); + let lower = self.keys_offs.index(index).into_owned(); + let upper = self.keys_offs.index(index+1).into_owned(); // Looking up values for an invalid key indicates something is wrong. assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index); (lower, upper) } /// Lower and upper bounds in `self.updates` corresponding to the value at `index`. fn updates_for_value(&self, index: usize) -> (usize, usize) { - let mut lower = self.vals_offs.index(index); - let upper = self.vals_offs.index(index+1); + let mut lower = self.vals_offs.index(index).into_owned(); + let upper = self.vals_offs.index(index+1).into_owned(); // We use equal lower and upper to encode "singleton update; just before here". // It should only apply when there is a prior element, so `lower` should be greater than zero. if lower == upper { @@ -178,7 +178,7 @@ mod val_batch { // push additional blank entries in. while self.keys.len() < desired { // We insert a default (dummy) key and repeat the offset to indicate this. - let current_offset = self.keys_offs.index(self.keys.len()); + let current_offset = self.keys_offs.index(self.keys.len()).into_owned(); self.keys.push(Default::default()); self.keys_offs.push(current_offset); } @@ -339,17 +339,20 @@ mod val_batch { let mut storage = RhhValStorage { keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys), - keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), + keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals), - vals_offs: OffsetList::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), + vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates), key_count: 0, key_capacity: rhh_cap, divisor: RhhValStorage::::divisor_for_capacity(rhh_cap), }; - storage.keys_offs.push(0); - storage.vals_offs.push(0); + // Mark explicit types because type inference fails to resolve it. + let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; + keys_offs.push(0); + let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs; + vals_offs.push(0); RhhValMerger { key_cursor1: 0, @@ -746,9 +749,9 @@ mod val_batch { Self { result: RhhValStorage { keys: L::KeyContainer::with_capacity(keys), - keys_offs: OffsetList::with_capacity(keys + 1), + keys_offs: L::OffsetContainer::with_capacity(keys + 1), vals: L::ValContainer::with_capacity(vals), - vals_offs: OffsetList::with_capacity(vals + 1), + vals_offs: L::OffsetContainer::with_capacity(vals + 1), updates: L::UpdContainer::with_capacity(upds), key_count: 0, key_capacity: rhh_capacity,