diff --git a/e2e_test/streaming/changed_log.slt b/e2e_test/streaming/changed_log.slt new file mode 100644 index 000000000000..04ef16ff8919 --- /dev/null +++ b/e2e_test/streaming/changed_log.slt @@ -0,0 +1,164 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t1 (v1 int, v2 int); + +statement ok +create table t2 (v1 int, v2 int); + +statement ok +create table t3 (v1 int primary key, v2 int); + +statement ok +create materialized view mv1 as with sub as changelog from t1 select * from sub; + +statement ok +create materialized view mv2 as with sub as changelog from t2 select * from sub; + +statement ok +create materialized view mv3 as with sub as changelog from t1 select v1, v2 from sub; + +statement ok +create materialized view mv4 as with sub1 as changelog from t1, sub2 as changelog from t2 +select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22 from sub1 inner join sub2 on sub1.v1 = sub2.v1; + +statement ok +create materialized view mv5 as with sub1 as changelog from t1, sub2 as changelog from t2 +select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.changelog_op as op1, sub2.changelog_op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1; + +statement ok +create materialized view mv6 as with sub as changelog from t3 select * from sub; + +statement ok +create materialized view mv7(col1,col2,col3) as with sub as changelog from t3 select * from sub; + +statement ok +insert into t1 values(1,1),(2,2); + +statement ok +insert into t2 values(1,10),(2,20); + +statement ok +insert into t3 values(5,5),(6,6); + +statement ok +update t1 set v2 = 100 where v1 = 1; + +statement ok +update t2 set v2 = 100 where v1 = 1; + +statement ok +update t3 set v2 = 500 where v1 = 5; + +statement ok +delete from t1 where v1 = 2; + +statement ok +alter materialized view mv7 rename to mv7_rename; + +statement ok +alter table t3 rename to t3_rename; + +query III rowsort +select * from mv1 order by v1; +---- +1 1 1 +1 1 4 +1 100 3 +2 2 1 +2 2 2 + +query III rowsort +select * from mv2 order by v1; +---- +1 10 1 +1 10 4 +1 100 3 +2 20 1 + +query III rowsort +select * from mv3 order by v1; +---- +1 1 +1 1 +1 100 +2 2 +2 2 + +query III rowsort +select * from mv4 order by v11,v21; +---- +1 1 1 10 +1 1 1 10 +1 1 1 10 +1 1 1 10 +1 1 1 100 +1 1 1 100 +1 100 1 10 +1 100 1 10 +1 100 1 100 +2 2 2 20 +2 2 2 20 + + +query III rowsort +select * from mv5 order by v11,v21; +---- +1 1 1 10 1 1 +1 1 1 10 1 4 +1 1 1 10 4 1 +1 1 1 10 4 4 +1 1 1 100 1 3 +1 1 1 100 4 3 +1 100 1 10 3 1 +1 100 1 10 3 4 +1 100 1 100 3 3 +2 2 2 20 1 1 +2 2 2 20 2 1 + +query III rowsort +select * from mv6 order by v1; +---- +5 5 1 +5 5 4 +5 500 3 +6 6 1 + +query III rowsort +select * from mv7_rename order by col1; +---- +5 5 1 +5 5 4 +5 500 3 +6 6 1 + +statement ok +drop materialized view mv7_rename; + +statement ok +drop materialized view mv6; + +statement ok +drop materialized view mv5; + +statement ok +drop materialized view mv4; + +statement ok +drop materialized view mv3; + +statement ok +drop materialized view mv2; + +statement ok +drop materialized view mv1; + +statement ok +drop table t3_rename; + +statement ok +drop table t2; + +statement ok +drop table t1; \ No newline at end of file diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index a43e34bde8df..75b3d6f177dd 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -287,6 +287,10 @@ message FilterNode { expr.ExprNode search_condition = 1; } +message ChangeLogNode { + bool need_op = 1; +} + message CdcFilterNode { expr.ExprNode search_condition = 1; uint32 upstream_source_id = 2; @@ -824,6 +828,7 @@ message StreamNode { StreamCdcScanNode stream_cdc_scan = 139; CdcFilterNode cdc_filter = 140; SourceBackfillNode source_backfill = 142; + ChangeLogNode changelog = 143; } // The id for the operator. This is local per mview. // TODO: should better be a uint32. diff --git a/src/frontend/src/binder/bind_context.rs b/src/frontend/src/binder/bind_context.rs index 2876ec4aa944..688977f84991 100644 --- a/src/frontend/src/binder/bind_context.rs +++ b/src/frontend/src/binder/bind_context.rs @@ -23,6 +23,7 @@ use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::TableAlias; +use crate::binder::Relation; use crate::error::{ErrorCode, Result}; type LiteResult = std::result::Result; @@ -94,11 +95,17 @@ pub enum BindingCteState { #[default] Init, /// We know the schema form after the base term resolved. - BaseResolved { base: BoundSetExpr }, + BaseResolved { + base: BoundSetExpr, + }, /// We get the whole bound result of the (recursive) CTE. Bound { query: Either, }, + + ChangeLog { + table: Relation, + }, } /// the entire `RecursiveUnion` represents a *bound* recursive cte. diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index cf1241733461..8b526a78d53f 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -49,9 +49,9 @@ pub use insert::BoundInsert; use pgwire::pg_server::{Session, SessionId}; pub use query::BoundQuery; pub use relation::{ - BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundSource, BoundSystemTable, - BoundWatermark, BoundWindowTableFunction, Relation, ResolveQualifiedNameError, - WindowTableFunctionKind, + BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundShareInput, BoundSource, + BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, + ResolveQualifiedNameError, WindowTableFunctionKind, }; pub use select::{BoundDistinct, BoundSelect}; pub use set_expr::*; @@ -574,165 +574,167 @@ mod tests { Share( BoundShare { share_id: 0, - input: Right( - RecursiveUnion { - all: true, - base: Select( - BoundSelect { - distinct: All, - select_items: [ - Literal( - Literal { - data: Some( - Int32( - 1, + input: Query( + Right( + RecursiveUnion { + all: true, + base: Select( + BoundSelect { + distinct: All, + select_items: [ + Literal( + Literal { + data: Some( + Int32( + 1, + ), ), - ), - data_type: Some( - Int32, - ), - }, - ), - ], - aliases: [ - Some( - "a", - ), - ], - from: None, - where_clause: None, - group_by: GroupKey( - [], - ), - having: None, - schema: Schema { - fields: [ - a:Int32, + data_type: Some( + Int32, + ), + }, + ), ], + aliases: [ + Some( + "a", + ), + ], + from: None, + where_clause: None, + group_by: GroupKey( + [], + ), + having: None, + schema: Schema { + fields: [ + a:Int32, + ], + }, }, - }, - ), - recursive: Select( - BoundSelect { - distinct: All, - select_items: [ - FunctionCall( - FunctionCall { - func_type: Add, - return_type: Int32, - inputs: [ - InputRef( - InputRef { - index: 0, - data_type: Int32, - }, - ), - Literal( - Literal { - data: Some( - Int32( - 1, + ), + recursive: Select( + BoundSelect { + distinct: All, + select_items: [ + FunctionCall( + FunctionCall { + func_type: Add, + return_type: Int32, + inputs: [ + InputRef( + InputRef { + index: 0, + data_type: Int32, + }, + ), + Literal( + Literal { + data: Some( + Int32( + 1, + ), ), + data_type: Some( + Int32, + ), + }, + ), + ], + }, + ), + ], + aliases: [ + None, + ], + from: Some( + BackCteRef( + BoundBackCteRef { + share_id: 0, + base: Select( + BoundSelect { + distinct: All, + select_items: [ + Literal( + Literal { + data: Some( + Int32( + 1, + ), + ), + data_type: Some( + Int32, + ), + }, + ), + ], + aliases: [ + Some( + "a", + ), + ], + from: None, + where_clause: None, + group_by: GroupKey( + [], ), - data_type: Some( - Int32, - ), + having: None, + schema: Schema { + fields: [ + a:Int32, + ], + }, }, ), - ], - }, + }, + ), ), - ], - aliases: [ - None, - ], - from: Some( - BackCteRef( - BoundBackCteRef { - share_id: 0, - base: Select( - BoundSelect { - distinct: All, - select_items: [ - Literal( - Literal { - data: Some( - Int32( - 1, - ), - ), - data_type: Some( - Int32, + where_clause: Some( + FunctionCall( + FunctionCall { + func_type: LessThan, + return_type: Boolean, + inputs: [ + InputRef( + InputRef { + index: 0, + data_type: Int32, + }, + ), + Literal( + Literal { + data: Some( + Int32( + 10, ), - }, - ), - ], - aliases: [ - Some( - "a", - ), - ], - from: None, - where_clause: None, - group_by: GroupKey( - [], + ), + data_type: Some( + Int32, + ), + }, ), - having: None, - schema: Schema { - fields: [ - a:Int32, - ], - }, - }, - ), - }, + ], + }, + ), ), - ), - where_clause: Some( - FunctionCall( - FunctionCall { - func_type: LessThan, - return_type: Boolean, - inputs: [ - InputRef( - InputRef { - index: 0, - data_type: Int32, - }, - ), - Literal( - Literal { - data: Some( - Int32( - 10, - ), - ), - data_type: Some( - Int32, - ), - }, - ), - ], - }, + group_by: GroupKey( + [], ), - ), - group_by: GroupKey( - [], - ), - having: None, - schema: Schema { - fields: [ - ?column?:Int32, - ], + having: None, + schema: Schema { + fields: [ + ?column?:Int32, + ], + }, }, + ), + schema: Schema { + fields: [ + a:Int32, + ], }, - ), - schema: Schema { - fields: [ - a:Int32, - ], }, - }, + ), ), }, ), diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index 2aa1d066953c..459e1b7921e9 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -20,7 +20,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_sqlparser::ast::{ - Cte, Expr, Fetch, OrderByExpr, Query, SetExpr, SetOperator, Value, With, + Cte, CteInner, Expr, Fetch, ObjectName, OrderByExpr, Query, SetExpr, SetOperator, Value, With, }; use thiserror_ext::AsReport; @@ -286,51 +286,81 @@ impl Binder { for cte_table in with.cte_tables { // note that the new `share_id` for the rcte is generated here let share_id = self.next_share_id(); - let Cte { alias, query, .. } = cte_table; + let Cte { alias, cte_inner } = cte_table; let table_name = alias.name.real_value(); if with.recursive { - let ( - SetExpr::SetOperation { - op: SetOperator::Union, - all, - left, - right, - }, - with, - ) = Self::validate_rcte(query)? - else { + if let CteInner::Query(query) = cte_inner { + let ( + SetExpr::SetOperation { + op: SetOperator::Union, + all, + left, + right, + }, + with, + ) = Self::validate_rcte(query)? + else { + return Err(ErrorCode::BindError( + "expect `SetOperation` as the return type of validation".into(), + ) + .into()); + }; + + let entry = self + .context + .cte_to_relation + .entry(table_name) + .insert_entry(Rc::new(RefCell::new(BindingCte { + share_id, + state: BindingCteState::Init, + alias, + }))) + .get() + .clone(); + + self.bind_rcte(with, entry, *left, *right, all)?; + } else { return Err(ErrorCode::BindError( - "expect `SetOperation` as the return type of validation".into(), + "RECURSIVE CTE only support query".to_string(), ) .into()); - }; - - let entry = self - .context - .cte_to_relation - .entry(table_name) - .insert_entry(Rc::new(RefCell::new(BindingCte { - share_id, - state: BindingCteState::Init, - alias, - }))) - .get() - .clone(); - - self.bind_rcte(with, entry, *left, *right, all)?; + } } else { - let bound_query = self.bind_query(query)?; - self.context.cte_to_relation.insert( - table_name, - Rc::new(RefCell::new(BindingCte { - share_id, - state: BindingCteState::Bound { - query: either::Either::Left(bound_query), - }, - alias, - })), - ); + match cte_inner { + CteInner::Query(query) => { + let bound_query = self.bind_query(query)?; + self.context.cte_to_relation.insert( + table_name, + Rc::new(RefCell::new(BindingCte { + share_id, + state: BindingCteState::Bound { + query: either::Either::Left(bound_query), + }, + alias, + })), + ); + } + CteInner::ChangeLog(from_table_name) => { + self.push_context(); + let from_table_relation = self.bind_relation_by_name( + ObjectName::from(vec![from_table_name]), + None, + None, + )?; + self.pop_context()?; + self.context.cte_to_relation.insert( + table_name, + Rc::new(RefCell::new(BindingCte { + share_id, + state: BindingCteState::ChangeLog { + table: from_table_relation, + }, + alias, + })), + ); + } + } } } Ok(()) diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index 4de9c3cfaa9a..2cdf3ea07db4 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -15,7 +15,6 @@ use std::collections::hash_map::Entry; use std::ops::Deref; -use either::Either; use itertools::{EitherOrBoth, Itertools}; use risingwave_common::bail; use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME}; @@ -44,7 +43,7 @@ mod window_table_function; pub use cte_ref::BoundBackCteRef; pub use join::BoundJoin; -pub use share::BoundShare; +pub use share::{BoundShare, BoundShareInput}; pub use subquery::BoundSubquery; pub use table_or_source::{BoundBaseTable, BoundSource, BoundSystemTable}; pub use watermark::BoundWatermark; @@ -383,20 +382,25 @@ impl Binder { Ok(Relation::BackCteRef(Box::new(BoundBackCteRef { share_id, base }))) } BindingCteState::Bound { query } => { - let schema = match &query { - Either::Left(normal) => normal.schema(), - Either::Right(recursive) => &recursive.schema, - }; + let input = BoundShareInput::Query(query); self.bind_table_to_context( - schema.fields.iter().map(|f| (false, f.clone())), + input.fields()?, table_name.clone(), Some(original_alias), )?; // we could always share the cte, // no matter it's recursive or not. - let input = query; - Ok(Relation::Share(Box::new(BoundShare { share_id, input }))) + Ok(Relation::Share(Box::new(BoundShare { share_id, input}))) } + BindingCteState::ChangeLog { table } => { + let input = BoundShareInput::ChangeLog(table); + self.bind_table_to_context( + input.fields()?, + table_name.clone(), + Some(original_alias), + )?; + Ok(Relation::Share(Box::new(BoundShare { share_id, input }))) + }, } } else { self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, as_of) diff --git a/src/frontend/src/binder/relation/share.rs b/src/frontend/src/binder/relation/share.rs index 3a64189bd886..8b8afbadeb52 100644 --- a/src/frontend/src/binder/relation/share.rs +++ b/src/frontend/src/binder/relation/share.rs @@ -13,24 +13,98 @@ // limitations under the License. use either::Either; +use itertools::Itertools; +use risingwave_common::catalog::Field; use crate::binder::bind_context::RecursiveUnion; use crate::binder::statement::RewriteExprsRecursive; -use crate::binder::{BoundQuery, ShareId}; +use crate::binder::{BoundQuery, Relation, ShareId}; +use crate::error::{ErrorCode, Result}; +use crate::optimizer::plan_node::generic::{CHANGELOG_OP, _CHANGELOG_ROW_ID}; /// Share a relation during binding and planning. /// It could be used to share a (recursive) CTE, a source, a view and so on. + +#[derive(Debug, Clone)] +pub enum BoundShareInput { + Query(Either), + ChangeLog(Relation), +} +impl BoundShareInput { + pub fn fields(&self) -> Result> { + match self { + BoundShareInput::Query(q) => match q { + Either::Left(q) => Ok(q + .schema() + .fields() + .iter() + .cloned() + .map(|f| (false, f)) + .collect_vec()), + Either::Right(r) => Ok(r + .schema + .fields() + .iter() + .cloned() + .map(|f| (false, f)) + .collect_vec()), + }, + BoundShareInput::ChangeLog(r) => { + let (fields, _name) = if let Relation::BaseTable(bound_base_table) = r { + ( + bound_base_table.table_catalog.columns().to_vec(), + bound_base_table.table_catalog.name().to_string(), + ) + } else { + return Err(ErrorCode::BindError( + "Change log CTE must be a base table".to_string(), + ) + .into()); + }; + let fields = fields + .into_iter() + .map(|x| { + ( + x.is_hidden, + Field::with_name(x.data_type().clone(), x.name()), + ) + }) + .chain(vec![ + ( + false, + Field::with_name( + risingwave_common::types::DataType::Int16, + CHANGELOG_OP.to_string(), + ), + ), + ( + true, + Field::with_name( + risingwave_common::types::DataType::Serial, + _CHANGELOG_ROW_ID.to_string(), + ), + ), + ]) + .collect(); + Ok(fields) + } + } + } +} #[derive(Debug, Clone)] pub struct BoundShare { pub(crate) share_id: ShareId, - pub(crate) input: Either, + pub(crate) input: BoundShareInput, } impl RewriteExprsRecursive for BoundShare { fn rewrite_exprs_recursive(&mut self, rewriter: &mut impl crate::expr::ExprRewriter) { match &mut self.input { - Either::Left(q) => q.rewrite_exprs_recursive(rewriter), - Either::Right(r) => r.rewrite_exprs_recursive(rewriter), + BoundShareInput::Query(q) => match q { + Either::Left(q) => q.rewrite_exprs_recursive(rewriter), + Either::Right(r) => r.rewrite_exprs_recursive(rewriter), + }, + BoundShareInput::ChangeLog(r) => r.rewrite_exprs_recursive(rewriter), }; } } diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index 94b2980fcb63..7e08064fccc8 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -25,6 +25,7 @@ use risingwave_sqlparser::parser::Parser; use thiserror_ext::AsReport; use super::BoundShare; +use crate::binder::relation::BoundShareInput; use crate::binder::{Binder, Relation}; use crate::catalog::root_catalog::SchemaPath; use crate::catalog::source_catalog::SourceCatalog; @@ -281,7 +282,10 @@ impl Binder { }; let input = Either::Left(query); Ok(( - Relation::Share(Box::new(BoundShare { share_id, input })), + Relation::Share(Box::new(BoundShare { + share_id, + input: BoundShareInput::Query(input), + })), columns.iter().map(|c| (false, c.clone())).collect_vec(), )) } diff --git a/src/frontend/src/binder/select.rs b/src/frontend/src/binder/select.rs index d9848ed76973..e15a9eac7324 100644 --- a/src/frontend/src/binder/select.rs +++ b/src/frontend/src/binder/select.rs @@ -25,11 +25,12 @@ use risingwave_sqlparser::ast::{ use super::bind_context::{Clause, ColumnBinding}; use super::statement::RewriteExprsRecursive; -use super::UNNAMED_COLUMN; +use super::{BoundShareInput, UNNAMED_COLUMN}; use crate::binder::{Binder, Relation}; use crate::catalog::check_valid_column_name; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{CorrelatedId, Depth, Expr as _, ExprImpl, ExprType, FunctionCall, InputRef}; +use crate::optimizer::plan_node::generic::CHANGELOG_OP; use crate::utils::group_by::GroupBy; #[derive(Debug, Clone)] @@ -282,6 +283,17 @@ impl Binder { }) .collect::>>()?; + if let Some(Relation::Share(bound)) = &from { + if matches!(bound.input, BoundShareInput::ChangeLog(_)) + && fields.iter().filter(|&x| x.name.eq(CHANGELOG_OP)).count() > 1 + { + return Err(ErrorCode::BindError( + "The source table of changelog cannot have `changelog_op`, please rename it first".to_string() + ) + .into()); + } + } + Ok(BoundSelect { distinct, select_items, diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 1993e1293880..c33625491082 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -548,7 +548,6 @@ impl PlanRoot { ).into()); } let plan = self.gen_optimized_logical_plan_for_stream()?; - let (plan, out_col_change) = { let (plan, out_col_change) = plan.logical_rewrite_for_stream(&mut Default::default())?; @@ -879,7 +878,6 @@ impl PlanRoot { let stream_plan = self.gen_optimized_stream_plan(emit_on_window_close)?; assert_eq!(self.phase, PlanPhase::Stream); assert_eq!(stream_plan.convention(), Convention::Stream); - StreamMaterialize::create( stream_plan, mv_name, diff --git a/src/frontend/src/optimizer/plan_node/generic/changelog.rs b/src/frontend/src/optimizer/plan_node/generic/changelog.rs new file mode 100644 index 000000000000..df550f29c419 --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/generic/changelog.rs @@ -0,0 +1,91 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pretty_xmlish::{Str, XmlNode}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::util::column_index_mapping::ColIndexMapping; + +use super::{DistillUnit, GenericPlanNode}; +use crate::optimizer::plan_node::stream::prelude::GenericPlanRef; +use crate::optimizer::plan_node::utils::childless_record; +use crate::optimizer::property::FunctionalDependencySet; +use crate::utils::ColIndexMappingRewriteExt; +use crate::OptimizerContextRef; + +pub const CHANGELOG_OP: &str = "changelog_op"; +pub const _CHANGELOG_ROW_ID: &str = "_changelog_row_id"; +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ChangeLog { + pub input: PlanRef, + // If there is no op in the output result, it is false, example 'create materialized view mv1 as with sub as changelog from t1 select v1 from sub;' + pub need_op: bool, + // False before rewrite, true after rewrite + pub need_changelog_row_id: bool, +} +impl DistillUnit for ChangeLog { + fn distill_with_name<'a>(&self, name: impl Into>) -> XmlNode<'a> { + childless_record(name, vec![]) + } +} +impl ChangeLog { + pub fn new(input: PlanRef, need_op: bool, need_changelog_row_id: bool) -> Self { + ChangeLog { + input, + need_op, + need_changelog_row_id, + } + } + + pub fn i2o_col_mapping(&self) -> ColIndexMapping { + let mut map = vec![None; self.input.schema().len()]; + (0..self.input.schema().len()).for_each(|i| map[i] = Some(i)); + ColIndexMapping::new(map, self.schema().len()) + } +} +impl GenericPlanNode for ChangeLog { + fn schema(&self) -> Schema { + let mut fields = self.input.schema().fields.clone(); + if self.need_op { + fields.push(Field::with_name( + risingwave_common::types::DataType::Int16, + CHANGELOG_OP, + )); + } + if self.need_changelog_row_id { + fields.push(Field::with_name( + risingwave_common::types::DataType::Serial, + _CHANGELOG_ROW_ID, + )); + } + Schema::new(fields) + } + + fn stream_key(&self) -> Option> { + if self.need_changelog_row_id { + let keys = vec![self.schema().len() - 1]; + Some(keys) + } else { + None + } + } + + fn ctx(&self) -> OptimizerContextRef { + self.input.ctx() + } + + fn functional_dependency(&self) -> FunctionalDependencySet { + let i2o = self.i2o_col_mapping(); + i2o.rewrite_functional_dependency_set(self.input.functional_dependency().clone()) + } +} diff --git a/src/frontend/src/optimizer/plan_node/generic/filter.rs b/src/frontend/src/optimizer/plan_node/generic/filter.rs index 2bc4ee032468..71d4ca9fd05c 100644 --- a/src/frontend/src/optimizer/plan_node/generic/filter.rs +++ b/src/frontend/src/optimizer/plan_node/generic/filter.rs @@ -22,10 +22,6 @@ use crate::optimizer::plan_node::utils::childless_record; use crate::optimizer::property::FunctionalDependencySet; use crate::utils::{Condition, ConditionDisplay}; -/// [`Filter`] iterates over its input and returns elements for which `predicate` evaluates to -/// true, filtering out the others. -/// -/// If the condition allows nulls, then a null value is treated the same as false. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Filter { pub predicate: Condition, diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 392f07337184..38efb6fe2a27 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -78,6 +78,8 @@ mod cte_ref; pub use cte_ref::*; mod recursive_union; pub use recursive_union::*; +mod changelog; +pub use changelog::*; mod now; pub use now::*; diff --git a/src/frontend/src/optimizer/plan_node/logical_changelog.rs b/src/frontend/src/optimizer/plan_node/logical_changelog.rs new file mode 100644 index 000000000000..1cdbb7e7bfcc --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/logical_changelog.rs @@ -0,0 +1,173 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; + +use super::expr_visitable::ExprVisitable; +use super::generic::{GenericPlanRef, CHANGELOG_OP, _CHANGELOG_ROW_ID}; +use super::utils::impl_distill_by_unit; +use super::{ + gen_filter_and_pushdown, generic, ColPrunable, ColumnPruningContext, ExprRewritable, Logical, + LogicalProject, PlanBase, PlanTreeNodeUnary, PredicatePushdown, RewriteStreamContext, + StreamChangeLog, StreamRowIdGen, ToBatch, ToStream, ToStreamContext, +}; +use crate::error::ErrorCode::BindError; +use crate::error::Result; +use crate::expr::{ExprImpl, InputRef}; +use crate::optimizer::property::Distribution; +use crate::utils::{ColIndexMapping, Condition}; +use crate::PlanRef; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LogicalChangeLog { + pub base: PlanBase, + core: generic::ChangeLog, +} + +impl LogicalChangeLog { + pub fn create(input: PlanRef) -> PlanRef { + Self::new(input, true, true).into() + } + + pub fn new(input: PlanRef, need_op: bool, need_changelog_row_id: bool) -> Self { + let core = generic::ChangeLog::new(input, need_op, need_changelog_row_id); + Self::with_core(core) + } + + pub fn with_core(core: generic::ChangeLog) -> Self { + let base = PlanBase::new_logical_with_core(&core); + LogicalChangeLog { base, core } + } +} + +impl PlanTreeNodeUnary for LogicalChangeLog { + fn input(&self) -> PlanRef { + self.core.input.clone() + } + + fn clone_with_input(&self, input: PlanRef) -> Self { + Self::new(input, self.core.need_op, self.core.need_changelog_row_id) + } + + fn rewrite_with_input( + &self, + input: PlanRef, + input_col_change: ColIndexMapping, + ) -> (Self, ColIndexMapping) { + let changelog = Self::new(input, self.core.need_op, true); + if self.core.need_op { + let mut output_vec = input_col_change.to_parts().0.to_vec(); + let len = input_col_change.to_parts().1; + output_vec.push(Some(len)); + let out_col_change = ColIndexMapping::new(output_vec, len + 1); + (changelog, out_col_change) + } else { + (changelog, input_col_change) + } + } +} + +impl_plan_tree_node_for_unary! {LogicalChangeLog} +impl_distill_by_unit!(LogicalChangeLog, core, "LogicalChangeLog"); + +impl ExprRewritable for LogicalChangeLog {} + +impl ExprVisitable for LogicalChangeLog {} + +impl PredicatePushdown for LogicalChangeLog { + fn predicate_pushdown( + &self, + predicate: Condition, + ctx: &mut super::PredicatePushdownContext, + ) -> PlanRef { + gen_filter_and_pushdown(self, predicate, Condition::true_cond(), ctx) + } +} + +impl ColPrunable for LogicalChangeLog { + fn prune_col(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { + let fields = self.schema().fields(); + let mut need_op = false; + let mut need_changelog_row_id = false; + let new_required_cols: Vec<_> = required_cols + .iter() + .filter_map(|a| { + if let Some(f) = fields.get(*a) { + if f.name == CHANGELOG_OP { + need_op = true; + None + } else if f.name == _CHANGELOG_ROW_ID { + need_changelog_row_id = true; + None + } else { + Some(*a) + } + } else { + Some(*a) + } + }) + .collect(); + + let new_input = self.input().prune_col(&new_required_cols, ctx); + Self::new(new_input, need_op, need_changelog_row_id).into() + } +} + +impl ToBatch for LogicalChangeLog { + fn to_batch(&self) -> Result { + Err(BindError("With changelog cte only support with create mv/sink".to_string()).into()) + } +} + +impl ToStream for LogicalChangeLog { + fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { + let new_input = self.input().to_stream(ctx)?; + + let mut new_logical = self.core.clone(); + new_logical.input = new_input; + let plan = StreamChangeLog::new(new_logical).into(); + let row_id_index = self.schema().fields().len() - 1; + let plan = StreamRowIdGen::new_with_dist( + plan, + row_id_index, + Distribution::HashShard(vec![row_id_index]), + ) + .into(); + + Ok(plan) + } + + fn logical_rewrite_for_stream( + &self, + ctx: &mut RewriteStreamContext, + ) -> Result<(PlanRef, ColIndexMapping)> { + let original_schema = self.input().schema().clone(); + let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?; + let exprs = (0..original_schema.len()) + .map(|x| { + ExprImpl::InputRef( + InputRef::new( + input_col_change.map(x), + original_schema.fields[x].data_type.clone(), + ) + .into(), + ) + }) + .collect_vec(); + let project = LogicalProject::new(input.clone(), exprs); + let (project, out_col_change) = project.rewrite_with_input(input, input_col_change); + let (changelog, out_col_change) = self.rewrite_with_input(project.into(), out_col_change); + Ok((changelog.into(), out_col_change)) + } +} diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 2567cbf01c6e..295afd076216 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -850,6 +850,7 @@ mod batch_values; mod logical_agg; mod logical_apply; mod logical_cdc_scan; +mod logical_changelog; mod logical_cte_ref; mod logical_dedup; mod logical_delete; @@ -878,6 +879,7 @@ mod logical_topn; mod logical_union; mod logical_update; mod logical_values; +mod stream_changelog; mod stream_dedup; mod stream_delta_join; mod stream_dml; @@ -951,6 +953,7 @@ pub use batch_values::BatchValues; pub use logical_agg::LogicalAgg; pub use logical_apply::LogicalApply; pub use logical_cdc_scan::LogicalCdcScan; +pub use logical_changelog::LogicalChangeLog; pub use logical_cte_ref::LogicalCteRef; pub use logical_dedup::LogicalDedup; pub use logical_delete::LogicalDelete; @@ -981,6 +984,7 @@ pub use logical_union::LogicalUnion; pub use logical_update::LogicalUpdate; pub use logical_values::LogicalValues; pub use stream_cdc_table_scan::StreamCdcTableScan; +pub use stream_changelog::StreamChangeLog; pub use stream_dedup::StreamDedup; pub use stream_delta_join::StreamDeltaJoin; pub use stream_dml::StreamDml; @@ -1071,6 +1075,7 @@ macro_rules! for_all_plan_nodes { , { Logical, IcebergScan } , { Logical, RecursiveUnion } , { Logical, CteRef } + , { Logical, ChangeLog } , { Batch, SimpleAgg } , { Batch, HashAgg } , { Batch, SortAgg } @@ -1134,6 +1139,7 @@ macro_rules! for_all_plan_nodes { , { Stream, EowcSort } , { Stream, OverWindow } , { Stream, FsFetch } + , { Stream, ChangeLog } } }; } @@ -1175,6 +1181,7 @@ macro_rules! for_logical_plan_nodes { , { Logical, IcebergScan } , { Logical, RecursiveUnion } , { Logical, CteRef } + , { Logical, ChangeLog } } }; } @@ -1256,6 +1263,7 @@ macro_rules! for_stream_plan_nodes { , { Stream, EowcSort } , { Stream, OverWindow } , { Stream, FsFetch } + , { Stream, ChangeLog } } }; } diff --git a/src/frontend/src/optimizer/plan_node/stream_changelog.rs b/src/frontend/src/optimizer/plan_node/stream_changelog.rs new file mode 100644 index 000000000000..d84aa8c7e0cd --- /dev/null +++ b/src/frontend/src/optimizer/plan_node/stream_changelog.rs @@ -0,0 +1,79 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use risingwave_pb::stream_plan::ChangeLogNode; + +use super::expr_visitable::ExprVisitable; +use super::stream::prelude::PhysicalPlanRef; +use super::stream::StreamPlanRef; +use super::utils::impl_distill_by_unit; +use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode}; +use crate::stream_fragmenter::BuildFragmentGraphState; +use crate::PlanRef; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct StreamChangeLog { + pub base: PlanBase, + core: generic::ChangeLog, +} + +impl StreamChangeLog { + pub fn new(core: generic::ChangeLog) -> Self { + let input = core.input.clone(); + let dist = input.distribution().clone(); + // Filter executor won't change the append-only behavior of the stream. + let mut watermark_columns = input.watermark_columns().clone(); + if core.need_op { + watermark_columns.grow(input.watermark_columns().len() + 2); + } else { + watermark_columns.grow(input.watermark_columns().len() + 1); + } + let base = PlanBase::new_stream_with_core( + &core, + dist, + input.append_only(), + input.emit_on_window_close(), + watermark_columns, + ); + StreamChangeLog { base, core } + } +} + +impl PlanTreeNodeUnary for StreamChangeLog { + fn input(&self) -> PlanRef { + self.core.input.clone() + } + + fn clone_with_input(&self, input: PlanRef) -> Self { + let mut core = self.core.clone(); + core.input = input; + Self::new(core) + } +} + +impl_plan_tree_node_for_unary! { StreamChangeLog } +impl_distill_by_unit!(StreamChangeLog, core, "StreamChangeLog"); + +impl StreamNode for StreamChangeLog { + fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { + PbNodeBody::Changelog(ChangeLogNode { + need_op: self.core.need_op, + }) + } +} + +impl ExprRewritable for StreamChangeLog {} + +impl ExprVisitable for StreamChangeLog {} diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 35991349f17c..1e93514f6c0f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -34,7 +34,7 @@ use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::{Explain, TableCatalog}; +use crate::TableCatalog; /// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted /// to stream scan + merge node (for upstream materialize) + batch table scan when converting to `MView` @@ -241,12 +241,7 @@ impl StreamTableScan { let stream_key = self .stream_key() - .unwrap_or_else(|| { - panic!( - "should always have a stream key in the stream plan but not, sub plan: {}", - PlanRef::from(self.clone()).explain_to_string() - ) - }) + .unwrap_or(&[]) .iter() .map(|x| *x as u32) .collect_vec(); diff --git a/src/frontend/src/planner/changelog.rs b/src/frontend/src/planner/changelog.rs new file mode 100644 index 000000000000..cddfa629938f --- /dev/null +++ b/src/frontend/src/planner/changelog.rs @@ -0,0 +1,25 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use crate::binder::Relation; +use crate::error::Result; +use crate::optimizer::plan_node::LogicalChangeLog; +use crate::{PlanRef, Planner}; + +impl Planner { + pub(super) fn plan_changelog(&mut self, relation: Relation) -> Result { + let root = self.plan_relation(relation)?; + let plan = LogicalChangeLog::create(root); + Ok(plan) + } +} diff --git a/src/frontend/src/planner/mod.rs b/src/frontend/src/planner/mod.rs index 12bf25c0372d..7ccbb905cb47 100644 --- a/src/frontend/src/planner/mod.rs +++ b/src/frontend/src/planner/mod.rs @@ -18,6 +18,7 @@ use crate::binder::{BoundStatement, ShareId}; use crate::error::Result; use crate::optimizer::{OptimizerContextRef, PlanRoot}; +mod changelog; mod delete; mod insert; mod query; diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 51d475616e21..7941d2837a06 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -22,8 +22,8 @@ use risingwave_common::types::{DataType, Interval, ScalarImpl}; use risingwave_sqlparser::ast::AsOf; use crate::binder::{ - BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundSource, BoundSystemTable, - BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind, + BoundBackCteRef, BoundBaseTable, BoundJoin, BoundShare, BoundShareInput, BoundSource, + BoundSystemTable, BoundWatermark, BoundWindowTableFunction, Relation, WindowTableFunctionKind, }; use crate::error::{ErrorCode, Result}; use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; @@ -224,7 +224,7 @@ impl Planner { pub(super) fn plan_share(&mut self, share: BoundShare) -> Result { match share.input { - Either::Left(nonrecursive_query) => { + BoundShareInput::Query(Either::Left(nonrecursive_query)) => { let id = share.share_id; match self.share_cache.get(&id) { None => { @@ -239,11 +239,18 @@ impl Planner { } } // for the recursive union in rcte - Either::Right(recursive_union) => self.plan_recursive_union( + BoundShareInput::Query(Either::Right(recursive_union)) => self.plan_recursive_union( *recursive_union.base, *recursive_union.recursive, share.share_id, ), + BoundShareInput::ChangeLog(relation) => { + let id = share.share_id; + let result = self.plan_changelog(relation)?; + let logical_share = LogicalShare::create(result); + self.share_cache.insert(id, logical_share.clone()); + Ok(logical_share) + } } } @@ -272,10 +279,12 @@ impl Planner { .map(|col| col.data_type().clone()) .collect(), Relation::Subquery(q) => q.query.schema().data_types(), - Relation::Share(share) => match &share.input { - Either::Left(nonrecursive) => nonrecursive.schema().data_types(), - Either::Right(recursive) => recursive.schema.data_types(), - }, + Relation::Share(share) => share + .input + .fields()? + .into_iter() + .map(|(_, f)| f.data_type) + .collect(), r => { return Err(ErrorCode::BindError(format!( "Invalid input relation to tumble: {r:?}" diff --git a/src/meta/src/controller/rename.rs b/src/meta/src/controller/rename.rs index 84bbce4dc1ee..3947413ba868 100644 --- a/src/meta/src/controller/rename.rs +++ b/src/meta/src/controller/rename.rs @@ -153,7 +153,12 @@ impl QueryRewriter<'_> { fn visit_query(&self, query: &mut Query) { if let Some(with) = &mut query.with { for cte_table in &mut with.cte_tables { - self.visit_query(&mut cte_table.query); + match &mut cte_table.cte_inner { + risingwave_sqlparser::ast::CteInner::Query(query) => self.visit_query(query), + risingwave_sqlparser::ast::CteInner::ChangeLog(from) => { + *from = Ident::new_unchecked(self.to) + } + } } } self.visit_set_expr(&mut query.body); diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index c27508e8e80d..9bc2c0603a12 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -46,9 +46,9 @@ pub use self::legacy_source::{ }; pub use self::operator::{BinaryOperator, QualifiedOperator, UnaryOperator}; pub use self::query::{ - Cte, Distinct, Fetch, Join, JoinConstraint, JoinOperator, LateralView, OrderByExpr, Query, - Select, SelectItem, SetExpr, SetOperator, TableAlias, TableFactor, TableWithJoins, Top, Values, - With, + Cte, CteInner, Distinct, Fetch, Join, JoinConstraint, JoinOperator, LateralView, OrderByExpr, + Query, Select, SelectItem, SetExpr, SetOperator, TableAlias, TableFactor, TableWithJoins, Top, + Values, With, }; pub use self::statement::*; pub use self::value::{ diff --git a/src/sqlparser/src/ast/query.rs b/src/sqlparser/src/ast/query.rs index 5425864bf4e5..83e84907a109 100644 --- a/src/sqlparser/src/ast/query.rs +++ b/src/sqlparser/src/ast/query.rs @@ -282,20 +282,28 @@ impl fmt::Display for With { #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct Cte { pub alias: TableAlias, - pub query: Query, - pub from: Option, + pub cte_inner: CteInner, } impl fmt::Display for Cte { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{} AS ({})", self.alias, self.query)?; - if let Some(ref fr) = self.from { - write!(f, " FROM {}", fr)?; + match &self.cte_inner { + CteInner::Query(query) => write!(f, "{} AS ({})", self.alias, query)?, + CteInner::ChangeLog(ident) => { + write!(f, "{} AS changelog from {}", self.alias, ident.value)? + } } Ok(()) } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum CteInner { + Query(Query), + ChangeLog(Ident), +} + /// One item of the comma-separated list following `SELECT` #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 7015bfac429f..34121747823f 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3978,37 +3978,36 @@ impl Parser<'_> { /// Parse a CTE (`alias [( col1, col2, ... )] AS (subquery)`) fn parse_cte(&mut self) -> PResult { let name = self.parse_identifier_non_reserved()?; - - let mut cte = if self.parse_keyword(Keyword::AS) { - self.expect_token(&Token::LParen)?; - let query = self.parse_query()?; - self.expect_token(&Token::RParen)?; + let cte = if self.parse_keyword(Keyword::AS) { + let cte_inner = self.parse_cte_inner()?; let alias = TableAlias { name, columns: vec![], }; - Cte { - alias, - query, - from: None, - } + Cte { alias, cte_inner } } else { let columns = self.parse_parenthesized_column_list(Optional)?; self.expect_keyword(Keyword::AS)?; - self.expect_token(&Token::LParen)?; + let cte_inner = self.parse_cte_inner()?; + let alias = TableAlias { name, columns }; + Cte { alias, cte_inner } + }; + Ok(cte) + } + + fn parse_cte_inner(&mut self) -> PResult { + if let Ok(()) = self.expect_token(&Token::LParen) { let query = self.parse_query()?; self.expect_token(&Token::RParen)?; - let alias = TableAlias { name, columns }; - Cte { - alias, - query, - from: None, + Ok(CteInner::Query(query)) + } else { + let changelog = self.parse_identifier_non_reserved()?; + if changelog.to_string().to_lowercase() != "changelog" { + parser_err!("Expected 'changelog' but found '{}'", changelog); } - }; - if self.parse_keyword(Keyword::FROM) { - cte.from = Some(self.parse_identifier()?); + self.expect_keyword(Keyword::FROM)?; + Ok(CteInner::ChangeLog(self.parse_identifier()?)) } - Ok(cte) } /// Parse a "query body", which is an expression with roughly the diff --git a/src/sqlparser/tests/sqlparser_common.rs b/src/sqlparser/tests/sqlparser_common.rs index a3159400b88f..16d0d94b601d 100644 --- a/src/sqlparser/tests/sqlparser_common.rs +++ b/src/sqlparser/tests/sqlparser_common.rs @@ -2688,17 +2688,23 @@ fn parse_ctes() { fn assert_ctes_in_select(expected: &[&str], sel: &Query) { for (i, exp) in expected.iter().enumerate() { - let Cte { alias, query, .. } = &sel.with.as_ref().unwrap().cte_tables[i]; - assert_eq!(*exp, query.to_string()); - assert_eq!( - if i == 0 { - Ident::new_unchecked("a") - } else { - Ident::new_unchecked("b") - }, - alias.name - ); - assert!(alias.columns.is_empty()); + let Cte { + alias, cte_inner, .. + } = &sel.with.as_ref().unwrap().cte_tables[i]; + if let CteInner::Query(query) = cte_inner { + assert_eq!(*exp, query.to_string()); + assert_eq!( + if i == 0 { + Ident::new_unchecked("a") + } else { + Ident::new_unchecked("b") + }, + alias.name + ); + assert!(alias.columns.is_empty()); + } else { + panic!("expected CteInner::Query") + } } } @@ -2731,7 +2737,75 @@ fn parse_ctes() { // CTE in a CTE... let sql = &format!("WITH outer_cte AS ({}) SELECT * FROM outer_cte", with); let select = verified_query(sql); - assert_ctes_in_select(&cte_sqls, &only(&select.with.unwrap().cte_tables).query); + if let CteInner::Query(query) = &only(&select.with.unwrap().cte_tables).cte_inner { + assert_ctes_in_select(&cte_sqls, query); + } else { + panic!("expected CteInner::Query") + } +} + +#[test] +fn parse_changelog_ctes() { + let cte_sqls = vec!["foo", "bar"]; + let with = &format!( + "WITH a AS changelog from {}, b AS changelog from {} SELECT foo + bar FROM a, b", + cte_sqls[0], cte_sqls[1] + ); + + fn assert_changelog_ctes(expected: &[&str], sel: &Query) { + for (i, exp) in expected.iter().enumerate() { + let Cte { alias, cte_inner } = &sel.with.as_ref().unwrap().cte_tables[i]; + if let CteInner::ChangeLog(from) = cte_inner { + assert_eq!(*exp, from.to_string()); + assert_eq!( + if i == 0 { + Ident::new_unchecked("a") + } else { + Ident::new_unchecked("b") + }, + alias.name + ); + assert!(alias.columns.is_empty()); + } else { + panic!("expected CteInner::ChangeLog") + } + } + } + + // Top-level CTE + assert_changelog_ctes(&cte_sqls, &verified_query(with)); + // CTE in a subquery + let sql = &format!("SELECT ({})", with); + let select = verified_only_select(sql); + match expr_from_projection(only(&select.projection)) { + Expr::Subquery(ref subquery) => { + assert_changelog_ctes(&cte_sqls, subquery.as_ref()); + } + _ => panic!("expected subquery"), + } + // CTE in a derived table + let sql = &format!("SELECT * FROM ({})", with); + let select = verified_only_select(sql); + match only(select.from).relation { + TableFactor::Derived { subquery, .. } => { + assert_changelog_ctes(&cte_sqls, subquery.as_ref()) + } + _ => panic!("expected derived table"), + } + // CTE in a view + let sql = &format!("CREATE VIEW v AS {}", with); + match verified_stmt(sql) { + Statement::CreateView { query, .. } => assert_changelog_ctes(&cte_sqls, &query), + _ => panic!("expected CREATE VIEW"), + } + // CTE in a CTE... + let sql = &format!("WITH outer_cte AS ({}) SELECT * FROM outer_cte", with); + let select = verified_query(sql); + if let CteInner::Query(query) = &only(&select.with.unwrap().cte_tables).cte_inner { + assert_changelog_ctes(&cte_sqls, query); + } else { + panic!("expected CteInner::Query") + } } #[test] @@ -2749,6 +2823,21 @@ fn parse_cte_renamed_columns() { .alias .columns ); + + let sql_changelog = "WITH cte (col1, col2) AS changelog from baz SELECT * FROM cte"; + + let query_changelog = verified_query(sql_changelog); + assert_eq!( + vec![Ident::new_unchecked("col1"), Ident::new_unchecked("col2")], + query_changelog + .with + .unwrap() + .cte_tables + .first() + .unwrap() + .alias + .columns + ); } #[test] @@ -2770,8 +2859,7 @@ fn parse_recursive_cte() { name: Ident::new_unchecked("nums"), columns: vec![Ident::new_unchecked("val")], }, - query: cte_query, - from: None, + cte_inner: CteInner::Query(cte_query), }; assert_eq!(with.cte_tables.first().unwrap(), &expected); } diff --git a/src/stream/src/executor/changelog.rs b/src/stream/src/executor/changelog.rs new file mode 100644 index 000000000000..2a816da81a4a --- /dev/null +++ b/src/stream/src/executor/changelog.rs @@ -0,0 +1,84 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::fmt::Formatter; +use core::iter::repeat; +use std::fmt::Debug; +use std::sync::Arc; + +use futures::prelude::stream::StreamExt; +use futures_async_stream::try_stream; +use risingwave_common::array::{ArrayImpl, I16Array, Op, SerialArray, StreamChunk}; + +use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message, StreamExecutorError}; + +pub struct ChangeLogExecutor { + _ctx: ActorContextRef, + input: Executor, + need_op: bool, +} + +impl Debug for ChangeLogExecutor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ChangeLogExecutor").finish() + } +} + +impl Execute for ChangeLogExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.execute_inner().boxed() + } +} +impl ChangeLogExecutor { + pub fn new(ctx: ActorContextRef, input: Executor, need_op: bool) -> Self { + Self { + _ctx: ctx, + input, + need_op, + } + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn execute_inner(self) { + let input = self.input.execute(); + #[for_await] + for msg in input { + let msg = msg?; + match msg { + Message::Chunk(chunk) => { + let (ops, mut columns, bitmap) = chunk.into_inner(); + let new_ops = vec![Op::Insert; ops.len()]; + // They are all 0, will be add in row id gen executor. + let changelog_row_id_array = Arc::new(ArrayImpl::Serial( + SerialArray::from_iter(repeat(None).take(ops.len())), + )); + let new_chunk = if self.need_op { + let ops: Vec> = + ops.iter().map(|op| Some(op.to_i16())).collect(); + let ops_array = Arc::new(ArrayImpl::Int16(I16Array::from_iter(ops))); + columns.push(ops_array); + columns.push(changelog_row_id_array); + StreamChunk::with_visibility(new_ops, columns, bitmap) + } else { + columns.push(changelog_row_id_array); + StreamChunk::with_visibility(new_ops, columns, bitmap) + }; + yield Message::Chunk(new_chunk); + } + Message::Watermark(_w) => {} + m => yield m, + } + } + } +} diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index a6a215228366..36c6e8d173e3 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -61,6 +61,7 @@ mod backfill; mod barrier_recv; mod batch_query; mod chain; +mod changelog; mod dedup; mod dispatch; pub mod dml; @@ -114,6 +115,7 @@ pub use backfill::no_shuffle_backfill::*; pub use barrier_recv::BarrierRecvExecutor; pub use batch_query::BatchQueryExecutor; pub use chain::ChainExecutor; +pub use changelog::ChangeLogExecutor; pub use dedup::AppendOnlyDedupExecutor; pub use dispatch::{DispatchExecutor, DispatcherImpl}; pub use dynamic_filter::DynamicFilterExecutor; diff --git a/src/stream/src/from_proto/changelog.rs b/src/stream/src/from_proto/changelog.rs new file mode 100644 index 000000000000..231d894ccdd2 --- /dev/null +++ b/src/stream/src/from_proto/changelog.rs @@ -0,0 +1,38 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::stream_plan::ChangeLogNode; +use risingwave_storage::StateStore; + +use super::ExecutorBuilder; +use crate::error::StreamResult; +use crate::executor::{ChangeLogExecutor, Executor}; +use crate::task::ExecutorParams; + +pub struct ChangeLogExecutorBuilder; + +impl ExecutorBuilder for ChangeLogExecutorBuilder { + type Node = ChangeLogNode; + + async fn new_boxed_executor( + params: ExecutorParams, + node: &Self::Node, + _store: impl StateStore, + ) -> StreamResult { + let [input]: [_; 1] = params.input.try_into().unwrap(); + + let exec = ChangeLogExecutor::new(params.actor_context, input, node.need_op); + Ok((params.info, exec).into()) + } +} diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index ba433595718a..caf1d72f4ef0 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -19,6 +19,7 @@ mod append_only_dedup; mod barrier_recv; mod batch_query; mod cdc_filter; +mod changelog; mod dml; mod dynamic_filter; mod eowc_over_window; @@ -95,6 +96,7 @@ use self::union::*; use self::watermark_filter::WatermarkFilterBuilder; use crate::error::StreamResult; use crate::executor::{Execute, Executor, ExecutorInfo}; +use crate::from_proto::changelog::ChangeLogExecutorBuilder; use crate::from_proto::values::ValuesExecutorBuilder; use crate::task::ExecutorParams; @@ -172,5 +174,6 @@ pub async fn create_executor( NodeBody::OverWindow => OverWindowExecutorBuilder, NodeBody::StreamFsFetch => FsFetchExecutorBuilder, NodeBody::SourceBackfill => SourceBackfillExecutorBuilder, + NodeBody::Changelog => ChangeLogExecutorBuilder, } } diff --git a/src/tests/sqlsmith/src/reducer.rs b/src/tests/sqlsmith/src/reducer.rs index a636c2ec4413..a2875de6ae9a 100644 --- a/src/tests/sqlsmith/src/reducer.rs +++ b/src/tests/sqlsmith/src/reducer.rs @@ -20,7 +20,7 @@ use anyhow::anyhow; use itertools::Itertools; use regex::Regex; use risingwave_sqlparser::ast::{ - Cte, Expr, FunctionArgExpr, Join, Query, Select, SetExpr, Statement, TableFactor, + Cte, CteInner, Expr, FunctionArgExpr, Join, Query, Select, SetExpr, Statement, TableFactor, TableWithJoins, With, }; @@ -123,8 +123,10 @@ pub(crate) fn find_ddl_references(sql_statements: &[Statement]) -> HashSet) { let Query { with, body, .. } = query; if let Some(With { cte_tables, .. }) = with { - for Cte { query, .. } in cte_tables { - find_ddl_references_for_query(query, ddl_references) + for Cte { cte_inner, .. } in cte_tables { + if let CteInner::Query(query) = cte_inner { + find_ddl_references_for_query(query, ddl_references) + } } } find_ddl_references_for_query_in_set_expr(body, ddl_references); diff --git a/src/tests/sqlsmith/src/sql_gen/query.rs b/src/tests/sqlsmith/src/sql_gen/query.rs index 11a229844c95..dcead4275c7a 100644 --- a/src/tests/sqlsmith/src/sql_gen/query.rs +++ b/src/tests/sqlsmith/src/sql_gen/query.rs @@ -130,11 +130,9 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { fn gen_with_inner(&mut self) -> (With, Vec) { let alias = self.gen_table_alias_with_prefix("with"); let (query, query_schema) = self.gen_local_query(); - let from = None; let cte = Cte { alias: alias.clone(), - query, - from, + cte_inner: risingwave_sqlparser::ast::CteInner::Query(query), }; let with_tables = vec![Table::new(alias.name.real_value(), query_schema)];