Skip to content

Commit

Permalink
feat(resharding): flat storage resharding children catchup (#12312)
Browse files Browse the repository at this point in the history
PR to add children catchup step for flat storages created as a result of
a parent shard split.

In previous iterations, the two children shards were populated in a
background task from the flat storage of the parent at height `last
block of old shard layout` (post-processing).

Since the task mentioned above takes a long time and the children are
active shards in the `first block of the new shard layout` their flat
storage accumulates a lot of deltas.

The catchup step applies delta in the background, then finalizes
creation of child flat storage, and triggers a possible memtrie rebuild.

Part of #12174
  • Loading branch information
Trisfald authored Nov 4, 2024
1 parent c83205b commit cf6d2ef
Show file tree
Hide file tree
Showing 10 changed files with 696 additions and 123 deletions.
663 changes: 579 additions & 84 deletions chain/chain/src/flat_storage_resharder.rs

Large diffs are not rendered by default.

28 changes: 10 additions & 18 deletions chain/chain/src/resharding/event_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ pub struct ReshardingSplitShardParams {
/// The account at the boundary between the two children.
pub boundary_account: AccountId,
/// Hash of the last block having the old shard layout.
pub block_hash: CryptoHash,
/// The block before `block_hash`.
pub prev_block_hash: CryptoHash,
pub resharding_hash: CryptoHash,
}

impl ReshardingSplitShardParams {
Expand All @@ -44,15 +42,13 @@ impl ReshardingEventType {
///
/// # Args:
/// * `next_shard_layout`: the new shard layout
/// * `block_hash`: hash of the last block with the shard layout before `next_shard_layout`
/// * `prev_block_hash`: hash of the block preceding `block_hash`
/// * `resharding_hash`: hash of the last block with the shard layout before `next_shard_layout`
///
/// Returns a [ReshardingEventType] if exactly one resharding change is contained in
/// `next_shard_layout`, otherwise returns `None`.
pub fn from_shard_layout(
next_shard_layout: &ShardLayout,
block_hash: CryptoHash,
prev_block_hash: CryptoHash,
resharding_hash: CryptoHash,
) -> Result<Option<ReshardingEventType>, Error> {
let log_and_error = |err_msg: &str| {
error!(target: "resharding", ?next_shard_layout, err_msg);
Expand Down Expand Up @@ -103,8 +99,7 @@ impl ReshardingEventType {
left_child_shard,
right_child_shard,
boundary_account,
block_hash,
prev_block_hash,
resharding_hash,
}));
}
_ => {
Expand Down Expand Up @@ -139,7 +134,6 @@ mod tests {
#[test]
fn parse_event_type_from_shard_layout() {
let block = CryptoHash::hash_bytes(&[1]);
let prev_block = CryptoHash::hash_bytes(&[2]);

let s0 = ShardId::new(0);
let s1 = ShardId::new(1);
Expand All @@ -152,13 +146,13 @@ mod tests {
#[allow(deprecated)]
let layout_v0 = ShardLayout::v0(1, 0);
let layout_v1 = ShardLayout::v1_test();
assert!(ReshardingEventType::from_shard_layout(&layout_v0, block, prev_block).is_err());
assert!(ReshardingEventType::from_shard_layout(&layout_v1, block, prev_block).is_err());
assert!(ReshardingEventType::from_shard_layout(&layout_v0, block).is_err());
assert!(ReshardingEventType::from_shard_layout(&layout_v1, block).is_err());

// No resharding is ok.
let shards_split_map = BTreeMap::from([(s0, vec![s0])]);
let layout = ShardLayout::v2(vec![], vec![s0], Some(shards_split_map));
assert!(ReshardingEventType::from_shard_layout(&layout, block, prev_block)
assert!(ReshardingEventType::from_shard_layout(&layout, block)
.is_ok_and(|event| event.is_none()));

// Single split shard is ok.
Expand All @@ -169,16 +163,14 @@ mod tests {
Some(shards_split_map),
);

let event_type =
ReshardingEventType::from_shard_layout(&layout, block, prev_block).unwrap();
let event_type = ReshardingEventType::from_shard_layout(&layout, block).unwrap();
assert_eq!(
event_type,
Some(ReshardingEventType::SplitShard(ReshardingSplitShardParams {
parent_shard: ShardUId { version: 3, shard_id: 1 },
left_child_shard: ShardUId { version: 3, shard_id: 2 },
right_child_shard: ShardUId { version: 3, shard_id: 3 },
block_hash: block,
prev_block_hash: prev_block,
resharding_hash: block,
boundary_account: account!("pp")
}))
);
Expand All @@ -190,6 +182,6 @@ mod tests {
vec![s2, s3, s4, s5],
Some(shards_split_map),
);
assert!(ReshardingEventType::from_shard_layout(&layout, block, prev_block).is_err());
assert!(ReshardingEventType::from_shard_layout(&layout, block).is_err());
}
}
5 changes: 2 additions & 3 deletions chain/chain/src/resharding/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use super::event_type::{ReshardingEventType, ReshardingSplitShardParams};
use super::types::ReshardingSender;
use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageResharderController};
use crate::types::RuntimeAdapter;
use near_async::messaging::IntoSender;
use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle};
use near_chain_primitives::Error;
use near_epoch_manager::EpochManagerAdapter;
Expand Down Expand Up @@ -43,7 +42,7 @@ impl ReshardingManager {
let resharding_handle = ReshardingHandle::new();
let flat_storage_resharder = FlatStorageResharder::new(
runtime_adapter,
resharding_sender.into_sender(),
resharding_sender,
FlatStorageResharderController::from_resharding_handle(resharding_handle.clone()),
resharding_config.clone(),
);
Expand Down Expand Up @@ -84,7 +83,7 @@ impl ReshardingManager {
}

let resharding_event_type =
ReshardingEventType::from_shard_layout(&next_shard_layout, *block_hash, *prev_hash)?;
ReshardingEventType::from_shard_layout(&next_shard_layout, *block_hash)?;
match resharding_event_type {
Some(ReshardingEventType::SplitShard(split_shard_event)) => {
self.split_shard(
Expand Down
81 changes: 73 additions & 8 deletions chain/chain/src/resharding/resharding_actor.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,88 @@
use super::types::FlatStorageSplitShardRequest;
use near_async::messaging::{self, Handler};
use super::types::{
FlatStorageShardCatchupRequest, FlatStorageSplitShardRequest, MemtrieReloadRequest,
};
use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageReshardingTaskStatus};
use crate::ChainStore;
use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt};
use near_async::messaging::{self, Handler, HandlerWithContext};
use near_primitives::hash::CryptoHash;
use near_primitives::types::BlockHeight;
use near_store::{ShardUId, Store};
use time::Duration;

/// Dedicated actor for resharding V3.
pub struct ReshardingActor {}
pub struct ReshardingActor {
chain_store: ChainStore,
}

impl messaging::Actor for ReshardingActor {}

impl Handler<FlatStorageSplitShardRequest> for ReshardingActor {
fn handle(&mut self, msg: FlatStorageSplitShardRequest) {
self.handle_flat_storage_split_shard_request(msg);
match msg.resharder.split_shard_task() {
FlatStorageReshardingTaskStatus::Successful { .. } => {
// All good.
}
FlatStorageReshardingTaskStatus::Failed => {
panic!("impossible to recover from a flat storage split shard failure!")
}
FlatStorageReshardingTaskStatus::Cancelled => {
// The task has been cancelled. Nothing else to do.
}
}
}
}

impl HandlerWithContext<FlatStorageShardCatchupRequest> for ReshardingActor {
fn handle(
&mut self,
msg: FlatStorageShardCatchupRequest,
ctx: &mut dyn DelayedActionRunner<Self>,
) {
// Shard catchup task is delayed and could get postponed several times. This must be
// done to cover the scenario in which catchup is triggered so fast that the initial
// state of the new flat storage is beyond the chain final tip.
ctx.run_later(
"ReshardingActor FlatStorageShardCatchup",
Duration::milliseconds(100),
move |act, _| {
act.handle_flat_storage_shard_catchup(
msg.resharder,
msg.shard_uid,
msg.flat_head_block_hash,
);
},
);
}
}

impl Handler<MemtrieReloadRequest> for ReshardingActor {
fn handle(&mut self, _msg: MemtrieReloadRequest) {
// TODO
}
}

impl ReshardingActor {
pub fn new() -> Self {
Self {}
pub fn new(store: Store, genesis_height: BlockHeight) -> Self {
Self { chain_store: ChainStore::new(store, genesis_height, false) }
}

pub fn handle_flat_storage_split_shard_request(&mut self, msg: FlatStorageSplitShardRequest) {
msg.resharder.split_shard_task();
fn handle_flat_storage_shard_catchup(
&self,
resharder: FlatStorageResharder,
shard_uid: ShardUId,
flat_head_block_hash: CryptoHash,
) {
match resharder.shard_catchup_task(shard_uid, flat_head_block_hash, &self.chain_store) {
FlatStorageReshardingTaskStatus::Successful { .. } => {
// All good.
}
FlatStorageReshardingTaskStatus::Failed => {
panic!("impossible to recover from a flat storage shard catchup failure!")
}
FlatStorageReshardingTaskStatus::Cancelled => {
// The task has been cancelled. Nothing else to do.
}
}
}
}
23 changes: 22 additions & 1 deletion chain/chain/src/resharding/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::flat_storage_resharder::FlatStorageResharder;
use near_async::messaging::Sender;
use near_primitives::hash::CryptoHash;
use near_store::ShardUId;

/// Represents a request to start the split of a parent shard flat storage into two children flat
/// storages.
Expand All @@ -9,11 +11,30 @@ pub struct FlatStorageSplitShardRequest {
pub resharder: FlatStorageResharder,
}

/// Represents a request to start the catchup phase of a flat storage child shard.
#[derive(actix::Message, Clone, Debug)]
#[rtype(result = "()")]
pub struct FlatStorageShardCatchupRequest {
pub resharder: FlatStorageResharder,
pub shard_uid: ShardUId,
pub flat_head_block_hash: CryptoHash,
}

/// Represents a request to reload a Mem Trie for a shard after its Flat Storage resharding is
/// finished.
#[derive(actix::Message, Clone, Debug)]
#[rtype(result = "()")]
pub struct MemtrieReloadRequest {
pub shard_uid: ShardUId,
}

/// A multi-sender for the FlatStorageResharder post processing API.
///
/// This is meant to be used to send messages to handle the post processing tasks needed for
/// resharding the flat storage. An example is splitting a shard.
#[derive(Clone, near_async::MultiSend, near_async::MultiSenderFrom)]
pub struct ReshardingSender {
pub flat_storage_split_shard_send: Sender<FlatStorageSplitShardRequest>,
pub flat_storage_split_shard_sender: Sender<FlatStorageSplitShardRequest>,
pub flat_storage_shard_catchup_sender: Sender<FlatStorageShardCatchupRequest>,
pub memtrie_reload_sender: Sender<MemtrieReloadRequest>,
}
3 changes: 2 additions & 1 deletion chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ pub fn setup(
));
let partial_witness_adapter = partial_witness_addr.with_auto_span_context();

let (resharding_sender_addr, _) = spawn_actix_actor(ReshardingActor::new());
let (resharding_sender_addr, _) =
spawn_actix_actor(ReshardingActor::new(store.clone(), chain_genesis.height));
let resharding_sender = resharding_sender_addr.with_auto_span_context();

let shards_manager_adapter_for_client = LateBoundSender::new();
Expand Down
4 changes: 1 addition & 3 deletions core/store/src/flat/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,7 @@ pub struct SplittingParentStatus {
/// The new shard layout.
pub shard_layout: ShardLayout,
/// Hash of the last block having the old shard layout.
pub block_hash: CryptoHash,
/// The block before `block_hash`.
pub prev_block_hash: CryptoHash,
pub resharding_hash: CryptoHash,
/// Parent's flat head state when the split began.
pub flat_head: BlockInfo,
}
Expand Down
3 changes: 2 additions & 1 deletion integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,8 @@ impl TestLoopBuilder {
// We don't send messages to `GCActor` so adapter is not needed.
self.test_loop.register_actor_for_index(idx, gc_actor, None);

let resharding_actor = ReshardingActor::new();
let resharding_actor =
ReshardingActor::new(runtime_adapter.store().clone(), chain_genesis.height);

let future_spawner = self.test_loop.future_spawner();
let state_sync_dumper = StateSyncDumper {
Expand Down
3 changes: 2 additions & 1 deletion nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ pub fn start_with_config_and_synchronization(
config.client_config.archive,
));

let (resharding_sender_addr, _) = spawn_actix_actor(ReshardingActor::new());
let (resharding_sender_addr, _) =
spawn_actix_actor(ReshardingActor::new(runtime.store().clone(), chain_genesis.height));
let resharding_sender = resharding_sender_addr.with_auto_span_context();
let state_sync_runtime =
Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap());
Expand Down
6 changes: 3 additions & 3 deletions tools/protocol-schema-check/res/protocol_schema.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ FlatStateDeltaMetadata = 3401366797
FlatStateValue = 83834662
FlatStorageCreationStatus = 3717607657
FlatStorageReadyStatus = 677315221
FlatStorageReshardingStatus = 3937221189
FlatStorageStatus = 169572134
FlatStorageReshardingStatus = 743000213
FlatStorageStatus = 271501637
FunctionCallAction = 2405840012
FunctionCallError = 3652274053
FunctionCallPermission = 1517509673
Expand Down Expand Up @@ -243,7 +243,7 @@ SignedTransaction = 3898692301
SlashState = 3264273950
SlashedValidator = 2601657743
SnapshotHostInfo = 2890323952
SplittingParentStatus = 2820724267
SplittingParentStatus = 1188425274
StakeAction = 2002027105
StateChangeCause = 3890585134
StateHeaderKey = 1666317019
Expand Down

0 comments on commit cf6d2ef

Please sign in to comment.