Skip to content

Commit

Permalink
feat(steam): support stream changed log (#17132)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Jun 26, 2024
1 parent e5447c9 commit 61e9e52
Show file tree
Hide file tree
Showing 31 changed files with 1,178 additions and 272 deletions.
164 changes: 164 additions & 0 deletions e2e_test/streaming/changed_log.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (v1 int, v2 int);

statement ok
create table t2 (v1 int, v2 int);

statement ok
create table t3 (v1 int primary key, v2 int);

statement ok
create materialized view mv1 as with sub as changelog from t1 select * from sub;

statement ok
create materialized view mv2 as with sub as changelog from t2 select * from sub;

statement ok
create materialized view mv3 as with sub as changelog from t1 select v1, v2 from sub;

statement ok
create materialized view mv4 as with sub1 as changelog from t1, sub2 as changelog from t2
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22 from sub1 inner join sub2 on sub1.v1 = sub2.v1;

statement ok
create materialized view mv5 as with sub1 as changelog from t1, sub2 as changelog from t2
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.changelog_op as op1, sub2.changelog_op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1;

statement ok
create materialized view mv6 as with sub as changelog from t3 select * from sub;

statement ok
create materialized view mv7(col1,col2,col3) as with sub as changelog from t3 select * from sub;

statement ok
insert into t1 values(1,1),(2,2);

statement ok
insert into t2 values(1,10),(2,20);

statement ok
insert into t3 values(5,5),(6,6);

statement ok
update t1 set v2 = 100 where v1 = 1;

statement ok
update t2 set v2 = 100 where v1 = 1;

statement ok
update t3 set v2 = 500 where v1 = 5;

statement ok
delete from t1 where v1 = 2;

statement ok
alter materialized view mv7 rename to mv7_rename;

statement ok
alter table t3 rename to t3_rename;

query III rowsort
select * from mv1 order by v1;
----
1 1 1
1 1 4
1 100 3
2 2 1
2 2 2

query III rowsort
select * from mv2 order by v1;
----
1 10 1
1 10 4
1 100 3
2 20 1

query III rowsort
select * from mv3 order by v1;
----
1 1
1 1
1 100
2 2
2 2

query III rowsort
select * from mv4 order by v11,v21;
----
1 1 1 10
1 1 1 10
1 1 1 10
1 1 1 10
1 1 1 100
1 1 1 100
1 100 1 10
1 100 1 10
1 100 1 100
2 2 2 20
2 2 2 20


query III rowsort
select * from mv5 order by v11,v21;
----
1 1 1 10 1 1
1 1 1 10 1 4
1 1 1 10 4 1
1 1 1 10 4 4
1 1 1 100 1 3
1 1 1 100 4 3
1 100 1 10 3 1
1 100 1 10 3 4
1 100 1 100 3 3
2 2 2 20 1 1
2 2 2 20 2 1

query III rowsort
select * from mv6 order by v1;
----
5 5 1
5 5 4
5 500 3
6 6 1

query III rowsort
select * from mv7_rename order by col1;
----
5 5 1
5 5 4
5 500 3
6 6 1

statement ok
drop materialized view mv7_rename;

statement ok
drop materialized view mv6;

statement ok
drop materialized view mv5;

statement ok
drop materialized view mv4;

statement ok
drop materialized view mv3;

statement ok
drop materialized view mv2;

statement ok
drop materialized view mv1;

statement ok
drop table t3_rename;

statement ok
drop table t2;

statement ok
drop table t1;
5 changes: 5 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,10 @@ message FilterNode {
expr.ExprNode search_condition = 1;
}

message ChangeLogNode {
bool need_op = 1;
}

message CdcFilterNode {
expr.ExprNode search_condition = 1;
uint32 upstream_source_id = 2;
Expand Down Expand Up @@ -824,6 +828,7 @@ message StreamNode {
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SourceBackfillNode source_backfill = 142;
ChangeLogNode changelog = 143;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::TableAlias;

use crate::binder::Relation;
use crate::error::{ErrorCode, Result};

type LiteResult<T> = std::result::Result<T, ErrorCode>;
Expand Down Expand Up @@ -94,11 +95,17 @@ pub enum BindingCteState {
#[default]
Init,
/// We know the schema form after the base term resolved.
BaseResolved { base: BoundSetExpr },
BaseResolved {
base: BoundSetExpr,
},
/// We get the whole bound result of the (recursive) CTE.
Bound {
query: Either<BoundQuery, RecursiveUnion>,
},

ChangeLog {
table: Relation,
},
}

/// the entire `RecursiveUnion` represents a *bound* recursive cte.
Expand Down
Loading

0 comments on commit 61e9e52

Please sign in to comment.