-
Notifications
You must be signed in to change notification settings - Fork 599
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: support backfill_rate_limit for source backfill (#19445)
- Loading branch information
Showing
8 changed files
with
177 additions
and
19 deletions.
There are no files selected for viewing
134 changes: 134 additions & 0 deletions
134
e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
control substitution on | ||
|
||
############## Create kafka seed data | ||
|
||
statement ok | ||
create table kafka_seed_data (v1 int); | ||
|
||
statement ok | ||
insert into kafka_seed_data select * from generate_series(1, 1000); | ||
|
||
############## Sink into kafka | ||
|
||
statement ok | ||
create sink kafka_sink | ||
from | ||
kafka_seed_data with ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'test_rate_limit_shared', | ||
type = 'append-only', | ||
force_append_only='true' | ||
); | ||
|
||
############## Source from kafka (rate_limit = 0) | ||
|
||
# Wait for the topic to create | ||
skipif in-memory | ||
sleep 5s | ||
|
||
statement ok | ||
create source kafka_source (v1 int) with ( | ||
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, | ||
topic = 'test_rate_limit_shared', | ||
source_rate_limit = 0, | ||
) FORMAT PLAIN ENCODE JSON | ||
|
||
statement ok | ||
flush; | ||
|
||
############## Check data | ||
|
||
skipif in-memory | ||
sleep 3s | ||
|
||
############## Create MV on source | ||
|
||
statement ok | ||
create materialized view rl_mv1 as select count(*) from kafka_source; | ||
|
||
############## Although source is rate limited, the MV's SourceBackfill is not. | ||
|
||
statement ok | ||
flush; | ||
|
||
query I | ||
select * from rl_mv1; | ||
---- | ||
1000 | ||
|
||
############## Insert more data. They will not go into the MV. | ||
|
||
statement ok | ||
insert into kafka_seed_data select * from generate_series(1, 1000); | ||
|
||
sleep 3s | ||
|
||
query I | ||
select * from rl_mv1; | ||
---- | ||
1000 | ||
|
||
statement ok | ||
SET BACKGROUND_DDL=true; | ||
|
||
statement ok | ||
SET BACKFILL_RATE_LIMIT=0; | ||
|
||
statement ok | ||
create materialized view rl_mv2 as select count(*) from kafka_source; | ||
|
||
sleep 1s | ||
|
||
query T | ||
SELECT progress from rw_ddl_progress; | ||
---- | ||
0 rows consumed | ||
|
||
############## Alter Source (rate_limit = 0 --> rate_limit = 1000) | ||
|
||
statement ok | ||
alter source kafka_source set source_rate_limit to 1000; | ||
|
||
sleep 3s | ||
|
||
query I | ||
select * from rl_mv1; | ||
---- | ||
2000 | ||
|
||
query T | ||
SELECT progress from rw_ddl_progress; | ||
---- | ||
0 rows consumed | ||
|
||
|
||
|
||
statement error | ||
alter materialized view rl_mv2 set source_rate_limit = 1000; | ||
---- | ||
db error: ERROR: Failed to run the query | ||
|
||
Caused by: | ||
sql parser error: expected SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET, found: source_rate_limit | ||
LINE 1: alter materialized view rl_mv2 set source_rate_limit = 1000; | ||
^ | ||
|
||
|
||
statement ok | ||
alter materialized view rl_mv2 set backfill_rate_limit = 2000; | ||
|
||
sleep 3s | ||
|
||
query ? | ||
select * from rl_mv2; | ||
---- | ||
2000 | ||
|
||
|
||
############## Cleanup | ||
|
||
statement ok | ||
drop source kafka_source cascade; | ||
|
||
statement ok | ||
drop table kafka_seed_data cascade; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters