Skip to content

Commit

Permalink
fix(state sync): delete get_cached_state_parts (#12197)
Browse files Browse the repository at this point in the history
In testnet we observed `get_cached_state_parts` consistently taking ~58
seconds to run.

Returning the list of cached state parts is a relic of the old
decentralized state sync design. We no longer need this at all.
  • Loading branch information
saketh-are authored Oct 9, 2024
1 parent d0a33c3 commit 2d5dd96
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 107 deletions.
40 changes: 2 additions & 38 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use crate::{
Provenance,
};
use crate::{metrics, DoomslugThresholdMode};
use borsh::BorshDeserialize;
use crossbeam_channel::{unbounded, Receiver, Sender};
use itertools::Itertools;
use lru::LruCache;
Expand Down Expand Up @@ -69,8 +68,8 @@ use near_primitives::sharding::{
};
use near_primitives::state_part::PartId;
use near_primitives::state_sync::{
get_num_state_parts, BitArray, CachedParts, ReceiptProofResponse, RootProof,
ShardStateSyncResponseHeader, ShardStateSyncResponseHeaderV2, StateHeaderKey, StatePartKey,
get_num_state_parts, ReceiptProofResponse, RootProof, ShardStateSyncResponseHeader,
ShardStateSyncResponseHeaderV2, StateHeaderKey, StatePartKey,
};
use near_primitives::stateless_validation::state_witness::{
ChunkStateWitness, ChunkStateWitnessSize,
Expand Down Expand Up @@ -3864,41 +3863,6 @@ impl Chain {

Ok((make_snapshot, delete_snapshot))
}

/// Returns a description of state parts cached for the given shard of the given epoch.
pub fn get_cached_state_parts(
&self,
sync_hash: CryptoHash,
shard_id: ShardId,
num_parts: u64,
) -> Result<CachedParts, Error> {
let _span = tracing::debug_span!(target: "chain", "get_cached_state_parts").entered();
// DBCol::StateParts is keyed by StatePartKey: (BlockHash || ShardId || PartId (u64)).
let lower_bound = StatePartKey(sync_hash, shard_id, 0);
let lower_bound = borsh::to_vec(&lower_bound)?;
let upper_bound = StatePartKey(sync_hash, shard_id + 1, 0);
let upper_bound = borsh::to_vec(&upper_bound)?;
let mut num_cached_parts = 0;
let mut bit_array = BitArray::new(num_parts);
for item in self.chain_store.store().iter_range(
DBCol::StateParts,
Some(&lower_bound),
Some(&upper_bound),
) {
let key = item?.0;
let key = StatePartKey::try_from_slice(&key)?;
let part_id = key.2;
num_cached_parts += 1;
bit_array.set_bit(part_id);
}
Ok(if num_cached_parts == 0 {
CachedParts::NoParts
} else if num_cached_parts == num_parts {
CachedParts::AllParts
} else {
CachedParts::BitArray(bit_array)
})
}
}

/// This method calculates the congestion info for the genesis chunks. It uses
Expand Down
30 changes: 2 additions & 28 deletions chain/client/src/view_client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1358,17 +1358,6 @@ impl Handler<StateRequestHeader> for ViewClientActorInner {
};
let state_response = match header {
Some(header) => {
let num_parts = header.num_state_parts();
let cached_parts = match self
.chain
.get_cached_state_parts(sync_hash, shard_id, num_parts)
{
Ok(cached_parts) => Some(cached_parts),
Err(err) => {
tracing::error!(target: "sync", ?err, ?sync_hash, shard_id, "Failed to get cached state parts");
None
}
};
let header = match header {
ShardStateSyncResponseHeader::V2(inner) => inner,
_ => {
Expand All @@ -1381,7 +1370,7 @@ impl Handler<StateRequestHeader> for ViewClientActorInner {
ShardStateSyncResponse::V3(ShardStateSyncResponseV3 {
header: Some(header),
part: None,
cached_parts,
cached_parts: None,
can_generate,
})
}
Expand Down Expand Up @@ -1444,26 +1433,11 @@ impl Handler<StateRequestPart> for ViewClientActorInner {
None
}
};
let num_parts = part.as_ref().and_then(|_| match self.chain.get_state_response_header(shard_id, sync_hash) {
Ok(header) => Some(header.num_state_parts()),
Err(err) => {
tracing::error!(target: "sync", ?err, ?sync_hash, shard_id, "Failed to get num state parts");
None
}
});
let cached_parts = num_parts.and_then(|num_parts|
match self.chain.get_cached_state_parts(sync_hash, shard_id, num_parts) {
Ok(cached_parts) => Some(cached_parts),
Err(err) => {
tracing::error!(target: "sync", ?err, ?sync_hash, shard_id, "Failed to get cached state parts");
None
}
});
let can_generate = part.is_some();
let state_response = ShardStateSyncResponse::V3(ShardStateSyncResponseV3 {
header: None,
part,
cached_parts,
cached_parts: None,
can_generate,
});
let info =
Expand Down
5 changes: 1 addition & 4 deletions core/primitives/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,8 @@ pub struct ShardStateSyncResponseV2 {
pub struct ShardStateSyncResponseV3 {
pub header: Option<ShardStateSyncResponseHeaderV2>,
pub part: Option<(u64, Vec<u8>)>,
/// Parts that can be provided **cheaply**.
// Can be `None` only if both `header` and `part` are `None`.
// TODO(saketh): deprecate unused fields cached_parts and can_generate
pub cached_parts: Option<CachedParts>,
/// Whether the node can provide parts for this epoch of this shard.
/// Assumes that a node can either provide all state parts or no state parts.
pub can_generate: bool,
}

Expand Down
43 changes: 6 additions & 37 deletions integration-tests/src/tests/client/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use near_o11y::testonly::{init_integration_logger, init_test_logger};
use near_o11y::WithSpanContextExt;
use near_primitives::shard_layout::ShardUId;
use near_primitives::state_part::PartId;
use near_primitives::state_sync::{CachedParts, StatePartKey};
use near_primitives::state_sync::StatePartKey;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{BlockId, BlockReference, EpochId, EpochReference};
use near_primitives::utils::MaybeValidated;
Expand Down Expand Up @@ -858,35 +858,21 @@ fn test_state_sync_headers() {
None => return ControlFlow::Continue(()),
};
let state_response = state_response_info.take_state_response();
let cached_parts = state_response.cached_parts().clone();
let can_generate = state_response.can_generate();
assert!(state_response.part().is_none());
if let Some(_header) = state_response.take_header() {
if !can_generate {
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
"got header but cannot generate"
);
return ControlFlow::Continue(());
}
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
"got header"
);
tracing::info!(?sync_hash, shard_id, can_generate, "got header");
} else {
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
"got no header"
);
tracing::info!(?sync_hash, shard_id, can_generate, "got no header");
return ControlFlow::Continue(());
}

Expand Down Expand Up @@ -915,36 +901,19 @@ fn test_state_sync_headers() {
let part = state_response.part().clone();
assert!(state_response.take_header().is_none());
if let Some((part_id, _part)) = part {
if !can_generate
|| cached_parts != Some(CachedParts::AllParts)
|| part_id != 0
{
if !can_generate || cached_parts != None || part_id != 0 {
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
part_id,
"got part but shard info is unexpected"
);
return ControlFlow::Continue(());
}
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
part_id,
"got part"
);
tracing::info!(?sync_hash, shard_id, can_generate, part_id, "got part");
} else {
tracing::info!(
?sync_hash,
shard_id,
?cached_parts,
can_generate,
"got no part"
);
tracing::info!(?sync_hash, shard_id, can_generate, "got no part");
return ControlFlow::Continue(());
}
}
Expand Down

0 comments on commit 2d5dd96

Please sign in to comment.