Skip to content

Commit

Permalink
fix(log-store): rebuild log store iter when exists for a timeout (#17009
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wenym1 authored May 30, 2024
1 parent 0d4d253 commit 4e64389
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 21 deletions.
271 changes: 251 additions & 20 deletions src/stream/src/common/log_store_impl/kv_log_store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
// limitations under the License.

use std::future::Future;
use std::ops::Bound;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::pin::Pin;
use std::time::Duration;
use std::time::{Duration, Instant};

use anyhow::anyhow;
use await_tree::InstrumentAwait;
use bytes::Bytes;
use foyer::CacheContext;
use futures::future::{try_join_all, BoxFuture};
use futures::{FutureExt, TryFutureExt};
Expand All @@ -31,11 +33,14 @@ use risingwave_common::util::epoch::EpochExt;
use risingwave_connector::sink::log_store::{
ChunkId, LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset,
};
use risingwave_hummock_sdk::key::prefixed_range_with_vnode;
use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, FullKey, TableKey, TableKeyRange};
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_storage::error::StorageResult;
use risingwave_storage::hummock::CachePolicy;
use risingwave_storage::store::{PrefetchOptions, ReadOptions};
use risingwave_storage::StateStore;
use risingwave_storage::store::{
PrefetchOptions, ReadOptions, StateStoreIterItemRef, StateStoreRead,
};
use risingwave_storage::{StateStore, StateStoreIter};
use tokio::sync::watch;
use tokio::time::sleep;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -113,7 +118,7 @@ pub struct KvLogStoreReader<S: StateStore> {
first_write_epoch: Option<u64>,

/// `Some` means consuming historical log data
state_store_stream: Option<Pin<Box<LogStoreItemMergeStream<S::Iter>>>>,
state_store_stream: Option<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>,

/// Store the future that attempts to read a flushed stream chunk.
/// This is for cancellation safety. Since it is possible that the future of `next_item`
Expand Down Expand Up @@ -180,12 +185,141 @@ impl<S: StateStore> KvLogStoreReader<S> {
}
}

struct AutoRebuildStateStoreReadIter<S: StateStoreRead, F> {
state_store: S,
iter: S::Iter,
// call to get whether to rebuild the iter. Once return true, the closure should reset itself.
should_rebuild: F,
end_bound: Bound<TableKey<Bytes>>,
epoch: HummockEpoch,
options: ReadOptions,
}

impl<S: StateStoreRead, F: FnMut() -> bool> AutoRebuildStateStoreReadIter<S, F> {
async fn new(
state_store: S,
should_rebuild: F,
range: TableKeyRange,
epoch: HummockEpoch,
options: ReadOptions,
) -> StorageResult<Self> {
let (start_bound, end_bound) = range;
let iter = state_store
.iter((start_bound, end_bound.clone()), epoch, options.clone())
.await?;
Ok(Self {
state_store,
iter,
should_rebuild,
end_bound,
epoch,
options,
})
}
}

type TimeoutAutoRebuildIter<S: StateStoreRead> =
AutoRebuildStateStoreReadIter<S, impl FnMut() -> bool + Send>;

async fn iter_with_timeout_rebuild<S: StateStoreRead>(
state_store: S,
range: TableKeyRange,
epoch: HummockEpoch,
options: ReadOptions,
timeout: Duration,
) -> StorageResult<TimeoutAutoRebuildIter<S>> {
const CHECK_TIMEOUT_PERIOD: usize = 100;
// use a struct here to avoid accidental copy instead of move on primitive usize
struct Count(usize);
let mut check_count = Count(0);
let mut total_count = Count(0);
let mut curr_iter_item_count = Count(0);
let mut start_time = Instant::now();
let initial_start_time = start_time;
AutoRebuildStateStoreReadIter::new(
state_store,
move || {
check_count.0 += 1;
curr_iter_item_count.0 += 1;
total_count.0 += 1;
if check_count.0 == CHECK_TIMEOUT_PERIOD {
check_count.0 = 0;
if start_time.elapsed() > timeout {
let prev_iter_item_count = curr_iter_item_count.0;
curr_iter_item_count.0 = 0;
start_time = Instant::now();
info!(
table_id = options.table_id.table_id,
iter_exist_time_secs = initial_start_time.elapsed().as_secs(),
prev_iter_item_count,
total_iter_item_count = total_count.0,
"kv log store iter is rebuilt"
);
true
} else {
false
}
} else {
false
}
},
range,
epoch,
options,
)
.await
}

impl<S: StateStoreRead, F: FnMut() -> bool + Send> StateStoreIter
for AutoRebuildStateStoreReadIter<S, F>
{
async fn try_next(&mut self) -> StorageResult<Option<StateStoreIterItemRef<'_>>> {
let should_rebuild = (self.should_rebuild)();
if should_rebuild {
let Some((key, _value)) = self.iter.try_next().await? else {
return Ok(None);
};
let key: FullKey<&[u8]> = key;
let range_start = Bytes::copy_from_slice(key.user_key.table_key.as_ref());
let new_iter = self
.state_store
.iter(
(
Included(TableKey(range_start.clone())),
self.end_bound.clone(),
),
self.epoch,
self.options.clone(),
)
.await?;
self.iter = new_iter;
let item: Option<StateStoreIterItemRef<'_>> = self.iter.try_next().await?;
if let Some((key, value)) = item {
assert_eq!(
key.user_key.table_key.0,
range_start.as_ref(),
"the first key should be the previous key"
);
Ok(Some((key, value)))
} else {
unreachable!(
"the first key should be the previous key {:?}, but get None",
range_start
)
}
} else {
self.iter.try_next().await
}
}
}

