Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt Materialize to use the columnated merge batcher #23121

Merged
merged 2 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions src/compute/src/extensions/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@ where
impl<G, K, V, T, R> ArrangementSize for Arranged<G, TraceAgent<RowSpine<K, V, T, R>>>
where
G: Scope<Timestamp = T>,
G::Timestamp: Lattice + Ord,
G::Timestamp: Lattice + Ord + Columnation,
K: Data + Columnation,
V: Data + Columnation,
T: Lattice + Timestamp,
R: Semigroup,
R: Semigroup + Columnation,
{
fn log_arrangement_size(self) -> Self {
log_arrangement_size_inner(self, |trace| {
Expand All @@ -300,8 +300,8 @@ where
G: Scope<Timestamp = T>,
G::Timestamp: Lattice + Ord,
K: Data + Columnation,
T: Lattice + Timestamp,
R: Semigroup,
T: Lattice + Timestamp + Columnation,
R: Semigroup + Columnation,
{
fn log_arrangement_size(self) -> Self {
log_arrangement_size_inner(self, |trace| {
Expand Down
62 changes: 31 additions & 31 deletions src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ pub(crate) type ErrArrangementImport<S, T> = Arranged<
/// of regions or iteration.
pub struct Context<S: Scope, T = mz_repr::Timestamp>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// The scope within which all managed collections exist.
///
Expand Down Expand Up @@ -104,7 +104,7 @@ where

impl<S: Scope> Context<S>
where
S::Timestamp: Lattice + Refines<mz_repr::Timestamp>,
S::Timestamp: Lattice + Refines<mz_repr::Timestamp> + Columnation,
{
/// Creates a new empty Context.
pub fn for_dataflow_in<Plan>(
Expand Down Expand Up @@ -134,8 +134,8 @@ where

impl<S: Scope, T> Context<S, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
Comment on lines +137 to +138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not completely clear to me whether these trait bounds are absolutely required here? We are forcing CollectionBundle to require Columnation on its timestamp types, but we could require these bounds only in the implementation including ensure_collections?

Another way to phrase the question more conceptually: Do we want to ensure Columnation for timestamp types in connection with arrangement usage only or across all contexts (arrangements, dataflow edges) in this PR? The difference is not large now in that the timestamps used in arrangements and the ones in dataflow edges are today the same, but changing the requirements expresses an opinion that they should at least share some more behavior than we've required so far.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we need the bound on CollectionBundle because ArrangementFlavor needs it. We use both T and S::Timestamp in the handles, so the Columnation bound propagates everywhere.

{
/// Insert a collection bundle by an identifier.
///
Expand Down Expand Up @@ -230,15 +230,15 @@ impl ShutdownToken {
#[derive(Clone)]
pub enum SpecializedArrangement<S: Scope>
where
<S as ScopeParent>::Timestamp: Lattice,
<S as ScopeParent>::Timestamp: Lattice + Columnation,
{
RowUnit(KeyValArrangement<S, Row, ()>),
RowRow(KeyValArrangement<S, Row, Row>),
}

impl<S: Scope> SpecializedArrangement<S>
where
<S as ScopeParent>::Timestamp: Lattice,
<S as ScopeParent>::Timestamp: Lattice + Columnation,
{
/// The scope of the underlying arrangement's stream.
pub fn scope(&self) -> S {
Expand Down Expand Up @@ -293,7 +293,7 @@ where
refuel: usize,
) -> timely::dataflow::Stream<S, I::Item>
where
T: Timestamp + Lattice,
T: Timestamp + Lattice + Columnation,
<S as ScopeParent>::Timestamp: Lattice + Refines<T>,
I: IntoIterator,
I::Item: Data,
Expand Down Expand Up @@ -329,7 +329,7 @@ where

impl<'a, S: Scope> SpecializedArrangement<Child<'a, S, S::Timestamp>>
where
<S as ScopeParent>::Timestamp: Lattice,
<S as ScopeParent>::Timestamp: Lattice + Columnation,
{
/// Extracts the underlying arrangement flavor from a region.
pub fn leave_region(&self) -> SpecializedArrangement<S> {
Expand Down Expand Up @@ -366,7 +366,7 @@ where
#[derive(Clone)]
pub enum SpecializedArrangementImport<S: Scope, T = mz_repr::Timestamp>
where
T: Timestamp + Lattice,
T: Timestamp + Lattice + Columnation,
<S as ScopeParent>::Timestamp: Lattice + Refines<T>,
{
RowUnit(KeyValArrangementImport<S, Row, (), T>),
Expand All @@ -375,8 +375,8 @@ where

impl<S: Scope, T> SpecializedArrangementImport<S, T>
where
T: Timestamp + Lattice,
<S as ScopeParent>::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
<S as ScopeParent>::Timestamp: Lattice + Refines<T> + Columnation,
{
/// The scope of the underlying trace's stream.
pub fn scope(&self) -> S {
Expand Down Expand Up @@ -467,7 +467,7 @@ where

impl<'a, S: Scope, T> SpecializedArrangementImport<Child<'a, S, S::Timestamp>, T>
where
T: Timestamp + Lattice,
T: Timestamp + Lattice + Columnation,
<S as ScopeParent>::Timestamp: Lattice + Refines<T>,
{
/// Extracts the underlying arrangement flavor from a region.
Expand All @@ -487,8 +487,8 @@ where
#[derive(Clone)]
pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// A dataflow-local arrangement.
Local(SpecializedArrangement<S>, ErrArrangement<S>),
Expand All @@ -505,8 +505,8 @@ where

impl<S: Scope, T> ArrangementFlavor<S, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// Presents `self` as a stream of updates.
///
Expand Down Expand Up @@ -582,8 +582,8 @@ where
}
impl<S: Scope, T> ArrangementFlavor<S, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// The scope containing the collection bundle.
pub fn scope(&self) -> S {
Expand All @@ -610,8 +610,8 @@ where
}
impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// Extracts the arrangement flavor from a region.
pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
Expand All @@ -633,17 +633,17 @@ where
#[derive(Clone)]
pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
Comment on lines +636 to +637
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, even if we'd like Columnation in the definition of CollectionBundle, then we'd need to require the bound for the scope timestamp type S::Timestamp here, but not for the anchor timestamp type T?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also needed:

error[E0277]: the trait bound `T: Columnation` is not satisfied
   --> src/compute/src/render/context.rs:372:13
    |
372 |     RowUnit(KeyValArrangementImport<S, Row, (), T>),
    |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Columnation` is not implemented for `T`
    |
    = note: required for `TStack<((mz_repr::Row, ()), T, i64), Overflowing<u32>>` to implement `differential_dataflow::trace::implementations::Layout`
    = note: required for `Spine<Rc<OrdValBatch<TStack<((mz_repr::Row, ()), T, i64), Overflowing<u32>>, TimelyStack<((mz_repr::Row, ()), T, i64)>>>>` to implement `TraceReader`
help: consider further restricting this bound
    |
369 |     T: Timestamp + Lattice + timely::container::columnation::Columnation,
    |                            +++++++++++++++++++++++++++++++++++++++++++++

{
pub collection: Option<(Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)>,
pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
}

impl<S: Scope, T: Lattice> CollectionBundle<S, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// Construct a new collection bundle from update streams.
pub fn from_collections(
Expand Down Expand Up @@ -715,8 +715,8 @@ where

impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// Extracts the collection bundle from a region.
pub fn leave_region(&self) -> CollectionBundle<S, T> {
Expand All @@ -734,10 +734,10 @@ where
}
}

impl<S: Scope, T: Lattice> CollectionBundle<S, T>
impl<S: Scope, T> CollectionBundle<S, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// Asserts that the arrangement for a specific key
/// (or the raw collection for no key) exists,
Expand Down Expand Up @@ -899,7 +899,7 @@ where

impl<S, T> CollectionBundle<S, T>
where
T: timely::progress::Timestamp + Lattice,
T: timely::progress::Timestamp + Lattice + Columnation,
S: Scope,
S::Timestamp:
Refines<T> + Lattice + timely::progress::Timestamp + crate::render::RenderTimestamp,
Expand Down
8 changes: 4 additions & 4 deletions src/compute/src/render/join/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ fn dispatch_build_halfjoin_trace<G, T, CF>(
)
where
G: Scope,
T: Timestamp + Lattice,
G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T>,
T: Timestamp + Lattice + Columnation,
G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T> + Columnation,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
{
match trace {
Expand Down Expand Up @@ -576,8 +576,8 @@ fn dispatch_build_update_stream_trace<G, T>(
) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
where
G: Scope,
T: Timestamp + Lattice,
G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T>,
T: Timestamp + Lattice + Columnation,
G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T> + Columnation,
{
match trace {
SpecializedArrangementImport::RowUnit(inner) => build_update_stream(
Expand Down
9 changes: 5 additions & 4 deletions src/compute/src/render/join/linear_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use mz_repr::fixed_length::IntoRowByTypes;
use mz_repr::{ColumnType, DatumVec, Diff, Row, RowArena, SharedRow};
use mz_storage_types::errors::DataflowError;
use mz_timely_util::operator::CollectionExt;
use timely::container::columnation::Columnation;
use timely::dataflow::operators::OkErr;
use timely::dataflow::Scope;
use timely::progress::timestamp::{Refines, Timestamp};
Expand Down Expand Up @@ -125,8 +126,8 @@ impl LinearJoinSpec {
enum JoinedFlavor<G, T>
where
G: Scope,
G::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
{
/// Streamed data as a collection.
Collection(Collection<G, Row, Diff>),
Expand All @@ -139,8 +140,8 @@ where
impl<G, T> Context<G, T>
where
G: Scope,
G::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
{
pub(crate) fn render_join(
&mut self,
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/render/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ where
}

/// A timestamp type that can be used for operations within MZ's dataflow layer.
pub trait RenderTimestamp: Timestamp + Lattice + Refines<mz_repr::Timestamp> {
pub trait RenderTimestamp: Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation {
/// The system timestamp component of the timestamp.
///
/// This is useful for manipulating the system time, as when delaying
Expand Down Expand Up @@ -970,7 +970,7 @@ impl RenderTimestamp for mz_repr::Timestamp {
}
}

impl<T: Timestamp + Lattice> RenderTimestamp for Product<mz_repr::Timestamp, T> {
impl<T: Timestamp + Lattice + Columnation> RenderTimestamp for Product<mz_repr::Timestamp, T> {
fn system_time(&mut self) -> &mut mz_repr::Timestamp {
&mut self.outer
}
Expand Down
Loading