Skip to content

Commit

Permalink
fix(resharding): wait until child flat storages are split to take sna…
Browse files Browse the repository at this point in the history
…pshots (#12589)

`test_resharding_v3_shard_shuffling_slower_post_processing_tasks`
exposes a bug that can be triggered if child flat storages are not split
after a resharding by the time we want to take a state snapshot. Then
the state snapshot code will fail because the flat storage is not ready,
but will not retry. To fix it, we add a `want_snapshot` field that will
be set when we decide to take a state snapshot. We also add a
`split_in_progress` field to the `FlatStorageManager` that will be set
to `true` when a resharding is started, and back to false when it's
finished and the catchup code has progressed to a height close to the
desired snapshot height. The state snapshot code will wait until
`split_in_progress` is false to proceed, and the flat storage catchup
code will wait until `want_snapshot` is cleared if it has already
advanced to the desired snapshot hash, so that we don't advance past the
point that was wanted by the state snapshot. The first one is the one
actually causing the test failure, but the second one is also required.

We implement this waiting by rescheduling the message sends in the
future. A Condvar would be a very natural choice, but it unfortunately
doesn't seem to work in testloop, since actors that are normally running
on different threads are put on the same thread, and a blocker on a
Condvar won't be woken up.

Here we are making a change to the behavior of the old
`set_flat_state_updates_mode()`, which used to refuse to proceed if the
update mode was already set to the same value. This seems to be an
artifact of the fact that when state snapshots were implemented in
#9090, this extra logic was added
because there was another user of this function
(`inline_flat_state_values()` added in
#9037), but that function has since
been deleted, so the state snapshot code is now the only user of
`set_flat_state_updates_mode()`.
  • Loading branch information
marcelo-gonzalez authored Dec 15, 2024
1 parent 0a48754 commit 8cae8cc
Show file tree
Hide file tree
Showing 18 changed files with 430 additions and 194 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ workspace = true

[dependencies]
actix.workspace = true
anyhow.workspace = true
borsh.workspace = true
bytesize.workspace = true
chrono.workspace = true
Expand Down
27 changes: 26 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3838,6 +3838,24 @@ impl Chain {
)))
}

fn min_chunk_prev_height(&self, block: &Block) -> Result<BlockHeight, Error> {
let mut ret = None;
for chunk in block.chunks().iter_raw() {
let prev_height = if chunk.prev_block_hash() == &CryptoHash::default() {
0
} else {
let prev_header = self.get_block_header(chunk.prev_block_hash())?;
prev_header.height()
};
if let Some(min_height) = ret {
ret = Some(std::cmp::min(min_height, prev_height));
} else {
ret = Some(prev_height);
}
}
Ok(ret.unwrap_or(0))
}

/// Function to create or delete a snapshot if necessary.
/// TODO: this function calls head() inside of start_process_block_impl(), consider moving this to be called right after HEAD gets updated
fn process_snapshot(&mut self) -> Result<(), Error> {
Expand All @@ -3847,14 +3865,21 @@ impl Chain {
SnapshotAction::MakeSnapshot(prev_hash) => {
let prev_block = self.get_block(&prev_hash)?;
let prev_prev_hash = prev_block.header().prev_hash();
let min_chunk_prev_height = self.min_chunk_prev_height(&prev_block)?;
let epoch_height =
self.epoch_manager.get_epoch_height_from_prev_block(prev_prev_hash)?;
let shard_layout =
&self.epoch_manager.get_shard_layout_from_prev_block(prev_prev_hash)?;
let shard_uids = shard_layout.shard_uids().enumerate().collect();

let make_snapshot_callback = &snapshot_callbacks.make_snapshot_callback;
make_snapshot_callback(*prev_prev_hash, epoch_height, shard_uids, prev_block);
make_snapshot_callback(
*prev_prev_hash,
min_chunk_prev_height,
epoch_height,
shard_uids,
prev_block,
);
}
SnapshotAction::DeleteSnapshot => {
let delete_snapshot_callback = &snapshot_callbacks.delete_snapshot_callback;
Expand Down
172 changes: 87 additions & 85 deletions chain/chain/src/flat_storage_resharder.rs

Large diffs are not rendered by default.

64 changes: 42 additions & 22 deletions chain/chain/src/resharding/resharding_actor.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use super::types::{
FlatStorageShardCatchupRequest, FlatStorageSplitShardRequest, MemtrieReloadRequest,
};
use crate::flat_storage_resharder::{
FlatStorageResharder, FlatStorageReshardingSchedulableTaskResult,
FlatStorageReshardingTaskResult,
};
use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageReshardingTaskResult};
use crate::ChainStore;
use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt};
use near_async::messaging::{self, Handler, HandlerWithContext};
use near_primitives::shard_layout::ShardUId;
use near_primitives::types::BlockHeight;
use near_store::Store;
use time::Duration;
Expand All @@ -29,20 +27,13 @@ impl HandlerWithContext<FlatStorageSplitShardRequest> for ReshardingActor {
}
}

impl Handler<FlatStorageShardCatchupRequest> for ReshardingActor {
fn handle(&mut self, msg: FlatStorageShardCatchupRequest) {
match msg.resharder.shard_catchup_task(
msg.shard_uid,
msg.flat_head_block_hash,
&self.chain_store,
) {
FlatStorageReshardingTaskResult::Successful { .. } => {
// All good.
}
FlatStorageReshardingTaskResult::Failed => {
panic!("impossible to recover from a flat storage shard catchup failure!")
}
}
impl HandlerWithContext<FlatStorageShardCatchupRequest> for ReshardingActor {
fn handle(
&mut self,
msg: FlatStorageShardCatchupRequest,
ctx: &mut dyn DelayedActionRunner<Self>,
) {
self.handle_flat_storage_catchup(msg.resharder, msg.shard_uid, ctx);
}
}

Expand All @@ -66,16 +57,16 @@ impl ReshardingActor {
// becomes final. If the resharding block is not yet final, the task will exit early with
// `Postponed` status and it must be rescheduled.
match resharder.split_shard_task(&self.chain_store) {
FlatStorageReshardingSchedulableTaskResult::Successful { .. } => {
FlatStorageReshardingTaskResult::Successful { .. } => {
// All good.
}
FlatStorageReshardingSchedulableTaskResult::Failed => {
FlatStorageReshardingTaskResult::Failed => {
panic!("impossible to recover from a flat storage split shard failure!")
}
FlatStorageReshardingSchedulableTaskResult::Cancelled => {
FlatStorageReshardingTaskResult::Cancelled => {
// The task has been cancelled. Nothing else to do.
}
FlatStorageReshardingSchedulableTaskResult::Postponed => {
FlatStorageReshardingTaskResult::Postponed => {
// The task must be retried later.
ctx.run_later(
"ReshardingActor FlatStorageSplitShard",
Expand All @@ -87,4 +78,33 @@ impl ReshardingActor {
}
}
}

fn handle_flat_storage_catchup(
&self,
resharder: FlatStorageResharder,
shard_uid: ShardUId,
ctx: &mut dyn DelayedActionRunner<Self>,
) {
match resharder.shard_catchup_task(shard_uid, &self.chain_store) {
FlatStorageReshardingTaskResult::Successful { .. } => {
// All good.
}
FlatStorageReshardingTaskResult::Failed => {
panic!("impossible to recover from a flat storage shard catchup failure!")
}
FlatStorageReshardingTaskResult::Cancelled => {
// The task has been cancelled. Nothing else to do.
}
FlatStorageReshardingTaskResult::Postponed => {
// The task must be retried later.
ctx.run_later(
"ReshardingActor FlatStorageCatchup",
Duration::milliseconds(1000),
move |act, ctx| {
act.handle_flat_storage_catchup(resharder, shard_uid, ctx);
},
);
}
}
}
}
2 changes: 0 additions & 2 deletions chain/chain/src/resharding/types.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::flat_storage_resharder::FlatStorageResharder;
use near_async::messaging::Sender;
use near_primitives::hash::CryptoHash;
use near_store::ShardUId;

/// Represents a request to start the split of a parent shard flat storage into two children flat
Expand All @@ -17,7 +16,6 @@ pub struct FlatStorageSplitShardRequest {
pub struct FlatStorageShardCatchupRequest {
pub resharder: FlatStorageResharder,
pub shard_uid: ShardUId,
pub flat_head_block_hash: CryptoHash,
}

/// Represents a request to reload a Mem Trie for a shard after its Flat Storage resharding is
Expand Down
132 changes: 103 additions & 29 deletions chain/chain/src/state_snapshot_actor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use near_async::messaging::{Actor, CanSend, Handler, Sender};
use near_async::futures::{DelayedActionRunner, DelayedActionRunnerExt};
use near_async::messaging::{Actor, CanSend, Handler, HandlerWithContext, Sender};
use near_async::time::Duration;
use near_async::{MultiSend, MultiSenderFrom};
use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest};
use near_performance_metrics_macros::perf;
use near_primitives::block::Block;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::types::{EpochHeight, ShardIndex};
use near_primitives::types::{BlockHeight, EpochHeight, ShardIndex};
use near_store::flat::FlatStorageManager;
use near_store::ShardTries;
use std::sync::Arc;
Expand Down Expand Up @@ -44,13 +46,15 @@ pub struct DeleteAndMaybeCreateSnapshotRequest {
#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct CreateSnapshotRequest {
/// prev_hash of the last processed block.
/// equal to self.block.header().prev_hash()
prev_block_hash: CryptoHash,
/// Min height of chunk.prev_block_hash() for each chunk in `block`
min_chunk_prev_height: BlockHeight,
/// epoch height associated with prev_block_hash
epoch_height: EpochHeight,
/// Shards that need to be present in the snapshot.
shard_indexes_and_uids: Vec<(ShardIndex, ShardUId)>,
/// Last block of the prev epoch.
/// prev block of the "sync_hash" block.
block: Block,
}

Expand All @@ -59,6 +63,7 @@ impl std::fmt::Debug for CreateSnapshotRequest {
f.debug_struct("CreateSnapshotRequest")
.field("block_hash", self.block.hash())
.field("prev_block_hash", &self.prev_block_hash)
.field("min_chunk_prev_height", &self.min_chunk_prev_height)
.field("epoch_height", &self.epoch_height)
.field(
"shard_uids",
Expand All @@ -85,19 +90,78 @@ impl StateSnapshotActor {
}
}

pub fn handle_create_snapshot_request(&mut self, msg: CreateSnapshotRequest) {
tracing::debug!(target: "state_snapshot", ?msg);
/// Returns true if we shouldn't yet try to create a snapshot because a flat storage resharding
/// is in progress.
fn should_wait_for_resharding_split(
&self,
min_chunk_prev_height: BlockHeight,
shard_indexes_and_uids: &[(ShardIndex, ShardUId)],
) -> anyhow::Result<bool> {
let shard_uids = shard_indexes_and_uids.iter().map(|(_idx, uid)| *uid);
let Some(min_height) =
self.flat_storage_manager.resharding_catchup_height_reached(shard_uids)?
else {
// No flat storage split + catchup is in progress, ok to proceed
return Ok(false);
};
let Some(min_height) = min_height else {
// storage split + catchup is in progress and not all shards have reached the catchup phase yet. Can't proceed
return Ok(true);
};
// Proceed if the catchup code is already reasonably close to being finished. This is not a correctness issue,
// as this line of code could just be replaced with Ok(false), and things would work. But in that case, if there are for
// some reason lots of deltas to apply (e.g. the sync hash is 1000s of blocks past the start of the epoch because of missed
// chunks), then we'll duplicate a lot of work that's being done by the resharding catchup code. So we might as well just
// come back later after most of that work has already been done.
Ok(min_height + 10 < min_chunk_prev_height)
}

pub fn handle_create_snapshot_request(
&mut self,
msg: CreateSnapshotRequest,
ctx: &mut dyn DelayedActionRunner<Self>,
) {
let should_wait = match self.should_wait_for_resharding_split(
msg.min_chunk_prev_height,
&msg.shard_indexes_and_uids,
) {
Ok(s) => s,
Err(err) => {
tracing::error!(target: "state_snapshot", ?err, "State Snapshot Actor failed to check resharding status. Not making snapshot");
return;
}
};
// TODO: instead of resending the same message over and over, wait on a Condvar.
// This would require making testloop work with Condvars that normally are meant to be woken up by another thread
if should_wait {
tracing::debug!(target: "state_snapshot", prev_block_hash=?&msg.prev_block_hash, "Postpone CreateSnapshotRequest");
ctx.run_later(
"ReshardingActor FlatStorageSplitShard",
Duration::seconds(1),
move |act, ctx| {
act.handle_create_snapshot_request(msg, ctx);
},
);
return;
}

let CreateSnapshotRequest { prev_block_hash, epoch_height, shard_indexes_and_uids, block } =
msg;
tracing::debug!(target: "state_snapshot", prev_block_hash=?&msg.prev_block_hash, "Handle CreateSnapshotRequest");
let CreateSnapshotRequest {
prev_block_hash,
epoch_height,
shard_indexes_and_uids,
block,
..
} = msg;
let res =
self.tries.create_state_snapshot(prev_block_hash, &shard_indexes_and_uids, &block);

// Unlocking flat state head can be done asynchronously in state_snapshot_actor.
// The next flat storage update will bring flat storage to latest head.
if !self.flat_storage_manager.set_flat_state_updates_mode(true) {
tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_indexes_and_uids, "Failed to unlock flat state updates");
}
// TODO(resharding): check what happens if two calls to want_snapshot() are made before this point,
// which can happen with short epochs if a state snapshot takes longer than the rest of the epoch to complete.
// TODO(resharding): this can actually be called sooner, just after the rocksdb checkpoint is made.
self.flat_storage_manager.snapshot_taken();
match res {
Ok(res_shard_uids) => {
let Some(res_shard_uids) = res_shard_uids else {
Expand Down Expand Up @@ -126,10 +190,10 @@ impl Handler<DeleteAndMaybeCreateSnapshotRequest> for StateSnapshotActor {
}
}

impl Handler<CreateSnapshotRequest> for StateSnapshotActor {
impl HandlerWithContext<CreateSnapshotRequest> for StateSnapshotActor {
#[perf]
fn handle(&mut self, msg: CreateSnapshotRequest) {
self.handle_create_snapshot_request(msg)
fn handle(&mut self, msg: CreateSnapshotRequest, ctx: &mut dyn DelayedActionRunner<Self>) {
self.handle_create_snapshot_request(msg, ctx)
}
}

Expand All @@ -142,7 +206,7 @@ pub struct StateSnapshotSenderForStateSnapshot {
pub struct StateSnapshotSenderForClient(Sender<DeleteAndMaybeCreateSnapshotRequest>);

type MakeSnapshotCallback = Arc<
dyn Fn(CryptoHash, EpochHeight, Vec<(ShardIndex, ShardUId)>, Block) -> ()
dyn Fn(CryptoHash, BlockHeight, EpochHeight, Vec<(ShardIndex, ShardUId)>, Block) -> ()
+ Send
+ Sync
+ 'static,
Expand All @@ -156,28 +220,38 @@ pub struct SnapshotCallbacks {
}

/// Sends a request to make a state snapshot.
// TODO: remove the `prev_block_hash` argument. It's just block.header().prev_hash()
pub fn get_make_snapshot_callback(
sender: StateSnapshotSenderForClient,
flat_storage_manager: FlatStorageManager,
) -> MakeSnapshotCallback {
Arc::new(move |prev_block_hash, epoch_height, shard_indexes_and_uids, block| {
tracing::info!(
Arc::new(
move |prev_block_hash,
min_chunk_prev_height,
epoch_height,
shard_indexes_and_uids,
block| {
tracing::info!(
target: "state_snapshot",
?prev_block_hash,
?shard_indexes_and_uids,
"make_snapshot_callback sends `DeleteAndMaybeCreateSnapshotRequest` to state_snapshot_addr");
// We need to stop flat head updates synchronously in the client thread.
// Async update in state_snapshot_actor and potentially lead to flat head progressing beyond prev_block_hash
if !flat_storage_manager.set_flat_state_updates_mode(false) {
tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_indexes_and_uids, "Failed to lock flat state updates");
return;
}
let create_snapshot_request =
CreateSnapshotRequest { prev_block_hash, epoch_height, shard_indexes_and_uids, block };
sender.send(DeleteAndMaybeCreateSnapshotRequest {
create_snapshot_request: Some(create_snapshot_request),
});
})
// We need to stop flat head updates synchronously in the client thread.
// Async update in state_snapshot_actor can potentially lead to flat head progressing beyond prev_block_hash
// This also prevents post-resharding flat storage catchup from advancing past `prev_block_hash`
flat_storage_manager.want_snapshot(min_chunk_prev_height);
let create_snapshot_request = CreateSnapshotRequest {
prev_block_hash,
min_chunk_prev_height,
epoch_height,
shard_indexes_and_uids,
block,
};
sender.send(DeleteAndMaybeCreateSnapshotRequest {
create_snapshot_request: Some(create_snapshot_request),
});
},
)
}

/// Sends a request to delete a state snapshot.
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/test_utils/test_env_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ impl TestEnvBuilder {
None => TEST_SEED,
};
let tries = runtime.get_tries();
let make_snapshot_callback = Arc::new(move |prev_block_hash, _epoch_height, shard_uids: Vec<(ShardIndex, ShardUId)>, block| {
let make_snapshot_callback = Arc::new(move |prev_block_hash, _min_chunk_prev_height, _epoch_height, shard_uids: Vec<(ShardIndex, ShardUId)>, block| {
tracing::info!(target: "state_snapshot", ?prev_block_hash, "make_snapshot_callback");
tries.delete_state_snapshot();
tries.create_state_snapshot(prev_block_hash, &shard_uids, &block).unwrap();
Expand Down
Loading

0 comments on commit 8cae8cc

Please sign in to comment.