Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable batcher implementation in ord_neu #422

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 53 additions & 19 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! and should consume fewer resources (computation and memory) when it applies.

use std::rc::Rc;
use timely::container::columnation::TimelyStack;

use trace::implementations::spine_fueled::Spine;

Expand All @@ -18,21 +19,24 @@ use self::val_batch::{OrdValBatch};


/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R), O>>>>;
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R), O>, Vec<((K,V),T,R)>>>>;
// /// A trace implementation for empty values using a spine of ordered lists.
// pub type OrdKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R), O>>>>;

/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R), O>>>>;
pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R), O>, TimelyStack<((K,V),T,R)>>>>;
// /// A trace implementation backed by columnar storage.
// pub type ColKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R), O>>>>;

mod val_batch {

use std::convert::TryInto;
use std::marker::PhantomData;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::progress::{Antichain, frontier::AntichainRef};

use trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
use trace::implementations::merge_batcher_col::ColumnatedMergeBatcher;
use trace::layers::BatchContainer;

use super::{Layout, Update};
Expand Down Expand Up @@ -82,8 +86,11 @@ mod val_batch {
}

/// An immutable collection of update tuples, from a contiguous interval of logical times.
///
/// The `L` parameter captures how the updates should be laid out, and `C` determines which
/// merge batcher to select.
#[derive(Abomonation)]
pub struct OrdValBatch<L: Layout> {
pub struct OrdValBatch<L: Layout, C> {
/// The updates themselves.
pub storage: OrdValStorage<L>,
/// Description of the update times this layer represents.
Expand All @@ -94,20 +101,22 @@ mod val_batch {
/// we may have many more updates than `storage.updates.len()`. It should equal that
/// length, plus the number of singleton optimizations employed.
pub updates: usize,
/// Phantom marker for Rust happiness.
pub phantom: PhantomData<C>,
}

impl<L: Layout> BatchReader for OrdValBatch<L> {
impl<L: Layout, C> BatchReader for OrdValBatch<L, C> {
type Key = <L::Target as Update>::Key;
type Val = <L::Target as Update>::Val;
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

type Cursor = OrdValCursor<L>;
type Cursor = OrdValCursor<L, C>;
fn cursor(&self) -> Self::Cursor {
OrdValCursor {
key_cursor: 0,
val_cursor: 0,
phantom: std::marker::PhantomData,
phantom: PhantomData,
}
}
fn len(&self) -> usize {
Expand All @@ -118,7 +127,7 @@ mod val_batch {
fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
}

impl<L: Layout> Batch for OrdValBatch<L> {
impl<L: Layout> Batch for OrdValBatch<L, Vec<L::Target>> {
type Batcher = MergeBatcher<Self>;
type Builder = OrdValBuilder<L>;
type Merger = OrdValMerger<L>;
Expand All @@ -128,6 +137,23 @@ mod val_batch {
}
}

impl<L: Layout> Batch for OrdValBatch<L, TimelyStack<L::Target>>
where
<L as Layout>::Target: Columnation,
Self::Key: Columnation + 'static,
Self::Val: Columnation + 'static,
Self::Time: Columnation + 'static,
Self::R: Columnation + 'static,
{
type Batcher = ColumnatedMergeBatcher<Self>;
type Builder = OrdValBuilder<L>;
type Merger = OrdValMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdValMerger::new(self, other, compaction_frontier)
}
}

/// State for an in-progress merge.
pub struct OrdValMerger<L: Layout> {
/// Key position to merge next in the first batch.
Expand All @@ -148,8 +174,11 @@ mod val_batch {
singletons: usize,
}

impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L> {
fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
impl<L: Layout, C> Merger<OrdValBatch<L, C>> for OrdValMerger<L>
where
OrdValBatch<L, C>: Batch<Time=<L::Target as Update>::Time>
{
fn new(batch1: &OrdValBatch<L, C>, batch2: &OrdValBatch<L, C>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {

assert!(batch1.upper() == batch2.lower());
use lattice::Lattice;
Expand Down Expand Up @@ -181,14 +210,15 @@ mod val_batch {
singletons: 0,
}
}
fn done(self) -> OrdValBatch<L> {
fn done(self) -> OrdValBatch<L, C> {
OrdValBatch {
updates: self.result.updates.len() + self.singletons,
storage: self.result,
description: self.description,
phantom: PhantomData
}
}
fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
fn work(&mut self, source1: &OrdValBatch<L, C>, source2: &OrdValBatch<L, C>, fuel: &mut isize) {

// An (incomplete) indication of the amount of work we've done so far.
let starting_updates = self.result.updates.len();
Expand Down Expand Up @@ -387,25 +417,25 @@ mod val_batch {
}

/// A cursor for navigating a single layer.
pub struct OrdValCursor<L: Layout> {
pub struct OrdValCursor<L: Layout, C> {
/// Absolute position of the current key.
key_cursor: usize,
/// Absolute position of the current value.
val_cursor: usize,
/// Phantom marker for Rust happiness.
phantom: std::marker::PhantomData<L>,
phantom: PhantomData<(L, C)>,
}

impl<L: Layout> Cursor for OrdValCursor<L> {
impl<L: Layout, C> Cursor for OrdValCursor<L, C> {
type Key = <L::Target as Update>::Key;
type Val = <L::Target as Update>::Val;
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

type Storage = OrdValBatch<L>;
type Storage = OrdValBatch<L, C>;

fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &storage.storage.keys.index(self.key_cursor.try_into().ok().unwrap()) }
fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { &storage.storage.vals.index(self.val_cursor.try_into().ok().unwrap()) }
fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { storage.storage.keys.index(self.key_cursor) }
fn val<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Val { storage.storage.vals.index(self.val_cursor) }
Comment on lines +437 to +438
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow. I guess I must have been having a bad time and pasted that fragment onto everything and anything.

fn map_times<L2: FnMut(&Self::Time, &Self::R)>(&mut self, storage: &Self::Storage, mut logic: L2) {
let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
for index in lower .. upper {
Expand Down Expand Up @@ -489,7 +519,10 @@ mod val_batch {
}
}

impl<L: Layout> Builder<OrdValBatch<L>> for OrdValBuilder<L> {
impl<L: Layout, C> Builder<OrdValBatch<L, C>> for OrdValBuilder<L>
where
OrdValBatch<L, C>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, R=<L::Target as Update>::Diff>
{

fn new() -> Self { Self::with_capacity(0) }
fn with_capacity(cap: usize) -> Self {
Expand Down Expand Up @@ -564,7 +597,7 @@ mod val_batch {
}

#[inline(never)]
fn done(mut self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdValBatch<L> {
fn done(mut self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdValBatch<L, C> {
// Record the final offsets
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
// Remove any pending singleton, and if it was set increment our count.
Expand All @@ -574,6 +607,7 @@ mod val_batch {
updates: self.result.updates.len() + self.singletons,
storage: self.result,
description: Description::new(lower, upper, since),
phantom: PhantomData,
}
}
}
Expand Down