diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 1f3923bc1..1b3bad7df 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -53,8 +53,6 @@ pub mod rhh; pub use self::ord::OrdValSpine as ValSpine; pub use self::ord::OrdKeySpine as KeySpine; -use std::ops::{Add, Sub}; -use std::convert::{TryInto, TryFrom}; use std::borrow::{Borrow, ToOwned}; use timely::container::columnation::{Columnation, TimelyStack}; @@ -96,10 +94,6 @@ where pub trait Layout { /// The represented update. type Target: Update + ?Sized; - /// Offsets to use from keys into vals. - type KeyOffset: OrdOffset; - /// Offsets to use from vals into updates. - type ValOffset: OrdOffset; /// Container for update keys. type KeyContainer: RetainFrom<::Key>+ @@ -114,29 +108,27 @@ pub trait Layout { } /// A layout that uses vectors -pub struct Vector { - phantom: std::marker::PhantomData<(U, O)>, +pub struct Vector { + phantom: std::marker::PhantomData, } -impl Layout for Vector +impl Layout for Vector where U::Key: ToOwned + Sized + Clone, U::Val: ToOwned + Sized + Clone, { type Target = U; - type KeyOffset = O; - type ValOffset = O; type KeyContainer = Vec; type ValContainer = Vec; type UpdContainer = Vec<(U::Time, U::Diff)>; } /// A layout based on timely stacks -pub struct TStack { - phantom: std::marker::PhantomData<(U, O)>, +pub struct TStack { + phantom: std::marker::PhantomData, } -impl Layout for TStack +impl Layout for TStack where U::Key: Columnation + ToOwned, U::Val: Columnation + ToOwned, @@ -144,8 +136,6 @@ where U::Diff: Columnation, { type Target = U; - type KeyOffset = O; - type ValOffset = O; type KeyContainer = TimelyStack; type ValContainer = TimelyStack; type UpdContainer = TimelyStack<(U::Time, U::Diff)>; @@ -168,11 +158,11 @@ impl PreferredContainer for [T] { } /// An update and layout description based on preferred containers. -pub struct Preferred { - phantom: std::marker::PhantomData<(Box, Box, T, D, O)>, +pub struct Preferred { + phantom: std::marker::PhantomData<(Box, Box, T, D)>, } -impl Update for Preferred +impl Update for Preferred where K: Ord+ToOwned + ?Sized, K::Owned: Ord+Clone, @@ -180,7 +170,6 @@ where V::Owned: Ord+Clone, T: Ord+Lattice+timely::progress::Timestamp+Clone, R: Semigroup+Clone, - O: OrdOffset, { type Key = K; type KeyOwned = K::Owned; @@ -190,7 +179,7 @@ where type Diff = R; } -impl Layout for Preferred +impl Layout for Preferred where K: Ord+ToOwned+PreferredContainer + ?Sized, K::Owned: Ord+Clone, @@ -198,11 +187,8 @@ where V::Owned: Ord+Clone, T: Ord+Lattice+timely::progress::Timestamp+Clone, D: Semigroup+Clone, - O: OrdOffset, { - type Target = Preferred; - type KeyOffset = O; - type ValOffset = O; + type Target = Preferred; type KeyContainer = K::Container; type ValContainer = V::Container; type UpdContainer = Vec<(T, D)>; @@ -239,16 +225,90 @@ impl RetainFrom for TimelyStack { } } -/// Trait for types used as offsets into an ordered layer. -/// This is usually `usize`, but `u32` can also be used in applications -/// where huge batches do not occur to reduce metadata size. -pub trait OrdOffset: Copy + PartialEq + Add + Sub + TryFrom + TryInto -{} +use std::convert::TryInto; -impl OrdOffset for O -where - O: Copy + PartialEq + Add + Sub + TryFrom + TryInto, -{} +/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)] +pub struct OffsetList { + /// Offsets that fit within a `u32`. + smol: Vec, + /// Offsets that either do not fit in a `u32`, or are inserted after some offset that did not fit. + chonk: Vec, +} + +impl OffsetList { + /// Inserts the offset, as a `u32` if that is still on the table. + pub fn push(&mut self, offset: usize) { + if self.chonk.is_empty() { + if let Ok(smol) = offset.try_into() { + self.smol.push(smol); + } + else { + self.chonk.push(offset.try_into().unwrap()) + } + } + else { + self.chonk.push(offset.try_into().unwrap()) + } + } + /// Like `std::ops::Index`, which we cannot implement as it must return a `&usize`. + pub fn index(&self, index: usize) -> usize { + if index < self.smol.len() { + self.smol[index].try_into().unwrap() + } + else { + self.chonk[index - self.smol.len()].try_into().unwrap() + } + } + /// Set the offset at location index. + /// + /// Complicated if `offset` does not fit into `self.smol`. + pub fn set(&mut self, index: usize, offset: usize) { + if index < self.smol.len() { + if let Ok(off) = offset.try_into() { + self.smol[index] = off; + } + else { + // Move all `smol` elements from `index` onward to the front of `chonk`. + self.chonk.splice(0..0, self.smol.drain(index ..).map(|x| x.try_into().unwrap())); + self.chonk[index - self.smol.len()] = offset.try_into().unwrap(); + } + } + else { + self.chonk[index - self.smol.len()] = offset.try_into().unwrap(); + } + } + /// The last element in the list of offsets, if non-empty. + pub fn last(&self) -> Option { + if self.chonk.is_empty() { + self.smol.last().map(|x| (*x).try_into().unwrap()) + } + else { + self.chonk.last().map(|x| (*x).try_into().unwrap()) + } + } + /// THe number of offsets in the list. + pub fn len(&self) -> usize { + self.smol.len() + self.chonk.len() + } + /// Allocate a new list with a specified capacity. + pub fn with_capacity(cap: usize) -> Self { + Self { + smol: Vec::with_capacity(cap), + chonk: Vec::new(), + } + } + /// Trim all elements at index `length` and greater. + pub fn truncate(&mut self, length: usize) { + if length > self.smol.len() { + self.chonk.truncate(length - self.smol.len()); + } + else { + assert!(self.chonk.is_empty()); + self.smol.truncate(length); + } + } +} pub use self::containers::{BatchContainer, SliceContainer}; diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index f63b58a14..2db42f9fd 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -9,7 +9,6 @@ //! and should consume fewer resources (computation and memory) when it applies. use std::rc::Rc; -use std::convert::{TryFrom, TryInto}; use std::fmt::Debug; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -40,44 +39,44 @@ use trace::rc_blanket_impls::RcBuilder; use trace::abomonated_blanket_impls::AbomonatedBuilder; /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine< - Rc>>, +pub type OrdValSpine = Spine< + Rc>>, MergeBatcher, - RcBuilder>>, + RcBuilder>>, >; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine< - Rc>, Vec>>, +pub type OrdValSpineAbom = Spine< + Rc>, Vec>>, MergeBatcher, - AbomonatedBuilder>>, + AbomonatedBuilder>>, >; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine = Spine< - Rc>>, +pub type OrdKeySpine = Spine< + Rc>>, MergeBatcher, - RcBuilder>>, + RcBuilder>>, >; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom = Spine< - Rc>, Vec>>, +pub type OrdKeySpineAbom = Spine< + Rc>, Vec>>, MergeBatcher, - AbomonatedBuilder>>, + AbomonatedBuilder>>, >; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine< - Rc>>, +pub type ColValSpine = Spine< + Rc>>, ColumnatedMergeBatcher, - RcBuilder>>, + RcBuilder>>, >; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine< - Rc>>, +pub type ColKeySpine = Spine< + Rc>>, ColumnatedMergeBatcher, - RcBuilder>>, + RcBuilder>>, >; /// An immutable collection of update tuples, from a contiguous interval of logical times. @@ -98,13 +97,13 @@ where // Type aliases to make certain types readable. type TDLayer = OrderedLeaf<<::Target as Update>::Time, <::Target as Update>::Diff>; -type VTDLayer = OrderedLayer<<::Target as Update>::Val, TDLayer, ::ValOffset, ::ValContainer>; -type KTDLayer = OrderedLayer<<::Target as Update>::Key, TDLayer, ::KeyOffset, ::KeyContainer>; -type KVTDLayer = OrderedLayer<<::Target as Update>::Key, VTDLayer, ::KeyOffset, ::KeyContainer>; +type VTDLayer = OrderedLayer<<::Target as Update>::Val, TDLayer, ::ValContainer>; +type KTDLayer = OrderedLayer<<::Target as Update>::Key, TDLayer, ::KeyContainer>; +type KVTDLayer = OrderedLayer<<::Target as Update>::Key, VTDLayer, ::KeyContainer>; type TDBuilder = OrderedLeafBuilder<<::Target as Update>::Time, <::Target as Update>::Diff>; -type VTDBuilder = OrderedBuilder<<::Target as Update>::Val, TDBuilder, ::ValOffset, ::ValContainer>; -type KTDBuilder = OrderedBuilder<<::Target as Update>::Key, TDBuilder, ::KeyOffset, ::KeyContainer>; -type KVTDBuilder = OrderedBuilder<<::Target as Update>::Key, VTDBuilder, ::KeyOffset, ::KeyContainer>; +type VTDBuilder = OrderedBuilder<<::Target as Update>::Val, TDBuilder, ::ValContainer>; +type KTDBuilder = OrderedBuilder<<::Target as Update>::Key, TDBuilder, ::KeyContainer>; +type KVTDBuilder = OrderedBuilder<<::Target as Update>::Key, VTDBuilder, ::KeyContainer>; impl BatchReader for OrdValBatch where @@ -143,8 +142,8 @@ where fn advance_builder_from(layer: &mut KVTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; - let val_start: usize = layer.offs[key_pos].try_into().ok().unwrap(); - let time_start: usize = layer.vals.offs[val_start].try_into().ok().unwrap(); + let val_start: usize = layer.offs.index(key_pos); + let time_start: usize = layer.vals.offs.index(val_start); // We have unique ownership of the batch, and can advance times in place. // We must still sort, collapse, and remove empty updates. @@ -167,10 +166,10 @@ where // we will change batch.layer.vals.offs[i] in this iteration, from `write_position`'s // initial value. - let lower: usize = layer.vals.offs[i].try_into().ok().unwrap(); - let upper: usize = layer.vals.offs[i+1].try_into().ok().unwrap(); + let lower: usize = layer.vals.offs.index(i); + let upper: usize = layer.vals.offs.index(i+1); - layer.vals.offs[i] = L::ValOffset::try_from(write_position).ok().unwrap(); + layer.vals.offs.set(i, write_position); let updates = &mut layer.vals.vals.vals[..]; @@ -183,7 +182,7 @@ where } } layer.vals.vals.vals.truncate(write_position); - layer.vals.offs[layer.vals.keys.len()] = L::ValOffset::try_from(write_position).ok().unwrap(); + layer.vals.offs.set(layer.vals.keys.len(), write_position); // 3. Remove values with empty histories. In addition, we need to update offsets // in `layer.offs` to correctly reference the potentially moved values. @@ -193,14 +192,14 @@ where let keys_off = &mut layer.offs; layer.vals.keys.retain_from(val_start, |index, _item| { // As we pass each key offset, record its new position. - if index == keys_off[keys_pos].try_into().ok().unwrap() { - keys_off[keys_pos] = L::KeyOffset::try_from(write_position).ok().unwrap(); + if index == keys_off.index(keys_pos) { + keys_off.set(keys_pos, write_position); keys_pos += 1; } - let lower = vals_off[index].try_into().ok().unwrap(); - let upper = vals_off[index+1].try_into().ok().unwrap(); + let lower = vals_off.index(index); + let upper = vals_off.index(index+1); if lower < upper { - vals_off[write_position+1] = vals_off[index+1]; + vals_off.set(write_position+1, vals_off.index(index+1)); write_position += 1; true } @@ -208,16 +207,16 @@ where }); debug_assert_eq!(write_position, layer.vals.keys.len()); layer.vals.offs.truncate(write_position + 1); - layer.offs[layer.keys.len()] = L::KeyOffset::try_from(write_position).ok().unwrap(); + layer.offs.set(layer.keys.len(), write_position); // 4. Remove empty keys. let offs = &mut layer.offs; let mut write_position = key_start; layer.keys.retain_from(key_start, |index, _item| { - let lower = offs[index].try_into().ok().unwrap(); - let upper = offs[index+1].try_into().ok().unwrap(); + let lower = offs.index(index); + let upper = offs.index(index+1); if lower < upper { - offs[write_position+1] = offs[index+1]; + offs.set(write_position+1, offs.index(index+1)); write_position += 1; true } @@ -466,7 +465,7 @@ where fn advance_builder_from(layer: &mut KTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; - let time_start: usize = layer.offs[key_pos].try_into().ok().unwrap(); + let time_start: usize = layer.offs.index(key_pos); // We will zip through the time leaves, calling advance on each, // then zip through the value layer, sorting and collapsing each, @@ -489,10 +488,10 @@ where // we will change batch.layer.vals.offs[i] in this iteration, from `write_position`'s // initial value. - let lower: usize = layer.offs[i].try_into().ok().unwrap(); - let upper: usize = layer.offs[i+1].try_into().ok().unwrap(); + let lower: usize = layer.offs.index(i); + let upper: usize = layer.offs.index(i+1); - layer.offs[i] = L::KeyOffset::try_from(write_position).ok().unwrap(); + layer.offs.set(i, write_position); let updates = &mut layer.vals.vals[..]; @@ -505,16 +504,16 @@ where } } layer.vals.vals.truncate(write_position); - layer.offs[layer.keys.len()] = L::KeyOffset::try_from(write_position).ok().unwrap(); + layer.offs.set(layer.keys.len(), write_position); // 4. Remove empty keys. let offs = &mut layer.offs; let mut write_position = key_start; layer.keys.retain_from(key_start, |index, _item| { - let lower = offs[index].try_into().ok().unwrap(); - let upper = offs[index+1].try_into().ok().unwrap(); + let lower = offs.index(index); + let upper = offs.index(index+1); if lower < upper { - offs[write_position+1] = offs[index+1]; + offs.set(write_position+1, offs.index(index+1)); write_position += 1; true } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index b9d39a7e6..65f1d3155 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -20,31 +20,31 @@ use super::{Update, Layout, Vector, TStack, Preferred}; use self::val_batch::{OrdValBatch, OrdValBuilder}; /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine< - Rc>>, +pub type OrdValSpine = Spine< + Rc>>, MergeBatcher, - RcBuilder>>, + RcBuilder>>, >; // /// A trace implementation for empty values using a spine of ordered lists. -// pub type OrdKeySpine = Spine>>>; +// pub type OrdKeySpine = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine< - Rc>>, +pub type ColValSpine = Spine< + Rc>>, ColumnatedMergeBatcher, - RcBuilder>>, + RcBuilder>>, >; /// A trace implementation backed by columnar storage. -pub type PreferredSpine = Spine< - Rc>>, +pub type PreferredSpine = Spine< + Rc>>, ColumnatedMergeBatcher<::Owned,::Owned,T,R>, - RcBuilder>>, + RcBuilder>>, >; // /// A trace implementation backed by columnar storage. -// pub type ColKeySpine = Spine>>>; +// pub type ColKeySpine = Spine>>>; mod val_batch { @@ -54,7 +54,7 @@ mod val_batch { use timely::progress::{Antichain, frontier::AntichainRef}; use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use trace::implementations::BatchContainer; + use trace::implementations::{BatchContainer, OffsetList}; use super::{Layout, Update}; @@ -66,7 +66,7 @@ mod val_batch { /// Offsets used to provide indexes from keys to values. /// /// The length of this list is one longer than `keys`, so that we can avoid bounds logic. - pub keys_offs: Vec, + pub keys_offs: OffsetList, /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`. pub vals: L::ValContainer, /// Offsets used to provide indexes from values to updates. @@ -77,7 +77,7 @@ mod val_batch { /// single common update values (e.g. in a snapshot, the minimal time and a diff of one). /// /// The length of this list is one longer than `vals`, so that we can avoid bounds logic. - pub vals_offs: Vec, + pub vals_offs: OffsetList, /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`. pub updates: L::UpdContainer, } @@ -85,12 +85,12 @@ mod val_batch { impl OrdValStorage { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { - (self.keys_offs[index].try_into().ok().unwrap(), self.keys_offs[index+1].try_into().ok().unwrap()) + (self.keys_offs.index(index), self.keys_offs.index(index+1)) } /// Lower and upper bounds in `self.updates` corresponding to the value at `index`. fn updates_for_value(&self, index: usize) -> (usize, usize) { - let mut lower = self.vals_offs[index].try_into().ok().unwrap(); - let upper = self.vals_offs[index+1].try_into().ok().unwrap(); + let mut lower = self.vals_offs.index(index); + let upper = self.vals_offs.index(index+1); // We use equal lower and upper to encode "singleton update; just before here". // It should only apply when there is a prior element, so `lower` should be greater than zero. if lower == upper { @@ -187,9 +187,9 @@ mod val_batch { let mut storage = OrdValStorage { keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys), - keys_offs: Vec::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), + keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals), - vals_offs: Vec::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), + vals_offs: OffsetList::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates), }; @@ -307,7 +307,7 @@ mod val_batch { &mut self, (source1, mut lower1, upper1): (&OrdValStorage, usize, usize), (source2, mut lower2, upper2): (&OrdValStorage, usize, usize), - ) -> Option { + ) -> Option { // Capture the initial number of values to determine if the merge was ultimately non-empty. let init_vals = self.result.vals.len(); while lower1 < upper1 && lower2 < upper2 { @@ -386,7 +386,7 @@ mod val_batch { } /// Consolidates `self.updates_stash` and produces the offset to record, if any. - fn consolidate_updates(&mut self) -> Option { + fn consolidate_updates(&mut self) -> Option { use consolidation; consolidation::consolidate(&mut self.update_stash); if !self.update_stash.is_empty() { @@ -527,9 +527,9 @@ mod val_batch { Self { result: OrdValStorage { keys: L::KeyContainer::with_capacity(keys), - keys_offs: Vec::with_capacity(keys + 1), + keys_offs: OffsetList::with_capacity(keys + 1), vals: L::ValContainer::with_capacity(vals), - vals_offs: Vec::with_capacity(vals + 1), + vals_offs: OffsetList::with_capacity(vals + 1), updates: L::UpdContainer::with_capacity(upds), }, singleton: None, diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 5d67a8c9a..eb426bf3b 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -18,22 +18,22 @@ use super::{Update, Layout, Vector, TStack}; use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. -pub type VecSpine = Spine< - Rc>>, +pub type VecSpine = Spine< + Rc>>, MergeBatcher, - RcBuilder>>, + RcBuilder>>, >; // /// A trace implementation for empty values using a spine of ordered lists. -// pub type OrdKeySpine = Spine>>>; +// pub type OrdKeySpine = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColSpine = Spine< - Rc>>, +pub type ColSpine = Spine< + Rc>>, ColumnatedMergeBatcher, - RcBuilder>>, + RcBuilder>>, >; // /// A trace implementation backed by columnar storage. -// pub type ColKeySpine = Spine>>>; +// pub type ColKeySpine = Spine>>>; /// A carrier trait indicating that the type's `Ord` and `PartialOrd` implementations are by `Hashable::hashed()`. pub trait HashOrdered: Hashable { } @@ -77,7 +77,7 @@ mod val_batch { use timely::progress::{Antichain, frontier::AntichainRef}; use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use trace::implementations::BatchContainer; + use trace::implementations::{BatchContainer, OffsetList}; use super::{Layout, Update, HashOrdered}; @@ -115,7 +115,7 @@ mod val_batch { /// Offsets used to provide indexes from keys to values. /// /// The length of this list is one longer than `keys`, so that we can avoid bounds logic. - pub keys_offs: Vec, + pub keys_offs: OffsetList, /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`. pub vals: L::ValContainer, /// Offsets used to provide indexes from values to updates. @@ -126,7 +126,7 @@ mod val_batch { /// single common update values (e.g. in a snapshot, the minimal time and a diff of one). /// /// The length of this list is one longer than `vals`, so that we can avoid bounds logic. - pub vals_offs: Vec, + pub vals_offs: OffsetList, /// Concatenated ordered lists of updates, bracketed by offsets in `vals_offs`. pub updates: L::UpdContainer, } @@ -138,16 +138,16 @@ mod val_batch { { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { - let lower = self.keys_offs[index].try_into().ok().unwrap(); - let upper = self.keys_offs[index+1].try_into().ok().unwrap(); + let lower = self.keys_offs.index(index); + let upper = self.keys_offs.index(index+1); // Looking up values for an invalid key indicates something is wrong. assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index); (lower, upper) } /// Lower and upper bounds in `self.updates` corresponding to the value at `index`. fn updates_for_value(&self, index: usize) -> (usize, usize) { - let mut lower = self.vals_offs[index].try_into().ok().unwrap(); - let upper = self.vals_offs[index+1].try_into().ok().unwrap(); + let mut lower = self.vals_offs.index(index); + let upper = self.vals_offs.index(index+1); // We use equal lower and upper to encode "singleton update; just before here". // It should only apply when there is a prior element, so `lower` should be greater than zero. if lower == upper { @@ -166,13 +166,13 @@ mod val_batch { /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified, /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may /// not know the final offset at the moment of key insertion can prepare for receiving the offset. - fn insert_key(&mut self, key: &::Key, offset: Option) { + fn insert_key(&mut self, key: &::Key, offset: Option) { let desired = self.desired_location(key); // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong, // push additional blank entries in. while self.keys.len() < desired { // We insert a default (dummy) key and repeat the offset to indicate this. - let current_offset = self.keys_offs[self.keys.len()]; + let current_offset = self.keys_offs.index(self.keys.len()); self.keys.push(Default::default()); self.keys_offs.push(current_offset); } @@ -201,7 +201,7 @@ mod val_batch { /// Indicates that a key is valid, rather than dead space, by looking for a valid offset range. fn live_key(&self, index: usize) -> bool { - self.keys_offs[index] != self.keys_offs[index+1] + self.keys_offs.index(index) != self.keys_offs.index(index+1) } /// Advances `index` until it references a live key, or is `keys.len()`. @@ -333,9 +333,9 @@ mod val_batch { let mut storage = RhhValStorage { keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys), - keys_offs: Vec::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), + keys_offs: OffsetList::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()), vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals), - vals_offs: Vec::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), + vals_offs: OffsetList::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()), updates: L::UpdContainer::merge_capacity(&batch1.updates, &batch2.updates), key_count: 0, key_capacity: rhh_cap, @@ -467,7 +467,7 @@ mod val_batch { &mut self, (source1, mut lower1, upper1): (&RhhValStorage, usize, usize), (source2, mut lower2, upper2): (&RhhValStorage, usize, usize), - ) -> Option { + ) -> Option { // Capture the initial number of values to determine if the merge was ultimately non-empty. let init_vals = self.result.vals.len(); while lower1 < upper1 && lower2 < upper2 { @@ -546,7 +546,7 @@ mod val_batch { } /// Consolidates `self.updates_stash` and produces the offset to record, if any. - fn consolidate_updates(&mut self) -> Option { + fn consolidate_updates(&mut self) -> Option { use consolidation; consolidation::consolidate(&mut self.update_stash); if !self.update_stash.is_empty() { @@ -742,9 +742,9 @@ mod val_batch { Self { result: RhhValStorage { keys: L::KeyContainer::with_capacity(keys), - keys_offs: Vec::with_capacity(keys + 1), + keys_offs: OffsetList::with_capacity(keys + 1), vals: L::ValContainer::with_capacity(vals), - vals_offs: Vec::with_capacity(vals + 1), + vals_offs: OffsetList::with_capacity(vals + 1), updates: L::UpdContainer::with_capacity(upds), key_count: 0, key_capacity: rhh_capacity, diff --git a/src/trace/layers/ordered.rs b/src/trace/layers/ordered.rs index 2b59bb5ba..60cc9ae32 100644 --- a/src/trace/layers/ordered.rs +++ b/src/trace/layers/ordered.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; -use trace::implementations::{BatchContainer, OrdOffset}; +use trace::implementations::{BatchContainer, OffsetList}; use super::{Trie, Cursor, Builder, MergeBuilder, TupleBuilder}; @@ -10,11 +10,10 @@ use super::{Trie, Cursor, Builder, MergeBuilder, TupleBuilder}; /// /// In this representation, the values for `keys[i]` are found at `vals[offs[i] .. offs[i+1]]`. #[derive(Debug, Eq, PartialEq, Clone, Abomonation)] -pub struct OrderedLayer> +pub struct OrderedLayer> where K: Ord, C: BatchContainer, - O: OrdOffset { /// The keys of the layer. pub keys: C, @@ -22,22 +21,21 @@ where /// /// The bounds for `keys[i]` are `(offs[i], offs[i+1]`). The offset array is guaranteed to be one /// element longer than the keys array, ensuring that these accesses do not panic. - pub offs: Vec, + pub offs: OffsetList, /// The ranges of values associated with the keys. pub vals: L, } -impl Trie for OrderedLayer +impl Trie for OrderedLayer where K: Ord+Clone, C: BatchContainer, L: Trie, - O: OrdOffset { type Item = (K, L::Item); type Cursor = OrderedCursor; - type MergeBuilder = OrderedBuilder; - type TupleBuilder = OrderedBuilder; + type MergeBuilder = OrderedBuilder; + type TupleBuilder = OrderedBuilder; fn keys(&self) -> usize { self.keys.len() } fn tuples(&self) -> usize { self.vals.tuples() } @@ -45,11 +43,11 @@ where if lower < upper { - let child_lower = self.offs[lower]; - let child_upper = self.offs[lower + 1]; + let child_lower = self.offs.index(lower); + let child_upper = self.offs.index(lower + 1); OrderedCursor { bounds: (lower, upper), - child: self.vals.cursor_from(child_lower.try_into().ok().unwrap(), child_upper.try_into().ok().unwrap()), + child: self.vals.cursor_from(child_lower, child_upper), pos: lower, } } @@ -64,35 +62,33 @@ where } /// Assembles a layer of this -pub struct OrderedBuilder> +pub struct OrderedBuilder> where K: Ord, C: BatchContainer, - O: OrdOffset { /// Keys pub keys: C, /// Offsets - pub offs: Vec, + pub offs: OffsetList, /// The next layer down pub vals: L, } -impl Builder for OrderedBuilder +impl Builder for OrderedBuilder where K: Ord+Clone, C: BatchContainer, L: Builder, - O: OrdOffset { - type Trie = OrderedLayer; + type Trie = OrderedLayer; fn boundary(&mut self) -> usize { - self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).ok().unwrap(); + self.offs.set(self.keys.len(), self.vals.boundary()); self.keys.len() } fn done(mut self) -> Self::Trie { - if self.keys.len() > 0 && self.offs[self.keys.len()].try_into().ok().unwrap() == 0 { - self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).ok().unwrap(); + if self.keys.len() > 0 && self.offs.index(self.keys.len()) == 0 { + self.offs.set(self.keys.len(), self.vals.boundary()); } OrderedLayer { keys: self.keys, @@ -102,16 +98,15 @@ where } } -impl MergeBuilder for OrderedBuilder +impl MergeBuilder for OrderedBuilder where K: Ord+Clone, C: BatchContainer, L: MergeBuilder, - O: OrdOffset { fn with_capacity(other1: &Self::Trie, other2: &Self::Trie) -> Self { - let mut offs = Vec::with_capacity(other1.keys() + other2.keys() + 1); - offs.push(O::try_from(0 as usize).ok().unwrap()); + let mut offs = OffsetList::with_capacity(other1.keys() + other2.keys() + 1); + offs.push(0); OrderedBuilder { keys: C::merge_capacity(&other1.keys, &other2.keys), offs: offs, @@ -121,14 +116,14 @@ where #[inline] fn copy_range(&mut self, other: &Self::Trie, lower: usize, upper: usize) { debug_assert!(lower < upper); - let other_basis = other.offs[lower]; - let self_basis = self.offs.last().map(|&x| x).unwrap_or(O::try_from(0).ok().unwrap()); + let other_basis = other.offs.index(lower); + let self_basis = self.offs.last().unwrap_or(0); self.keys.copy_range(&other.keys, lower, upper); for index in lower .. upper { - self.offs.push((other.offs[index + 1] + self_basis) - other_basis); + self.offs.push((other.offs.index(index + 1) + self_basis) - other_basis); } - self.vals.copy_range(&other.vals, other_basis.try_into().ok().unwrap(), other.offs[upper].try_into().ok().unwrap()); + self.vals.copy_range(&other.vals, other_basis, other.offs.index(upper)); } fn push_merge(&mut self, other1: (&Self::Trie, usize, usize), other2: (&Self::Trie, usize, usize)) -> usize { @@ -149,12 +144,11 @@ where } } -impl OrderedBuilder +impl OrderedBuilder where K: Ord+Clone, C: BatchContainer, L: MergeBuilder, - O: OrdOffset { /// Performs one step of merging. #[inline] @@ -175,12 +169,12 @@ where let lower = self.vals.boundary(); // record vals_length so we can tell if anything was pushed. let upper = self.vals.push_merge( - (&trie1.vals, trie1.offs[*lower1].try_into().ok().unwrap(), trie1.offs[*lower1 + 1].try_into().ok().unwrap()), - (&trie2.vals, trie2.offs[*lower2].try_into().ok().unwrap(), trie2.offs[*lower2 + 1].try_into().ok().unwrap()) + (&trie1.vals, trie1.offs.index(*lower1), trie1.offs.index(*lower1 + 1)), + (&trie2.vals, trie2.offs.index(*lower2), trie2.offs.index(*lower2 + 1)) ); if upper > lower { self.keys.copy(&trie1.keys.index(*lower1)); - self.offs.push(O::try_from(upper).ok().unwrap()); + self.offs.push(upper); } *lower1 += 1; @@ -197,18 +191,17 @@ where } } -impl TupleBuilder for OrderedBuilder +impl TupleBuilder for OrderedBuilder where K: Ord+Clone, C: BatchContainer, L: TupleBuilder, - O: OrdOffset { type Item = (K, L::Item); - fn new() -> Self { OrderedBuilder { keys: C::default(), offs: vec![O::try_from(0).ok().unwrap()], vals: L::new() } } + fn new() -> Self { Self::with_capacity(0) } fn with_capacity(cap: usize) -> Self { - let mut offs = Vec::with_capacity(cap + 1); - offs.push(O::try_from(0).ok().unwrap()); + let mut offs = OffsetList::with_capacity(cap + 1); + offs.push(0); OrderedBuilder{ keys: C::with_capacity(cap), offs: offs, @@ -219,12 +212,12 @@ where fn push_tuple(&mut self, (key, val): (K, L::Item)) { // if first element, prior element finish, or different element, need to push and maybe punctuate. - if self.keys.len() == 0 || self.offs[self.keys.len()].try_into().ok().unwrap() != 0 || self.keys.index(self.keys.len()-1) != &key { - if self.keys.len() > 0 && self.offs[self.keys.len()].try_into().ok().unwrap() == 0 { - self.offs[self.keys.len()] = O::try_from(self.vals.boundary()).ok().unwrap(); + if self.keys.len() == 0 || self.offs.index(self.keys.len()) != 0 || self.keys.index(self.keys.len()-1) != &key { + if self.keys.len() > 0 && self.offs.index(self.keys.len()) == 0 { + self.offs.set(self.keys.len(), self.vals.boundary()); } self.keys.push(key); - self.offs.push(O::try_from(0).ok().unwrap()); // <-- indicates "unfinished". + self.offs.push(0); // <-- indicates "unfinished". } self.vals.push_tuple(val); } @@ -239,43 +232,42 @@ pub struct OrderedCursor { pub child: L::Cursor, } -impl Cursor> for OrderedCursor +impl Cursor> for OrderedCursor where K: Ord, C: BatchContainer, L: Trie, - O: OrdOffset { type Key = K; - fn key<'a>(&self, storage: &'a OrderedLayer) -> &'a Self::Key { &storage.keys.index(self.pos) } - fn step(&mut self, storage: &OrderedLayer) { + fn key<'a>(&self, storage: &'a OrderedLayer) -> &'a Self::Key { &storage.keys.index(self.pos) } + fn step(&mut self, storage: &OrderedLayer) { self.pos += 1; if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); + self.child.reposition(&storage.vals, storage.offs.index(self.pos), storage.offs.index(self.pos + 1)); } else { self.pos = self.bounds.1; } } - fn seek(&mut self, storage: &OrderedLayer, key: &Self::Key) { + fn seek(&mut self, storage: &OrderedLayer, key: &Self::Key) { self.pos += storage.keys.advance(self.pos, self.bounds.1, |k| k.lt(key)); if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); + self.child.reposition(&storage.vals, storage.offs.index(self.pos), storage.offs.index(self.pos + 1)); } } // fn size(&self) -> usize { self.bounds.1 - self.bounds.0 } - fn valid(&self, _storage: &OrderedLayer) -> bool { self.pos < self.bounds.1 } - fn rewind(&mut self, storage: &OrderedLayer) { + fn valid(&self, _storage: &OrderedLayer) -> bool { self.pos < self.bounds.1 } + fn rewind(&mut self, storage: &OrderedLayer) { self.pos = self.bounds.0; if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); + self.child.reposition(&storage.vals, storage.offs.index(self.pos), storage.offs.index(self.pos + 1)); } } - fn reposition(&mut self, storage: &OrderedLayer, lower: usize, upper: usize) { + fn reposition(&mut self, storage: &OrderedLayer, lower: usize, upper: usize) { self.pos = lower; self.bounds = (lower, upper); if self.valid(storage) { - self.child.reposition(&storage.vals, storage.offs[self.pos].try_into().ok().unwrap(), storage.offs[self.pos + 1].try_into().ok().unwrap()); + self.child.reposition(&storage.vals, storage.offs.index(self.pos), storage.offs.index(self.pos + 1)); } } }