Skip to content

Commit

Permalink
Configurable batcher implementation in ord_neu
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Nov 21, 2023
1 parent 6027145 commit 07efd6d
Showing 1 changed file with 53 additions and 19 deletions.
72 changes: 53 additions & 19 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! and should consume fewer resources (computation and memory) when it applies.

use std::rc::Rc;
use timely::container::columnation::TimelyStack;

use trace::implementations::spine_fueled::Spine;

Expand All @@ -18,21 +19,24 @@ use self::val_batch::{OrdValBatch};


/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R), O>>>>;
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R), O>, Vec<((K,V),T,R)>>>>;
// /// A trace implementation for empty values using a spine of ordered lists.
// pub type OrdKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R), O>>>>;

/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R), O>>>>;
pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R), O>, TimelyStack<((K,V),T,R)>>>>;
// /// A trace implementation backed by columnar storage.
// pub type ColKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R), O>>>>;

mod val_batch {

use std::convert::TryInto;
use std::marker::PhantomData;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::progress::{Antichain, frontier::AntichainRef};

use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher;
use trace::layers::BatchContainer;

use super::{Layout, Update};
Expand Down Expand Up @@ -82,8 +86,11 @@ mod val_batch {
}

/// An immutable collection of update tuples, from a contiguous interval of logical times.
///
/// The `L` parameter captures how the updates should be laid out, and `C` determines which
/// merge batcher to select.
#[derive(Abomonation)]
pub struct OrdValBatch<L: Layout> {
pub struct OrdValBatch<L: Layout, C> {
/// The updates themselves.
pub storage: OrdValStorage<L>,
/// Description of the update times this layer represents.
Expand All @@ -94,20 +101,22 @@ mod val_batch {
/// we may have many more updates than `storage.updates.len()`. It should equal that
/// length, plus the number of singleton optimizations employed.
pub updates: usize,
/// Phantom marker for Rust happiness.
pub phantom: PhantomData<C>,
}

impl<L: Layout> BatchReader for OrdValBatch<L> {
impl<L: Layout, C> BatchReader for OrdValBatch<L, C> {
type Key = <L::Target as Update>::Key;
type Val = <L::Target as Update>::Val;
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

type Cursor = OrdValCursor<L>;
type Cursor = OrdValCursor<L, C>;
fn cursor(&self) -> Self::Cursor {
OrdValCursor {
key_cursor: 0,
val_cursor: 0,
phantom: std::marker::PhantomData,
phantom: PhantomData,
}
}
fn len(&self) -> usize {
Expand All @@ -118,7 +127,7 @@ mod val_batch {
fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
}

impl<L: Layout> Batch for OrdValBatch<L> {
impl<L: Layout> Batch for OrdValBatch<L, Vec<L::Target>> {
type Batcher = MergeBatcher<Self>;
type Builder = OrdValBuilder<L>;
type Merger = OrdValMerger<L>;
Expand All @@ -128,6 +137,23 @@ mod val_batch {
}
}

impl<L: Layout> Batch for OrdValBatch<L, TimelyStack<L::Target>>
where
<L as Layout>::Target: Columnation,
Self::Key: Columnation + 'static,
Self::Val: Columnation + 'static,
Self::Time: Columnation + 'static,
Self::R: Columnation + 'static,
{
type Batcher = ColumnatedMergeBatcher<Self>;
type Builder = OrdValBuilder<L>;
type Merger = OrdValMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdValMerger::new(self, other, compaction_frontier)
}
}

/// State for an in-progress merge.
pub struct OrdValMerger<L: Layout> {
/// Key position to merge next in the first batch.
Expand All @@ -148,8 +174,11 @@ mod val_batch {
singletons: usize,
}

impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L> {
fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
impl<L: Layout, C> Merger<OrdValBatch<L, C>> for OrdValMerger<L>
where
OrdValBatch<L, C>: Batch<Time=<L::Target as Update>::Time>
{
fn new(batch1: &OrdValBatch<L, C>, batch2: &OrdValBatch<L, C>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {

assert!(batch1.upper() == batch2.lower());
use lattice::Lattice;
Expand Down Expand Up @@ -181,14 +210,15 @@ mod val_batch {
singletons: 0,
}
}
fn done(self) -> OrdValBatch<L> {
fn done(self) -> OrdValBatch<L, C> {
OrdValBatch {
updates: self.result.updates.len() + self.singletons,
storage: self.result,
description: self.description,
phantom: PhantomData
}
}
fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
fn work(&mut self, source1: &OrdValBatch<L, C>, source2: &OrdValBatch<L, C>, fuel: &mut isize) {

// An (incomplete) indication of the amount of work we've done so far.
let starting_updates = self.result.updates.len();
Expand Down Expand Up @@ -387,25 +417,25 @@ mod val_batch {
}

/// A cursor for navigating a single layer.
pub struct OrdValCursor<L: Layout> {
pub struct OrdValCursor<L: Layout, C> {
/// Absolute position of the current key.
key_cursor: usize,
/// Absolute position of the current value.
val_cursor: usize,
/// Phantom marker for Rust happiness.
phantom: std::marker::PhantomData<L>,
phantom: PhantomData<(L, C)>,
}

impl<L: Layout> Cursor for OrdValCursor<L> {
impl<L: Layout, C> Cursor for OrdValCursor<L, C> {
type Key = <L::Target as Update>::Key;
type Val = <L::Target as Update>::Val;
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

type Storage = OrdValBatch<L>;
type Storage = OrdValBatch<L, C>;

fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &storage.storage.keys.index(self.key_cursor.try_into().ok().unwrap()) }
fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { &storage.storage.vals.index(self.val_cursor.try_into().ok().unwrap()) }
fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { storage.storage.vals.index(self.val_cursor) }
fn map_times<L2: FnMut(&Self::Time, &Self::R)>(&mut self, storage: &Self::Storage, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
for index in lower .. upper {
Expand Down Expand Up @@ -489,7 +519,10 @@ mod val_batch {
}
}

impl<L: Layout> Builder<OrdValBatch<L>> for OrdValBuilder<L> {
impl<L: Layout, C> Builder<OrdValBatch<L, C>> for OrdValBuilder<L>
where
OrdValBatch<L, C>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, R=<L::Target as Update>::Diff>
{

fn new() -> Self { Self::with_capacity(0) }
fn with_capacity(cap: usize) -> Self {
Expand Down Expand Up @@ -564,7 +597,7 @@ mod val_batch {
}

#[inline(never)]
fn done(mut self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdValBatch<L> {
fn done(mut self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdValBatch<L, C> {
// Record the final offsets
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
// Remove any pending singleton, and if it was set increment our count.
Expand All @@ -574,6 +607,7 @@ mod val_batch {
updates: self.result.updates.len() + self.singletons,
storage: self.result,
description: Description::new(lower, upper, since),
phantom: PhantomData,
}
}
}
Expand Down

0 comments on commit 07efd6d

Please sign in to comment.