Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick fix: add stream change log to 1.10 (#17583)(#17515)(#17132) #18240

Merged
merged 3 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 178 additions & 0 deletions e2e_test/streaming/changelog.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (v1 int, v2 int);

statement ok
create table t2 (v1 int, v2 int);

statement ok
create table t3 (v1 int primary key, v2 int);

statement ok
create materialized view mv1 as with sub as changelog from t1 select * from sub;

statement ok
create materialized view mv2 as with sub as changelog from t2 select * from sub;

statement ok
create materialized view mv3 as with sub as changelog from t1 select v1, v2 from sub;

statement ok
create materialized view mv4 as with sub1 as changelog from t1, sub2 as changelog from t2
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22 from sub1 inner join sub2 on sub1.v1 = sub2.v1;

statement ok
create materialized view mv5 as with sub1 as changelog from t1, sub2 as changelog from t2
select sub1.v1 as v11, sub1.v2 as v12, sub2.v1 as v21, sub2.v2 as v22, sub1.changelog_op as op1, sub2.changelog_op as op2 from sub1 inner join sub2 on sub1.v1 = sub2.v1;

statement ok
create materialized view mv6 as with sub as changelog from t3 select * from sub;

statement ok
create materialized view mv7(col1,col2,col3) as with sub as changelog from t3 select * from sub;

statement ok
create materialized view mv8 as with sub as changelog from t2 select *, _changelog_row_id as row_id from sub;

statement ok
insert into t1 values(1,1),(2,2);

statement ok
insert into t2 values(1,10),(2,20);

statement ok
insert into t3 values(5,5),(6,6);

statement ok
update t1 set v2 = 100 where v1 = 1;

statement ok
update t2 set v2 = 100 where v1 = 1;

statement ok
update t3 set v2 = 500 where v1 = 5;

statement ok
delete from t1 where v1 = 2;

statement ok
alter materialized view mv7 rename to mv7_rename;

statement ok
alter table t3 rename to t3_rename;

query III rowsort
select * from mv1 order by v1;
----
1 1 1
1 1 4
1 100 3
2 2 1
2 2 2

query III rowsort
select * from mv2 order by v1;
----
1 10 1
1 10 4
1 100 3
2 20 1

query III rowsort
select * from mv3 order by v1;
----
1 1
1 1
1 100
2 2
2 2

query III rowsort
select * from mv4 order by v11,v21;
----
1 1 1 10
1 1 1 10
1 1 1 10
1 1 1 10
1 1 1 100
1 1 1 100
1 100 1 10
1 100 1 10
1 100 1 100
2 2 2 20
2 2 2 20


query III rowsort
select * from mv5 order by v11,v21;
----
1 1 1 10 1 1
1 1 1 10 1 4
1 1 1 10 4 1
1 1 1 10 4 4
1 1 1 100 1 3
1 1 1 100 4 3
1 100 1 10 3 1
1 100 1 10 3 4
1 100 1 100 3 3
2 2 2 20 1 1
2 2 2 20 2 1

query III rowsort
select * from mv6 order by v1;
----
5 5 1
5 5 4
5 500 3
6 6 1

query III rowsort
select * from mv7_rename order by col1;
----
5 5 1
5 5 4
5 500 3
6 6 1

query III rowsort
select v1 from mv8 order by v1;
----
1
1
1
2

statement ok
drop materialized view mv8;

statement ok
drop materialized view mv7_rename;

statement ok
drop materialized view mv6;

statement ok
drop materialized view mv5;

statement ok
drop materialized view mv4;

statement ok
drop materialized view mv3;

statement ok
drop materialized view mv2;

statement ok
drop materialized view mv1;

statement ok
drop table t3_rename;

statement ok
drop table t2;

statement ok
drop table t1;
24 changes: 24 additions & 0 deletions integration_tests/snowflake-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,27 @@ launch your risingwave cluster, and execute the following sql commands respectiv
- `create_sink.sql`

note: the column name(s) in your materialized view should be exactly the same as the ones in your pre-defined snowflake table, due to what we specified for snowflake pipe previously in `snowflake_prep.sql`.

## 3. Sink data into snowflake with UPSERT

1. To begin the process of sink data into Snowflake with upsert, we need to set up snowflake and s3 as we did for step 1

2. Execute the following sql commands respectively.
- `upsert/create_source.sql`
- `upsert/create_mv.sql`
- `upsert/create_sink.sql`

After execution, we will import RisingWave's data change log into the snowflake's table.

3. We then use the following sql statement to create the dynamic table. We can select it to get the result of the upsert
```
CREATE OR REPLACE DYNAMIC TABLE user_behaviors
TARGET_LAG = '1 minute'
WAREHOUSE = test_warehouse
AS SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY {primary_key} ORDER BY __row_id DESC) AS dedupe_id
FROM t3
) AS subquery
WHERE dedupe_id = 1 AND (__op = 1 or __op = 3)
```
13 changes: 13 additions & 0 deletions integration_tests/snowflake-sink/upsert/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- please note that the column name(s) for your mv should be *exactly*
-- the same as the column name(s) in your snowflake table, since we are matching column by name.

CREATE MATERIALIZED VIEW ss_mv AS
WITH sub AS changelog FROM user_behaviors
SELECT
user_id,
target_id,
event_timestamp AT TIME ZONE 'America/Indiana/Indianapolis' as event_timestamp,
changelog_op AS __op,
_changelog_row_id::bigint AS __row_id
FROM
sub;
16 changes: 16 additions & 0 deletions integration_tests/snowflake-sink/upsert/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE SINK snowflake_sink FROM ss_mv WITH (
connector = 'snowflake',
type = 'append-only',
snowflake.database = 'EXAMPLE_DB',
snowflake.schema = 'EXAMPLE_SCHEMA',
snowflake.pipe = 'EXAMPLE_SNOWFLAKE_PIPE',
snowflake.account_identifier = '<ORG_NAME>-<ACCOUNT_NAME>',
snowflake.user = 'XZHSEH',
snowflake.rsa_public_key_fp = 'EXAMPLE_FP',
snowflake.private_key = 'EXAMPLE_PK',
snowflake.s3_bucket = 'EXAMPLE_S3_BUCKET',
snowflake.aws_access_key_id = 'EXAMPLE_AWS_ID',
snowflake.aws_secret_access_key = 'EXAMPLE_SECRET_KEY',
snowflake.aws_region = 'EXAMPLE_REGION',
snowflake.s3_path = 'EXAMPLE_S3_PATH',
);
19 changes: 19 additions & 0 deletions integration_tests/snowflake-sink/upsert/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- please note that this will create a source that generates 1,000 rows in 10 seconds
-- you may want to change the configuration for better testing / demo purpose

CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMPTZ,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
datagen.rows.per.second = '100'
) FORMAT PLAIN ENCODE JSON;
6 changes: 6 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ message FilterNode {
expr.ExprNode search_condition = 1;
}

message ChangeLogNode {
// Whether or not there is an op in the final output.
bool need_op = 1;
}

message CdcFilterNode {
expr.ExprNode search_condition = 1;
uint32 upstream_source_id = 2;
Expand Down Expand Up @@ -812,6 +817,7 @@ message StreamNode {
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SourceBackfillNode source_backfill = 142;
ChangeLogNode changelog = 143;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_sqlparser::ast::TableAlias;

use crate::binder::Relation;
use crate::error::{ErrorCode, Result};

type LiteResult<T> = std::result::Result<T, ErrorCode>;
Expand Down Expand Up @@ -94,11 +95,17 @@ pub enum BindingCteState {
#[default]
Init,
/// We know the schema form after the base term resolved.
BaseResolved { base: BoundSetExpr },
BaseResolved {
base: BoundSetExpr,
},
/// We get the whole bound result of the (recursive) CTE.
Bound {
query: Either<BoundQuery, RecursiveUnion>,
},

ChangeLog {
table: Relation,
},
}

/// the entire `RecursiveUnion` represents a *bound* recursive cte.
Expand Down
Loading
Loading