Skip to content

Commit

Permalink
feat(resharding): use resharding config in flat storage resharder (#1…
Browse files Browse the repository at this point in the history
…2246)

This PR improves the way batches are handled in the background task that
splits a shard. In particular, I'm re-using the good old `batch_size`
and `batch_delay` to throttle processing.

Part of #12174
  • Loading branch information
Trisfald authored Oct 18, 2024
1 parent 2c40e53 commit 3e1d923
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 28 deletions.
99 changes: 71 additions & 28 deletions chain/chain/src/flat_storage_resharder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::sync::{Arc, Mutex};

use near_chain_configs::ReshardingHandle;
use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle};
use near_chain_primitives::Error;

use tracing::{error, info};
Expand Down Expand Up @@ -52,9 +52,14 @@ use std::fmt::{Debug, Formatter};
#[derive(Clone)]
pub struct FlatStorageResharder {
runtime: Arc<dyn RuntimeAdapter>,
/// The current active resharding event.
resharding_event: Arc<Mutex<Option<FlatStorageReshardingEventStatus>>>,
/// Sender responsible to convey requests to the dedicated resharding actor.
scheduler: Sender<FlatStorageSplitShardRequest>,
/// Controls cancellation of background processing.
pub controller: FlatStorageResharderController,
/// Configuration for resharding.
resharding_config: MutableConfigValue<ReshardingConfig>,
}

