From 06bad4341595a8d5ca293a94d8cb9317d0580bc2 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 10 Nov 2023 15:40:31 -0500 Subject: [PATCH 1/2] Adapt Materialize to use the columnated merge batcher Signed-off-by: Moritz Hoffmann --- Cargo.lock | 15 +++--- src/compute/src/extensions/arrange.rs | 4 +- src/compute/src/render/context.rs | 62 +++++++++++----------- src/compute/src/render/join/delta_join.rs | 8 +-- src/compute/src/render/join/linear_join.rs | 9 ++-- src/compute/src/render/mod.rs | 4 +- src/compute/src/render/reduce.rs | 4 +- src/compute/src/render/threshold.rs | 16 +++--- src/compute/src/render/top_k.rs | 58 +++++++++++++++++++- src/expr/Cargo.toml | 1 + src/expr/src/relation/mod.rs | 18 ++++++- 11 files changed, 137 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b334664e9019..57ac836febd01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1745,7 +1745,7 @@ checksum = "0e25ea47919b1560c4e3b7fe0aaab9becf5b84a10325ddf7db0f0ba5e1026499" [[package]] name = "differential-dataflow" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/differential-dataflow.git#40fbd33810198116f7a3562e86495233184acbd4" +source = "git+https://github.com/MaterializeInc/differential-dataflow.git#d9896ce1f322fb89c3e6955441fb2d2c2e07bc1d" dependencies = [ "abomonation", "abomonation_derive", @@ -1802,7 +1802,7 @@ checksum = "923dea538cea0aa3025e8685b20d6ee21ef99c4f77e954a30febbaac5ec73a97" [[package]] name = "dogsdogsdogs" version = "0.1.0" -source = "git+https://github.com/MaterializeInc/differential-dataflow.git#40fbd33810198116f7a3562e86495233184acbd4" +source = "git+https://github.com/MaterializeInc/differential-dataflow.git#d9896ce1f322fb89c3e6955441fb2d2c2e07bc1d" dependencies = [ "abomonation", "abomonation_derive", @@ -4297,6 +4297,7 @@ dependencies = [ "sha1", "sha2", "subtle", + "timely", "tracing", "uncased", "uuid", @@ -8531,7 +8532,7 @@ dependencies = [ [[package]] name = "timely" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#53adc68bb890c1ce60f0b507d6a4996a3d577f21" +source = "git+https://github.com/MaterializeInc/timely-dataflow.git#9adb32c061330b5a82876606141121362c7c659a" dependencies = [ "abomonation", "abomonation_derive", @@ -8549,12 +8550,12 @@ dependencies = [ [[package]] name = "timely_bytes" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#53adc68bb890c1ce60f0b507d6a4996a3d577f21" +source = "git+https://github.com/MaterializeInc/timely-dataflow.git#9adb32c061330b5a82876606141121362c7c659a" [[package]] name = "timely_communication" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#53adc68bb890c1ce60f0b507d6a4996a3d577f21" +source = "git+https://github.com/MaterializeInc/timely-dataflow.git#9adb32c061330b5a82876606141121362c7c659a" dependencies = [ "abomonation", "abomonation_derive", @@ -8570,7 +8571,7 @@ dependencies = [ [[package]] name = "timely_container" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#53adc68bb890c1ce60f0b507d6a4996a3d577f21" +source = "git+https://github.com/MaterializeInc/timely-dataflow.git#9adb32c061330b5a82876606141121362c7c659a" dependencies = [ "columnation", "serde", @@ -8579,7 +8580,7 @@ dependencies = [ [[package]] name = "timely_logging" version = "0.12.0" -source = "git+https://github.com/MaterializeInc/timely-dataflow.git#53adc68bb890c1ce60f0b507d6a4996a3d577f21" +source = "git+https://github.com/MaterializeInc/timely-dataflow.git#9adb32c061330b5a82876606141121362c7c659a" [[package]] name = "tiny-keccak" diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs index ed013b478a42a..69aec7dfa2beb 100644 --- a/src/compute/src/extensions/arrange.rs +++ b/src/compute/src/extensions/arrange.rs @@ -269,11 +269,11 @@ where impl ArrangementSize for Arranged>> where G: Scope, - G::Timestamp: Lattice + Ord, + G::Timestamp: Lattice + Ord + Columnation, K: Data + Columnation, V: Data + Columnation, T: Lattice + Timestamp, - R: Semigroup, + R: Semigroup + Columnation, { fn log_arrangement_size(self) -> Self { log_arrangement_size_inner(self, |trace| { diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index e6081a57cf2cf..21ff25cab9ac8 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -74,8 +74,8 @@ pub(crate) type ErrArrangementImport = Arranged< /// of regions or iteration. pub struct Context where - T: Timestamp + Lattice, - S::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + S::Timestamp: Lattice + Refines + Columnation, { /// The scope within which all managed collections exist. /// @@ -104,7 +104,7 @@ where impl Context where - S::Timestamp: Lattice + Refines, + S::Timestamp: Lattice + Refines + Columnation, { /// Creates a new empty Context. pub fn for_dataflow_in( @@ -134,8 +134,8 @@ where impl Context where - T: Timestamp + Lattice, - S::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + S::Timestamp: Lattice + Refines + Columnation, { /// Insert a collection bundle by an identifier. /// @@ -230,7 +230,7 @@ impl ShutdownToken { #[derive(Clone)] pub enum SpecializedArrangement where - ::Timestamp: Lattice, + ::Timestamp: Lattice + Columnation, { RowUnit(KeyValArrangement), RowRow(KeyValArrangement), @@ -238,7 +238,7 @@ where impl SpecializedArrangement where - ::Timestamp: Lattice, + ::Timestamp: Lattice + Columnation, { /// The scope of the underlying arrangement's stream. pub fn scope(&self) -> S { @@ -293,7 +293,7 @@ where refuel: usize, ) -> timely::dataflow::Stream where - T: Timestamp + Lattice, + T: Timestamp + Lattice + Columnation, ::Timestamp: Lattice + Refines, I: IntoIterator, I::Item: Data, @@ -329,7 +329,7 @@ where impl<'a, S: Scope> SpecializedArrangement> where - ::Timestamp: Lattice, + ::Timestamp: Lattice + Columnation, { /// Extracts the underlying arrangement flavor from a region. pub fn leave_region(&self) -> SpecializedArrangement { @@ -366,7 +366,7 @@ where #[derive(Clone)] pub enum SpecializedArrangementImport where - T: Timestamp + Lattice, + T: Timestamp + Lattice + Columnation, ::Timestamp: Lattice + Refines, { RowUnit(KeyValArrangementImport), @@ -375,8 +375,8 @@ where impl SpecializedArrangementImport where - T: Timestamp + Lattice, - ::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + ::Timestamp: Lattice + Refines + Columnation, { /// The scope of the underlying trace's stream. pub fn scope(&self) -> S { @@ -467,7 +467,7 @@ where impl<'a, S: Scope, T> SpecializedArrangementImport, T> where - T: Timestamp + Lattice, + T: Timestamp + Lattice + Columnation, ::Timestamp: Lattice + Refines, { /// Extracts the underlying arrangement flavor from a region. @@ -487,8 +487,8 @@ where #[derive(Clone)] pub enum ArrangementFlavor where - T: Timestamp + Lattice, - S::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + S::Timestamp: Lattice + Refines + Columnation, { /// A dataflow-local arrangement. Local(SpecializedArrangement, ErrArrangement), @@ -505,8 +505,8 @@ where impl ArrangementFlavor where - T: Timestamp + Lattice, - S::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + S::Timestamp: Lattice + Refines + Columnation, { /// Presents `self` as a stream of updates. /// @@ -582,8 +582,8 @@ where } impl ArrangementFlavor where - T: Timestamp + Lattice, - S::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + S::Timestamp: Lattice + Refines + Columnation, { /// The scope containing the collection bundle. pub fn scope(&self) -> S { @@ -610,8 +610,8 @@ where } impl<'a, S: Scope, T> ArrangementFlavor, T> where - T: Timestamp + Lattice, - S::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + S::Timestamp: Lattice + Refines + Columnation, { /// Extracts the arrangement flavor from a region. pub fn leave_region(&self) -> ArrangementFlavor { @@ -633,8 +633,8 @@ where #[derive(Clone)] pub struct CollectionBundle where - T: Timestamp + Lattice, - S::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + S::Timestamp: Lattice + Refines + Columnation, { pub collection: Option<(Collection, Collection)>, pub arranged: BTreeMap, ArrangementFlavor>, @@ -642,8 +642,8 @@ where impl CollectionBundle where - T: Timestamp + Lattice, - S::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + S::Timestamp: Lattice + Refines + Columnation, { /// Construct a new collection bundle from update streams. pub fn from_collections( @@ -715,8 +715,8 @@ where impl<'a, S: Scope, T> CollectionBundle, T> where - T: Timestamp + Lattice, - S::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + S::Timestamp: Lattice + Refines + Columnation, { /// Extracts the collection bundle from a region. pub fn leave_region(&self) -> CollectionBundle { @@ -734,10 +734,10 @@ where } } -impl CollectionBundle +impl CollectionBundle where - T: Timestamp + Lattice, - S::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + S::Timestamp: Lattice + Refines + Columnation, { /// Asserts that the arrangement for a specific key /// (or the raw collection for no key) exists, @@ -899,7 +899,7 @@ where impl CollectionBundle where - T: timely::progress::Timestamp + Lattice, + T: timely::progress::Timestamp + Lattice + Columnation, S: Scope, S::Timestamp: Refines + Lattice + timely::progress::Timestamp + crate::render::RenderTimestamp, diff --git a/src/compute/src/render/join/delta_join.rs b/src/compute/src/render/join/delta_join.rs index baf4253396f75..6d8669f182398 100644 --- a/src/compute/src/render/join/delta_join.rs +++ b/src/compute/src/render/join/delta_join.rs @@ -365,8 +365,8 @@ fn dispatch_build_halfjoin_trace( ) where G: Scope, - T: Timestamp + Lattice, - G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines, + T: Timestamp + Lattice + Columnation, + G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines + Columnation, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, { match trace { @@ -576,8 +576,8 @@ fn dispatch_build_update_stream_trace( ) -> (Collection, Collection) where G: Scope, - T: Timestamp + Lattice, - G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines, + T: Timestamp + Lattice + Columnation, + G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines + Columnation, { match trace { SpecializedArrangementImport::RowUnit(inner) => build_update_stream( diff --git a/src/compute/src/render/join/linear_join.rs b/src/compute/src/render/join/linear_join.rs index cd65773273a58..0be75a249e782 100644 --- a/src/compute/src/render/join/linear_join.rs +++ b/src/compute/src/render/join/linear_join.rs @@ -24,6 +24,7 @@ use mz_repr::fixed_length::IntoRowByTypes; use mz_repr::{ColumnType, DatumVec, Diff, Row, RowArena, SharedRow}; use mz_storage_types::errors::DataflowError; use mz_timely_util::operator::CollectionExt; +use timely::container::columnation::Columnation; use timely::dataflow::operators::OkErr; use timely::dataflow::Scope; use timely::progress::timestamp::{Refines, Timestamp}; @@ -125,8 +126,8 @@ impl LinearJoinSpec { enum JoinedFlavor where G: Scope, - G::Timestamp: Lattice + Refines, - T: Timestamp + Lattice, + G::Timestamp: Lattice + Refines + Columnation, + T: Timestamp + Lattice + Columnation, { /// Streamed data as a collection. Collection(Collection), @@ -139,8 +140,8 @@ where impl Context where G: Scope, - G::Timestamp: Lattice + Refines, - T: Timestamp + Lattice, + G::Timestamp: Lattice + Refines + Columnation, + T: Timestamp + Lattice + Columnation, { pub(crate) fn render_join( &mut self, diff --git a/src/compute/src/render/mod.rs b/src/compute/src/render/mod.rs index 4e85d5eb0794d..26c1c004c9165 100644 --- a/src/compute/src/render/mod.rs +++ b/src/compute/src/render/mod.rs @@ -935,7 +935,7 @@ where } /// A timestamp type that can be used for operations within MZ's dataflow layer. -pub trait RenderTimestamp: Timestamp + Lattice + Refines { +pub trait RenderTimestamp: Timestamp + Lattice + Refines + Columnation { /// The system timestamp component of the timestamp. /// /// This is useful for manipulating the system time, as when delaying @@ -970,7 +970,7 @@ impl RenderTimestamp for mz_repr::Timestamp { } } -impl RenderTimestamp for Product { +impl RenderTimestamp for Product { fn system_time(&mut self) -> &mut mz_repr::Timestamp { &mut self.outer } diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index 03c63bbd0614c..af4a4678b7eb1 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -50,8 +50,8 @@ use crate::typedefs::{ErrValSpine, RowKeySpine, RowSpine}; impl Context where G: Scope, - G::Timestamp: Lattice + Refines, - T: Timestamp + Lattice, + G::Timestamp: Lattice + Refines + Columnation, + T: Timestamp + Lattice + Columnation, { /// Renders a `MirRelationExpr::Reduce` using various non-obvious techniques to /// minimize worst-case incremental update times and memory footprint. diff --git a/src/compute/src/render/threshold.rs b/src/compute/src/render/threshold.rs index 3994f0cfeaf8f..50c275aa450ab 100644 --- a/src/compute/src/render/threshold.rs +++ b/src/compute/src/render/threshold.rs @@ -38,7 +38,7 @@ fn threshold_arrangement( ) -> Arranged> where G: Scope, - G::Timestamp: Lattice + Refines, + G::Timestamp: Lattice + Refines + Columnation, T: Timestamp + Lattice, K: Columnation + Data, V: Columnation + Data, @@ -63,7 +63,7 @@ fn dispatch_threshold_arrangement_local( ) -> SpecializedArrangement where G: Scope, - G::Timestamp: Lattice, + G::Timestamp: Lattice + Columnation, L: Fn(&Diff) -> bool + 'static, { match oks { @@ -87,8 +87,8 @@ fn dispatch_threshold_arrangement_trace( ) -> SpecializedArrangement where G: Scope, - T: Timestamp + Lattice, - G::Timestamp: Lattice + Refines, + T: Timestamp + Lattice + Columnation, + G::Timestamp: Lattice + Refines + Columnation, L: Fn(&Diff) -> bool + 'static, { match oks { @@ -114,8 +114,8 @@ pub fn build_threshold_basic( ) -> CollectionBundle where G: Scope, - G::Timestamp: Lattice + Refines, - T: Timestamp + Lattice, + G::Timestamp: Lattice + Refines + Columnation, + T: Timestamp + Lattice + Columnation, { let arrangement = input .arrangement(&key) @@ -139,8 +139,8 @@ where impl Context where G: Scope, - G::Timestamp: Lattice + Refines, - T: Timestamp + Lattice, + G::Timestamp: Lattice + Refines + Columnation, + T: Timestamp + Lattice + Columnation, { pub(crate) fn render_threshold( &self, diff --git a/src/compute/src/render/top_k.rs b/src/compute/src/render/top_k.rs index e884e3dfa6712..4dc17890dca26 100644 --- a/src/compute/src/render/top_k.rs +++ b/src/compute/src/render/top_k.rs @@ -26,6 +26,7 @@ use mz_ore::soft_assert_or_log; use mz_repr::{DatumVec, Diff, Row, SharedRow}; use mz_storage_types::errors::DataflowError; use mz_timely_util::operator::CollectionExt; +use timely::container::columnation::Columnation; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Operator; use timely::dataflow::Scope; @@ -381,7 +382,7 @@ fn build_topk_negated_stage( ) -> Collection where G: Scope, - G::Timestamp: Lattice, + G::Timestamp: Lattice + Columnation, R: MaybeValidatingRow, { // We only want to arrange parts of the input that are not part of the actual output @@ -645,6 +646,7 @@ pub mod monoids { use mz_expr::ColumnOrder; use mz_repr::{DatumVec, Diff, Row}; use serde::{Deserialize, Serialize}; + use timely::container::columnation::{Columnation, Region}; /// A monoid containing a row and an ordering. #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)] @@ -697,6 +699,60 @@ pub mod monoids { } } + impl Columnation for Top1Monoid { + type InnerRegion = Top1MonoidRegion; + } + + #[derive(Default)] + pub struct Top1MonoidRegion { + row_region: ::InnerRegion, + order_key_region: as Columnation>::InnerRegion, + } + + impl Region for Top1MonoidRegion { + type Item = Top1Monoid; + + unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item { + let row = self.row_region.copy(&item.row); + let order_key = self.order_key_region.copy(&item.order_key); + Self::Item { row, order_key } + } + + fn clear(&mut self) { + self.row_region.clear(); + self.order_key_region.clear(); + } + + fn reserve_items<'a, I>(&mut self, items1: I) + where + Self: 'a, + I: Iterator + Clone, + { + let items2 = items1.clone(); + self.row_region + .reserve_items(items1.into_iter().map(|s| &s.row)); + self.order_key_region + .reserve_items(items2.into_iter().map(|s| &s.order_key)); + } + + fn reserve_regions<'a, I>(&mut self, regions1: I) + where + Self: 'a, + I: Iterator + Clone, + { + let regions2 = regions1.clone(); + self.row_region + .reserve_regions(regions1.into_iter().map(|s| &s.row_region)); + self.order_key_region + .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region)); + } + + fn heap_size(&self, mut callback: impl FnMut(usize, usize)) { + self.row_region.heap_size(&mut callback); + self.order_key_region.heap_size(callback); + } + } + /// A shared portion of a thread-local top-1 monoid implementation. #[derive(Debug)] pub struct Top1MonoidShared { diff --git a/src/expr/Cargo.toml b/src/expr/Cargo.toml index af9c029260831..2bb67e5a18bbe 100644 --- a/src/expr/Cargo.toml +++ b/src/expr/Cargo.toml @@ -50,6 +50,7 @@ serde_regex = "1.1.0" sha1 = "0.10.5" sha2 = "0.10.6" subtle = "2.4.1" +timely = { version = "0.12.0", default-features = false, features = ["bincode"] } tracing = "0.1.37" uncased = "0.9.7" uuid = { version = "1.2.2", features = ["v5"] } diff --git a/src/expr/src/relation/mod.rs b/src/expr/src/relation/mod.rs index be8505830e3a3..689b7342df530 100644 --- a/src/expr/src/relation/mod.rs +++ b/src/expr/src/relation/mod.rs @@ -32,6 +32,7 @@ use mz_repr::explain::{ use mz_repr::{ColumnName, ColumnType, Datum, Diff, GlobalId, RelationType, Row, ScalarType}; use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize}; +use timely::container::columnation::{Columnation, CopyRegion}; use crate::explain::HumanizedExpr; use crate::relation::func::{AggregateFunc, LagLeadType, TableFunc}; @@ -2238,7 +2239,18 @@ impl VisitChildren for MirRelationExpr { /// Specification for an ordering by a column. #[derive( - Arbitrary, Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect, + Arbitrary, + Debug, + Clone, + Copy, + Eq, + PartialEq, + Ord, + PartialOrd, + Serialize, + Deserialize, + Hash, + MzReflect, )] pub struct ColumnOrder { /// The column index. @@ -2251,6 +2263,10 @@ pub struct ColumnOrder { pub nulls_last: bool, } +impl Columnation for ColumnOrder { + type InnerRegion = CopyRegion; +} + impl RustType for ColumnOrder { fn into_proto(&self) -> ProtoColumnOrder { ProtoColumnOrder { From 91e092d951ff325ac2c60cf7df61d7b7dc713282 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 17 Nov 2023 15:38:35 -0500 Subject: [PATCH 2/2] Columnation for reduction monoids Signed-off-by: Moritz Hoffmann --- src/compute/src/extensions/arrange.rs | 4 +- src/compute/src/render/reduce.rs | 77 ++++++++++++++++++++++----- 2 files changed, 67 insertions(+), 14 deletions(-) diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs index 69aec7dfa2beb..5fadeddeb44b8 100644 --- a/src/compute/src/extensions/arrange.rs +++ b/src/compute/src/extensions/arrange.rs @@ -300,8 +300,8 @@ where G: Scope, G::Timestamp: Lattice + Ord, K: Data + Columnation, - T: Lattice + Timestamp, - R: Semigroup, + T: Lattice + Timestamp + Columnation, + R: Semigroup + Columnation, { fn log_arrangement_size(self) -> Self { log_arrangement_size_inner(self, |trace| { diff --git a/src/compute/src/render/reduce.rs b/src/compute/src/render/reduce.rs index af4a4678b7eb1..868be52ab2d9e 100644 --- a/src/compute/src/render/reduce.rs +++ b/src/compute/src/render/reduce.rs @@ -31,7 +31,7 @@ use mz_storage_types::errors::DataflowError; use mz_timely_util::operator::CollectionExt; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; -use timely::container::columnation::Columnation; +use timely::container::columnation::{Columnation, CopyRegion}; use timely::dataflow::Scope; use timely::progress::timestamp::Refines; use timely::progress::Timestamp; @@ -1513,7 +1513,7 @@ fn finalize_accum<'a>(aggr_func: &'a AggregateFunc, accum: &'a Accum, total: Dif /// point representation has less precision than a double. It is entirely possible /// that the values of the accumulator overflow, thus we have to use wrapping arithmetic /// to preserve group guarantees. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] enum Accum { /// Accumulates boolean values. Bool { @@ -1755,6 +1755,10 @@ impl Multiply for Accum { } } +impl Columnation for Accum { + type InnerRegion = CopyRegion; +} + /// Monoids for in-place compaction of monotonic streams. mod monoids { @@ -1778,6 +1782,7 @@ mod monoids { use mz_ore::soft_panic_or_log; use mz_repr::{Datum, Diff, Row}; use serde::{Deserialize, Serialize}; + use timely::container::columnation::{Columnation, Region}; /// A monoid containing a single-datum row. #[derive(Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)] @@ -1786,6 +1791,15 @@ mod monoids { Max(Row), } + impl ReductionMonoid { + pub fn finalize(&self) -> &Row { + use ReductionMonoid::*; + match self { + Min(row) | Max(row) => row, + } + } + } + impl Multiply for ReductionMonoid { type Output = Self; @@ -1839,7 +1853,6 @@ mod monoids { } } } - fn is_zero(&self) -> bool { // It totally looks like we could return true here for `Datum::Null`, but don't do this! // DD uses true results of this method to make stuff disappear. This makes sense when @@ -1850,6 +1863,55 @@ mod monoids { } } + impl Columnation for ReductionMonoid { + type InnerRegion = ReductionMonoidRegion; + } + + /// Region for [`ReductionMonoid`]. This region is special in that it stores both enum variants + /// in the same backing region. Alternatively, it could store it in two regions, but we select + /// the former for simplicity reasons. + #[derive(Default)] + pub struct ReductionMonoidRegion { + inner: ::InnerRegion, + } + + impl Region for ReductionMonoidRegion { + type Item = ReductionMonoid; + + unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item { + use ReductionMonoid::*; + match item { + Min(row) => Min(self.inner.copy(row)), + Max(row) => Max(self.inner.copy(row)), + } + } + + fn clear(&mut self) { + self.inner.clear(); + } + + fn reserve_items<'a, I>(&mut self, items: I) + where + Self: 'a, + I: Iterator + Clone, + { + self.inner + .reserve_items(items.map(ReductionMonoid::finalize)); + } + + fn reserve_regions<'a, I>(&mut self, regions: I) + where + Self: 'a, + I: Iterator + Clone, + { + self.inner.reserve_regions(regions.map(|r| &r.inner)); + } + + fn heap_size(&self, callback: impl FnMut(usize, usize)) { + self.inner.heap_size(callback); + } + } + /// Get the correct monoid implementation for a given aggregation function. Note that /// all hierarchical aggregation functions need to supply a monoid implementation. pub fn get_monoid(row: Row, func: &AggregateFunc) -> Option { @@ -1911,15 +1973,6 @@ mod monoids { | AggregateFunc::WindowAggregate { .. } => None, } } - - impl ReductionMonoid { - pub fn finalize(&self) -> &Row { - use ReductionMonoid::*; - match &self { - Min(row) | Max(row) => row, - } - } - } } mod window_agg_helpers {