Skip to content

Commit

Permalink
refactor(storage): pass table ids to state store sync (#17332)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jun 26, 2024
1 parent 994e48f commit 8d7a2ac
Show file tree
Hide file tree
Showing 25 changed files with 261 additions and 157 deletions.
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ message InjectBarrierRequest {
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_send = 3;
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
}

message BarrierCompleteResponse {
Expand Down
6 changes: 6 additions & 0 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ impl ControlStreamManager {
barrier: Some(barrier),
actor_ids_to_send,
actor_ids_to_collect,
table_ids_to_sync: command_context
.info
.existing_table_ids()
.iter()
.map(|table_id| table_id.table_id)
.collect(),
},
),
),
Expand Down
5 changes: 3 additions & 2 deletions src/storage/hummock_test/src/bin/replay/replay_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::ops::Bound;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::catalog::TableId;
use risingwave_common::util::addr::HostAddr;
use risingwave_common_service::observer_manager::{Channel, NotificationClient, ObserverError};
use risingwave_hummock_sdk::key::TableKey;
Expand Down Expand Up @@ -137,10 +138,10 @@ impl ReplayRead for GlobalReplayImpl {

#[async_trait::async_trait]
impl ReplayStateStore for GlobalReplayImpl {
async fn sync(&self, id: u64) -> Result<usize> {
async fn sync(&self, id: u64, table_ids: Vec<u32>) -> Result<usize> {
let result: SyncResult = self
.store
.sync(id)
.sync(id, table_ids.into_iter().map(TableId::new).collect())
.await
.map_err(|e| TraceError::SyncFailed(format!("{e}")))?;
Ok(result.sync_size)
Expand Down
4 changes: 4 additions & 0 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,10 @@ pub(crate) mod tests {
)
.await;

global_storage
.wait_version(hummock_manager_ref.get_current_version().await)
.await;

let vnode = VirtualNode::from_index(1);
for index in 0..kv_count {
epoch.inc_epoch();
Expand Down
13 changes: 6 additions & 7 deletions src/storage/hummock_test/src/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@ use risingwave_hummock_sdk::key::prefixed_range_with_vnode;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_meta::hummock::MockHummockMetaClient;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::CachePolicy;
use risingwave_storage::hummock::{CachePolicy, HummockStorage};
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::{
LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, SealCurrentEpochOptions,
WriteOptions,
StateStoreRead, WriteOptions,
};
use risingwave_storage::StateStore;

use crate::local_state_store_test_utils::LocalStateStoreTestExt;
use crate::test_utils::{
gen_key_from_bytes, with_hummock_storage_v2, HummockStateStoreTestTrait, TestIngestBatch,
};
use crate::test_utils::{gen_key_from_bytes, with_hummock_storage_v2, TestIngestBatch};

macro_rules! assert_count_range_scan {
($storage:expr, $vnode:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{
Expand Down Expand Up @@ -104,7 +103,7 @@ macro_rules! assert_count_backward_range_scan {
}

async fn test_snapshot_inner(
hummock_storage: impl HummockStateStoreTestTrait,
hummock_storage: HummockStorage,
mock_hummock_meta_client: Arc<MockHummockMetaClient>,
enable_sync: bool,
enable_commit: bool,
Expand Down Expand Up @@ -235,7 +234,7 @@ async fn test_snapshot_inner(
}

async fn test_snapshot_range_scan_inner(
hummock_storage: impl HummockStateStoreTestTrait,
hummock_storage: HummockStorage,
mock_hummock_meta_client: Arc<MockHummockMetaClient>,
enable_sync: bool,
enable_commit: bool,
Expand Down
28 changes: 8 additions & 20 deletions src/storage/hummock_test/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use risingwave_common_service::observer_manager::ObserverManager;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::TableKey;
pub use risingwave_hummock_sdk::key::{gen_key_from_bytes, gen_key_from_str};
#[cfg(test)]
use risingwave_hummock_sdk::SyncResult;
use risingwave_meta::hummock::test_utils::{
register_table_ids_to_compaction_group, setup_compute_env,
};
Expand Down Expand Up @@ -114,24 +112,6 @@ impl<S: LocalStateStore> TestIngestBatch for S {
}
}

#[cfg(test)]
#[async_trait::async_trait]
pub(crate) trait HummockStateStoreTestTrait: StateStore {
#[allow(dead_code)]
fn get_pinned_version(&self) -> PinnedVersion;
async fn seal_and_sync_epoch(&self, epoch: u64) -> StorageResult<SyncResult> {
self.seal_epoch(epoch, true);
self.sync(epoch).await
}
}

#[cfg(test)]
impl HummockStateStoreTestTrait for HummockStorage {
fn get_pinned_version(&self) -> PinnedVersion {
self.get_pinned_version()
}
}

pub async fn with_hummock_storage_v2(
table_id: TableId,
) -> (HummockStorage, Arc<MockHummockMetaClient>) {
Expand Down Expand Up @@ -234,13 +214,20 @@ pub struct HummockTestEnv {
}

impl HummockTestEnv {
async fn wait_version_sync(&self) {
self.storage
.wait_version(self.manager.get_current_version().await)
.await
}

pub async fn register_table_id(&self, table_id: TableId) {
register_tables_with_id_for_test(
self.storage.filter_key_extractor_manager(),
&self.manager,
&[table_id.table_id()],
)
.await;
self.wait_version_sync().await;
}

pub async fn register_table(&self, table: PbTable) {
Expand All @@ -250,6 +237,7 @@ impl HummockTestEnv {
&[table],
)
.await;
self.wait_version_sync().await;
}

// Seal, sync and commit a epoch.
Expand Down
16 changes: 14 additions & 2 deletions src/storage/hummock_trace/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::env;
use std::fs::{create_dir_all, OpenOptions};
use std::io::BufWriter;
Expand All @@ -23,6 +24,7 @@ use std::sync::LazyLock;
use bincode::{Decode, Encode};
use bytes::Bytes;
use parking_lot::Mutex;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_pb::meta::SubscribeResponse;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -286,8 +288,18 @@ impl TraceSpan {
)
}

pub fn new_sync_span(epoch: u64, storage_type: StorageType) -> MayTraceSpan {
Self::new_global_op(Operation::Sync(epoch), storage_type)
pub fn new_sync_span(
epoch: u64,
table_ids: &HashSet<TableId>,
storage_type: StorageType,
) -> MayTraceSpan {
Self::new_global_op(
Operation::Sync(
epoch,
table_ids.iter().map(|table_id| table_id.table_id).collect(),
),
storage_type,
)
}

pub fn new_seal_span(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_trace/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub enum Operation {
IterNext(RecordId),

/// Sync operation of Hummock.
Sync(u64),
Sync(u64, Vec<u32>),

/// Seal operation of Hummock.
Seal(u64, bool),
Expand Down
4 changes: 2 additions & 2 deletions src/storage/hummock_trace/src/replay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub trait ReplayWrite {
#[cfg_attr(test, automock)]
#[async_trait::async_trait]
pub trait ReplayStateStore {
async fn sync(&self, id: u64) -> Result<usize>;
async fn sync(&self, id: u64, table_ids: Vec<u32>) -> Result<usize>;
fn seal_epoch(&self, epoch_id: u64, is_checkpoint: bool);
async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64>;
async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
Expand Down Expand Up @@ -146,7 +146,7 @@ mock! {
}
#[async_trait::async_trait]
impl ReplayStateStore for GlobalReplayInterface{
async fn sync(&self, id: u64) -> Result<usize>;
async fn sync(&self, id: u64, table_ids: Vec<u32>) -> Result<usize>;
fn seal_epoch(&self, epoch_id: u64, is_checkpoint: bool);
async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64,
) -> Result<u64>;
Expand Down
6 changes: 3 additions & 3 deletions src/storage/hummock_trace/src/replay/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ mod tests {
let mut non_local: Vec<Result<Record>> = vec![
(12, Operation::Seal(seal_id, seal_checkpoint)),
(12, Operation::Finish),
(13, Operation::Sync(sync_id)),
(13, Operation::Sync(sync_id, vec![1, 2, 3])),
(
13,
Operation::Result(OperationResult::Sync(TraceResult::Ok(0))),
Expand Down Expand Up @@ -247,9 +247,9 @@ mod tests {

mock_replay
.expect_sync()
.with(predicate::eq(sync_id))
.with(predicate::eq(sync_id), predicate::eq(vec![1, 2, 3]))
.times(1)
.returning(|_| Ok(0));
.returning(|_, _| Ok(0));

mock_replay
.expect_seal_epoch()
Expand Down
4 changes: 2 additions & 2 deletions src/storage/hummock_trace/src/replay/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ impl ReplayWorker {
panic!("expect iter result, but got {:?}", res);
}
}
Operation::Sync(epoch_id) => {
Operation::Sync(epoch_id, table_ids) => {
assert_eq!(storage_type, StorageType::Global);
let sync_result = replay.sync(epoch_id).await.unwrap();
let sync_result = replay.sync(epoch_id, table_ids).await.unwrap();
let res = res_rx.recv().await.expect("recv result failed");
if let OperationResult::Sync(expected) = res {
assert_eq!(TraceResult::Ok(sync_result), expected, "sync failed");
Expand Down
11 changes: 8 additions & 3 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::pin::pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
Expand Down Expand Up @@ -466,14 +466,15 @@ impl HummockEventHandler {
&mut self,
new_sync_epoch: HummockEpoch,
sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
table_ids: HashSet<TableId>,
) {
debug!(
"awaiting for epoch to be synced: {}, max_synced_epoch: {}",
new_sync_epoch,
self.uploader.max_synced_epoch()
);
self.uploader
.start_sync_epoch(new_sync_epoch, sync_result_sender);
.start_sync_epoch(new_sync_epoch, sync_result_sender, table_ids);
}

async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, prev_epoch: u64) {
Expand Down Expand Up @@ -729,8 +730,9 @@ impl HummockEventHandler {
HummockEvent::SyncEpoch {
new_sync_epoch,
sync_result_sender,
table_ids,
} => {
self.handle_sync_epoch(new_sync_epoch, sync_result_sender);
self.handle_sync_epoch(new_sync_epoch, sync_result_sender, table_ids);
}
HummockEvent::Clear(_, _) => {
unreachable!("clear is handled in separated async context")
Expand Down Expand Up @@ -917,6 +919,7 @@ impl SyncedData {

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::future::{poll_fn, Future};
use std::sync::Arc;
use std::task::Poll;
Expand Down Expand Up @@ -1188,12 +1191,14 @@ mod tests {
send_event(HummockEvent::SyncEpoch {
new_sync_epoch: epoch1,
sync_result_sender: tx1,
table_ids: HashSet::from_iter([TEST_TABLE_ID]),
});
assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
let (tx2, mut rx2) = oneshot::channel();
send_event(HummockEvent::SyncEpoch {
new_sync_epoch: epoch2,
sync_result_sender: tx2,
table_ids: HashSet::from_iter([TEST_TABLE_ID]),
});
assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);

Expand Down
6 changes: 4 additions & 2 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use parking_lot::{RwLock, RwLockReadGuard};
Expand Down Expand Up @@ -61,6 +61,7 @@ pub enum HummockEvent {
SyncEpoch {
new_sync_epoch: HummockEpoch,
sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
table_ids: HashSet<TableId>,
},

/// Clear shared buffer and reset all states
Expand Down Expand Up @@ -109,7 +110,8 @@ impl HummockEvent {
HummockEvent::SyncEpoch {
new_sync_epoch,
sync_result_sender: _,
} => format!("AwaitSyncEpoch epoch {} ", new_sync_epoch),
table_ids,
} => format!("AwaitSyncEpoch epoch {} {:?}", new_sync_epoch, table_ids),

HummockEvent::Clear(_, prev_epoch) => format!("Clear {:?}", prev_epoch),

Expand Down
Loading

0 comments on commit 8d7a2ac

Please sign in to comment.