From 3e1d9232b99f76ada21d314f04f1c127223c389f Mon Sep 17 00:00:00 2001 From: Andrea Date: Fri, 18 Oct 2024 12:05:52 +0200 Subject: [PATCH] feat(resharding): use resharding config in flat storage resharder (#12246) This PR improves the way batches are handled in the background task that splits a shard. In particular, I'm re-using the good old `batch_size` and `batch_delay` to throttle processing. Part of #12174 --- chain/chain/src/flat_storage_resharder.rs | 99 ++++++++++++++++------- chain/chain/src/resharding/manager.rs | 1 + core/primitives/src/state.rs | 7 ++ 3 files changed, 79 insertions(+), 28 deletions(-) diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs index bf27f2df98a..616c80b1ee9 100644 --- a/chain/chain/src/flat_storage_resharder.rs +++ b/chain/chain/src/flat_storage_resharder.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex}; -use near_chain_configs::ReshardingHandle; +use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle}; use near_chain_primitives::Error; use tracing::{error, info}; @@ -52,9 +52,14 @@ use std::fmt::{Debug, Formatter}; #[derive(Clone)] pub struct FlatStorageResharder { runtime: Arc, + /// The current active resharding event. resharding_event: Arc>>, + /// Sender responsible to convey requests to the dedicated resharding actor. scheduler: Sender, + /// Controls cancellation of background processing. pub controller: FlatStorageResharderController, + /// Configuration for resharding. + resharding_config: MutableConfigValue, } impl FlatStorageResharder { @@ -64,13 +69,15 @@ impl FlatStorageResharder { /// * `runtime`: runtime adapter /// * `scheduler`: component used to schedule the background tasks /// * `controller`: manages the execution of the background tasks + /// * `resharing_config`: configuration options pub fn new( runtime: Arc, scheduler: Sender, controller: FlatStorageResharderController, + resharding_config: MutableConfigValue, ) -> Self { let resharding_event = Arc::new(Mutex::new(None)); - Self { runtime, resharding_event, scheduler, controller } + Self { runtime, resharding_event, scheduler, controller, resharding_config } } /// Starts a resharding event. @@ -237,10 +244,11 @@ impl FlatStorageResharder { /// Task to perform the actual split of a flat storage shard. This may be a long operation time-wise. /// /// Conceptually it simply copies each key-value pair from the parent shard to the correct child. - pub fn split_shard_task(&self) { + pub fn split_shard_task(&self) -> FlatStorageReshardingTaskStatus { let task_status = self.split_shard_task_impl(); self.split_shard_task_postprocessing(task_status); info!(target: "resharding", ?task_status, "flat storage shard split task finished"); + task_status } /// Performs the bulk of [split_shard_task]. @@ -251,14 +259,16 @@ impl FlatStorageResharder { return FlatStorageReshardingTaskStatus::Cancelled; } - /// Determines after how many key-values the process stops to - /// commit changes and to check cancellation. - const BATCH_SIZE: usize = 10_000; + // Determines after how many bytes worth of key-values the process stops to commit changes + // and to check cancellation. + let batch_size = self.resharding_config.get().batch_size.as_u64() as usize; + // Delay between every batch. + let batch_delay = self.resharding_config.get().batch_delay.unsigned_abs(); let (parent_shard, status) = self .get_parent_shard_and_status() .expect("flat storage resharding event must be Split!"); - info!(target: "resharding", ?parent_shard, ?status, "flat storage shard split task: starting key-values copy"); + info!(target: "resharding", ?parent_shard, ?status, ?batch_delay, ?batch_size, "flat storage shard split task: starting key-values copy"); // Parent shard flat storage head must be on block height just before the new shard layout kicks // in. This guarantees that all deltas have been applied and thus the state of all key-values is @@ -269,21 +279,23 @@ impl FlatStorageResharder { // Prepare the store object for commits and the iterator over parent's flat storage. let flat_store = self.runtime.store().flat_store(); let mut iter = flat_store.iter(parent_shard); - let mut batches_done = 0; + let mut num_batches_done: usize = 0; + let mut iter_exhausted = false; loop { let _span = tracing::debug_span!( target: "resharding", "split_shard_task_impl/batch", - batch_id = ?batches_done) + batch_id = ?num_batches_done) .entered(); let mut store_update = flat_store.store_update(); + let mut processed_size = 0; - // Process a `BATCH_SIZE` worth of key value pairs. - let mut iter_exhausted = false; - for _ in 0..BATCH_SIZE { + // Process a `batch_size` worth of key value pairs. + while processed_size < batch_size && !iter_exhausted { match iter.next() { Some(Ok((key, value))) => { + processed_size += key.len() + value.size(); if let Err(err) = shard_split_handle_key_value(key, value, &mut store_update, &status) { @@ -307,7 +319,7 @@ impl FlatStorageResharder { return FlatStorageReshardingTaskStatus::Failed; } - batches_done += 1; + num_batches_done += 1; // If `iter`` is exhausted we can exit after the store commit. if iter_exhausted { @@ -316,8 +328,12 @@ impl FlatStorageResharder { if self.controller.is_cancelled() { return FlatStorageReshardingTaskStatus::Cancelled; } + + // Sleep between batches in order to throttle resharding and leave some resource for the + // regular node operation. + std::thread::sleep(batch_delay); } - FlatStorageReshardingTaskStatus::Successful + FlatStorageReshardingTaskStatus::Successful { num_batches_done } } /// Performs post-processing of shard splitting after all key-values have been moved from parent to @@ -339,7 +355,7 @@ impl FlatStorageResharder { let mut store_update = flat_store.store_update(); match task_status { - FlatStorageReshardingTaskStatus::Successful => { + FlatStorageReshardingTaskStatus::Successful { .. } => { // Split shard completed successfully. // Parent flat storage can be deleted from the FlatStoreManager. // If FlatStoreManager has no reference to the shard, delete it manually. @@ -527,7 +543,7 @@ pub enum FlatStorageReshardingEventStatus { /// Status of a flat storage resharding task. #[derive(Clone, Debug, Copy, Eq, PartialEq)] pub enum FlatStorageReshardingTaskStatus { - Successful, + Successful { num_batches_done: usize }, Failed, Cancelled, } @@ -608,14 +624,13 @@ mod tests { #[derive(Default)] struct DelayedScheduler { - test_scheduler: TestScheduler, split_shard_request: Mutex>, } impl DelayedScheduler { - fn call_split_shard_task(&self) { + fn call_split_shard_task(&self) -> FlatStorageReshardingTaskStatus { let msg_guard = self.split_shard_request.lock().unwrap(); - self.test_scheduler.send(msg_guard.clone().unwrap()); + msg_guard.as_ref().unwrap().resharder.split_shard_task() } } @@ -826,13 +841,13 @@ mod tests { #[test] fn simple_split_shard() { init_test_logger(); - // Perform resharding. let sender = TestScheduler::default().into_multi_sender(); let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); - assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok()); + // Perform resharding. + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); // Check flat storages of children contain the correct accounts and access keys. let left_child = ShardUId { version: 3, shard_id: 2 }; @@ -883,17 +898,45 @@ mod tests { ); } + /// Split shard task should run in batches. #[test] - fn cancel_split_shard() { + fn split_shard_batching() { init_test_logger(); + let scheduler = Arc::new(DelayedScheduler::default()); + let (chain, resharder) = + create_chain_and_resharder(simple_shard_layout(), scheduler.as_multi_sender()); + let new_shard_layout = shard_layout_after_split(); + let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); + + // Tweak the resharding config to make smaller batches. + let mut config = resharder.resharding_config.get(); + config.batch_size = bytesize::ByteSize(1); + resharder.resharding_config.update(config); + // Perform resharding. + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); + + // Check that more than one batch has been processed. + let FlatStorageReshardingTaskStatus::Successful { num_batches_done } = + scheduler.call_split_shard_task() + else { + assert!(false); + return; + }; + assert!(num_batches_done > 1); + } + + #[test] + fn cancel_split_shard() { + init_test_logger(); let scheduler = Arc::new(DelayedScheduler::default()); - let sender = scheduler.as_multi_sender(); - let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); + let (chain, resharder) = + create_chain_and_resharder(simple_shard_layout(), scheduler.as_multi_sender()); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); - assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok()); + // Perform resharding. + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); let (parent_shard, status) = resharder.get_parent_shard_and_status().unwrap(); let SplittingParentStatus { left_child_shard, right_child_shard, flat_head, .. } = status; @@ -1071,7 +1114,7 @@ mod tests { store_update.commit().unwrap(); // Do resharding. - assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); // Check that flat storages of both children contain the delayed receipt. for child_shard in [left_child_shard, right_child_shard] { @@ -1135,7 +1178,7 @@ mod tests { store_update.commit().unwrap(); // Do resharding. - assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); // Check that flat storages of both children contain the promise yield. for child_shard in [left_child_shard, right_child_shard] { @@ -1192,7 +1235,7 @@ mod tests { store_update.commit().unwrap(); // Do resharding. - assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); // Check that only the first child contain the buffered receipt. assert_eq!( diff --git a/chain/chain/src/resharding/manager.rs b/chain/chain/src/resharding/manager.rs index edccd82d88a..01ca5ac6112 100644 --- a/chain/chain/src/resharding/manager.rs +++ b/chain/chain/src/resharding/manager.rs @@ -44,6 +44,7 @@ impl ReshardingManager { runtime_adapter, resharding_sender.into_sender(), FlatStorageResharderController::from_resharding_handle(resharding_handle.clone()), + resharding_config.clone(), ); Self { store, epoch_manager, resharding_config, flat_storage_resharder, resharding_handle } } diff --git a/core/primitives/src/state.rs b/core/primitives/src/state.rs index dcfd030e07e..8c7da64bb84 100644 --- a/core/primitives/src/state.rs +++ b/core/primitives/src/state.rs @@ -102,4 +102,11 @@ impl FlatStateValue { Self::Inlined(value) => value.len(), } } + + pub fn size(&self) -> usize { + match self { + Self::Ref(_) => size_of::(), + Self::Inlined(value) => size_of::() + value.capacity(), + } + } }