Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick: shared source data loss (#19443) and rate limit (#19445, #19466) #19482

Merged
merged 6 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ create source s with (

sleep 2s

# At the beginning, the source is paused. It will resume after a downstream is created.
system ok
internal_table.mjs --name s --type '' --count
----
count: 0


statement ok
create table tt1_shared (v1 int,
Expand Down
44 changes: 38 additions & 6 deletions e2e_test/source_inline/fs/posix_fs.slt
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,36 @@ create materialized view diamonds_mv as select * from diamonds_source;
sleep 1s

# no output due to rate limit
query TTTT rowsort
statement count 0
select * from diamonds;
----

query TTTT rowsort

statement count 0
select * from diamonds_mv;


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----
diamonds FS_FETCH {FS_FETCH} 0
diamonds SOURCE {SOURCE} 0
diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0
diamonds_mv SOURCE {SOURCE} 0

statement ok
ALTER TABLE diamonds SET source_rate_limit TO DEFAULT;

statement ok
ALTER source diamonds_source SET source_rate_limit TO DEFAULT;

sleep 10s
query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----
diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0
diamonds_mv SOURCE {SOURCE} 0




query TTTT rowsort
select * from diamonds;
Expand All @@ -63,6 +78,23 @@ select * from diamonds;
1.28 Good J 63.1
1.3 Fair E 64.7


statement count 0
select * from diamonds_mv;



statement ok
ALTER SOURCE diamonds_source SET source_rate_limit TO DEFAULT;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name, node_name;
----


sleep 10s

query TTTT rowsort
select * from diamonds_mv;
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,38 @@ select * from rl_mv3;
----
0

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
rl_mv1 SOURCE {SOURCE} 0
rl_mv2 SOURCE {SOURCE} 0
rl_mv3 SOURCE {SOURCE} 0

############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
statement count 0
alter source kafka_source set source_rate_limit to 1000;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
rl_mv1 SOURCE {SOURCE} 1000
rl_mv2 SOURCE {SOURCE} 1000
rl_mv3 SOURCE {SOURCE} 1000

skipif in-memory
query I
statement count 0
alter source kafka_source set source_rate_limit to default;

# rate limit becomes None
query T
select count(*) from rw_rate_limit;
----
0

skipif in-memory
sleep 3s

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
control substitution on

############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

############## Sink into kafka

statement ok
create sink kafka_sink
from
kafka_seed_data with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit_shared',
type = 'append-only',
force_append_only='true'
);

############## Source from kafka (rate_limit = 0)

# Wait for the topic to create
skipif in-memory
sleep 5s

statement ok
create source kafka_source (v1 int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit_shared',
source_rate_limit = 0,
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Check data

skipif in-memory
sleep 3s

############## Create MV on source

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;

############## Although source is rate limited, the MV's SourceBackfill is not.

statement ok
flush;

query I
select * from rl_mv1;
----
1000

############## Insert more data. They will not go into the MV.

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

sleep 3s

query I
select * from rl_mv1;
----
1000

statement ok
SET BACKGROUND_DDL=true;

statement ok
SET BACKFILL_RATE_LIMIT=0;

statement ok
create materialized view rl_mv2 as select count(*) from kafka_source;

sleep 1s

query T
SELECT progress from rw_ddl_progress;
----
0 rows consumed

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 0
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0


############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

statement ok
alter source kafka_source set source_rate_limit to 1000;

query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0

sleep 3s

query I
select * from rl_mv1;
----
2000

query T
SELECT progress from rw_ddl_progress;
----
0 rows consumed



statement error
alter materialized view rl_mv2 set source_rate_limit = 1000;
----
db error: ERROR: Failed to run the query

Caused by:
sql parser error: expected SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET, found: source_rate_limit
LINE 1: alter materialized view rl_mv2 set source_rate_limit = 1000;
^


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0


statement ok
alter materialized view rl_mv2 set backfill_rate_limit = 2000;


query T
select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id
order by name;
----
kafka_source SOURCE {SOURCE} 1000
rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 2000

sleep 3s

query T
select * from rl_mv2;
----
2000



############## Cleanup

statement ok
drop source kafka_source cascade;

statement ok
drop table kafka_seed_data cascade;
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ select count(*) from kafka_source;
############## Alter source (rate_limit = 0 --> rate_limit = 1000)

skipif in-memory
query I
statement ok
alter table kafka_source set source_rate_limit to 1000;

skipif in-memory
Expand Down
Loading