diff --git a/e2e_test/source_inline/kafka/shared_source.slt.serial b/e2e_test/source_inline/kafka/shared_source.slt.serial index 3397f90f081da..a812d8672d177 100644 --- a/e2e_test/source_inline/kafka/shared_source.slt.serial +++ b/e2e_test/source_inline/kafka/shared_source.slt.serial @@ -59,11 +59,14 @@ select count(*) from rw_internal_tables where name like '%s0%'; sleep 1s -# SourceExecutor's ingestion does not start (state table is empty), even after sleep +# SourceExecutor's starts from latest. system ok internal_table.mjs --name s0 --type source ---- -(empty) +0,"{""split_info"": {""partition"": 0, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +1,"{""split_info"": {""partition"": 1, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" +3,"{""split_info"": {""partition"": 3, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" statement ok @@ -72,12 +75,6 @@ create materialized view mv_1 as select * from s0; # Wait enough time to ensure SourceExecutor consumes all Kafka data. sleep 2s -# SourceExecutor's ingestion started, but it only starts from latest (offset 1). -system ok -internal_table.mjs --name s0 --type source ----- -(empty) - # SourceBackfill starts from offset 0, with backfill_info: HasDataToBackfill { latest_offset: "0" } (decided by kafka high watermark). # (meaning upstream already consumed offset 0, so we only need to backfill to offset 0) @@ -144,7 +141,7 @@ EOF sleep 2s -# SourceExecutor's finally got new data now. +# SourceExecutor's got new data. system ok internal_table.mjs --name s0 --type source ---- @@ -185,16 +182,6 @@ select v1, v2 from mv_1; 4 dd -# start_offset changed to 1 -system ok -internal_table.mjs --name s0 --type source ----- -0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -2,"{""split_info"": {""partition"": 2, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" -3,"{""split_info"": {""partition"": 3, ""start_offset"": 3, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}" - - # Transition from SourceCachingUp to Finished after consuming one upstream message. system ok internal_table.mjs --name mv_1 --type sourcebackfill @@ -334,6 +321,47 @@ internal_table.mjs --name s0 --type source # # risedev psql -c "select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;" +# Test: rate limit and resume won't lose data + +statement ok +alter source s0 set source_rate_limit to 0; + + +system ok +cat < @@ -108,7 +108,7 @@ impl TryFromBTreeMap for P { } } -pub async fn create_split_reader( +pub async fn create_split_reader( prop: P, splits: Vec, parser_config: ParserConfig, @@ -375,6 +375,10 @@ pub trait SplitReader: Sized + Send { fn backfill_info(&self) -> HashMap { HashMap::new() } + + async fn seek_to_latest(&mut self) -> Result> { + Err(anyhow!("seek_to_latest is not supported for this connector").into()) + } } /// Information used to determine whether we should start and finish source backfill. diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index 3f3626449153a..9e99c7db9e5e8 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -58,7 +58,7 @@ pub fn build_cdc_table_id(source_id: u32, external_table_name: &str) -> String { format!("{}.{}", source_id, external_table_name) } -pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static { +pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static { const CDC_CONNECTOR_NAME: &'static str; fn source_type() -> CdcSourceType; } diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 1d7525bc7a613..e8e9eb556c3c5 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -191,7 +191,6 @@ impl SplitEnumerator for KafkaSplitEnumerator { partition, start_offset: start_offsets.remove(&partition).unwrap(), stop_offset: stop_offsets.remove(&partition).unwrap(), - hack_seek_to_latest: false, }) .collect(); @@ -299,7 +298,6 @@ impl KafkaSplitEnumerator { partition: *partition, start_offset: Some(start_offset), stop_offset: Some(stop_offset), - hack_seek_to_latest:false } }) .collect::>()) diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index b9523eca98b57..13523e00135ad 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -37,13 +37,15 @@ use crate::source::kafka::{ }; use crate::source::{ into_chunk_stream, BackfillInfo, BoxChunkSourceStream, Column, SourceContextRef, SplitId, - SplitMetaData, SplitReader, + SplitImpl, SplitMetaData, SplitReader, }; pub struct KafkaSplitReader { consumer: StreamConsumer, offsets: HashMap, Option)>, backfill_info: HashMap, + splits: Vec, + sync_call_timeout: Duration, bytes_per_second: usize, max_num_messages: usize, parser_config: ParserConfig, @@ -110,12 +112,10 @@ impl SplitReader for KafkaSplitReader { let mut offsets = HashMap::new(); let mut backfill_info = HashMap::new(); - for split in splits { + for split in splits.clone() { offsets.insert(split.id(), (split.start_offset, split.stop_offset)); - if split.hack_seek_to_latest { - tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::End)?; - } else if let Some(offset) = split.start_offset { + if let Some(offset) = split.start_offset { tpl.add_partition_offset( split.topic.as_str(), split.partition, @@ -168,8 +168,10 @@ impl SplitReader for KafkaSplitReader { Ok(Self { consumer, offsets, + splits, backfill_info, bytes_per_second, + sync_call_timeout: properties.common.sync_call_timeout, max_num_messages, parser_config, source_ctx, @@ -185,6 +187,29 @@ impl SplitReader for KafkaSplitReader { fn backfill_info(&self) -> HashMap { self.backfill_info.clone() } + + async fn seek_to_latest(&mut self) -> Result> { + let mut latest_splits: Vec = Vec::new(); + let mut tpl = TopicPartitionList::with_capacity(self.splits.len()); + for mut split in self.splits.clone() { + // we can't get latest offset if we use Offset::End, so we just fetch watermark here. + let (_low, high) = self + .consumer + .fetch_watermarks( + split.topic.as_str(), + split.partition, + self.sync_call_timeout, + ) + .await?; + tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::Offset(high))?; + split.start_offset = Some(high - 1); + latest_splits.push(split.into()); + } + self.consumer + .seek_partitions(tpl, self.sync_call_timeout) + .await?; + Ok(latest_splits) + } } impl KafkaSplitReader { diff --git a/src/connector/src/source/kafka/split.rs b/src/connector/src/source/kafka/split.rs index 791836ac2c858..fa969bb37111f 100644 --- a/src/connector/src/source/kafka/split.rs +++ b/src/connector/src/source/kafka/split.rs @@ -32,12 +32,6 @@ pub struct KafkaSplit { /// A better approach would be to make it **inclusive**. pub(crate) start_offset: Option, pub(crate) stop_offset: Option, - #[serde(skip)] - /// Used by shared source to hackily seek to the latest offset without fetching start offset first. - /// XXX: But why do we fetch low watermark for latest start offset..? - /// - /// When this is `true`, `start_offset` will be ignored. - pub(crate) hack_seek_to_latest: bool, } impl SplitMetaData for KafkaSplit { @@ -72,16 +66,10 @@ impl KafkaSplit { partition, start_offset, stop_offset, - hack_seek_to_latest: false, } } pub fn get_topic_and_partition(&self) -> (String, i32) { (self.topic.clone(), self.partition) } - - /// This should only be used for a fresh split, not persisted in state table yet. - pub fn seek_to_latest_offset(&mut self) { - self.hack_seek_to_latest = true; - } } diff --git a/src/connector/src/source/reader/reader.rs b/src/connector/src/source/reader/reader.rs index 89335f8f0d80e..d8b135f62420e 100644 --- a/src/connector/src/source/reader/reader.rs +++ b/src/connector/src/source/reader/reader.rs @@ -37,8 +37,8 @@ use crate::source::filesystem::opendal_source::{ use crate::source::filesystem::{FsPageItem, OpendalFsSplit}; use crate::source::{ create_split_reader, BackfillInfo, BoxChunkSourceStream, BoxTryStream, Column, - ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitReader, - WaitCheckpointTask, + ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitImpl, + SplitReader, WaitCheckpointTask, }; use crate::{dispatch_source_prop, WithOptionsSecResolved}; @@ -211,14 +211,17 @@ impl SourceReader { } /// Build `SplitReader`s and then `BoxChunkSourceStream` from the given `ConnectorState` (`SplitImpl`s). + /// + /// If `seek_to_latest` is true, will also return the latest splits after seek. pub async fn build_stream( &self, state: ConnectorState, column_ids: Vec, source_ctx: Arc, - ) -> ConnectorResult { + seek_to_latest: bool, + ) -> ConnectorResult<(BoxChunkSourceStream, Option>)> { let Some(splits) = state else { - return Ok(pending().boxed()); + return Ok((pending().boxed(), None)); }; let config = self.config.clone(); let columns = self.get_target_columns(column_ids)?; @@ -243,7 +246,7 @@ impl SourceReader { let support_multiple_splits = config.support_multiple_splits(); dispatch_source_prop!(config, prop, { - let readers = if support_multiple_splits { + let mut readers = if support_multiple_splits { tracing::debug!( "spawning connector split reader for multiple splits {:?}", splits @@ -268,7 +271,20 @@ impl SourceReader { .await? }; - Ok(select_all(readers.into_iter().map(|r| r.into_stream())).boxed()) + let latest_splits = if seek_to_latest { + let mut latest_splits = Vec::new(); + for reader in readers.iter_mut() { + latest_splits.extend(reader.seek_to_latest().await?); + } + Some(latest_splits) + } else { + None + }; + + Ok(( + select_all(readers.into_iter().map(|r| r.into_stream())).boxed(), + latest_splits, + )) }) } } diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index 13bbac436d367..8964eaecff45a 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -160,9 +160,9 @@ impl FsFetchExecutor { batch: SplitBatch, rate_limit_rps: Option, ) -> StreamExecutorResult { - let stream = source_desc + let (stream, _) = source_desc .source - .build_stream(batch, column_ids, Arc::new(source_ctx)) + .build_stream(batch, column_ids, Arc::new(source_ctx), false) .await .map_err(StreamExecutorError::connector_error)?; Ok(apply_rate_limit(stream, rate_limit_rps).boxed()) diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 9df74a719d465..6ada0f2b62ebb 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -609,7 +609,6 @@ impl SourceBackfillExecutorInner { .await?; if self.should_report_finished(&backfill_stage.states) { - tracing::debug!("progress finish"); self.progress.finish( barrier.epoch, backfill_stage.total_backfilled_rows(), diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index e0bbe3d1f6d97..6d394fe450ab9 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -116,11 +116,13 @@ impl SourceExecutor { })) } + /// If `seek_to_latest` is true, will also return the latest splits after seek. pub async fn build_stream_source_reader( &self, source_desc: &SourceDesc, state: ConnectorState, - ) -> StreamExecutorResult { + seek_to_latest: bool, + ) -> StreamExecutorResult<(BoxChunkSourceStream, Option>)> { let column_ids = source_desc .columns .iter() @@ -183,13 +185,16 @@ impl SourceExecutor { source_desc.source.config.clone(), schema_change_tx, ); - let stream = source_desc + let (stream, latest_splits) = source_desc .source - .build_stream(state, column_ids, Arc::new(source_ctx)) + .build_stream(state, column_ids, Arc::new(source_ctx), seek_to_latest) .await - .map_err(StreamExecutorError::connector_error); + .map_err(StreamExecutorError::connector_error)?; - Ok(apply_rate_limit(stream?, self.rate_limit_rps).boxed()) + Ok(( + apply_rate_limit(stream, self.rate_limit_rps).boxed(), + latest_splits, + )) } fn is_auto_schema_change_enable(&self) -> bool { @@ -367,10 +372,10 @@ impl SourceExecutor { ); // Replace the source reader with a new one of the new state. - let reader = self - .build_stream_source_reader(source_desc, Some(target_state.clone())) - .await? - .map_err(StreamExecutorError::connector_error); + let (reader, _) = self + .build_stream_source_reader(source_desc, Some(target_state.clone()), false) + .await?; + let reader = reader.map_err(StreamExecutorError::connector_error); stream.replace_data_stream(reader); @@ -459,7 +464,7 @@ impl SourceExecutor { }; core.split_state_store.init_epoch(first_epoch).await?; - + let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; for ele in &mut boot_state { if let Some(recover_state) = core .split_state_store @@ -467,42 +472,46 @@ impl SourceExecutor { .await? { *ele = recover_state; + // if state store is non-empty, we consider it's initialized. + is_uninitialized = false; } else { // This is a new split, not in state table. - if self.is_shared { - // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. - // It's highly probable that the work of scanning historical data cannot be shared, - // so don't waste work on it. - // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 - if ele.is_cdc_split() { - // shared CDC source already starts from latest. - continue; - } - match ele { - SplitImpl::Kafka(split) => { - split.seek_to_latest_offset(); - } - _ => unreachable!("only kafka source can be shared, got {:?}", ele), - } - } + // make sure it is written to state table later. + // Then even it receives no messages, we can observe it in state table. + core.updated_splits_in_epoch.insert(ele.id(), ele.clone()); } } // init in-memory split states with persisted state if any core.init_split_state(boot_state.clone()); - let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0; // Return the ownership of `stream_source_core` to the source executor. self.stream_source_core = Some(core); let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state); tracing::debug!(state = ?recover_state, "start with state"); - let source_chunk_reader = self - .build_stream_source_reader(&source_desc, recover_state) + let (source_chunk_reader, latest_splits) = self + .build_stream_source_reader( + &source_desc, + recover_state, + // For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data. + // It's highly probable that the work of scanning historical data cannot be shared, + // so don't waste work on it. + // For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297 + self.is_shared && is_uninitialized, + ) .instrument_await("source_build_reader") - .await? - .map_err(StreamExecutorError::connector_error); - + .await?; + let source_chunk_reader = source_chunk_reader.map_err(StreamExecutorError::connector_error); + if let Some(latest_splits) = latest_splits { + // make sure it is written to state table later. + // Then even it receives no messages, we can observe it in state table. + self.stream_source_core + .as_mut() + .unwrap() + .updated_splits_in_epoch + .extend(latest_splits.into_iter().map(|s| (s.id(), s))); + } // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); @@ -510,9 +519,8 @@ impl SourceExecutor { StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); let mut command_paused = false; - // - For shared source, pause until there's a MV. // - If the first barrier requires us to pause on startup, pause the stream. - if (self.is_shared && is_uninitialized) || is_pause_on_startup { + if is_pause_on_startup { tracing::info!( is_shared = self.is_shared, is_uninitialized = is_uninitialized, diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index dba8f5050627a..c860b8f430fa1 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -250,6 +250,7 @@ impl CreateMviewProgressReporter { if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) = self.state { return; } + tracing::debug!("progress finish"); self.update_inner( epoch, BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows),