Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jun 14, 2024
1 parent f4df317 commit 2a1d9bf
Show file tree
Hide file tree
Showing 27 changed files with 344 additions and 259 deletions.
34 changes: 26 additions & 8 deletions e2e_test/streaming/changed_log.slt
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,27 @@ statement ok
create table t3 (v1 int primary key, v2 int);

statement ok
create materialized view mv1 as with sub as changedlog from t1 select * from sub;
create materialized view mv1 as with sub as changelog from t1 select * from sub;

statement ok
create materialized view mv2 as with sub as changedlog from t2 select * from sub;
create materialized view mv2 as with sub as changelog from t2 select * from sub;

statement ok
create materialized view mv3 as with sub as changedlog from t1 select v1, v2 from sub;
create materialized view mv3 as with sub as changelog from t1 select v1, v2 from sub;

statement ok
create materialized view mv4 as with sub1 as changedlog from t1, sub2 as changedlog from t2
create materialized view mv4 as with sub1 as changelog from t1, sub2 as changelog from t2
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22 from sub1 inner join sub2 on sub1.v1 = sub2.v1;

statement ok
create materialized view mv5 as with sub1 as changedlog from t1, sub2 as changedlog from t2
create materialized view mv5 as with sub1 as changelog from t1, sub2 as changelog from t2
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.op as op1, sub2.op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1;

statement ok
create materialized view mv6 as with sub as changedlog from t3 select * from sub;
create materialized view mv6 as with sub as changelog from t3 select * from sub;

statement ok
create materialized view mv7(col1,col2,col3) as with sub as changelog from t3 select * from sub;

statement ok
insert into t1 values(1,1),(2,2);
Expand All @@ -51,6 +54,12 @@ update t3 set v2 = 500 where v1 = 5;
statement ok
delete from t1 where v1 = 2;

statement ok
alter materialized view mv7 rename to mv7_rename;

statement ok
alter table t3 rename to t3_rename;

query III rowsort
select * from mv1 order by v1;
----
Expand Down Expand Up @@ -116,6 +125,16 @@ select * from mv6 order by v1;
5 500 3
6 6 1

query III rowsort
select * from mv7_rename order by col1;
----
5 5 1
5 5 4
5 500 3
6 6 1

statement ok
drop materialized view mv7_rename;

statement ok
drop materialized view mv6;
Expand All @@ -135,9 +154,8 @@ drop materialized view mv2;
statement ok
drop materialized view mv1;


statement ok
drop table t3;
drop table t3_rename;

statement ok
drop table t2;
Expand Down
4 changes: 2 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ message FilterNode {
expr.ExprNode search_condition = 1;
}

message ChangedLogNode {
message ChangeLogNode {
bool need_op = 1;
}

Expand Down Expand Up @@ -816,7 +816,7 @@ message StreamNode {
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SourceBackfillNode source_backfill = 142;
ChangedLogNode changed_log = 143;
ChangeLogNode change_log = 143;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub enum BindingCteState {
query: Either<BoundQuery, RecursiveUnion>,
},

ChangedLog {
ChangeLog {
table: Relation,
},
}
Expand Down
136 changes: 70 additions & 66 deletions src/frontend/src/binder/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_sqlparser::ast::{
Cte, Expr, Fetch, ObjectName, OrderByExpr, Query, SetExpr, SetOperator, Value, With,
Cte, CteInner, Expr, Fetch, ObjectName, OrderByExpr, Query, SetExpr, SetOperator, Value, With,
};
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -145,7 +145,7 @@ impl Binder {
/// After finishing binding, we pop the previous context from the stack.
pub fn bind_query(&mut self, query: Query) -> Result<BoundQuery> {
self.push_context();
let result = self.bind_query_inner(query.clone());
let result = self.bind_query_inner(query);
self.pop_context()?;
result
}
Expand Down Expand Up @@ -286,77 +286,81 @@ impl Binder {
for cte_table in with.cte_tables {
// note that the new `share_id` for the rcte is generated here
let share_id = self.next_share_id();
let Cte { alias, query, from } = cte_table;
let Cte { alias, cte_inner } = cte_table;
let table_name = alias.name.real_value();

if with.recursive {
let query = query.ok_or_else(|| {
ErrorCode::BindError("RECURSIVE CTE don't support changedlog from".to_string())
})?;
let (
SetExpr::SetOperation {
op: SetOperator::Union,
all,
left,
right,
},
with,
) = Self::validate_rcte(query)?
else {
if let CteInner::Query(query) = cte_inner {
let (
SetExpr::SetOperation {
op: SetOperator::Union,
all,
left,
right,
},
with,
) = Self::validate_rcte(query)?
else {
return Err(ErrorCode::BindError(
"expect `SetOperation` as the return type of validation".into(),
)
.into());
};

let entry = self
.context
.cte_to_relation
.entry(table_name)
.insert_entry(Rc::new(RefCell::new(BindingCte {
share_id,
state: BindingCteState::Init,
alias,
})))
.get()
.clone();

self.bind_rcte(with, entry, *left, *right, all)?;
} else {
return Err(ErrorCode::BindError(
"expect `SetOperation` as the return type of validation".into(),
"RECURSIVE CTE only support query".to_string(),
)
.into());
};

let entry = self
.context
.cte_to_relation
.entry(table_name)
.insert_entry(Rc::new(RefCell::new(BindingCte {
share_id,
state: BindingCteState::Init,
alias,
})))
.get()
.clone();

self.bind_rcte(with, entry, *left, *right, all)?;
} else if let Some(query) = query {
let bound_query = self.bind_query(query)?;
self.context.cte_to_relation.insert(
table_name,
Rc::new(RefCell::new(BindingCte {
share_id,
state: BindingCteState::Bound {
query: either::Either::Left(bound_query),
},
alias,
})),
);
}
} else {
let from_table_name = from.ok_or_else(|| {
ErrorCode::BindError(
"CTE with changedlog from must have a table/mv".to_string(),
)
})?;
self.push_context();
let from_table_relation = self.bind_relation_by_name(
ObjectName::from(vec![from_table_name]),
None,
None,
)?;
self.pop_context()?;
self.context.cte_to_relation.insert(
table_name,
Rc::new(RefCell::new(BindingCte {
share_id,
state: BindingCteState::ChangedLog {
table: from_table_relation,
},
alias,
})),
);
match cte_inner {
CteInner::Query(query) => {
let bound_query = self.bind_query(query)?;
self.context.cte_to_relation.insert(
table_name,
Rc::new(RefCell::new(BindingCte {
share_id,
state: BindingCteState::Bound {
query: either::Either::Left(bound_query),
},
alias,
})),
);
}
CteInner::ChangeLog(from_table_name) => {
self.push_context();
let from_table_relation = self.bind_relation_by_name(
ObjectName::from(vec![from_table_name]),
None,
None,
)?;
self.pop_context()?;
self.context.cte_to_relation.insert(
table_name,
Rc::new(RefCell::new(BindingCte {
share_id,
state: BindingCteState::ChangeLog {
table: from_table_relation,
},
alias,
})),
);
}
}
}
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,8 @@ impl Binder {
// no matter it's recursive or not.
Ok(Relation::Share(Box::new(BoundShare { share_id, input})))
}
BindingCteState::ChangedLog { table } => {
let input = BoundShareInput::ChangedLog(table);
BindingCteState::ChangeLog { table } => {
let input = BoundShareInput::ChangeLog(table);
self.bind_table_to_context(
input.fields()?,
table_name.clone(),
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/binder/relation/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::error::{ErrorCode, Result};
#[derive(Debug, Clone)]
pub enum BoundShareInput {
Query(Either<BoundQuery, RecursiveUnion>),
ChangedLog(Relation),
ChangeLog(Relation),
}
impl BoundShareInput {
pub fn fields(&self) -> Result<Vec<(bool, Field)>> {
Expand All @@ -48,7 +48,7 @@ impl BoundShareInput {
.map(|f| (false, f))
.collect_vec()),
},
BoundShareInput::ChangedLog(r) => {
BoundShareInput::ChangeLog(r) => {
let (fields, _name) = if let Relation::BaseTable(bound_base_table) = r {
(
bound_base_table.table_catalog.columns().to_vec(),
Expand Down Expand Up @@ -80,7 +80,7 @@ impl BoundShareInput {
true,
Field::with_name(
risingwave_common::types::DataType::Serial,
"_changedlog_row_id".to_string(),
"_changelog_row_id".to_string(),
),
),
])
Expand All @@ -103,7 +103,7 @@ impl RewriteExprsRecursive for BoundShare {
Either::Left(q) => q.rewrite_exprs_recursive(rewriter),
Either::Right(r) => r.rewrite_exprs_recursive(rewriter),
},
BoundShareInput::ChangedLog(r) => r.rewrite_exprs_recursive(rewriter),
BoundShareInput::ChangeLog(r) => r.rewrite_exprs_recursive(rewriter),
};
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/binder/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl Binder {
&& !except_indices.contains(&c.index)
}));

select_list.extend(exprs.clone());
select_list.extend(exprs);
aliases.extend(names);
// TODO: we will need to be able to handle wildcard expressions bound to
// aliases in the future. We'd then need a
Expand Down
Loading

0 comments on commit 2a1d9bf

Please sign in to comment.