impl FlatStorageResharder {
Expand All @@ -64,13 +69,15 @@ impl FlatStorageResharder {
/// * `runtime`: runtime adapter
/// * `scheduler`: component used to schedule the background tasks
/// * `controller`: manages the execution of the background tasks
/// * `resharing_config`: configuration options
pub fn new(
runtime: Arc<dyn RuntimeAdapter>,
scheduler: Sender<FlatStorageSplitShardRequest>,
controller: FlatStorageResharderController,
resharding_config: MutableConfigValue<ReshardingConfig>,
) -> Self {
let resharding_event = Arc::new(Mutex::new(None));
Self { runtime, resharding_event, scheduler, controller }
Self { runtime, resharding_event, scheduler, controller, resharding_config }
}

/// Starts a resharding event.
Expand Down Expand Up @@ -237,10 +244,11 @@ impl FlatStorageResharder {
/// Task to perform the actual split of a flat storage shard. This may be a long operation time-wise.
///
/// Conceptually it simply copies each key-value pair from the parent shard to the correct child.
pub fn split_shard_task(&self) {
pub fn split_shard_task(&self) -> FlatStorageReshardingTaskStatus {
let task_status = self.split_shard_task_impl();
self.split_shard_task_postprocessing(task_status);
info!(target: "resharding", ?task_status, "flat storage shard split task finished");
task_status
}

/// Performs the bulk of [split_shard_task].
Expand All @@ -251,14 +259,16 @@ impl FlatStorageResharder {
return FlatStorageReshardingTaskStatus::Cancelled;
}

/// Determines after how many key-values the process stops to
/// commit changes and to check cancellation.
const BATCH_SIZE: usize = 10_000;
// Determines after how many bytes worth of key-values the process stops to commit changes
// and to check cancellation.
let batch_size = self.resharding_config.get().batch_size.as_u64() as usize;
// Delay between every batch.
let batch_delay = self.resharding_config.get().batch_delay.unsigned_abs();

let (parent_shard, status) = self
.get_parent_shard_and_status()
.expect("flat storage resharding event must be Split!");
info!(target: "resharding", ?parent_shard, ?status, "flat storage shard split task: starting key-values copy");
info!(target: "resharding", ?parent_shard, ?status, ?batch_delay, ?batch_size, "flat storage shard split task: starting key-values copy");

// Parent shard flat storage head must be on block height just before the new shard layout kicks
// in. This guarantees that all deltas have been applied and thus the state of all key-values is
Expand All @@ -269,21 +279,23 @@ impl FlatStorageResharder {
// Prepare the store object for commits and the iterator over parent's flat storage.
let flat_store = self.runtime.store().flat_store();
let mut iter = flat_store.iter(parent_shard);
let mut batches_done = 0;
let mut num_batches_done: usize = 0;
let mut iter_exhausted = false;

loop {
let _span = tracing::debug_span!(
target: "resharding",
"split_shard_task_impl/batch",
batch_id = ?batches_done)
batch_id = ?num_batches_done)
.entered();
let mut store_update = flat_store.store_update();
let mut processed_size = 0;

// Process a `BATCH_SIZE` worth of key value pairs.
let mut iter_exhausted = false;
for _ in 0..BATCH_SIZE {
// Process a `batch_size` worth of key value pairs.
while processed_size < batch_size && !iter_exhausted {
match iter.next() {
Some(Ok((key, value))) => {
processed_size += key.len() + value.size();
if let Err(err) =
shard_split_handle_key_value(key, value, &mut store_update, &status)
{
Expand All @@ -307,7 +319,7 @@ impl FlatStorageResharder {
return FlatStorageReshardingTaskStatus::Failed;
}

batches_done += 1;
num_batches_done += 1;

// If `iter`` is exhausted we can exit after the store commit.
if iter_exhausted {
Expand All @@ -316,8 +328,12 @@ impl FlatStorageResharder {
if self.controller.is_cancelled() {
return FlatStorageReshardingTaskStatus::Cancelled;
}

// Sleep between batches in order to throttle resharding and leave some resource for the
// regular node operation.
std::thread::sleep(batch_delay);
}
FlatStorageReshardingTaskStatus::Successful
FlatStorageReshardingTaskStatus::Successful { num_batches_done }
}

/// Performs post-processing of shard splitting after all key-values have been moved from parent to
Expand All @@ -339,7 +355,7 @@ impl FlatStorageResharder {

let mut store_update = flat_store.store_update();
match task_status {
FlatStorageReshardingTaskStatus::Successful => {
FlatStorageReshardingTaskStatus::Successful { .. } => {
// Split shard completed successfully.
// Parent flat storage can be deleted from the FlatStoreManager.
// If FlatStoreManager has no reference to the shard, delete it manually.
Expand Down Expand Up @@ -527,7 +543,7 @@ pub enum FlatStorageReshardingEventStatus {
/// Status of a flat storage resharding task.
#[derive(Clone, Debug, Copy, Eq, PartialEq)]
pub enum FlatStorageReshardingTaskStatus {
Successful,
Successful { num_batches_done: usize },
Failed,
Cancelled,
}
Expand Down Expand Up @@ -608,14 +624,13 @@ mod tests {

#[derive(Default)]
struct DelayedScheduler {
test_scheduler: TestScheduler,
split_shard_request: Mutex<Option<FlatStorageSplitShardRequest>>,
}

impl DelayedScheduler {
fn call_split_shard_task(&self) {
fn call_split_shard_task(&self) -> FlatStorageReshardingTaskStatus {
let msg_guard = self.split_shard_request.lock().unwrap();
self.test_scheduler.send(msg_guard.clone().unwrap());
msg_guard.as_ref().unwrap().resharder.split_shard_task()
}
}

Expand Down Expand Up @@ -826,13 +841,13 @@ mod tests {
#[test]
fn simple_split_shard() {
init_test_logger();
// Perform resharding.
let sender = TestScheduler::default().into_multi_sender();
let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender);
let new_shard_layout = shard_layout_after_split();
let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout);

assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok());
// Perform resharding.
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok());

// Check flat storages of children contain the correct accounts and access keys.
let left_child = ShardUId { version: 3, shard_id: 2 };
Expand Down Expand Up @@ -883,17 +898,45 @@ mod tests {
);
}

/// Split shard task should run in batches.
#[test]
fn cancel_split_shard() {
fn split_shard_batching() {
init_test_logger();
let scheduler = Arc::new(DelayedScheduler::default());
let (chain, resharder) =
create_chain_and_resharder(simple_shard_layout(), scheduler.as_multi_sender());
let new_shard_layout = shard_layout_after_split();
let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout);

// Tweak the resharding config to make smaller batches.
let mut config = resharder.resharding_config.get();
config.batch_size = bytesize::ByteSize(1);
resharder.resharding_config.update(config);

// Perform resharding.
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok());

// Check that more than one batch has been processed.
let FlatStorageReshardingTaskStatus::Successful { num_batches_done } =
scheduler.call_split_shard_task()
else {
assert!(false);
return;
};
assert!(num_batches_done > 1);
}

#[test]
fn cancel_split_shard() {
init_test_logger();
let scheduler = Arc::new(DelayedScheduler::default());
let sender = scheduler.as_multi_sender();
let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender);
let (chain, resharder) =
create_chain_and_resharder(simple_shard_layout(), scheduler.as_multi_sender());
let new_shard_layout = shard_layout_after_split();
let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout);

assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok());
// Perform resharding.
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok());
let (parent_shard, status) = resharder.get_parent_shard_and_status().unwrap();
let SplittingParentStatus { left_child_shard, right_child_shard, flat_head, .. } = status;

Expand Down Expand Up @@ -1071,7 +1114,7 @@ mod tests {
store_update.commit().unwrap();

// Do resharding.
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok());
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok());

// Check that flat storages of both children contain the delayed receipt.
for child_shard in [left_child_shard, right_child_shard] {
Expand Down Expand Up @@ -1135,7 +1178,7 @@ mod tests {
store_update.commit().unwrap();

// Do resharding.
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok());
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok());

// Check that flat storages of both children contain the promise yield.
for child_shard in [left_child_shard, right_child_shard] {
Expand Down Expand Up @@ -1192,7 +1235,7 @@ mod tests {
store_update.commit().unwrap();

// Do resharding.
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok());
assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok());

// Check that only the first child contain the buffered receipt.
assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/resharding/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl ReshardingManager {
runtime_adapter,
resharding_sender.into_sender(),
FlatStorageResharderController::from_resharding_handle(resharding_handle.clone()),
resharding_config.clone(),
);
Self { store, epoch_manager, resharding_config, flat_storage_resharder, resharding_handle }
}
Expand Down
7 changes: 7 additions & 0 deletions core/primitives/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,11 @@ impl FlatStateValue {
Self::Inlined(value) => value.len(),
}
}

pub fn size(&self) -> usize {
match self {
Self::Ref(_) => size_of::<Self>(),
Self::Inlined(value) => size_of::<Self>() + value.capacity(),
}
}
}

0 comments on commit 3e1d923

Please sign in to comment.