Skip to content

Commit

Permalink
cherry-pick: shared source data loss (#19443) and rate limit (#19445, #…
Browse files Browse the repository at this point in the history
…19466) (#19482)

Signed-off-by: xxchan <[email protected]>
Co-authored-by: Noel Kwan <[email protected]>
  • Loading branch information
xxchan and kwannoel authored Nov 20, 2024
1 parent b0c5ffd commit 79dbb2c
Show file tree
Hide file tree
Showing 37 changed files with 720 additions and 148 deletions.
32 changes: 31 additions & 1 deletion e2e_test/ddl/drop/drop_creating_mv.slt
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@ risedev psql -c 'create materialized view m1 as select * from t;' &
onlyif can-use-recover
sleep 5s

onlyif can-use-recover
query I
select background_ddl from rw_catalog.rw_materialized_views where name='m1';
----
f

onlyif can-use-recover
statement ok
drop materialized view m1;

############## Test drop background mv BEFORE recovery

statement ok
set background_ddl=true;

Expand All @@ -33,6 +40,12 @@ create materialized view m1 as select * from t;
onlyif can-use-recover
sleep 5s

onlyif can-use-recover
query I
select background_ddl from rw_catalog.rw_materialized_views where name='m1';
----
t

onlyif can-use-recover
statement ok
drop materialized view m1;
Expand All @@ -48,12 +61,24 @@ create materialized view m1 as select * from t;
onlyif can-use-recover
sleep 5s

onlyif can-use-recover
query I
select background_ddl from rw_catalog.rw_materialized_views where name='m1';
----
t

onlyif can-use-recover
statement ok
recover;

onlyif can-use-recover
sleep 10s
sleep 5s

onlyif can-use-recover
query I
select background_ddl from rw_catalog.rw_materialized_views where name='m1';
----
t

onlyif can-use-recover
statement ok
Expand All @@ -69,6 +94,11 @@ set background_ddl=false;
statement ok
create materialized view m1 as select * from t;

query I
select background_ddl from rw_catalog.rw_materialized_views where name='m1';
----
f

statement ok
drop materialized view m1;

Expand Down
6 changes: 0 additions & 6 deletions e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ create source s with (

sleep 2s

# At the beginning, the source is paused. It will resume after a downstream is created.
system ok
internal_table.mjs --name s --type '' --count
----
count: 0


statement ok
create table tt1_shared (v1 int,
Expand Down
44 changes: 38 additions & 6 deletions e2e_test/source_inline/fs/posix_fs.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,36 @@ create materialized view diamonds_mv as select * from diamonds_source;
sleep 1s

# no output due to rate limit
query TTTT rowsort
statement count 0
select * from diamonds;
----

query TTTT rowsort

statement count 0
select * from diamonds_mv;


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----
diamonds FS_FETCH {FS_FETCH} 0
diamonds SOURCE {SOURCE} 0
diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0
diamonds_mv SOURCE {SOURCE} 0

statement ok
ALTER TABLE diamonds SET source_rate_limit TO DEFAULT;

statement ok
ALTER source diamonds_source SET source_rate_limit TO DEFAULT;

sleep 10s
query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----
diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0
diamonds_mv SOURCE {SOURCE} 0


sleep 3s

query TTTT rowsort
select * from diamonds;
Expand All @@ -63,6 +78,23 @@ select * from diamonds;
1.28 Good J 63.1
1.3 Fair E 64.7


statement count 0
select * from diamonds_mv;



statement ok
ALTER SOURCE diamonds_source SET source_rate_limit TO DEFAULT;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----


sleep 3s

query TTTT rowsort
select * from diamonds_mv;
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,38 @@ select * from rl_mv3;
----
0

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
rl_mv1 SOURCE {SOURCE} 0
rl_mv2 SOURCE {SOURCE} 0
rl_mv3 SOURCE {SOURCE} 0

############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
statement count 0
alter source kafka_source set source_rate_limit to 1000;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
rl_mv1 SOURCE {SOURCE} 1000
rl_mv2 SOURCE {SOURCE} 1000
rl_mv3 SOURCE {SOURCE} 1000

skipif in-memory
query I
statement count 0
alter source kafka_source set source_rate_limit to default;

# rate limit becomes None
query T
select count(*) from rw_rate_limit;
----
0

skipif in-memory
sleep 3s

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
control substitution on

############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

############## Sink into kafka

statement ok
create sink kafka_sink
from
kafka_seed_data with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit_shared',
type = 'append-only',
force_append_only='true'
);

############## Source from kafka (rate_limit = 0)

# Wait for the topic to create
skipif in-memory
sleep 5s

statement ok
create source kafka_source (v1 int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit_shared',
source_rate_limit = 0,
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Check data

skipif in-memory
sleep 3s

############## Create MV on source

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;

############## Although source is rate limited, the MV's SourceBackfill is not.

statement ok
flush;

query I
select * from rl_mv1;
----
1000

############## Insert more data. They will not go into the MV.

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

sleep 3s

query I
select * from rl_mv1;
----
1000

statement ok
SET BACKGROUND_DDL=true;

statement ok
SET BACKFILL_RATE_LIMIT=0;

statement ok
create materialized view rl_mv2 as select count(*) from kafka_source;

sleep 1s

query T
SELECT progress from rw_ddl_progress;
----
0 rows consumed

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 0
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0


############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

statement ok
alter source kafka_source set source_rate_limit to 1000;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0

sleep 3s

query I
select * from rl_mv1;
----
2000

query T
SELECT progress from rw_ddl_progress;
----
0 rows consumed



statement error
alter materialized view rl_mv2 set source_rate_limit = 1000;
----
db error: ERROR: Failed to run the query

Caused by:
sql parser error: expected SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET, found: source_rate_limit
LINE 1: alter materialized view rl_mv2 set source_rate_limit = 1000;
^


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0


statement ok
alter materialized view rl_mv2 set backfill_rate_limit = 2000;


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 2000

sleep 3s

query T
select * from rl_mv2;
----
2000



############## Cleanup

statement ok
drop source kafka_source cascade;

statement ok
drop table kafka_seed_data cascade;
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ select count(*) from kafka_source;
############## Alter source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
statement ok
alter table kafka_source set source_rate_limit to 1000;

skipif in-memory
Expand Down
Loading

0 comments on commit 79dbb2c

Please sign in to comment.