From a7a89ea128c28bcae295ddf304f7fcd19d8f4824 Mon Sep 17 00:00:00 2001 From: xxchan Date: Sat, 16 Nov 2024 14:59:56 +0800 Subject: [PATCH] feat: support backfill_rate_limit for source backfill --- .../alter/rate_limit_source_kafka_shared.slt | 134 ++++++++++++++++++ proto/stream_plan.proto | 10 +- src/connector/src/sink/encoder/json.rs | 2 +- src/meta/service/src/stream_service.rs | 4 +- src/meta/src/controller/streaming_job.rs | 13 +- src/meta/src/manager/metadata.rs | 4 +- .../source/source_backfill_executor.rs | 27 ++++ .../src/executor/source/source_executor.rs | 2 +- 8 files changed, 177 insertions(+), 19 deletions(-) create mode 100644 e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt diff --git a/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt new file mode 100644 index 0000000000000..29c0b83aa40d8 --- /dev/null +++ b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt @@ -0,0 +1,134 @@ +control substitution on + +############## Create kafka seed data + +statement ok +create table kafka_seed_data (v1 int); + +statement ok +insert into kafka_seed_data select * from generate_series(1, 1000); + +############## Sink into kafka + +statement ok +create sink kafka_sink +from + kafka_seed_data with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_rate_limit_shared', + type = 'append-only', + force_append_only='true' +); + +############## Source from kafka (rate_limit = 0) + +# Wait for the topic to create +skipif in-memory +sleep 5s + +statement ok +create source kafka_source (v1 int) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_rate_limit_shared', + source_rate_limit = 0, +) FORMAT PLAIN ENCODE JSON + +statement ok +flush; + +############## Check data + +skipif in-memory +sleep 3s + +############## Create MV on source + +statement ok +create materialized view rl_mv1 as select count(*) from kafka_source; + +############## Although source is rate limited, the MV's SourceBackfill is not. + +statement ok +flush; + +query I +select * from rl_mv1; +---- +1000 + +############## Insert more data. They will not go into the MV. + +statement ok +insert into kafka_seed_data select * from generate_series(1, 1000); + +sleep 3s + +query I +select * from rl_mv1; +---- +1000 + +statement ok +SET BACKGROUND_DDL=true; + +statement ok +SET BACKFILL_RATE_LIMIT=0; + +statement ok +create materialized view rl_mv2 as select count(*) from kafka_source; + +sleep 1s + +query T +SELECT progress from rw_ddl_progress; +---- +0 rows consumed + +############## Alter Source (rate_limit = 0 --> rate_limit = 1000) + +statement ok +alter source kafka_source set source_rate_limit to 1000; + +sleep 3s + +query I +select * from rl_mv1; +---- +2000 + +query T +SELECT progress from rw_ddl_progress; +---- +0 rows consumed + + + +statement error +alter materialized view rl_mv2 set source_rate_limit = 1000; +---- +db error: ERROR: Failed to run the query + +Caused by: + sql parser error: expected SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET, found: source_rate_limit +LINE 1: alter materialized view rl_mv2 set source_rate_limit = 1000; + ^ + + +statement ok +alter materialized view rl_mv2 set backfill_rate_limit = 2000; + +sleep 3s + +query ? +select * from rl_mv2; +---- +2000 + + +############## Cleanup + +statement ok +drop source kafka_source cascade; + +statement ok +drop table kafka_seed_data cascade; diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index ad0601189ec9f..70c0d229394bb 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -189,7 +189,7 @@ message StreamSource { map with_properties = 6; catalog.StreamSourceInfo info = 7; string source_name = 8; - // Streaming rate limit + // Source rate limit optional uint32 rate_limit = 9; map secret_refs = 10; } @@ -205,7 +205,7 @@ message StreamFsFetch { map with_properties = 6; catalog.StreamSourceInfo info = 7; string source_name = 8; - // Streaming rate limit + // Source rate limit optional uint32 rate_limit = 9; map secret_refs = 10; } @@ -231,7 +231,7 @@ message SourceBackfillNode { catalog.StreamSourceInfo info = 4; string source_name = 5; map with_properties = 6; - // Streaming rate limit + // Backfill rate limit optional uint32 rate_limit = 7; // fields above are the same as StreamSource @@ -609,7 +609,7 @@ message StreamScanNode { // Used iff `ChainType::Backfill`. plan_common.StorageTableDesc table_desc = 7; - // The rate limit for the stream scan node. + // The backfill rate limit for the stream scan node. optional uint32 rate_limit = 8; // Snapshot read every N barriers @@ -646,7 +646,7 @@ message StreamCdcScanNode { // The external table that will be backfilled for CDC. plan_common.ExternalTableDesc cdc_table_desc = 5; - // The rate limit for the stream cdc scan node. + // The backfill rate limit for the stream cdc scan node. optional uint32 rate_limit = 6; // Whether skip the backfill and only consume from upstream. diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 7691b3de5f447..b3c0580a5a780 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -204,7 +204,7 @@ fn datum_to_json_object( let data_type = field.data_type(); - tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref); + tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref); let value = match (data_type, scalar_ref) { (DataType::Boolean, ScalarRefImpl::Bool(v)) => { diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 91f73a2920252..dfd8ec21187fd 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -114,12 +114,12 @@ impl StreamManagerService for StreamServiceImpl { } ThrottleTarget::Mv => { self.metadata_manager - .update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate) + .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate) .await? } ThrottleTarget::CdcTable => { self.metadata_manager - .update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate) + .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate) .await? } ThrottleTarget::Unspecified => { diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index d3e170aff75ae..5139b5069d9d1 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -1317,7 +1317,6 @@ impl CatalogController { .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf())) .collect_vec(); - // TODO: limit source backfill? fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { let mut found = false; if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 { @@ -1384,7 +1383,7 @@ impl CatalogController { // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments // return the actor_ids to be applied - pub async fn update_mv_rate_limit_by_job_id( + pub async fn update_backfill_rate_limit_by_job_id( &self, job_id: ObjectId, rate_limit: Option, @@ -1411,7 +1410,7 @@ impl CatalogController { fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { let mut found = false; if (*fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0) - || (*fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0) + || (*fragment_type_mask & PbFragmentTypeFlag::SourceScan as i32 != 0) { visit_stream_node(stream_node, |node| match node { PbNodeBody::StreamCdcScan(node) => { @@ -1422,11 +1421,9 @@ impl CatalogController { node.rate_limit = rate_limit; found = true; } - PbNodeBody::Source(node) => { - if let Some(inner) = node.source_inner.as_mut() { - inner.rate_limit = rate_limit; - found = true; - } + PbNodeBody::SourceBackfill(node) => { + node.rate_limit = rate_limit; + found = true; } _ => {} }); diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 40d3c025c0c8b..db53f5fb8b6b3 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -657,14 +657,14 @@ impl MetadataManager { .collect()) } - pub async fn update_mv_rate_limit_by_table_id( + pub async fn update_backfill_rate_limit_by_table_id( &self, table_id: TableId, rate_limit: Option, ) -> MetaResult>> { let fragment_actors = self .catalog_controller - .update_mv_rate_limit_by_job_id(table_id.table_id as _, rate_limit) + .update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit) .await?; Ok(fragment_actors .into_iter() diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 6ada0f2b62ebb..bbf71b281d3e3 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -570,6 +570,33 @@ impl SourceBackfillExecutorInner { ) .await?; } + Mutation::Throttle(actor_to_apply) => { + if let Some(new_rate_limit) = + actor_to_apply.get(&self.actor_ctx.id) + && *new_rate_limit != self.rate_limit_rps + { + tracing::info!( + "updating rate limit from {:?} to {:?}", + self.rate_limit_rps, + *new_rate_limit + ); + self.rate_limit_rps = *new_rate_limit; + // rebuild reader + let (reader, _backfill_info) = self + .build_stream_source_reader( + &source_desc, + backfill_stage + .get_latest_unfinished_splits()?, + ) + .await?; + + backfill_stream = select_with_strategy( + input.by_ref().map(Either::Left), + reader.map(Either::Right), + select_strategy, + ); + } + } _ => {} } } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 118b33c08ae5f..80b252014d284 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -608,7 +608,7 @@ impl SourceExecutor { if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id) && *new_rate_limit != self.rate_limit_rps { - tracing::debug!( + tracing::info!( "updating rate limit from {:?} to {:?}", self.rate_limit_rps, *new_rate_limit