Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support backfill_rate_limit for source backfill #19445

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
control substitution on

############## Create kafka seed data

statement ok
create table kafka_seed_data (v1 int);

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

############## Sink into kafka

statement ok
create sink kafka_sink
from
kafka_seed_data with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit_shared',
type = 'append-only',
force_append_only='true'
);

############## Source from kafka (rate_limit = 0)

# Wait for the topic to create
skipif in-memory
sleep 5s

statement ok
create source kafka_source (v1 int) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'test_rate_limit_shared',
source_rate_limit = 0,
) FORMAT PLAIN ENCODE JSON

statement ok
flush;

############## Check data

skipif in-memory
sleep 3s

############## Create MV on source

statement ok
create materialized view rl_mv1 as select count(*) from kafka_source;

############## Although source is rate limited, the MV's SourceBackfill is not.

statement ok
flush;

query I
select * from rl_mv1;
----
1000

############## Insert more data. They will not go into the MV.

statement ok
insert into kafka_seed_data select * from generate_series(1, 1000);

sleep 3s

query I
select * from rl_mv1;
----
1000

statement ok
SET BACKGROUND_DDL=true;

statement ok
SET BACKFILL_RATE_LIMIT=0;

statement ok
create materialized view rl_mv2 as select count(*) from kafka_source;

sleep 1s

query T
SELECT progress from rw_ddl_progress;
----
0 rows consumed

############## Alter Source (rate_limit = 0 --> rate_limit = 1000)

statement ok
alter source kafka_source set source_rate_limit to 1000;

sleep 3s

query I
select * from rl_mv1;
----
2000

query T
SELECT progress from rw_ddl_progress;
----
0 rows consumed



statement error
alter materialized view rl_mv2 set source_rate_limit = 1000;
----
db error: ERROR: Failed to run the query

Caused by:
sql parser error: expected SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET, found: source_rate_limit
LINE 1: alter materialized view rl_mv2 set source_rate_limit = 1000;
^


statement ok
alter materialized view rl_mv2 set backfill_rate_limit = 2000;

sleep 3s

query ?
select * from rl_mv2;
----
2000


############## Cleanup

statement ok
drop source kafka_source cascade;

statement ok
drop table kafka_seed_data cascade;
10 changes: 5 additions & 5 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ message StreamSource {
map<string, string> with_properties = 6;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
// Streaming rate limit
// Source rate limit
optional uint32 rate_limit = 9;
map<string, secret.SecretRef> secret_refs = 10;
}
Expand All @@ -205,7 +205,7 @@ message StreamFsFetch {
map<string, string> with_properties = 6;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
// Streaming rate limit
// Source rate limit
optional uint32 rate_limit = 9;
map<string, secret.SecretRef> secret_refs = 10;
}
Expand All @@ -231,7 +231,7 @@ message SourceBackfillNode {
catalog.StreamSourceInfo info = 4;
string source_name = 5;
map<string, string> with_properties = 6;
// Streaming rate limit
// Backfill rate limit
optional uint32 rate_limit = 7;
Comment on lines +234 to 235
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we directly change the field name for distinguish?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be breaking change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC sql meta backend stores with JSON encoding


// fields above are the same as StreamSource
Expand Down Expand Up @@ -609,7 +609,7 @@ message StreamScanNode {
// Used iff `ChainType::Backfill`.
plan_common.StorageTableDesc table_desc = 7;

// The rate limit for the stream scan node.
// The backfill rate limit for the stream scan node.
optional uint32 rate_limit = 8;

// Snapshot read every N barriers
Expand Down Expand Up @@ -646,7 +646,7 @@ message StreamCdcScanNode {
// The external table that will be backfilled for CDC.
plan_common.ExternalTableDesc cdc_table_desc = 5;

// The rate limit for the stream cdc scan node.
// The backfill rate limit for the stream cdc scan node.
optional uint32 rate_limit = 6;

// Whether skip the backfill and only consume from upstream.
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/encoder/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ fn datum_to_json_object(

let data_type = field.data_type();

tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);
tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref);

let value = match (data_type, scalar_ref) {
(DataType::Boolean, ScalarRefImpl::Bool(v)) => {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ impl StreamManagerService for StreamServiceImpl {
}
ThrottleTarget::Mv => {
self.metadata_manager
.update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::CdcTable => {
self.metadata_manager
.update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate)
.await?
}
ThrottleTarget::Unspecified => {
Expand Down
13 changes: 5 additions & 8 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,6 @@ impl CatalogController {
.map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
.collect_vec();

// TODO: limit source backfill?
fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
let mut found = false;
if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
Expand Down Expand Up @@ -1384,7 +1383,7 @@ impl CatalogController {

// edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
// return the actor_ids to be applied
pub async fn update_mv_rate_limit_by_job_id(
pub async fn update_backfill_rate_limit_by_job_id(
&self,
job_id: ObjectId,
rate_limit: Option<u32>,
Expand All @@ -1411,7 +1410,7 @@ impl CatalogController {
fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
let mut found = false;
if (*fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0)
|| (*fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous code looks wrong: Alter backfill rate limit will affect MV on non-shared source

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seem so.

|| (*fragment_type_mask & PbFragmentTypeFlag::SourceScan as i32 != 0)
{
visit_stream_node(stream_node, |node| match node {
PbNodeBody::StreamCdcScan(node) => {
Expand All @@ -1422,11 +1421,9 @@ impl CatalogController {
node.rate_limit = rate_limit;
found = true;
}
PbNodeBody::Source(node) => {
if let Some(inner) = node.source_inner.as_mut() {
inner.rate_limit = rate_limit;
found = true;
}
PbNodeBody::SourceBackfill(node) => {
node.rate_limit = rate_limit;
found = true;
}
_ => {}
});
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,14 +657,14 @@ impl MetadataManager {
.collect())
}

pub async fn update_mv_rate_limit_by_table_id(
pub async fn update_backfill_rate_limit_by_table_id(
&self,
table_id: TableId,
rate_limit: Option<u32>,
) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
let fragment_actors = self
.catalog_controller
.update_mv_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
.update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit)
.await?;
Ok(fragment_actors
.into_iter()
Expand Down
27 changes: 27 additions & 0 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,33 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
)
.await?;
}
Mutation::Throttle(actor_to_apply) => {
if let Some(new_rate_limit) =
actor_to_apply.get(&self.actor_ctx.id)
&& *new_rate_limit != self.rate_limit_rps
{
tracing::info!(
"updating rate limit from {:?} to {:?}",
self.rate_limit_rps,
*new_rate_limit
);
self.rate_limit_rps = *new_rate_limit;
// rebuild reader
let (reader, _backfill_info) = self
.build_stream_source_reader(
&source_desc,
backfill_stage
.get_latest_unfinished_splits()?,
)
.await?;

backfill_stream = select_with_strategy(
input.by_ref().map(Either::Left),
reader.map(Either::Right),
select_strategy,
);
}
}
_ => {}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ impl<S: StateStore> SourceExecutor<S> {
if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id)
&& *new_rate_limit != self.rate_limit_rps
{
tracing::debug!(
tracing::info!(
"updating rate limit from {:?} to {:?}",
self.rate_limit_rps,
*new_rate_limit
Expand Down
Loading