impl<S: StateStore> KvLogStoreReader<S> {
fn read_persisted_log_store(
&self,
last_persisted_epoch: Option<u64>,
) -> impl Future<Output = LogStoreResult<Pin<Box<LogStoreItemMergeStream<S::Iter>>>>> + Send
{
) -> impl Future<
Output = LogStoreResult<Pin<Box<LogStoreItemMergeStream<TimeoutAutoRebuildIter<S>>>>>,
> + Send {
let range_start = if let Some(last_persisted_epoch) = last_persisted_epoch {
// start from the next epoch of last_persisted_epoch
Included(
Expand All @@ -210,19 +344,21 @@ impl<S: StateStore> KvLogStoreReader<S> {
);
let state_store = self.state_store.clone();
async move {
state_store
.iter(
key_range,
HummockEpoch::MAX,
ReadOptions {
// This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch.
prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(),
cache_policy: CachePolicy::Fill(CacheContext::LruPriorityLow),
table_id,
..Default::default()
},
)
.await
// rebuild the iter every 10 minutes to avoid pinning hummock version for too long
iter_with_timeout_rebuild(
state_store,
key_range,
HummockEpoch::MAX,
ReadOptions {
// This stream lives too long, the connection of prefetch object may break. So use a short connection prefetch.
prefetch_options: PrefetchOptions::prefetch_for_small_range_scan(),
cache_policy: CachePolicy::Fill(CacheContext::LruPriorityLow),
table_id,
..Default::default()
},
Duration::from_secs(10 * 60),
)
.await
}
}));

Expand Down Expand Up @@ -500,3 +636,98 @@ impl<S: StateStore> LogReader for KvLogStoreReader<S> {
Ok((true, Some((**self.serde.vnodes()).clone())))
}
}

#[cfg(test)]
mod tests {
use std::ops::Bound::Unbounded;

use bytes::Bytes;
use itertools::Itertools;
use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::key::TableKey;
use risingwave_storage::hummock::iterator::test_utils::{
iterator_test_table_key_of, iterator_test_value_of,
};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::{ReadOptions, StateStoreRead, StateStoreWrite, WriteOptions};
use risingwave_storage::StateStoreIter;

use crate::common::log_store_impl::kv_log_store::reader::AutoRebuildStateStoreReadIter;
use crate::common::log_store_impl::kv_log_store::test_utils::TEST_TABLE_ID;

#[tokio::test]
async fn test_auto_rebuild_iter() {
let state_store = MemoryStateStore::new();
let key_count = 100;
let pairs = (0..key_count)
.map(|i| {
let key = iterator_test_table_key_of(i);
let value = iterator_test_value_of(i);
(TableKey(Bytes::from(key)), StorageValue::new_put(value))
})
.collect_vec();
let epoch = test_epoch(1);
state_store
.ingest_batch(
pairs.clone(),
vec![],
WriteOptions {
epoch,
table_id: TEST_TABLE_ID,
},
)
.unwrap();

async fn validate(
mut kv_iter: impl Iterator<Item = (TableKey<Bytes>, StorageValue)>,
mut iter: impl StateStoreIter,
) {
while let Some((key, value)) = iter.try_next().await.unwrap() {
let (k, v) = kv_iter.next().unwrap();
assert_eq!(key.user_key.table_key, k.to_ref());
assert_eq!(v.user_value.as_deref(), Some(value));
}
assert!(kv_iter.next().is_none());
}

let read_options = ReadOptions {
table_id: TEST_TABLE_ID,
..Default::default()
};

let kv_iter = pairs.clone().into_iter();
let iter = state_store
.iter((Unbounded, Unbounded), epoch, read_options.clone())
.await
.unwrap();
validate(kv_iter, iter).await;

let kv_iter = pairs.clone().into_iter();
let mut count = 0;
let count_mut_ref = &mut count;
let rebuild_period = 8;
let mut rebuild_count = 0;
let rebuild_count_mut_ref = &mut rebuild_count;
let iter = AutoRebuildStateStoreReadIter::new(
state_store,
move || {
*count_mut_ref += 1;
if *count_mut_ref % rebuild_period == 0 {
*rebuild_count_mut_ref += 1;
true
} else {
false
}
},
(Unbounded, Unbounded),
epoch,
read_options,
)
.await
.unwrap();
validate(kv_iter, iter).await;
assert_eq!(count, key_count + 1); // with an extra call on the last None
assert_eq!(rebuild_count, key_count / rebuild_period);
}
}
2 changes: 1 addition & 1 deletion src/stream/src/common/log_store_impl/kv_log_store/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ impl<S: StateStoreReadIter> LogStoreRowOpStream<S> {
}
}

pub(crate) type LogStoreItemMergeStream<S: StateStoreReadIter + 'static> =
pub(crate) type LogStoreItemMergeStream<S: StateStoreReadIter> =
impl Stream<Item = LogStoreResult<(u64, KvLogStoreItem)>>;
pub(crate) fn merge_log_store_item_stream<S: StateStoreReadIter>(
iters: Vec<S>,
Expand Down

0 comments on commit 4e64389

Please sign in to comment.