Skip to content

Commit

Permalink
fix: fix potential data loss for shared source
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Nov 19, 2024
1 parent ac6cb38 commit a29b177
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 86 deletions.
66 changes: 47 additions & 19 deletions e2e_test/source_inline/kafka/shared_source.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,14 @@ select count(*) from rw_internal_tables where name like '%s0%';

sleep 1s

# SourceExecutor's ingestion does not start (state table is empty), even after sleep
# SourceExecutor's starts from latest.
system ok
internal_table.mjs --name s0 --type source
----
(empty)
0,"{""split_info"": {""partition"": 0, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
1,"{""split_info"": {""partition"": 1, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


statement ok
Expand All @@ -72,12 +75,6 @@ create materialized view mv_1 as select * from s0;
# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 2s

# SourceExecutor's ingestion started, but it only starts from latest (offset 1).
system ok
internal_table.mjs --name s0 --type source
----
(empty)


# SourceBackfill starts from offset 0, with backfill_info: HasDataToBackfill { latest_offset: "0" } (decided by kafka high watermark).
# (meaning upstream already consumed offset 0, so we only need to backfill to offset 0)
Expand Down Expand Up @@ -144,7 +141,7 @@ EOF

sleep 2s

# SourceExecutor's finally got new data now.
# SourceExecutor's got new data.
system ok
internal_table.mjs --name s0 --type source
----
Expand Down Expand Up @@ -185,16 +182,6 @@ select v1, v2 from mv_1;
4 dd


# start_offset changed to 1
system ok
internal_table.mjs --name s0 --type source
----
0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 3, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# Transition from SourceCachingUp to Finished after consuming one upstream message.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
Expand Down Expand Up @@ -334,6 +321,47 @@ internal_table.mjs --name s0 --type source
# # risedev psql -c "select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;"


# Test: rate limit and resume won't lose data

statement ok
alter source s0 set source_rate_limit to 0;


system ok
cat <<EOF | rpk topic produce shared_source -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a"}
1 {"v1": 2, "v2": "b"}
2 {"v1": 3, "v2": "c"}
3 {"v1": 4, "v2": "d"}
EOF

sleep 2s

# no data goes in

query ?? rowsort
select v1, count(*) from mv_1 group by v1;
----
1 12
2 12
3 13
4 14

statement ok
alter source s0 set source_rate_limit to default;

sleep 3s

# data comes in
query ?? rowsort
select v1, count(*) from mv_1 group by v1;
----
1 13
2 13
3 14
4 15


statement ok
drop source s0 cascade;

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ impl SourceExecutor {
ConnectorProperties::default(),
None,
));
let stream = self
let (stream, _) = self
.source
.build_stream(Some(self.split_list), self.column_ids, source_ctx)
.build_stream(Some(self.split_list), self.column_ids, source_ctx, false)
.await?;

#[for_await]
Expand Down
8 changes: 6 additions & 2 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub trait TryFromBTreeMap: Sized + UnknownFields {
/// Represents `WITH` options for sources.
///
/// Each instance should add a `#[derive(with_options::WithOptions)]` marker.
pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions {
pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug {
const SOURCE_NAME: &'static str;
type Split: SplitMetaData
+ TryFrom<SplitImpl, Error = crate::error::ConnectorError>
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
}
}

pub async fn create_split_reader<P: SourceProperties + std::fmt::Debug>(
pub async fn create_split_reader<P: SourceProperties>(
prop: P,
splits: Vec<SplitImpl>,
parser_config: ParserConfig,
Expand Down Expand Up @@ -375,6 +375,10 @@ pub trait SplitReader: Sized + Send {
fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
HashMap::new()
}

async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
Err(anyhow!("seek_to_latest is not supported for this connector").into())
}
}

/// Information used to determine whether we should start and finish source backfill.
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn build_cdc_table_id(source_id: u32, external_table_name: &str) -> String {
format!("{}.{}", source_id, external_table_name)
}

pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static {
pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static {
const CDC_CONNECTOR_NAME: &'static str;
fn source_type() -> CdcSourceType;
}
Expand Down
2 changes: 0 additions & 2 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ impl SplitEnumerator for KafkaSplitEnumerator {
partition,
start_offset: start_offsets.remove(&partition).unwrap(),
stop_offset: stop_offsets.remove(&partition).unwrap(),
hack_seek_to_latest: false,
})
.collect();

Expand Down Expand Up @@ -299,7 +298,6 @@ impl KafkaSplitEnumerator {
partition: *partition,
start_offset: Some(start_offset),
stop_offset: Some(stop_offset),
hack_seek_to_latest:false
}
})
.collect::<Vec<KafkaSplit>>())
Expand Down
35 changes: 30 additions & 5 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ use crate::source::kafka::{
};
use crate::source::{
into_chunk_stream, BackfillInfo, BoxChunkSourceStream, Column, SourceContextRef, SplitId,
SplitMetaData, SplitReader,
SplitImpl, SplitMetaData, SplitReader,
};

pub struct KafkaSplitReader {
consumer: StreamConsumer<RwConsumerContext>,
offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
backfill_info: HashMap<SplitId, BackfillInfo>,
splits: Vec<KafkaSplit>,
sync_call_timeout: Duration,
bytes_per_second: usize,
max_num_messages: usize,
parser_config: ParserConfig,
Expand Down Expand Up @@ -110,12 +112,10 @@ impl SplitReader for KafkaSplitReader {

let mut offsets = HashMap::new();
let mut backfill_info = HashMap::new();
for split in splits {
for split in splits.clone() {
offsets.insert(split.id(), (split.start_offset, split.stop_offset));

if split.hack_seek_to_latest {
tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::End)?;
} else if let Some(offset) = split.start_offset {
if let Some(offset) = split.start_offset {
tpl.add_partition_offset(
split.topic.as_str(),
split.partition,
Expand Down Expand Up @@ -168,8 +168,10 @@ impl SplitReader for KafkaSplitReader {
Ok(Self {
consumer,
offsets,
splits,
backfill_info,
bytes_per_second,
sync_call_timeout: properties.common.sync_call_timeout,
max_num_messages,
parser_config,
source_ctx,
Expand All @@ -185,6 +187,29 @@ impl SplitReader for KafkaSplitReader {
fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
self.backfill_info.clone()
}

async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
let mut latest_splits: Vec<SplitImpl> = Vec::new();
let mut tpl = TopicPartitionList::with_capacity(self.splits.len());
for mut split in self.splits.clone() {
// we can't get latest offset if we use Offset::End, so we just fetch watermark here.
let (_low, high) = self
.consumer
.fetch_watermarks(
split.topic.as_str(),
split.partition,
self.sync_call_timeout,
)
.await?;
tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::Offset(high))?;
split.start_offset = Some(high - 1);
latest_splits.push(split.into());
}
self.consumer
.seek_partitions(tpl, self.sync_call_timeout)
.await?;
Ok(latest_splits)
}
}

impl KafkaSplitReader {
Expand Down
12 changes: 0 additions & 12 deletions src/connector/src/source/kafka/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ pub struct KafkaSplit {
/// A better approach would be to make it **inclusive**. <https://github.com/risingwavelabs/risingwave/pull/16257>
pub(crate) start_offset: Option<i64>,
pub(crate) stop_offset: Option<i64>,
#[serde(skip)]
/// Used by shared source to hackily seek to the latest offset without fetching start offset first.
/// XXX: But why do we fetch low watermark for latest start offset..?
///
/// When this is `true`, `start_offset` will be ignored.
pub(crate) hack_seek_to_latest: bool,
}

impl SplitMetaData for KafkaSplit {
Expand Down Expand Up @@ -72,16 +66,10 @@ impl KafkaSplit {
partition,
start_offset,
stop_offset,
hack_seek_to_latest: false,
}
}

pub fn get_topic_and_partition(&self) -> (String, i32) {
(self.topic.clone(), self.partition)
}

/// This should only be used for a fresh split, not persisted in state table yet.
pub fn seek_to_latest_offset(&mut self) {
self.hack_seek_to_latest = true;
}
}
28 changes: 22 additions & 6 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use crate::source::filesystem::opendal_source::{
use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
use crate::source::{
create_split_reader, BackfillInfo, BoxChunkSourceStream, BoxTryStream, Column,
ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitReader,
WaitCheckpointTask,
ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitImpl,
SplitReader, WaitCheckpointTask,
};
use crate::{dispatch_source_prop, WithOptionsSecResolved};

Expand Down Expand Up @@ -211,14 +211,17 @@ impl SourceReader {
}

/// Build `SplitReader`s and then `BoxChunkSourceStream` from the given `ConnectorState` (`SplitImpl`s).
///
/// If `seek_to_latest` is true, will also return the latest splits after seek.
pub async fn build_stream(
&self,
state: ConnectorState,
column_ids: Vec<ColumnId>,
source_ctx: Arc<SourceContext>,
) -> ConnectorResult<BoxChunkSourceStream> {
seek_to_latest: bool,
) -> ConnectorResult<(BoxChunkSourceStream, Option<Vec<SplitImpl>>)> {
let Some(splits) = state else {
return Ok(pending().boxed());
return Ok((pending().boxed(), None));
};
let config = self.config.clone();
let columns = self.get_target_columns(column_ids)?;
Expand All @@ -243,7 +246,7 @@ impl SourceReader {

let support_multiple_splits = config.support_multiple_splits();
dispatch_source_prop!(config, prop, {
let readers = if support_multiple_splits {
let mut readers = if support_multiple_splits {
tracing::debug!(
"spawning connector split reader for multiple splits {:?}",
splits
Expand All @@ -268,7 +271,20 @@ impl SourceReader {
.await?
};

Ok(select_all(readers.into_iter().map(|r| r.into_stream())).boxed())
let latest_splits = if seek_to_latest {
let mut latest_splits = Vec::new();
for reader in readers.iter_mut() {
latest_splits.extend(reader.seek_to_latest().await?);
}
Some(latest_splits)
} else {
None
};

Ok((
select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
latest_splits,
))
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
batch: SplitBatch,
rate_limit_rps: Option<u32>,
) -> StreamExecutorResult<BoxChunkSourceStream> {
let stream = source_desc
let (stream, _) = source_desc
.source
.build_stream(batch, column_ids, Arc::new(source_ctx))
.build_stream(batch, column_ids, Arc::new(source_ctx), false)
.await
.map_err(StreamExecutorError::connector_error)?;
Ok(apply_rate_limit(stream, rate_limit_rps).boxed())
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
.await?;

if self.should_report_finished(&backfill_stage.states) {
tracing::debug!("progress finish");
self.progress.finish(
barrier.epoch,
backfill_stage.total_backfilled_rows(),
Expand Down
Loading

0 comments on commit a29b177

Please sign in to comment.