Skip to content

Commit

Permalink
ColKeySpine with TimelyStack in the merge batcher
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Nov 17, 2023
1 parent ab3bf43 commit 487b738
Showing 1 changed file with 46 additions and 20 deletions.
66 changes: 46 additions & 20 deletions src/trace/implementations/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@ pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K,V),T
pub type OrdValSpineAbom<K, V, T, R, O=usize> = Spine<Rc<Abomonated<OrdValBatch<Vector<((K,V),T,R), O>, Vec<((K,V),T,R)>>, Vec<u8>>>>;

/// A trace implementation for empty values using a spine of ordered lists.
pub type OrdKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R), O>>>>;
pub type OrdKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R), O>, Vec<((K,()),T,R)>>>>;

/// A trace implementation for empty values using a spine of abomonated ordered lists.
pub type OrdKeySpineAbom<K, T, R, O=usize> = Spine<Rc<Abomonated<OrdKeyBatch<Vector<((K,()),T,R), O>>, Vec<u8>>>>;
pub type OrdKeySpineAbom<K, T, R, O=usize> = Spine<Rc<Abomonated<OrdKeyBatch<Vector<((K,()),T,R), O>, Vec<((K,()),T,R)>>, Vec<u8>>>>;

/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R), O>, TimelyStack<((K,V),T,R)>>>>;
/// A trace implementation backed by columnar storage.
pub type ColKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R), O>>>>;
pub type ColKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R), O>, TimelyStack<((K,()),T,R)>>>>;


/// A container that can retain/discard from some offset onward.
Expand Down Expand Up @@ -484,20 +484,22 @@ where

/// An immutable collection of update tuples, from a contiguous interval of logical times.
#[derive(Abomonation)]
pub struct OrdKeyBatch<L: Layout> {
pub struct OrdKeyBatch<L: Layout, C> {
/// Where all the dataz is.
pub layer: KTDLayer<L>,
/// Description of the update times this layer represents.
pub desc: Description<<L::Target as Update>::Time>,
/// Phantom data
pub phantom: PhantomData<C>,
}

impl<L: Layout> BatchReader for OrdKeyBatch<L> {
impl<L: Layout, C> BatchReader for OrdKeyBatch<L, C> {
type Key = <L::Target as Update>::Key;
type Val = ();
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

type Cursor = OrdKeyCursor<L>;
type Cursor = OrdKeyCursor<L, C>;
fn cursor(&self) -> Self::Cursor {
OrdKeyCursor {
valid: true,
Expand All @@ -509,7 +511,7 @@ impl<L: Layout> BatchReader for OrdKeyBatch<L> {
fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.desc }
}

impl<L: Layout> Batch for OrdKeyBatch<L> {
impl<L: Layout> Batch for OrdKeyBatch<L, Vec<L::Target>> {
type Batcher = MergeBatcher<Self>;
type Builder = OrdKeyBuilder<L>;
type Merger = OrdKeyMerger<L>;
Expand All @@ -519,7 +521,23 @@ impl<L: Layout> Batch for OrdKeyBatch<L> {
}
}

impl<L: Layout> OrdKeyBatch<L> {
impl<L: Layout> Batch for OrdKeyBatch<L, TimelyStack<L::Target>>
where
<L as Layout>::Target: Columnation + 'static,
Self::Key: Columnation + 'static,
Self::Time: Columnation + 'static,
Self::R: Columnation + 'static,
{
type Batcher = ColumnatedMergeBatcher<Self>;
type Builder = OrdKeyBuilder<L>;
type Merger = OrdKeyMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self::Merger {
OrdKeyMerger::new(self, other, compaction_frontier)
}
}

impl<L: Layout, C> OrdKeyBatch<L, C> {
fn advance_builder_from(layer: &mut KTDBuilder<L>, frontier: AntichainRef<<L::Target as Update>::Time>, key_pos: usize) {

let key_start = key_pos;
Expand Down Expand Up @@ -596,8 +614,11 @@ pub struct OrdKeyMerger<L: Layout> {
should_compact: bool,
}

impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L> {
fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self {
impl<L: Layout, C> Merger<OrdKeyBatch<L, C>> for OrdKeyMerger<L>
where
OrdKeyBatch<L, C>: Batch<Time=<L::Target as Update>::Time>
{
fn new(batch1: &OrdKeyBatch<L, C>, batch2: &OrdKeyBatch<L, C>, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self {

assert!(batch1.upper() == batch2.lower());

Expand All @@ -618,17 +639,18 @@ impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L> {
should_compact: compaction_frontier.is_some(),
}
}
fn done(self) -> OrdKeyBatch<L> {
fn done(self) -> OrdKeyBatch<L, C> {

assert!(self.lower1 == self.upper1);
assert!(self.lower2 == self.upper2);

OrdKeyBatch {
layer: self.result.done(),
desc: self.description,
phantom: PhantomData,
}
}
fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
fn work(&mut self, source1: &OrdKeyBatch<L, C>, source2: &OrdKeyBatch<L, C>, fuel: &mut isize) {

let starting_updates = self.result.vals.vals.len();
let mut effort = 0isize;
Expand Down Expand Up @@ -673,7 +695,7 @@ impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L> {

// if we are supplied a frontier, we should compact.
if self.should_compact {
OrdKeyBatch::<L>::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos);
OrdKeyBatch::<L, C>::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos);
}

*fuel -= effort;
Expand All @@ -687,19 +709,19 @@ impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L> {

/// A cursor for navigating a single layer.
#[derive(Debug)]
pub struct OrdKeyCursor<L: Layout> {
pub struct OrdKeyCursor<L: Layout, C> {
valid: bool,
cursor: OrderedCursor<OrderedLeaf<<L::Target as Update>::Time, <L::Target as Update>::Diff>>,
phantom: PhantomData<L>,
phantom: PhantomData<(L, C)>,
}

impl<L: Layout> Cursor for OrdKeyCursor<L> {
impl<L: Layout, C> Cursor for OrdKeyCursor<L, C> {
type Key = <L::Target as Update>::Key;
type Val = ();
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

type Storage = OrdKeyBatch<L>;
type Storage = OrdKeyBatch<L, C>;

fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) }
fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
Expand All @@ -726,7 +748,10 @@ pub struct OrdKeyBuilder<L: Layout> {
builder: KTDBuilder<L>,
}

impl<L: Layout> Builder<OrdKeyBatch<L>> for OrdKeyBuilder<L> {
impl<L: Layout, C> Builder<OrdKeyBatch<L, C>> for OrdKeyBuilder<L>
where
OrdKeyBatch<L, C>: Batch<Key=<L::Target as Update>::Key, Val=(), Time=<L::Target as Update>::Time, R=<L::Target as Update>::Diff>
{

fn new() -> Self {
OrdKeyBuilder {
Expand All @@ -746,10 +771,11 @@ impl<L: Layout> Builder<OrdKeyBatch<L>> for OrdKeyBuilder<L> {
}

#[inline(never)]
fn done(self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdKeyBatch<L> {
fn done(self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdKeyBatch<L, C> {
OrdKeyBatch {
layer: self.builder.done(),
desc: Description::new(lower, upper, since)
desc: Description::new(lower, upper, since),
phantom: PhantomData,
}
}
}

0 comments on commit 487b738

Please sign in to comment.