Skip to content

Commit

Permalink
fix(state sync): handle StateResponse on state_parts_future_spawner (#…
Browse files Browse the repository at this point in the history
…12205)

Before this PR, when a state part arrived via a network message from a
peer, it was validated and stored by the client actor. As a result, the
client actor could become slow to apply blocks causing the node to fall
behind the chain.

Instead, the state parts should be handled async on
`state_parts_future_spawner` the same way they are when downloaded from
external storage.

---------

Co-authored-by: Marcelo Diop-Gonzalez <[email protected]>
  • Loading branch information
saketh-are and marcelo-gonzalez authored Oct 11, 2024
1 parent 0febdb5 commit 2b022a6
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 56 deletions.
4 changes: 4 additions & 0 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,8 @@ impl Handler<StateResponse> for ClientActorInner {
shard_id,
state_response,
&mut self.client.chain,
self.state_parts_future_spawner.as_ref(),
self.client.runtime_adapter.clone(),
);
return;
}
Expand All @@ -637,6 +639,8 @@ impl Handler<StateResponse> for ClientActorInner {
shard_id,
state_response,
&mut self.client.chain,
self.state_parts_future_spawner.as_ref(),
self.client.runtime_adapter.clone(),
);
return;
}
Expand Down
175 changes: 119 additions & 56 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::sync::external::{
use borsh::BorshDeserialize;
use futures::{future, FutureExt};
use near_async::futures::{FutureSpawner, FutureSpawnerExt};
use near_async::messaging::{CanSend, SendAsync};
use near_async::messaging::SendAsync;
use near_async::time::{Clock, Duration, Utc};
use near_chain::chain::{ApplyStatePartsRequest, LoadMemtrieRequest};
use near_chain::near_chain_primitives;
Expand All @@ -40,7 +40,7 @@ use near_client_primitives::types::{
use near_epoch_manager::EpochManagerAdapter;
use near_network::types::{
HighestHeightPeerInfo, NetworkRequests, NetworkResponses, PeerManagerAdapter,
PeerManagerMessageRequest, StateSyncEvent,
PeerManagerMessageRequest,
};
use near_primitives::hash::CryptoHash;
use near_primitives::network::PeerId;
Expand Down Expand Up @@ -85,13 +85,20 @@ pub enum StateSyncFileDownloadResult {
StatePart { part_length: u64 },
}

#[derive(PartialEq, Eq)]
enum PartProvenance {
Peers,
External,
}

/// Signals that a state part was downloaded and saved to RocksDB.
/// Or failed to do so.
pub struct StateSyncGetFileResult {
sync_hash: CryptoHash,
shard_id: ShardId,
part_id: Option<PartId>,
result: Result<StateSyncFileDownloadResult, String>,
provenance: PartProvenance,
}

struct StateSyncExternal {
Expand Down Expand Up @@ -348,8 +355,13 @@ impl StateSync {
sync_hash: CryptoHash,
shard_sync: &mut HashMap<ShardId, ShardSyncDownload>,
) {
for StateSyncGetFileResult { sync_hash: msg_sync_hash, shard_id, part_id, result } in
self.state_parts_mpsc_rx.try_iter()
for StateSyncGetFileResult {
sync_hash: msg_sync_hash,
shard_id,
part_id,
result,
provenance,
} in self.state_parts_mpsc_rx.try_iter()
{
if msg_sync_hash != sync_hash {
tracing::debug!(target: "sync",
Expand Down Expand Up @@ -401,6 +413,7 @@ impl StateSync {
download,
file_type,
download_result,
provenance,
);
}
}
Expand Down Expand Up @@ -723,22 +736,24 @@ impl StateSync {
pub fn update_download_on_state_response_message(
&mut self,
shard_sync_download: &mut ShardSyncDownload,
hash: CryptoHash,
sync_hash: CryptoHash,
shard_id: ShardId,
state_response: ShardStateSyncResponse,
chain: &mut Chain,
state_parts_future_spawner: &dyn FutureSpawner,
runtime_adapter: Arc<dyn RuntimeAdapter>,
) {
match shard_sync_download.status {
ShardSyncStatus::StateDownloadHeader => {
let header_download = shard_sync_download.get_header_download_mut().unwrap();
if let Some(header) = state_response.take_header() {
if !header_download.done {
match chain.set_state_header(shard_id, hash, header) {
match chain.set_state_header(shard_id, sync_hash, header) {
Ok(()) => {
header_download.done = true;
}
Err(err) => {
tracing::error!(target: "sync", %shard_id, %hash, ?err, "State sync set_state_header error");
tracing::error!(target: "sync", %shard_id, %sync_hash, ?err, "State sync set_state_header error");
header_download.error = true;
}
}
Expand All @@ -747,7 +762,7 @@ impl StateSync {
// No header found.
// It may happen because requested node couldn't build state response.
if !header_download.done {
tracing::info!(target: "sync", %shard_id, %hash, "state_response doesn't have header, should be re-requested");
tracing::info!(target: "sync", %shard_id, %sync_hash, "state_response doesn't have header, should be re-requested");
header_download.error = true;
}
}
Expand All @@ -757,27 +772,43 @@ impl StateSync {
let num_parts = shard_sync_download.downloads.len() as u64;
let (part_id, data) = part;
if part_id >= num_parts {
tracing::error!(target: "sync", %shard_id, %hash, part_id, "State sync received incorrect part_id, potential malicious peer");
tracing::error!(target: "sync", %shard_id, %sync_hash, part_id, "State sync received incorrect part_id, potential malicious peer");
return;
}
if !shard_sync_download.downloads[part_id as usize].done {
match chain.set_state_part(
shard_id,
hash,
PartId::new(part_id, num_parts),
&data,
) {
Ok(()) => {
tracing::debug!(target: "sync", %shard_id, %hash, part_id, "Received correct start part");
self.network_adapter
.send(StateSyncEvent::StatePartReceived(shard_id, part_id));
shard_sync_download.downloads[part_id as usize].done = true;
}
Err(err) => {
tracing::error!(target: "sync", %shard_id, %hash, part_id, ?err, "State sync set_state_part error");
shard_sync_download.downloads[part_id as usize].error = true;
let state_root = chain
.get_state_header(shard_id, sync_hash)
.unwrap()
.chunk_prev_state_root();
let runtime_adapter = runtime_adapter.clone();
let part_id = PartId { idx: part_id, total: num_parts };
let state_parts_mpsc_tx = self.state_parts_mpsc_tx.clone();
state_parts_future_spawner.spawn(
"update_download_on_state_response_message",
async move {
let result = try_validate_and_store_received_state_part(
part_id,
shard_id,
sync_hash,
state_root,
data,
runtime_adapter
);

match state_parts_mpsc_tx.send(StateSyncGetFileResult {
sync_hash,
shard_id,
part_id: Some(part_id),
result,
provenance: PartProvenance::Peers,
}) {
Ok(_) => tracing::debug!(target: "sync", %shard_id, ?part_id, "Download response sent to processing thread."),
Err(err) => {
tracing::error!(target: "sync", ?err, %shard_id, ?part_id, "Unable to send part download response to processing thread.");
},
}
}
}
);
}
}
}
Expand Down Expand Up @@ -1114,6 +1145,7 @@ fn request_header_from_external_storage(
shard_id,
part_id: None,
result,
provenance: PartProvenance::External,
}) {
Ok(_) => tracing::debug!(target: "sync", %shard_id, "Download header response sent to processing thread."),
Err(err) => {
Expand All @@ -1134,26 +1166,19 @@ async fn download_and_store_part_from_external_storage(
external: ExternalConnection,
runtime_adapter: Arc<dyn RuntimeAdapter>,
) -> Result<StateSyncFileDownloadResult, String> {
external
.get_file(shard_id, &location, file_type)
.await
.map_err(|err| err.to_string())
.and_then(|data| {
info!(target: "sync", ?shard_id, ?part_id, "downloaded state part");
if runtime_adapter.validate_state_part(&state_root, part_id, &data) {
let mut store_update = runtime_adapter.store().store_update();
borsh::to_vec(&StatePartKey(sync_hash, shard_id, part_id.idx))
.and_then(|key| {
store_update.set(DBCol::StateParts, &key, &data);
store_update.commit()
})
.map_err(|err| format!("Failed to store a state part. err={err:?}, state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id:?}"))
.map(|_| data.len() as u64)
.map(|part_length| StateSyncFileDownloadResult::StatePart { part_length })
} else {
Err(format!("validate_state_part failed. state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id}"))
}
})
external.get_file(shard_id, &location, file_type).await.map_err(|err| err.to_string()).and_then(
|data| {
info!(target: "sync", ?shard_id, ?part_id, "downloaded state part");
try_validate_and_store_received_state_part(
part_id,
shard_id,
sync_hash,
state_root,
data,
runtime_adapter,
)
},
)
}
/// Starts an asynchronous network request to external storage to fetch the given state part.
fn request_part_from_external_storage(
Expand Down Expand Up @@ -1210,6 +1235,7 @@ fn request_part_from_external_storage(
shard_id,
part_id: Some(part_id),
result,
provenance: PartProvenance::External,
}) {
Ok(_) => tracing::debug!(target: "sync", %shard_id, ?part_id, "Download response sent to processing thread."),
Err(err) => {
Expand Down Expand Up @@ -1266,6 +1292,33 @@ fn request_part_from_peers(
);
}

/// Takes a received state part and attempts to validate and store the part.
/// Used both for parts downloaded from external storage and parts received from peers.
/// This process is slow and should only occur on state_parts_future_spawner.
/// Returns a result to be passed back via state_parts_mpsc_tx.
fn try_validate_and_store_received_state_part(
part_id: PartId,
shard_id: ShardId,
sync_hash: CryptoHash,
state_root: StateRoot,
data: Vec<u8>,
runtime_adapter: Arc<dyn RuntimeAdapter>,
) -> Result<StateSyncFileDownloadResult, String> {
if runtime_adapter.validate_state_part(&state_root, part_id, &data) {
let mut store_update = runtime_adapter.store().store_update();
borsh::to_vec(&StatePartKey(sync_hash, shard_id, part_id.idx))
.and_then(|key| {
store_update.set(DBCol::StateParts, &key, &data);
store_update.commit()
})
.map_err(|err| format!("Failed to store a state part. err={err:?}, state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id:?}"))
.map(|_| data.len() as u64)
.map(|part_length| StateSyncFileDownloadResult::StatePart { part_length })
} else {
Err(format!("validate_state_part failed. state_root={state_root:?}, part_id={part_id:?}, shard_id={shard_id}"))
}
}

/// Works around how data requests to external storage are done.
/// This function investigates if the response is valid and updates `done` and `error` appropriately.
/// If the response is successful, then the downloaded state file was written to the DB.
Expand All @@ -1275,23 +1328,28 @@ fn process_download_response(
download: Option<&mut DownloadStatus>,
file_type: String,
download_result: Result<u64, String>,
provenance: PartProvenance,
) {
match download_result {
Ok(data_len) => {
// No error, aka Success.
metrics::STATE_SYNC_EXTERNAL_PARTS_DONE
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc();
metrics::STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc_by(data_len);
if provenance == PartProvenance::External {
metrics::STATE_SYNC_EXTERNAL_PARTS_DONE
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc();
metrics::STATE_SYNC_EXTERNAL_PARTS_SIZE_DOWNLOADED
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc_by(data_len);
}
download.map(|download| download.done = true);
}
// The request failed without reaching the external storage.
Err(err) => {
metrics::STATE_SYNC_EXTERNAL_PARTS_FAILED
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc();
if provenance == PartProvenance::External {
metrics::STATE_SYNC_EXTERNAL_PARTS_FAILED
.with_label_values(&[&shard_id.to_string(), &file_type])
.inc();
}
tracing::debug!(target: "sync", ?err, %shard_id, %sync_hash, ?file_type, "Failed to get a file from external storage, will retry");
download.map(|download| download.done = false);
}
Expand Down Expand Up @@ -1379,6 +1437,9 @@ mod test {
};

run_actix(async {
let state_parts_future_spawner =
ActixArbiterHandleFutureSpawner(Arbiter::new().handle());

state_sync
.run(
&None,
Expand All @@ -1390,9 +1451,9 @@ mod test {
vec![new_shard_id_tmp(0)],
&noop().into_sender(),
&noop().into_sender(),
&ActixArbiterHandleFutureSpawner(Arbiter::new().handle()),
&state_parts_future_spawner,
false,
runtime,
runtime.clone(),
)
.unwrap();

Expand Down Expand Up @@ -1439,6 +1500,8 @@ mod test {
new_shard_id_tmp(0),
state_response,
&mut chain,
&state_parts_future_spawner,
runtime,
);

let download = new_shard_sync.get(&new_shard_id_tmp(0)).unwrap();
Expand Down

0 comments on commit 2b022a6

Please sign in to comment.