diff --git a/e2e_test/ddl/drop/drop_creating_mv.slt b/e2e_test/ddl/drop/drop_creating_mv.slt index e9d8423f3cdec..678d880df90fd 100644 --- a/e2e_test/ddl/drop/drop_creating_mv.slt +++ b/e2e_test/ddl/drop/drop_creating_mv.slt @@ -18,11 +18,18 @@ risedev psql -c 'create materialized view m1 as select * from t;' & onlyif can-use-recover sleep 5s +onlyif can-use-recover +query I +select background_ddl from rw_catalog.rw_materialized_views where name='m1'; +---- +f + onlyif can-use-recover statement ok drop materialized view m1; ############## Test drop background mv BEFORE recovery + statement ok set background_ddl=true; @@ -33,6 +40,12 @@ create materialized view m1 as select * from t; onlyif can-use-recover sleep 5s +onlyif can-use-recover +query I +select background_ddl from rw_catalog.rw_materialized_views where name='m1'; +---- +t + onlyif can-use-recover statement ok drop materialized view m1; @@ -48,12 +61,24 @@ create materialized view m1 as select * from t; onlyif can-use-recover sleep 5s +onlyif can-use-recover +query I +select background_ddl from rw_catalog.rw_materialized_views where name='m1'; +---- +t + onlyif can-use-recover statement ok recover; onlyif can-use-recover -sleep 10s +sleep 5s + +onlyif can-use-recover +query I +select background_ddl from rw_catalog.rw_materialized_views where name='m1'; +---- +t onlyif can-use-recover statement ok @@ -69,6 +94,11 @@ set background_ddl=false; statement ok create materialized view m1 as select * from t; +query I +select background_ddl from rw_catalog.rw_materialized_views where name='m1'; +---- +f + statement ok drop materialized view m1; diff --git a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt.serial b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt.serial index 10854d97b6440..dd58a8dbf28b3 100644 --- a/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt.serial +++ b/e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt.serial @@ -50,12 +50,6 @@ create source s with ( sleep 2s -# At the beginning, the source is paused. It will resume after a downstream is created. -system ok -internal_table.mjs --name s --type '' --count ----- -count: 0 - statement ok create table tt1_shared (v1 int, diff --git a/e2e_test/source_inline/fs/posix_fs.slt b/e2e_test/source_inline/fs/posix_fs.slt index da56502e417e8..5408daf28321a 100644 --- a/e2e_test/source_inline/fs/posix_fs.slt +++ b/e2e_test/source_inline/fs/posix_fs.slt @@ -33,21 +33,36 @@ create materialized view diamonds_mv as select * from diamonds_source; sleep 1s # no output due to rate limit -query TTTT rowsort +statement count 0 select * from diamonds; ----- -query TTTT rowsort + +statement count 0 select * from diamonds_mv; + + +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name, node_name; ---- +diamonds FS_FETCH {FS_FETCH} 0 +diamonds SOURCE {SOURCE} 0 +diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0 +diamonds_mv SOURCE {SOURCE} 0 statement ok ALTER TABLE diamonds SET source_rate_limit TO DEFAULT; -statement ok -ALTER source diamonds_source SET source_rate_limit TO DEFAULT; -sleep 10s +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name, node_name; +---- +diamonds_mv FS_FETCH {MVIEW,FS_FETCH} 0 +diamonds_mv SOURCE {SOURCE} 0 + + +sleep 3s query TTTT rowsort select * from diamonds; @@ -63,6 +78,23 @@ select * from diamonds; 1.28 Good J 63.1 1.3 Fair E 64.7 + +statement count 0 +select * from diamonds_mv; + + + +statement ok +ALTER SOURCE diamonds_source SET source_rate_limit TO DEFAULT; + +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name, node_name; +---- + + +sleep 3s + query TTTT rowsort select * from diamonds_mv; ---- diff --git a/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt.serial similarity index 80% rename from e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt rename to e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt.serial index 96fd016c5812d..8353166b5a874 100644 --- a/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt +++ b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka.slt.serial @@ -80,16 +80,38 @@ select * from rl_mv3; ---- 0 +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +rl_mv1 SOURCE {SOURCE} 0 +rl_mv2 SOURCE {SOURCE} 0 +rl_mv3 SOURCE {SOURCE} 0 + ############## Alter Source (rate_limit = 0 --> rate_limit = 1000) skipif in-memory -query I +statement count 0 alter source kafka_source set source_rate_limit to 1000; +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +rl_mv1 SOURCE {SOURCE} 1000 +rl_mv2 SOURCE {SOURCE} 1000 +rl_mv3 SOURCE {SOURCE} 1000 + skipif in-memory -query I +statement count 0 alter source kafka_source set source_rate_limit to default; +# rate limit becomes None +query T +select count(*) from rw_rate_limit; +---- +0 + skipif in-memory sleep 3s diff --git a/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt.serial b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt.serial new file mode 100644 index 0000000000000..a9a730930b1b2 --- /dev/null +++ b/e2e_test/source_inline/kafka/alter/rate_limit_source_kafka_shared.slt.serial @@ -0,0 +1,166 @@ +control substitution on + +############## Create kafka seed data + +statement ok +create table kafka_seed_data (v1 int); + +statement ok +insert into kafka_seed_data select * from generate_series(1, 1000); + +############## Sink into kafka + +statement ok +create sink kafka_sink +from + kafka_seed_data with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_rate_limit_shared', + type = 'append-only', + force_append_only='true' +); + +############## Source from kafka (rate_limit = 0) + +# Wait for the topic to create +skipif in-memory +sleep 5s + +statement ok +create source kafka_source (v1 int) with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_rate_limit_shared', + source_rate_limit = 0, +) FORMAT PLAIN ENCODE JSON + +statement ok +flush; + +############## Check data + +skipif in-memory +sleep 3s + +############## Create MV on source + +statement ok +create materialized view rl_mv1 as select count(*) from kafka_source; + +############## Although source is rate limited, the MV's SourceBackfill is not. + +statement ok +flush; + +query I +select * from rl_mv1; +---- +1000 + +############## Insert more data. They will not go into the MV. + +statement ok +insert into kafka_seed_data select * from generate_series(1, 1000); + +sleep 3s + +query I +select * from rl_mv1; +---- +1000 + +statement ok +SET BACKGROUND_DDL=true; + +statement ok +SET BACKFILL_RATE_LIMIT=0; + +statement ok +create materialized view rl_mv2 as select count(*) from kafka_source; + +sleep 1s + +query T +SELECT progress from rw_ddl_progress; +---- +0 rows consumed + +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +kafka_source SOURCE {SOURCE} 0 +rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0 + + +############## Alter Source (rate_limit = 0 --> rate_limit = 1000) + +statement ok +alter source kafka_source set source_rate_limit to 1000; + +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +kafka_source SOURCE {SOURCE} 1000 +rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0 + +sleep 3s + +query I +select * from rl_mv1; +---- +2000 + +query T +SELECT progress from rw_ddl_progress; +---- +0 rows consumed + + + +statement error +alter materialized view rl_mv2 set source_rate_limit = 1000; +---- +db error: ERROR: Failed to run the query + +Caused by: + sql parser error: expected SCHEMA/PARALLELISM/BACKFILL_RATE_LIMIT after SET, found: source_rate_limit +LINE 1: alter materialized view rl_mv2 set source_rate_limit = 1000; + ^ + + +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +kafka_source SOURCE {SOURCE} 1000 +rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 0 + + +statement ok +alter materialized view rl_mv2 set backfill_rate_limit = 2000; + + +query T +select name, node_name, fragment_type, rate_limit from rw_rate_limit join rw_relations on table_id=id +order by name; +---- +kafka_source SOURCE {SOURCE} 1000 +rl_mv2 SOURCE_BACKFILL {SOURCE_SCAN} 2000 + +sleep 3s + +query T +select * from rl_mv2; +---- +2000 + + + +############## Cleanup + +statement ok +drop source kafka_source cascade; + +statement ok +drop table kafka_seed_data cascade; diff --git a/e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt b/e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt.serial similarity index 99% rename from e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt rename to e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt.serial index ac2a665fd10c0..5d22fc85dea4f 100644 --- a/e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt +++ b/e2e_test/source_inline/kafka/alter/rate_limit_table_kafka.slt.serial @@ -63,7 +63,7 @@ select count(*) from kafka_source; ############## Alter source (rate_limit = 0 --> rate_limit = 1000) skipif in-memory -query I +statement ok alter table kafka_source set source_rate_limit to 1000; skipif in-memory diff --git a/e2e_test/source_inline/kafka/shared_source.slt.serial b/e2e_test/source_inline/kafka/shared_source.slt.serial index 3397f90f081da..af6b371d21c49 100644 --- a/e2e_test/source_inline/kafka/shared_source.slt.serial +++ b/e2e_test/source_inline/kafka/shared_source.slt.serial @@ -59,11 +59,17 @@ select count(*) from rw_internal_tables where name like '%s0%'; sleep 1s -# SourceExecutor's ingestion does not start (state table is empty), even after sleep +statement ok +flush; + +# SourceExecutor's starts from latest. system ok internal_table.mjs --name s0 --type source ---- -(empty) +0,"{""split_info"": {""partition"": 0, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +1,"{""split_info"": {""partition"": 1, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +3,"{""split_info"": {""partition"": 3, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" statement ok @@ -72,12 +78,6 @@ create materialized view mv_1 as select * from s0; # Wait enough time to ensure SourceExecutor consumes all Kafka data. sleep 2s -# SourceExecutor's ingestion started, but it only starts from latest (offset 1). -system ok -internal_table.mjs --name s0 --type source ----- -(empty) - # SourceBackfill starts from offset 0, with backfill_info: HasDataToBackfill { latest_offset: "0" } (decided by kafka high watermark). # (meaning upstream already consumed offset 0, so we only need to backfill to offset 0) @@ -144,7 +144,7 @@ EOF sleep 2s -# SourceExecutor's finally got new data now. +# SourceExecutor's got new data. system ok internal_table.mjs --name s0 --type source ---- @@ -185,16 +185,6 @@ select v1, v2 from mv_1; 4 dd -# start_offset changed to 1 -system ok -internal_table.mjs --name s0 --type source ----- -0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -2,"{""split_info"": {""partition"": 2, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -3,"{""split_info"": {""partition"": 3, ""start_offset"": 3, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" - - # Transition from SourceCachingUp to Finished after consuming one upstream message. system ok internal_table.mjs --name mv_1 --type sourcebackfill @@ -334,6 +324,47 @@ internal_table.mjs --name s0 --type source # # risedev psql -c "select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;" +# Test: rate limit and resume won't lose data + +statement ok +alter source s0 set source_rate_limit to 0; + + +system ok +cat < with_properties = 6; catalog.StreamSourceInfo info = 7; string source_name = 8; - // Streaming rate limit + // Source rate limit optional uint32 rate_limit = 9; map secret_refs = 10; } @@ -205,7 +205,7 @@ message StreamFsFetch { map with_properties = 6; catalog.StreamSourceInfo info = 7; string source_name = 8; - // Streaming rate limit + // Source rate limit optional uint32 rate_limit = 9; map secret_refs = 10; } @@ -231,7 +231,7 @@ message SourceBackfillNode { catalog.StreamSourceInfo info = 4; string source_name = 5; map with_properties = 6; - // Streaming rate limit + // Backfill rate limit optional uint32 rate_limit = 7; // fields above are the same as StreamSource @@ -607,7 +607,7 @@ message StreamScanNode { // Used iff `ChainType::Backfill`. plan_common.StorageTableDesc table_desc = 7; - // The rate limit for the stream scan node. + // The backfill rate limit for the stream scan node. optional uint32 rate_limit = 8; // Snapshot read every N barriers @@ -644,7 +644,7 @@ message StreamCdcScanNode { // The external table that will be backfilled for CDC. plan_common.ExternalTableDesc cdc_table_desc = 5; - // The rate limit for the stream cdc scan node. + // The backfill rate limit for the stream cdc scan node. optional uint32 rate_limit = 6; // Whether skip the backfill and only consume from upstream. @@ -983,6 +983,8 @@ enum FragmentTypeFlag { FRAGMENT_TYPE_FLAG_CDC_FILTER = 256; FRAGMENT_TYPE_FLAG_SOURCE_SCAN = 1024; FRAGMENT_TYPE_FLAG_SNAPSHOT_BACKFILL_STREAM_SCAN = 2048; + // Note: this flag is not available in old fragments, so only suitable for debugging purpose. + FRAGMENT_TYPE_FLAG_FS_FETCH = 4096; } // The streaming context associated with a stream plan diff --git a/src/batch/src/executor/source.rs b/src/batch/src/executor/source.rs index 7a37be9183898..1ba28404a4d8b 100644 --- a/src/batch/src/executor/source.rs +++ b/src/batch/src/executor/source.rs @@ -157,9 +157,9 @@ impl SourceExecutor { ConnectorProperties::default(), None, )); - let stream = self + let (stream, _) = self .source - .build_stream(Some(self.split_list), self.column_ids, source_ctx) + .build_stream(Some(self.split_list), self.column_ids, source_ctx, false) .await?; #[for_await] diff --git a/src/connector/src/sink/encoder/json.rs b/src/connector/src/sink/encoder/json.rs index 7691b3de5f447..b3c0580a5a780 100644 --- a/src/connector/src/sink/encoder/json.rs +++ b/src/connector/src/sink/encoder/json.rs @@ -204,7 +204,7 @@ fn datum_to_json_object( let data_type = field.data_type(); - tracing::debug!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref); + tracing::trace!("datum_to_json_object: {:?}, {:?}", data_type, scalar_ref); let value = match (data_type, scalar_ref) { (DataType::Boolean, ScalarRefImpl::Bool(v)) => { diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 59e3585431a60..e031a85a34d62 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -69,7 +69,7 @@ pub trait TryFromBTreeMap: Sized + UnknownFields { /// Represents `WITH` options for sources. /// /// Each instance should add a `#[derive(with_options::WithOptions)]` marker. -pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions { +pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug { const SOURCE_NAME: &'static str; type Split: SplitMetaData + TryFrom @@ -108,7 +108,7 @@ impl TryFromBTreeMap for P { } } -pub async fn create_split_reader( +pub async fn create_split_reader( prop: P, splits: Vec, parser_config: ParserConfig, @@ -375,6 +375,10 @@ pub trait SplitReader: Sized + Send { fn backfill_info(&self) -> HashMap { HashMap::new() } + + async fn seek_to_latest(&mut self) -> Result> { + Err(anyhow!("seek_to_latest is not supported for this connector").into()) + } } /// Information used to determine whether we should start and finish source backfill. diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 3f3626449153a..9e99c7db9e5e8 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -58,7 +58,7 @@ pub fn build_cdc_table_id(source_id: u32, external_table_name: &str) -> String { format!("{}.{}", source_id, external_table_name) } -pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static { +pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static { const CDC_CONNECTOR_NAME: &'static str; fn source_type() -> CdcSourceType; } diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index a425de418ef4a..56b7b23bebac4 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -155,7 +155,6 @@ impl SplitEnumerator for KafkaSplitEnumerator { partition, start_offset: start_offsets.remove(&partition).unwrap(), stop_offset: stop_offsets.remove(&partition).unwrap(), - hack_seek_to_latest: false, }) .collect(); @@ -263,7 +262,6 @@ impl KafkaSplitEnumerator { partition: *partition, start_offset: Some(start_offset), stop_offset: Some(stop_offset), - hack_seek_to_latest:false } }) .collect::>()) diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index d58f1b70dd9fc..3847970ee335b 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -37,13 +37,15 @@ use crate::source::kafka::{ }; use crate::source::{ into_chunk_stream, BackfillInfo, BoxChunkSourceStream, Column, SourceContextRef, SplitId, - SplitMetaData, SplitReader, + SplitImpl, SplitMetaData, SplitReader, }; pub struct KafkaSplitReader { consumer: StreamConsumer, offsets: HashMap, Option)>, backfill_info: HashMap, + splits: Vec, + sync_call_timeout: Duration, bytes_per_second: usize, max_num_messages: usize, parser_config: ParserConfig, @@ -110,12 +112,10 @@ impl SplitReader for KafkaSplitReader { let mut offsets = HashMap::new(); let mut backfill_info = HashMap::new(); - for split in splits { + for split in splits.clone() { offsets.insert(split.id(), (split.start_offset, split.stop_offset)); - if split.hack_seek_to_latest { - tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::End)?; - } else if let Some(offset) = split.start_offset { + if let Some(offset) = split.start_offset { tpl.add_partition_offset( split.topic.as_str(), split.partition, @@ -168,8 +168,10 @@ impl SplitReader for KafkaSplitReader { Ok(Self { consumer, offsets, + splits, backfill_info, bytes_per_second, + sync_call_timeout: properties.common.sync_call_timeout, max_num_messages, parser_config, source_ctx, @@ -185,6 +187,28 @@ impl SplitReader for KafkaSplitReader { fn backfill_info(&self) -> HashMap { self.backfill_info.clone() } + + async fn seek_to_latest(&mut self) -> Result> { + let mut latest_splits: Vec = Vec::new(); + let mut tpl = TopicPartitionList::with_capacity(self.splits.len()); + for mut split in self.splits.clone() { + // we can't get latest offset if we use Offset::End, so we just fetch watermark here. + let (_low, high) = self + .consumer + .fetch_watermarks( + split.topic.as_str(), + split.partition, + self.sync_call_timeout, + ) + .await?; + tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::Offset(high))?; + split.start_offset = Some(high - 1); + latest_splits.push(split.into()); + } + // replace the previous assignment + self.consumer.assign(&tpl)?; + Ok(latest_splits) + } } impl KafkaSplitReader { diff --git a/src/connector/src/source/kafka/split.rs b/src/connector/src/source/kafka/split.rs index 791836ac2c858..fa969bb37111f 100644 --- a/src/connector/src/source/kafka/split.rs +++ b/src/connector/src/source/kafka/split.rs @@ -32,12 +32,6 @@ pub struct KafkaSplit { /// A better approach would be to make it **inclusive**. pub(crate) start_offset: Option, pub(crate) stop_offset: Option, - #[serde(skip)] - /// Used by shared source to hackily seek to the latest offset without fetching start offset first. - /// XXX: But why do we fetch low watermark for latest start offset..? - /// - /// When this is `true`, `start_offset` will be ignored. - pub(crate) hack_seek_to_latest: bool, } impl SplitMetaData for KafkaSplit { @@ -72,16 +66,10 @@ impl KafkaSplit { partition, start_offset, stop_offset, - hack_seek_to_latest: false, } } pub fn get_topic_and_partition(&self) -> (String, i32) { (self.topic.clone(), self.partition) } - - /// This should only be used for a fresh split, not persisted in state table yet. - pub fn seek_to_latest_offset(&mut self) { - self.hack_seek_to_latest = true; - } } diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 89335f8f0d80e..f849e7ba21aa3 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -37,8 +37,8 @@ use crate::source::filesystem::opendal_source::{ use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{ create_split_reader, BackfillInfo, BoxChunkSourceStream, BoxTryStream, Column, - ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitReader, - WaitCheckpointTask, + ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitImpl, + SplitReader, WaitCheckpointTask, }; use crate::{dispatch_source_prop, WithOptionsSecResolved}; @@ -211,14 +211,17 @@ impl SourceReader { } /// Build `SplitReader`s and then `BoxChunkSourceStream` from the given `ConnectorState` (`SplitImpl`s). + /// + /// If `seek_to_latest` is true, will also return the latest splits after seek. pub async fn build_stream( &self, state: ConnectorState, column_ids: Vec, source_ctx: Arc, - ) -> ConnectorResult { + seek_to_latest: bool, + ) -> ConnectorResult<(BoxChunkSourceStream, Option>)> { let Some(splits) = state else { - return Ok(pending().boxed()); + return Ok((pending().boxed(), None)); }; let config = self.config.clone(); let columns = self.get_target_columns(column_ids)?; @@ -243,7 +246,7 @@ impl SourceReader { let support_multiple_splits = config.support_multiple_splits(); dispatch_source_prop!(config, prop, { - let readers = if support_multiple_splits { + let mut readers = if support_multiple_splits { tracing::debug!( "spawning connector split reader for multiple splits {:?}", splits @@ -268,7 +271,20 @@ impl SourceReader { .await? }; - Ok(select_all(readers.into_iter().map(|r| r.into_stream())).boxed()) + let latest_splits = if seek_to_latest { + let mut latest_splits = Vec::new(); + for reader in &mut readers { + latest_splits.extend(reader.seek_to_latest().await?); + } + Some(latest_splits) + } else { + None + }; + + Ok(( + select_all(readers.into_iter().map(|r| r.into_stream())).boxed(), + latest_splits, + )) }) } } diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml index be23e2a99f11b..8b9616edeae9f 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml @@ -249,7 +249,7 @@ │ │ │ └─LogicalProject { exprs: [rw_subscriptions.id, rw_subscriptions.name, 'subscription':Varchar, rw_subscriptions.schema_id, rw_subscriptions.owner, rw_subscriptions.definition, rw_subscriptions.acl] } │ │ │ └─LogicalSysScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name, rw_subscriptions.schema_id, rw_subscriptions.owner, rw_subscriptions.definition, rw_subscriptions.acl, rw_subscriptions.initialized_at, rw_subscriptions.created_at, rw_subscriptions.initialized_at_cluster_version, rw_subscriptions.created_at_cluster_version] } │ │ └─LogicalProject { exprs: [rw_materialized_views.id, rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl] } - │ │ └─LogicalSysScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.append_only, rw_materialized_views.acl, rw_materialized_views.initialized_at, rw_materialized_views.created_at, rw_materialized_views.initialized_at_cluster_version, rw_materialized_views.created_at_cluster_version] } + │ │ └─LogicalSysScan { table: rw_materialized_views, columns: [rw_materialized_views.id, rw_materialized_views.name, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.append_only, rw_materialized_views.acl, rw_materialized_views.initialized_at, rw_materialized_views.created_at, rw_materialized_views.initialized_at_cluster_version, rw_materialized_views.created_at_cluster_version, rw_materialized_views.background_ddl] } │ └─LogicalProject { exprs: [rw_views.id, rw_views.name, 'view':Varchar, rw_views.schema_id, rw_views.owner, rw_views.definition, rw_views.acl] } │ └─LogicalSysScan { table: rw_views, columns: [rw_views.id, rw_views.name, rw_views.schema_id, rw_views.owner, rw_views.definition, rw_views.acl] } └─LogicalShare { id: 20 } diff --git a/src/frontend/src/catalog/schema_catalog.rs b/src/frontend/src/catalog/schema_catalog.rs index 5f4d884bf53c9..86d0353f29073 100644 --- a/src/frontend/src/catalog/schema_catalog.rs +++ b/src/frontend/src/catalog/schema_catalog.rs @@ -585,6 +585,14 @@ impl SchemaCatalog { } /// Iterate all materialized views, excluding the indices. + pub fn iter_all_mvs(&self) -> impl Iterator> { + self.table_by_name + .iter() + .filter(|(_, v)| v.is_mview() && valid_table_name(&v.name)) + .map(|(_, v)| v) + } + + /// Iterate created materialized views, excluding the indices. pub fn iter_created_mvs(&self) -> impl Iterator> { self.table_by_name .iter() diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs index 9c546f1ec7294..947560e44e62e 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/mod.rs @@ -39,6 +39,7 @@ mod rw_indexes; mod rw_internal_tables; mod rw_materialized_views; mod rw_meta_snapshot; +mod rw_rate_limit; mod rw_relation_info; mod rw_relations; mod rw_schemas; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs index ebe7369e46485..55294d6e6a494 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_fragments.rs @@ -33,7 +33,7 @@ struct RwFragment { max_parallelism: i32, } -fn extract_fragment_type_flag(mask: u32) -> Vec { +pub(super) fn extract_fragment_type_flag(mask: u32) -> Vec { let mut result = vec![]; for i in 0..32 { let bit = 1 << i; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs index c593b35b18f87..a0e8d98b24b69 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_materialized_views.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::catalog::CreateType; use risingwave_common::types::{Fields, Timestamptz}; use risingwave_frontend_macro::system_catalog; use risingwave_pb::user::grant_privilege::Object; @@ -33,6 +34,7 @@ struct RwMaterializedView { created_at: Option, initialized_at_cluster_version: Option, created_at_cluster_version: Option, + background_ddl: bool, } #[system_catalog(table, "rw_catalog.rw_materialized_views")] @@ -45,7 +47,7 @@ fn read_rw_materialized_views(reader: &SysCatalogReaderImpl) -> Result Result, + node_name: String, + table_id: i32, + rate_limit: i32, +} + +#[system_catalog(table, "rw_catalog.rw_rate_limit")] +async fn read_rw_rate_limit(reader: &SysCatalogReaderImpl) -> Result> { + let rate_limits = reader.meta_client.list_rate_limits().await?; + + Ok(rate_limits + .into_iter() + .map(|info| RwRateLimit { + fragment_id: info.fragment_id as i32, + fragment_type: extract_fragment_type_flag(info.fragment_type_mask) + .into_iter() + .flat_map(|t| t.as_str_name().strip_prefix("FRAGMENT_TYPE_FLAG_")) + .map(|s| s.into()) + .collect(), + table_id: info.job_id as i32, + rate_limit: info.rate_limit as i32, + node_name: info.node_name, + }) + .collect()) +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index bdbc67a4b9720..a1822e3c8e7f1 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -33,6 +33,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; +use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus}; @@ -123,6 +124,8 @@ pub trait FrontendMetaClient: Send + Sync { async fn get_cluster_recovery_status(&self) -> Result; async fn get_cluster_limits(&self) -> Result>; + + async fn list_rate_limits(&self) -> Result>; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -298,4 +301,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl { async fn get_cluster_limits(&self) -> Result> { self.0.get_cluster_limits().await } + + async fn list_rate_limits(&self) -> Result> { + self.0.list_rate_limits().await + } } diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index daa48d99969ca..f30b0abf5b4c4 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -361,6 +361,10 @@ fn build_fragment( current_fragment.requires_singleton = true; } + NodeBody::StreamFsFetch(_) => { + current_fragment.fragment_type_mask |= FragmentTypeFlag::FsFetch as u32; + } + _ => {} }; diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 288cfaa377e20..66feee855e219 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -56,6 +56,7 @@ use risingwave_pb::meta::list_actor_splits_response::ActorSplit; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_object_dependencies_response::PbObjectDependencies; +use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_pb::meta::{ @@ -1065,6 +1066,10 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn get_cluster_limits(&self) -> RpcResult> { Ok(vec![]) } + + async fn list_rate_limits(&self) -> RpcResult> { + Ok(vec![]) + } } #[cfg(test)] diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 4adde9de62a26..168b54a648da3 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -109,12 +109,12 @@ impl StreamManagerService for StreamServiceImpl { } ThrottleTarget::Mv => { self.metadata_manager - .update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate) + .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate) .await? } ThrottleTarget::CdcTable => { self.metadata_manager - .update_mv_rate_limit_by_table_id(TableId::from(request.id), request.rate) + .update_backfill_rate_limit_by_table_id(TableId::from(request.id), request.rate) .await? } ThrottleTarget::Unspecified => { @@ -414,4 +414,16 @@ impl StreamManagerService for StreamServiceImpl { Ok(Response::new(ListActorSplitsResponse { actor_splits })) } + + async fn list_rate_limits( + &self, + _request: Request, + ) -> Result, Status> { + let rate_limits = self + .metadata_manager + .catalog_controller + .list_rate_limits() + .await?; + Ok(Response::new(ListRateLimitsResponse { rate_limits })) + } } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 7651f42295339..595b03bb3bba1 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -37,6 +37,7 @@ use risingwave_meta_model::{ use risingwave_pb::catalog::source::PbOptionalAssociatedTableId; use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion}; use risingwave_pb::catalog::{PbCreateType, PbTable}; +use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::relation::{PbRelationInfo, RelationInfo}; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, @@ -52,12 +53,12 @@ use risingwave_pb::stream_plan::update_mutation::PbMergeUpdate; use risingwave_pb::stream_plan::{ PbDispatcher, PbDispatcherType, PbFragmentTypeFlag, PbStreamActor, }; -use sea_orm::sea_query::{Expr, Query, SimpleExpr}; +use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr}; use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, - JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, - TransactionTrait, + IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, + RelationTrait, TransactionTrait, }; use crate::barrier::{ReplaceTablePlan, Reschedule}; @@ -1269,7 +1270,6 @@ impl CatalogController { .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf())) .collect_vec(); - // TODO: limit source backfill? fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { let mut found = false; if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 { @@ -1285,9 +1285,11 @@ impl CatalogController { }); } if is_fs_source { - // scan all fragments for StreamFsFetch node if using fs connector + // in older versions, there's no fragment type flag for `FsFetch` node, + // so we just scan all fragments for StreamFsFetch node if using fs connector visit_stream_node(stream_node, |node| { if let PbNodeBody::StreamFsFetch(node) = node { + *fragment_type_mask |= PbFragmentTypeFlag::FsFetch as i32; if let Some(node_inner) = &mut node.node_inner && node_inner.source_id == source_id as u32 { @@ -1305,9 +1307,10 @@ impl CatalogController { "source id should be used by at least one fragment" ); let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec(); - for (id, _, stream_node) in fragments { + for (id, fragment_type_mask, stream_node) in fragments { fragment::ActiveModel { fragment_id: Set(id), + fragment_type_mask: Set(fragment_type_mask), stream_node: Set(StreamNode::from(&stream_node)), ..Default::default() } @@ -1336,7 +1339,7 @@ impl CatalogController { // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments // return the actor_ids to be applied - pub async fn update_mv_rate_limit_by_job_id( + pub async fn update_backfill_rate_limit_by_job_id( &self, job_id: ObjectId, rate_limit: Option, @@ -1362,9 +1365,7 @@ impl CatalogController { fragments.retain_mut(|(_, fragment_type_mask, stream_node)| { let mut found = false; - if (*fragment_type_mask & PbFragmentTypeFlag::StreamScan as i32 != 0) - || (*fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0) - { + if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0 { visit_stream_node(stream_node, |node| match node { PbNodeBody::StreamCdcScan(node) => { node.rate_limit = rate_limit; @@ -1374,11 +1375,9 @@ impl CatalogController { node.rate_limit = rate_limit; found = true; } - PbNodeBody::Source(node) => { - if let Some(inner) = node.source_inner.as_mut() { - inner.rate_limit = rate_limit; - found = true; - } + PbNodeBody::SourceBackfill(node) => { + node.rate_limit = rate_limit; + found = true; } _ => {} }); @@ -1734,4 +1733,107 @@ impl CatalogController { Ok(()) } + + /// Note: `FsFetch` created in old versions are not included. + /// Since this is only used for debugging, it should be fine. + pub async fn list_rate_limits(&self) -> MetaResult> { + let inner = self.inner.read().await; + let txn = inner.db.begin().await?; + + let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find() + .select_only() + .columns([ + fragment::Column::FragmentId, + fragment::Column::JobId, + fragment::Column::FragmentTypeMask, + fragment::Column::StreamNode, + ]) + .filter(fragment_type_mask_intersects( + PbFragmentTypeFlag::rate_limit_fragments(), + )) + .into_tuple() + .all(&txn) + .await?; + + let mut rate_limits = Vec::new(); + for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments { + let mut stream_node = stream_node.to_protobuf(); + let mut rate_limit = None; + let mut node_name = None; + + visit_stream_node(&mut stream_node, |node| { + match node { + // source rate limit + PbNodeBody::Source(node) => { + if let Some(node_inner) = &mut node.source_inner { + debug_assert!( + rate_limit.is_none(), + "one fragment should only have 1 rate limit node" + ); + rate_limit = node_inner.rate_limit; + node_name = Some("SOURCE"); + } + } + PbNodeBody::StreamFsFetch(node) => { + if let Some(node_inner) = &mut node.node_inner { + debug_assert!( + rate_limit.is_none(), + "one fragment should only have 1 rate limit node" + ); + rate_limit = node_inner.rate_limit; + node_name = Some("FS_FETCH"); + } + } + // backfill rate limit + PbNodeBody::SourceBackfill(node) => { + debug_assert!( + rate_limit.is_none(), + "one fragment should only have 1 rate limit node" + ); + rate_limit = node.rate_limit; + node_name = Some("SOURCE_BACKFILL"); + } + PbNodeBody::StreamScan(node) => { + debug_assert!( + rate_limit.is_none(), + "one fragment should only have 1 rate limit node" + ); + rate_limit = node.rate_limit; + node_name = Some("STREAM_SCAN"); + } + PbNodeBody::StreamCdcScan(node) => { + debug_assert!( + rate_limit.is_none(), + "one fragment should only have 1 rate limit node" + ); + rate_limit = node.rate_limit; + node_name = Some("STREAM_CDC_SCAN"); + } + _ => {} + } + }); + + if let Some(rate_limit) = rate_limit { + rate_limits.push(RateLimitInfo { + fragment_id: fragment_id as u32, + job_id: job_id as u32, + fragment_type_mask: fragment_type_mask as u32, + rate_limit, + node_name: node_name.unwrap().to_string(), + }); + } + } + + Ok(rate_limits) + } +} + +fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr { + column + .binary(BinOper::Custom("&"), value) + .binary(BinOper::NotEqual, 0) +} + +fn fragment_type_mask_intersects(value: i32) -> SimpleExpr { + bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value) } diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index e684c4071dad8..ebc7903c1b0f9 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -24,6 +24,7 @@ use risingwave_pb::catalog::{PbSink, PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; +use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo; use risingwave_pb::meta::table_fragments::{Fragment, PbFragment}; use risingwave_pb::stream_plan::{PbDispatchStrategy, StreamActor}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; @@ -558,14 +559,14 @@ impl MetadataManager { .collect()) } - pub async fn update_mv_rate_limit_by_table_id( + pub async fn update_backfill_rate_limit_by_table_id( &self, table_id: TableId, rate_limit: Option, ) -> MetaResult>> { let fragment_actors = self .catalog_controller - .update_mv_rate_limit_by_job_id(table_id.table_id as _, rate_limit) + .update_backfill_rate_limit_by_job_id(table_id.table_id as _, rate_limit) .await?; Ok(fragment_actors .into_iter() @@ -599,6 +600,11 @@ impl MetadataManager { pub fn cluster_id(&self) -> &ClusterId { self.cluster_controller.cluster_id() } + + pub async fn list_rate_limits(&self) -> MetaResult> { + let rate_limits = self.catalog_controller.list_rate_limits().await?; + Ok(rate_limits) + } } impl MetadataManager { diff --git a/src/prost/src/lib.rs b/src/prost/src/lib.rs index 5974a05664721..a4678df091270 100644 --- a/src/prost/src/lib.rs +++ b/src/prost/src/lib.rs @@ -302,6 +302,25 @@ impl stream_plan::StreamNode { } } +impl stream_plan::FragmentTypeFlag { + /// Fragments that may be affected by `BACKFILL_RATE_LIMIT`. + pub fn backfill_rate_limit_fragments() -> i32 { + stream_plan::FragmentTypeFlag::SourceScan as i32 + | stream_plan::FragmentTypeFlag::StreamScan as i32 + } + + /// Fragments that may be affected by `SOURCE_RATE_LIMIT`. + /// Note: for `FsFetch`, old fragments don't have this flag set, so don't use this to check. + pub fn source_rate_limit_fragments() -> i32 { + stream_plan::FragmentTypeFlag::Source as i32 | stream_plan::FragmentTypeFlag::FsFetch as i32 + } + + /// Note: this doesn't include `FsFetch` created in old versions. + pub fn rate_limit_fragments() -> i32 { + Self::backfill_rate_limit_fragments() | Self::source_rate_limit_fragments() + } +} + impl catalog::StreamSourceInfo { /// Refer to [`Self::cdc_source_job`] for details. pub fn is_shared(&self) -> bool { diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index fa0d38076abde..cb56344c32205 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -25,6 +25,7 @@ use async_trait::async_trait; use cluster_limit_service_client::ClusterLimitServiceClient; use either::Either; use futures::stream::BoxStream; +use list_rate_limits_response::RateLimitInfo; use lru::LruCache; use risingwave_common::catalog::{FunctionId, IndexId, SecretId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; @@ -1494,6 +1495,13 @@ impl MetaClient { self.inner.merge_compaction_group(req).await?; Ok(()) } + + /// List all rate limits for sources and backfills + pub async fn list_rate_limits(&self) -> Result> { + let request = ListRateLimitsRequest {}; + let resp = self.inner.list_rate_limits(request).await?; + Ok(resp.rate_limits) + } } #[async_trait] @@ -2044,6 +2052,7 @@ macro_rules! for_all_meta_rpc { ,{ stream_client, list_actor_splits, ListActorSplitsRequest, ListActorSplitsResponse } ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse } ,{ stream_client, recover, RecoverRequest, RecoverResponse } + ,{ stream_client, list_rate_limits, ListRateLimitsRequest, ListRateLimitsResponse } ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse } ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse } ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index ad3dff10efa32..3fe13f26d7bb8 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3473,7 +3473,7 @@ impl Parser<'_> { } else if let Some(rate_limit) = self.parse_alter_source_rate_limit(false)? { AlterSourceOperation::SetSourceRateLimit { rate_limit } } else { - return self.expected("SCHEMA after SET"); + return self.expected("SCHEMA or SOURCE_RATE_LIMIT after SET"); } } else if self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) { let connector_schema = self.parse_schema()?.unwrap(); diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 19c83702a8373..13b27c5b90e42 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -160,9 +160,9 @@ impl FsFetchExecutor { batch: SplitBatch, rate_limit_rps: Option, ) -> StreamExecutorResult { - let stream = source_desc + let (stream, _) = source_desc .source - .build_stream(batch, column_ids, Arc::new(source_ctx)) + .build_stream(batch, column_ids, Arc::new(source_ctx), false) .await .map_err(StreamExecutorError::connector_error)?; Ok(apply_rate_limit(stream, rate_limit_rps).boxed()) diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 34f9eb12d692a..a6f4ac5cfbbd9 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -550,6 +550,33 @@ impl SourceBackfillExecutorInner { ) .await?; } + Mutation::Throttle(actor_to_apply) => { + if let Some(new_rate_limit) = + actor_to_apply.get(&self.actor_ctx.id) + && *new_rate_limit != self.rate_limit_rps + { + tracing::info!( + "updating rate limit from {:?} to {:?}", + self.rate_limit_rps, + *new_rate_limit + ); + self.rate_limit_rps = *new_rate_limit; + // rebuild reader + let (reader, _backfill_info) = self + .build_stream_source_reader( + &source_desc, + backfill_stage + .get_latest_unfinished_splits()?, + ) + .await?; + + backfill_stream = select_with_strategy( + input.by_ref().map(Either::Left), + reader.map(Either::Right), + select_strategy, + ); + } + } _ => {} } } @@ -589,7 +616,6 @@ impl SourceBackfillExecutorInner { .await?; if self.should_report_finished(&backfill_stage.states) { - tracing::debug!("progress finish"); self.progress.finish( barrier.epoch, backfill_stage.total_backfilled_rows(), diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index b15d14a2679ec..f32a5da734671 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -71,7 +71,7 @@ pub struct SourceExecutor { /// Rate limit in rows/s. rate_limit_rps: Option, - is_shared: bool, + is_shared_non_cdc: bool, } impl SourceExecutor { @@ -82,7 +82,7 @@ impl SourceExecutor { barrier_receiver: UnboundedReceiver, system_params: SystemParamsReaderRef, rate_limit_rps: Option, - is_shared: bool, + is_shared_non_cdc: bool, ) -> Self { Self { actor_ctx, @@ -91,7 +91,7 @@ impl SourceExecutor { barrier_receiver: Some(barrier_receiver), system_params, rate_limit_rps, - is_shared, + is_shared_non_cdc, } } @@ -116,11 +116,13 @@ impl SourceExecutor { })) } + /// If `seek_to_latest` is true, will also return the latest splits after seek. pub async fn build_stream_source_reader( &self, source_desc: &SourceDesc, state: ConnectorState, - ) -> StreamExecutorResult { + seek_to_latest: bool, + ) -> StreamExecutorResult<(BoxChunkSourceStream, Option>)> { let column_ids = source_desc .columns .iter() @@ -183,13 +185,16 @@ impl SourceExecutor { source_desc.source.config.clone(), schema_change_tx, ); - let stream = source_desc + let (stream, latest_splits) = source_desc .source - .build_stream(state, column_ids, Arc::new(source_ctx)) + .build_stream(state, column_ids, Arc::new(source_ctx), seek_to_latest) .await - .map_err(StreamExecutorError::connector_error); + .map_err(StreamExecutorError::connector_error)?; - Ok(apply_rate_limit(stream?, self.rate_limit_rps).boxed()) + Ok(( + apply_rate_limit(stream, self.rate_limit_rps).boxed(), + latest_splits, + )) } fn is_auto_schema_change_enable(&self) -> bool { @@ -367,10 +372,10 @@ impl SourceExecutor { ); // Replace the source reader with a new one of the new state. - let reader = self - .build_stream_source_reader(source_desc, Some(target_state.clone())) - .await? - .map_err(StreamExecutorError::connector_error); + let (reader, _) = self + .build_stream_source_reader(source_desc, Some(target_state.clone()), false) + .await?; + let reader = reader.map_err(StreamExecutorError::connector_error); stream.replace_data_stream(reader); @@ -454,6 +459,7 @@ impl SourceExecutor { } core.split_state_store.init_epoch(barrier.epoch); + let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; for ele in &mut boot_state { if let Some(recover_state) = core @@ -462,56 +468,56 @@ impl SourceExecutor { .await? { *ele = recover_state; + // if state store is non-empty, we consider it's initialized. + is_uninitialized = false; } else { // This is a new split, not in state table. - if self.is_shared { - // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. - // It's highly probable that the work of scanning historical data cannot be shared, - // so don't waste work on it. - // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 - if ele.is_cdc_split() { - // shared CDC source already starts from latest. - continue; - } - match ele { - SplitImpl::Kafka(split) => { - split.seek_to_latest_offset(); - } - _ => unreachable!("only kafka source can be shared, got {:?}", ele), - } - } + // make sure it is written to state table later. + // Then even it receives no messages, we can observe it in state table. + core.updated_splits_in_epoch.insert(ele.id(), ele.clone()); } } // init in-memory split states with persisted state if any core.init_split_state(boot_state.clone()); - let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = Some(core); let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::debug!(state = ?recover_state, "start with state"); - let source_chunk_reader = self - .build_stream_source_reader(&source_desc, recover_state) + let (source_chunk_reader, latest_splits) = self + .build_stream_source_reader( + &source_desc, + recover_state, + // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. + // It's highly probable that the work of scanning historical data cannot be shared, + // so don't waste work on it. + // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 + // Note that shared CDC source is special. It already starts from latest. + self.is_shared_non_cdc && is_uninitialized, + ) .instrument_await("source_build_reader") - .await? - .map_err(StreamExecutorError::connector_error); - + .await?; + let source_chunk_reader = source_chunk_reader.map_err(StreamExecutorError::connector_error); + if let Some(latest_splits) = latest_splits { + // make sure it is written to state table later. + // Then even it receives no messages, we can observe it in state table. + self.stream_source_core + .as_mut() + .unwrap() + .updated_splits_in_epoch + .extend(latest_splits.into_iter().map(|s| (s.id(), s))); + } // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); - // - For shared source, pause until there's a MV. // - If the first barrier requires us to pause on startup, pause the stream. - if (self.is_shared && is_uninitialized) || barrier.is_pause_on_startup() { - tracing::info!( - is_shared = self.is_shared, - is_uninitialized = is_uninitialized, - "source paused on startup" - ); + if barrier.is_pause_on_startup() { + tracing::info!("source paused on startup"); stream.pause_stream(); } @@ -554,14 +560,6 @@ impl SourceExecutor { let epoch = barrier.epoch; - if self.is_shared - && is_uninitialized - && barrier.has_more_downstream_fragments(self.actor_ctx.id) - { - stream.resume_stream(); - is_uninitialized = false; - } - if let Some(mutation) = barrier.mutation.as_deref() { match mutation { // XXX: Is it possible that the stream is self_paused, and we have pause mutation now? In this case, it will panic. @@ -598,7 +596,7 @@ impl SourceExecutor { if let Some(new_rate_limit) = actor_to_apply.get(&self.actor_ctx.id) && *new_rate_limit != self.rate_limit_rps { - tracing::debug!( + tracing::info!( "updating rate limit from {:?} to {:?}", self.rate_limit_rps, *new_rate_limit diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 98746a672e43c..4d4786eea3bfa 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -232,7 +232,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { barrier_receiver, system_params, source.rate_limit, - is_shared, + is_shared && !source.with_properties.is_cdc_connector(), ) .boxed() } diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index dba8f5050627a..c860b8f430fa1 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -250,6 +250,7 @@ impl CreateMviewProgressReporter { if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) = self.state { return; } + tracing::debug!("progress finish"); self.update_inner( epoch, BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows),