Skip to content

Commit

Permalink
Quality of life updates: non-optional compaction, and Builder::copy.
Browse files Browse the repository at this point in the history
Co-authored-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
frankmcsherry and antiguru authored Nov 20, 2023
1 parent b82c3ee commit 42e032d
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<B: Batch> Batcher<B> for ColumnatedMergeBatcher<B>
keep.copy(datum);
}
else {
builder.push((key.clone(), val.clone(), time.clone(), diff.clone()));
builder.copy((key, val, time, diff));
}
}
// Recycling buffer.
Expand Down
41 changes: 19 additions & 22 deletions src/trace/implementations/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<L: Layout> Batch for OrdValBatch<L, Vec<L::Target>>
type Builder = OrdValBuilder<L>;
type Merger = OrdValMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self::Merger {
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdValMerger::new(self, other, compaction_frontier)
}
}
Expand All @@ -115,7 +115,7 @@ where
type Builder = OrdValBuilder<L>;
type Merger = OrdValMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self::Merger {
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdValMerger::new(self, other, compaction_frontier)
}
}
Expand Down Expand Up @@ -220,21 +220,18 @@ pub struct OrdValMerger<L: Layout> {
// result that we are currently assembling.
result: <KVTDLayer<L> as Trie>::MergeBuilder,
description: Description<<L::Target as Update>::Time>,
should_compact: bool,
}

impl<L: Layout, C> Merger<OrdValBatch<L, C>> for OrdValMerger<L>
where
OrdValBatch<L, C>: Batch<Time=<L::Target as Update>::Time>
{
fn new(batch1: &OrdValBatch<L, C>, batch2: &OrdValBatch<L, C>, compaction_frontier: Option<AntichainRef<<OrdValBatch<L, C> as BatchReader>::Time>>) -> Self {
fn new(batch1: &OrdValBatch<L, C>, batch2: &OrdValBatch<L, C>, compaction_frontier: AntichainRef<<OrdValBatch<L, C> as BatchReader>::Time>) -> Self {

assert!(batch1.upper() == batch2.lower());

let mut since = batch1.description().since().join(batch2.description().since());
if let Some(compaction_frontier) = compaction_frontier {
since = since.join(&compaction_frontier.to_owned());
}
since = since.join(&compaction_frontier.to_owned());

let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);

Expand All @@ -245,7 +242,6 @@ where
upper2: batch2.layer.keys(),
result: <<KVTDLayer<L> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer),
description: description,
should_compact: compaction_frontier.is_some(),
}
}
fn done(self) -> OrdValBatch<L, C> {
Expand Down Expand Up @@ -297,9 +293,7 @@ where
effort = (self.result.vals.vals.vals.len() - starting_updates) as isize;

// if we are supplied a frontier, we should compact.
if self.should_compact {
OrdValBatch::<L, C>::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos);
}
OrdValBatch::<L, C>::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos);

*fuel -= effort;

Expand Down Expand Up @@ -369,6 +363,10 @@ where
self.builder.push_tuple((key, (val, (time, diff))));
}

fn copy(&mut self, (key, val, time, diff): (&<OrdValBatch<L, C> as BatchReader>::Key, &<OrdValBatch<L, C> as BatchReader>::Val, &<OrdValBatch<L, C> as BatchReader>::Time, &<OrdValBatch<L, C> as BatchReader>::R)) {
self.builder.push_tuple((key.clone(), (val.clone(), (time.clone(), diff.clone()))));
}

#[inline(never)]
fn done(self, lower: Antichain<<OrdValBatch<L, C> as BatchReader>::Time>, upper: Antichain<<OrdValBatch<L, C> as BatchReader>::Time>, since: Antichain<<OrdValBatch<L, C> as BatchReader>::Time>) -> OrdValBatch<L, C> {
OrdValBatch {
Expand Down Expand Up @@ -416,7 +414,7 @@ impl<L: Layout> Batch for OrdKeyBatch<L, Vec<L::Target>> {
type Builder = OrdKeyBuilder<L>;
type Merger = OrdKeyMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self::Merger {
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdKeyMerger::new(self, other, compaction_frontier)
}
}
Expand All @@ -432,7 +430,7 @@ where
type Builder = OrdKeyBuilder<L>;
type Merger = OrdKeyMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self::Merger {
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdKeyMerger::new(self, other, compaction_frontier)
}
}
Expand Down Expand Up @@ -511,21 +509,18 @@ pub struct OrdKeyMerger<L: Layout> {
// result that we are currently assembling.
result: <KTDLayer<L> as Trie>::MergeBuilder,
description: Description<<L::Target as Update>::Time>,
should_compact: bool,
}

impl<L: Layout, C> Merger<OrdKeyBatch<L, C>> for OrdKeyMerger<L>
where
OrdKeyBatch<L, C>: Batch<Time=<L::Target as Update>::Time>
{
fn new(batch1: &OrdKeyBatch<L, C>, batch2: &OrdKeyBatch<L, C>, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self {
fn new(batch1: &OrdKeyBatch<L, C>, batch2: &OrdKeyBatch<L, C>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {

assert!(batch1.upper() == batch2.lower());

let mut since = batch1.description().since().join(batch2.description().since());
if let Some(compaction_frontier) = compaction_frontier {
since = since.join(&compaction_frontier.to_owned());
}
since = since.join(&compaction_frontier.to_owned());

let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);

Expand All @@ -536,7 +531,6 @@ where
upper2: batch2.layer.keys(),
result: <<KTDLayer<L> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer),
description: description,
should_compact: compaction_frontier.is_some(),
}
}
fn done(self) -> OrdKeyBatch<L, C> {
Expand Down Expand Up @@ -594,9 +588,7 @@ where
effort = (self.result.vals.vals.len() - starting_updates) as isize;

// if we are supplied a frontier, we should compact.
if self.should_compact {
OrdKeyBatch::<L, C>::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos);
}
OrdKeyBatch::<L, C>::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos);

*fuel -= effort;

Expand Down Expand Up @@ -670,6 +662,11 @@ where
self.builder.push_tuple((key, (time, diff)));
}

