Skip to content

Commit

Permalink
Columnation for reduction monoids
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Nov 20, 2023
1 parent ed9bd6f commit 7a99184
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 5 deletions.
4 changes: 2 additions & 2 deletions src/compute/src/extensions/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ where
G: Scope<Timestamp = T>,
G::Timestamp: Lattice + Ord,
K: Data + Columnation,
T: Lattice + Timestamp,
R: Semigroup,
T: Lattice + Timestamp + Columnation,
R: Semigroup + Columnation,
{
fn log_arrangement_size(self) -> Self {
log_arrangement_size_inner(self, |trace| {
Expand Down
68 changes: 65 additions & 3 deletions src/compute/src/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use mz_storage_types::errors::DataflowError;
use mz_timely_util::operator::CollectionExt;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use timely::container::columnation::Columnation;
use timely::container::columnation::{Columnation, CopyRegion};
use timely::dataflow::Scope;
use timely::progress::timestamp::Refines;
use timely::progress::Timestamp;
Expand Down Expand Up @@ -1513,7 +1513,7 @@ fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Dif
/// point representation has less precision than a double. It is entirely possible
/// that the values of the accumulator overflow, thus we have to use wrapping arithmetic
/// to preserve group guarantees.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
enum Accum {
/// Accumulates boolean values.
Bool {
Expand Down Expand Up @@ -1755,6 +1755,10 @@ impl Multiply<Diff> for Accum {
}
}

impl Columnation for Accum {
type InnerRegion = CopyRegion<Self>;
}

/// Monoids for in-place compaction of monotonic streams.
mod monoids {

Expand All @@ -1778,6 +1782,7 @@ mod monoids {
use mz_ore::soft_panic_or_log;
use mz_repr::{Datum, Diff, Row};
use serde::{Deserialize, Serialize};
use timely::container::columnation::{Columnation, Region};

/// A monoid containing a single-datum row.
#[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
Expand All @@ -1786,6 +1791,16 @@ mod monoids {
Max(Row),
}

impl ReductionMonoid {
/// Return the inner row independent of the enum variant.
fn row(&self) -> &Row {
use ReductionMonoid::*;
match self {
Min(row) | Max(row) => row,
}
}
}

impl Multiply<Diff> for ReductionMonoid {
type Output = Self;

Expand Down Expand Up @@ -1839,7 +1854,6 @@ mod monoids {
}
}
}

fn is_zero(&self) -> bool {
// It totally looks like we could return true here for `Datum::Null`, but don't do this!
// DD uses true results of this method to make stuff disappear. This makes sense when
Expand All @@ -1850,6 +1864,54 @@ mod monoids {
}
}

impl Columnation for ReductionMonoid {
type InnerRegion = ReductionMonoidRegion;
}

/// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants
/// in the same backing region. Alternatively, it could store it in two regions, but we select
/// the former for simplicity reasons.
#[derive(Default)]
pub struct ReductionMonoidRegion {
inner: <Row as Columnation>::InnerRegion,
}

impl Region for ReductionMonoidRegion {
type Item = ReductionMonoid;

unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
use ReductionMonoid::*;
match item {
Min(row) => Min(self.inner.copy(row)),
Max(row) => Max(self.inner.copy(row)),
}
}

fn clear(&mut self) {
self.inner.clear();
}

fn reserve_items<'a, I>(&mut self, items: I)
where
Self: 'a,
I: Iterator<Item = &'a Self::Item> + Clone,
{
self.inner.reserve_items(items.map(ReductionMonoid::row));
}

fn reserve_regions<'a, I>(&mut self, regions: I)
where
Self: 'a,
I: Iterator<Item = &'a Self> + Clone,
{
self.inner.reserve_regions(regions.map(|r| &r.inner));
}

fn heap_size(&self, callback: impl FnMut(usize, usize)) {
self.inner.heap_size(callback);
}
}

/// Get the correct monoid implementation for a given aggregation function. Note that
/// all hierarchical aggregation functions need to supply a monoid implementation.
pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option<ReductionMonoid> {
Expand Down

0 comments on commit 7a99184

Please sign in to comment.