Skip to content

Commit

Permalink
Merge pull request #23121 from antiguru/columnated_merge_batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
antiguru authored Nov 20, 2023
2 parents bf315ae + 91e092d commit c44fb2f
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 76 deletions.
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions src/compute/src/extensions/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@ where
impl<G, K, V, T, R> ArrangementSize for Arranged<G, TraceAgent<RowSpine<K, V, T, R>>>
where
G: Scope<Timestamp = T>,
G::Timestamp: Lattice + Ord,
G::Timestamp: Lattice + Ord + Columnation,
K: Data + Columnation,
V: Data + Columnation,
T: Lattice + Timestamp,
R: Semigroup,
R: Semigroup + Columnation,
{
fn log_arrangement_size(self) -> Self {
log_arrangement_size_inner(self, |trace| {
Expand All @@ -300,8 +300,8 @@ where
G: Scope<Timestamp = T>,
G::Timestamp: Lattice + Ord,
K: Data + Columnation,
T: Lattice + Timestamp,
R: Semigroup,
T: Lattice + Timestamp + Columnation,
R: Semigroup + Columnation,
{
fn log_arrangement_size(self) -> Self {
log_arrangement_size_inner(self, |trace| {
Expand Down
62 changes: 31 additions & 31 deletions src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ pub(crate) type ErrArrangementImport<S, T> = Arranged<
/// of regions or iteration.
pub struct Context<S: Scope, T = mz_repr::Timestamp>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// The scope within which all managed collections exist.
///
Expand Down Expand Up @@ -104,7 +104,7 @@ where

impl<S: Scope> Context<S>
where
S::Timestamp: Lattice + Refines<mz_repr::Timestamp>,
S::Timestamp: Lattice + Refines<mz_repr::Timestamp> + Columnation,
{
/// Creates a new empty Context.
pub fn for_dataflow_in<Plan>(
Expand Down Expand Up @@ -134,8 +134,8 @@ where

impl<S: Scope, T> Context<S, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// Insert a collection bundle by an identifier.
///
Expand Down Expand Up @@ -230,15 +230,15 @@ impl ShutdownToken {
#[derive(Clone)]
pub enum SpecializedArrangement<S: Scope>
where
<S as ScopeParent>::Timestamp: Lattice,
<S as ScopeParent>::Timestamp: Lattice + Columnation,
{
RowUnit(KeyValArrangement<S, Row, ()>),
RowRow(KeyValArrangement<S, Row, Row>),
}

impl<S: Scope> SpecializedArrangement<S>
where
<S as ScopeParent>::Timestamp: Lattice,
<S as ScopeParent>::Timestamp: Lattice + Columnation,
{
/// The scope of the underlying arrangement's stream.
pub fn scope(&self) -> S {
Expand Down Expand Up @@ -293,7 +293,7 @@ where
refuel: usize,
) -> timely::dataflow::Stream<S, I::Item>
where
T: Timestamp + Lattice,
T: Timestamp + Lattice + Columnation,
<S as ScopeParent>::Timestamp: Lattice + Refines<T>,
I: IntoIterator,
I::Item: Data,
Expand Down Expand Up @@ -329,7 +329,7 @@ where

impl<'a, S: Scope> SpecializedArrangement<Child<'a, S, S::Timestamp>>
where
<S as ScopeParent>::Timestamp: Lattice,
<S as ScopeParent>::Timestamp: Lattice + Columnation,
{
/// Extracts the underlying arrangement flavor from a region.
pub fn leave_region(&self) -> SpecializedArrangement<S> {
Expand Down Expand Up @@ -366,7 +366,7 @@ where
#[derive(Clone)]
pub enum SpecializedArrangementImport<S: Scope, T = mz_repr::Timestamp>
where
T: Timestamp + Lattice,
T: Timestamp + Lattice + Columnation,
<S as ScopeParent>::Timestamp: Lattice + Refines<T>,
{
RowUnit(KeyValArrangementImport<S, Row, (), T>),
Expand All @@ -375,8 +375,8 @@ where

impl<S: Scope, T> SpecializedArrangementImport<S, T>
where
T: Timestamp + Lattice,
<S as ScopeParent>::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
<S as ScopeParent>::Timestamp: Lattice + Refines<T> + Columnation,
{
/// The scope of the underlying trace's stream.
pub fn scope(&self) -> S {
Expand Down Expand Up @@ -467,7 +467,7 @@ where

impl<'a, S: Scope, T> SpecializedArrangementImport<Child<'a, S, S::Timestamp>, T>
where
T: Timestamp + Lattice,
T: Timestamp + Lattice + Columnation,
<S as ScopeParent>::Timestamp: Lattice + Refines<T>,
{
/// Extracts the underlying arrangement flavor from a region.
Expand All @@ -487,8 +487,8 @@ where
#[derive(Clone)]
pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// A dataflow-local arrangement.
Local(SpecializedArrangement<S>, ErrArrangement<S>),
Expand All @@ -505,8 +505,8 @@ where

impl<S: Scope, T> ArrangementFlavor<S, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// Presents `self` as a stream of updates.
///
Expand Down Expand Up @@ -582,8 +582,8 @@ where
}
impl<S: Scope, T> ArrangementFlavor<S, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// The scope containing the collection bundle.
pub fn scope(&self) -> S {
Expand All @@ -610,8 +610,8 @@ where
}
impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// Extracts the arrangement flavor from a region.
pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
Expand All @@ -633,17 +633,17 @@ where
#[derive(Clone)]
pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
pub collection: Option<(Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)>,
pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
}

impl<S: Scope, T: Lattice> CollectionBundle<S, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// Construct a new collection bundle from update streams.
pub fn from_collections(
Expand Down Expand Up @@ -715,8 +715,8 @@ where

impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// Extracts the collection bundle from a region.
pub fn leave_region(&self) -> CollectionBundle<S, T> {
Expand All @@ -734,10 +734,10 @@ where
}
}

impl<S: Scope, T: Lattice> CollectionBundle<S, T>
impl<S: Scope, T> CollectionBundle<S, T>
where
T: Timestamp + Lattice,
S::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
{
/// Asserts that the arrangement for a specific key
/// (or the raw collection for no key) exists,
Expand Down Expand Up @@ -899,7 +899,7 @@ where

impl<S, T> CollectionBundle<S, T>
where
T: timely::progress::Timestamp + Lattice,
T: timely::progress::Timestamp + Lattice + Columnation,
S: Scope,
S::Timestamp:
Refines<T> + Lattice + timely::progress::Timestamp + crate::render::RenderTimestamp,
Expand Down
8 changes: 4 additions & 4 deletions src/compute/src/render/join/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ fn dispatch_build_halfjoin_trace<G, T, CF>(
)
where
G: Scope,
T: Timestamp + Lattice,
G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T>,
T: Timestamp + Lattice + Columnation,
G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T> + Columnation,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
{
match trace {
Expand Down Expand Up @@ -576,8 +576,8 @@ fn dispatch_build_update_stream_trace<G, T>(
) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
where
G: Scope,
T: Timestamp + Lattice,
G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T>,
T: Timestamp + Lattice + Columnation,
G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T> + Columnation,
{
match trace {
SpecializedArrangementImport::RowUnit(inner) => build_update_stream(
Expand Down
9 changes: 5 additions & 4 deletions src/compute/src/render/join/linear_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use mz_repr::fixed_length::IntoRowByTypes;
use mz_repr::{ColumnType, DatumVec, Diff, Row, RowArena, SharedRow};
use mz_storage_types::errors::DataflowError;
use mz_timely_util::operator::CollectionExt;
use timely::container::columnation::Columnation;
use timely::dataflow::operators::OkErr;
use timely::dataflow::Scope;
use timely::progress::timestamp::{Refines, Timestamp};
Expand Down Expand Up @@ -125,8 +126,8 @@ impl LinearJoinSpec {
enum JoinedFlavor<G, T>
where
G: Scope,
G::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
{
/// Streamed data as a collection.
Collection(Collection<G, Row, Diff>),
Expand All @@ -139,8 +140,8 @@ where
impl<G, T> Context<G, T>
where
G: Scope,
G::Timestamp: Lattice + Refines<T>,
T: Timestamp + Lattice,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
{
pub(crate) fn render_join(
&mut self,
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/render/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,7 @@ where
}

/// A timestamp type that can be used for operations within MZ's dataflow layer.
pub trait RenderTimestamp: Timestamp + Lattice + Refines<mz_repr::Timestamp> {
pub trait RenderTimestamp: Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation {
/// The system timestamp component of the timestamp.
///
/// This is useful for manipulating the system time, as when delaying
Expand Down Expand Up @@ -970,7 +970,7 @@ impl RenderTimestamp for mz_repr::Timestamp {
}
}

impl<T: Timestamp + Lattice> RenderTimestamp for Product<mz_repr::Timestamp, T> {
impl<T: Timestamp + Lattice + Columnation> RenderTimestamp for Product<mz_repr::Timestamp, T> {
fn system_time(&mut self) -> &mut mz_repr::Timestamp {
&mut self.outer
}
Expand Down
Loading

0 comments on commit c44fb2f

Please sign in to comment.