#[inline]
fn copy(&mut self, (key, _, time, diff): (&<L::Target as Update>::Key, &(), &<L::Target as Update>::Time, &<L::Target as Update>::Diff)) {
self.builder.push_tuple((key.clone(), (time.clone(), diff.clone())));
}

#[inline(never)]
fn done(self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdKeyBatch<L, C> {
OrdKeyBatch {
Expand Down
35 changes: 30 additions & 5 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ mod val_batch {
type Builder = OrdValBuilder<L>;
type Merger = OrdValMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self::Merger {
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
OrdValMerger::new(self, other, compaction_frontier)
}
}
Expand All @@ -133,14 +133,13 @@ mod val_batch {
}

impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L> {
fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self {
fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {

assert!(batch1.upper() == batch2.lower());
use lattice::Lattice;
let mut since = batch1.description().since().join(batch2.description().since());
if let Some(compaction_frontier) = compaction_frontier {
since = since.join(&compaction_frontier.to_owned());
}
since = since.join(&compaction_frontier.to_owned());

let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);

let batch1 = &batch1.storage;
Expand Down Expand Up @@ -470,6 +469,32 @@ mod val_batch {
}
}

#[inline]
fn copy(&mut self, (key, val, time, diff): (&<L::Target as Update>::Key, &<L::Target as Update>::Val, &<L::Target as Update>::Time, &<L::Target as Update>::Diff)) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last() == Some(key) {
// Perhaps this is a continuation of an already received value.
if self.result.vals.last() == Some(val) {
// TODO: here we could look for repetition, and not push the update in that case.
// More logic (and state) would be required to correctly wrangle this.
self.result.updates.push((time.clone(), diff.clone()));
} else {
// New value; complete representation of prior value.
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
self.result.updates.push((time.clone(), diff.clone()));
self.result.vals.copy(val);
}
} else {
// New key; complete representation of prior key.
self.result.vals_offs.push(self.result.updates.len().try_into().ok().unwrap());
self.result.keys_offs.push(self.result.vals.len().try_into().ok().unwrap());
self.result.updates.push((time.clone(), diff.clone()));
self.result.vals.copy(val);
self.result.keys.copy(key);
}
}

#[inline(never)]
fn done(mut self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdValBatch<L> {
// Record the final offsets
Expand Down
4 changes: 2 additions & 2 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ where
complete: None,
}
));
let compaction_frontier = Some(self.logical_frontier.borrow());
let compaction_frontier = self.logical_frontier.borrow();
self.merging[index] = MergeState::begin_merge(old, batch, compaction_frontier);
}
MergeState::Double(_) => {
Expand Down Expand Up @@ -869,7 +869,7 @@ impl<B: Batch> MergeState<B> where B::Time: Eq {
/// empty batch whose upper and lower froniers are equal. This
/// option exists purely for bookkeeping purposes, and no computation
/// is performed to merge the two batches.
fn begin_merge(batch1: Option<B>, batch2: Option<B>, compaction_frontier: Option<AntichainRef<B::Time>>) -> MergeState<B> {
fn begin_merge(batch1: Option<B>, batch2: Option<B>, compaction_frontier: AntichainRef<B::Time>) -> MergeState<B> {
let variant =
match (batch1, batch2) {
(Some(batch1), Some(batch2)) => {
Expand Down
19 changes: 14 additions & 5 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized {
/// The result of this method can be exercised to eventually produce the same result
/// that a call to `self.merge(other)` would produce, but it can be done in a measured
/// fashion. This can help to avoid latency spikes where a large merge needs to happen.
fn begin_merge(&self, other: &Self, compaction_frontier: Option<AntichainRef<Self::Time>>) -> Self::Merger {
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<Self::Time>) -> Self::Merger {
Self::Merger::new(self, other, compaction_frontier)
}
/// Creates an empty batch with the stated bounds.
Expand Down Expand Up @@ -316,7 +316,14 @@ pub trait Builder<Output: Batch> {
/// Allocates an empty builder with some capacity.
fn with_capacity(cap: usize) -> Self;
/// Adds an element to the batch.
fn push(&mut self, element: (Output::Key, Output::Val, Output::Time, Output::R));
///
/// The default implementation uses `self.copy` with references to the owned arguments.
/// One should override it if the builder can take advantage of owned arguments.
fn push(&mut self, element: (Output::Key, Output::Val, Output::Time, Output::R)) {
self.copy((&element.0, &element.1, &element.2, &element.3));
}
/// Adds an element to the batch.
fn copy(&mut self, element: (&Output::Key, &Output::Val, &Output::Time, &Output::R));
/// Adds an ordered sequence of elements to the batch.
fn extend<I: Iterator<Item=(Output::Key,Output::Val,Output::Time,Output::R)>>(&mut self, iter: I) {
for item in iter { self.push(item); }
Expand All @@ -329,7 +336,7 @@ pub trait Builder<Output: Batch> {
pub trait Merger<Output: Batch> {
/// Creates a new merger to merge the supplied batches, optionally compacting
/// up to the supplied frontier.
fn new(source1: &Output, source2: &Output, compaction_frontier: Option<AntichainRef<Output::Time>>) -> Self;
fn new(source1: &Output, source2: &Output, compaction_frontier: AntichainRef<Output::Time>) -> Self;
/// Perform some amount of work, decrementing `fuel`.
///
/// If `fuel` is non-zero after the call, the merging is complete and
Expand Down Expand Up @@ -441,6 +448,7 @@ pub mod rc_blanket_impls {
fn new() -> Self { RcBuilder { builder: <B::Builder as Builder<B>>::new() } }
fn with_capacity(cap: usize) -> Self { RcBuilder { builder: <B::Builder as Builder<B>>::with_capacity(cap) } }
fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) }
fn copy(&mut self, element: (&B::Key, &B::Val, &B::Time, &B::R)) { self.builder.copy(element) }
fn done(self, lower: Antichain<B::Time>, upper: Antichain<B::Time>, since: Antichain<B::Time>) -> Rc<B> { Rc::new(self.builder.done(lower, upper, since)) }
}

Expand All @@ -449,7 +457,7 @@ pub mod rc_blanket_impls {

/// Represents a merge in progress.
impl<B:Batch> Merger<Rc<B>> for RcMerger<B> {
fn new(source1: &Rc<B>, source2: &Rc<B>, compaction_frontier: Option<AntichainRef<B::Time>>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
fn new(source1: &Rc<B>, source2: &Rc<B>, compaction_frontier: AntichainRef<B::Time>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
fn work(&mut self, source1: &Rc<B>, source2: &Rc<B>, fuel: &mut isize) { self.merger.work(source1, source2, fuel) }
fn done(self) -> Rc<B> { Rc::new(self.merger.done()) }
}
Expand Down Expand Up @@ -562,6 +570,7 @@ pub mod abomonated_blanket_impls {
fn new() -> Self { AbomonatedBuilder { builder: <B::Builder as Builder<B>>::new() } }
fn with_capacity(cap: usize) -> Self { AbomonatedBuilder { builder: <B::Builder as Builder<B>>::with_capacity(cap) } }
fn push(&mut self, element: (B::Key, B::Val, B::Time, B::R)) { self.builder.push(element) }
fn copy(&mut self, element: (&B::Key, &B::Val, &B::Time, &B::R)) { self.builder.copy(element) }
fn done(self, lower: Antichain<B::Time>, upper: Antichain<B::Time>, since: Antichain<B::Time>) -> Abomonated<B, Vec<u8>> {
let batch = self.builder.done(lower, upper, since);
let mut bytes = Vec::with_capacity(measure(&batch));
Expand All @@ -575,7 +584,7 @@ pub mod abomonated_blanket_impls {

/// Represents a merge in progress.
impl<B:Batch+Abomonation> Merger<Abomonated<B,Vec<u8>>> for AbomonatedMerger<B> {
fn new(source1: &Abomonated<B,Vec<u8>>, source2: &Abomonated<B,Vec<u8>>, compaction_frontier: Option<AntichainRef<B::Time>>) -> Self {
fn new(source1: &Abomonated<B,Vec<u8>>, source2: &Abomonated<B,Vec<u8>>, compaction_frontier: AntichainRef<B::Time>) -> Self {
AbomonatedMerger { merger: B::begin_merge(source1, source2, compaction_frontier) }
}
fn work(&mut self, source1: &Abomonated<B,Vec<u8>>, source2: &Abomonated<B,Vec<u8>>, fuel: &mut isize) {
Expand Down

0 comments on commit 42e032d

Please sign in to comment.