Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync): use routed state part request in sync actor #12111

Merged
merged 19 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
507 changes: 166 additions & 341 deletions chain/client/src/sync/state.rs

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions chain/client/src/test_utils/peer_manager_mock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use near_network::types::SetChainInfo;
use near_network::types::{PeerManagerMessageRequest, PeerManagerMessageResponse};
use near_network::types::{
PeerManagerMessageRequest, PeerManagerMessageResponse, SetChainInfo, StateSyncEvent,
};

pub struct PeerManagerMock {
handle: Box<
Expand Down Expand Up @@ -37,3 +38,8 @@ impl actix::Handler<SetChainInfo> for PeerManagerMock {
type Result = ();
fn handle(&mut self, _msg: SetChainInfo, _ctx: &mut Self::Context) {}
}

impl actix::Handler<StateSyncEvent> for PeerManagerMock {
type Result = ();
fn handle(&mut self, _msg: StateSyncEvent, _ctx: &mut Self::Context) {}
}
49 changes: 42 additions & 7 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::tcp;
use crate::types::{
ConnectedPeerInfo, HighestHeightPeerInfo, KnownProducer, NetworkInfo, NetworkRequests,
NetworkResponses, PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse, PeerType,
SetChainInfo, SnapshotHostInfo, StatePartRequestBody, Tier3RequestBody,
SetChainInfo, SnapshotHostInfo, StatePartRequestBody, StateSyncEvent, Tier3RequestBody,
};
use ::time::ext::InstantExt as _;
use actix::fut::future::wrap_future;
Expand Down Expand Up @@ -389,7 +389,7 @@ impl PeerManagerActor {
}.await;

if let Err(ref err) = result {
tracing::info!(target: "network", err = format!("{:#}", err), "failed to connect to {}", request.peer_info);
tracing::info!(target: "network", err = format!("{:#}", err), "tier3 failed to connect to {}", request.peer_info);
}
}

Expand Down Expand Up @@ -693,7 +693,7 @@ impl PeerManagerActor {
}.await;

if let Err(ref err) = result {
tracing::info!(target: "network", err = format!("{:#}", err), "failed to connect to {peer_info}");
tracing::info!(target: "network", err = format!("{:#}", err), "tier2 failed to connect to {peer_info}");
}
if state.peer_store.peer_connection_attempt(&clock, &peer_info.id, result).is_err() {
tracing::error!(target: "network", ?peer_info, "Failed to store connection attempt.");
Expand Down Expand Up @@ -892,6 +892,7 @@ impl PeerManagerActor {
shard_id,
part_id,
) {
tracing::debug!(target: "network", "requesting {sync_prev_prev_hash} {shard_id} {part_id} from {peer_id}");
success =
self.state.send_message_to_peer(
&self.clock,
Expand All @@ -917,7 +918,7 @@ impl PeerManagerActor {
NetworkResponses::RouteNotFound
}
}
NetworkRequests::SnapshotHostInfo { sync_hash, epoch_height, mut shards } => {
NetworkRequests::SnapshotHostInfo { sync_hash, mut epoch_height, mut shards } => {
if shards.len() > MAX_SHARDS_PER_SNAPSHOT_HOST_INFO {
tracing::warn!("PeerManager: Sending out a SnapshotHostInfo message with {} shards, \
this is more than the allowed limit. The list of shards will be truncated. \
Expand All @@ -935,18 +936,33 @@ impl PeerManagerActor {
// Sort the shards to keep things tidy
shards.sort();

let peer_id = self.state.config.node_id();

// Hacky workaround for test environments only.
VanBarbascu marked this conversation as resolved.
Show resolved Hide resolved
// When starting a chain from scratch the first two snapshots both have epoch height 1.
// The epoch height is used as a version number for SnapshotHostInfo and if duplicated,
// prevents the second snapshot from being advertised as new information to the network.
// To avoid this problem, we re-index the very first epoch with epoch_height=0.
if epoch_height == 1 && self.state.snapshot_hosts.get_host_info(&peer_id).is_none()
{
epoch_height = 0;
}

// Sign the information about the locally created snapshot using the keys in the
// network config before broadcasting it
let snapshot_host_info = SnapshotHostInfo::new(
let snapshot_host_info = Arc::new(SnapshotHostInfo::new(
self.state.config.node_id(),
sync_hash,
epoch_height,
shards,
&self.state.config.node_key,
);
));

// Insert our info to our own cache.
self.state.snapshot_hosts.insert_skip_verify(snapshot_host_info.clone());

self.state.tier2.broadcast_message(Arc::new(PeerMessage::SyncSnapshotHosts(
SyncSnapshotHosts { hosts: vec![snapshot_host_info.into()] },
SyncSnapshotHosts { hosts: vec![snapshot_host_info] },
)));
NetworkResponses::NoResponse
}
Expand Down Expand Up @@ -1260,6 +1276,25 @@ impl actix::Handler<WithSpanContext<PeerManagerMessageRequest>> for PeerManagerA
}
}

impl actix::Handler<WithSpanContext<StateSyncEvent>> for PeerManagerActor {
type Result = ();
#[perf]
fn handle(
&mut self,
msg: WithSpanContext<StateSyncEvent>,
_ctx: &mut Self::Context,
) -> Self::Result {
let (_span, msg) = handler_debug_span!(target: "network", msg);
let _timer =
metrics::PEER_MANAGER_MESSAGES_TIME.with_label_values(&[(&msg).into()]).start_timer();
match msg {
StateSyncEvent::StatePartReceived(shard_id, part_id) => {
self.state.snapshot_hosts.part_received(shard_id, part_id);
}
}
}
}

impl actix::Handler<GetDebugStatus> for PeerManagerActor {
type Result = DebugStatus;
#[perf]
Expand Down
10 changes: 9 additions & 1 deletion chain/network/src/snapshot_hosts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,19 @@ impl SnapshotHostsCache {
(newly_inserted_data, err)
}

/// Skips signature verification. Used only for the local node's own information.
pub fn insert_skip_verify(self: &Self, my_info: Arc<SnapshotHostInfo>) {
let _ = self.0.lock().try_insert(my_info);
}

pub fn get_hosts(&self) -> Vec<Arc<SnapshotHostInfo>> {
self.0.lock().hosts.iter().map(|(_, v)| v.clone()).collect()
}

pub(crate) fn get_host_info(&self, peer_id: &PeerId) -> Option<Arc<SnapshotHostInfo>> {
self.0.lock().hosts.peek(peer_id).cloned()
}

/// Given a state part request, selects a peer host to which the request should be sent.
pub fn select_host_for_part(
&self,
Expand All @@ -323,7 +332,6 @@ impl SnapshotHostsCache {
}

/// Triggered by state sync actor after processing a state part.
#[allow(dead_code)]
pub fn part_received(&self, shard_id: ShardId, part_id: u64) {
let mut inner = self.0.lock();
inner.peer_selector.remove(&(shard_id, part_id));
Expand Down
7 changes: 6 additions & 1 deletion chain/network/src/test_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::state_witness::{
};
use crate::types::{
NetworkRequests, NetworkResponses, PeerManagerMessageRequest, PeerManagerMessageResponse,
SetChainInfo,
SetChainInfo, StateSyncEvent,
};
use near_async::actix::ActixResult;
use near_async::futures::{FutureSpawner, FutureSpawnerExt};
Expand Down Expand Up @@ -188,6 +188,10 @@ impl Handler<SetChainInfo> for TestLoopPeerManagerActor {
fn handle(&mut self, _msg: SetChainInfo) {}
}

impl Handler<StateSyncEvent> for TestLoopPeerManagerActor {
fn handle(&mut self, _msg: StateSyncEvent) {}
}

impl Handler<PeerManagerMessageRequest> for TestLoopPeerManagerActor {
fn handle(&mut self, msg: PeerManagerMessageRequest) -> PeerManagerMessageResponse {
let PeerManagerMessageRequest::NetworkRequests(request) = msg else {
Expand Down Expand Up @@ -276,6 +280,7 @@ fn network_message_to_client_handler(
.send(EpochSyncResponseMessage { from_peer: my_peer_id.clone(), proof });
None
}
NetworkRequests::StateRequestPart { .. } => None,

_ => Some(request),
})
Expand Down
6 changes: 5 additions & 1 deletion chain/network/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::network_protocol::PeerInfo;
use crate::types::{
NetworkInfo, NetworkResponses, PeerManagerMessageRequest, PeerManagerMessageResponse,
SetChainInfo,
SetChainInfo, StateSyncEvent,
};
use crate::PeerManagerActor;
use actix::{Actor, ActorContext, Context, Handler};
Expand Down Expand Up @@ -259,6 +259,10 @@ impl CanSend<SetChainInfo> for MockPeerManagerAdapter {
fn send(&self, _msg: SetChainInfo) {}
}

impl CanSend<StateSyncEvent> for MockPeerManagerAdapter {
fn send(&self, _msg: StateSyncEvent) {}
}

impl MockPeerManagerAdapter {
pub fn pop(&self) -> Option<PeerManagerMessageRequest> {
self.requests.write().unwrap().pop_front()
Expand Down
7 changes: 7 additions & 0 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ pub enum NetworkRequests {
EpochSyncResponse { route_back: CryptoHash, proof: CompressedEpochSyncProof },
}

#[derive(Debug, actix::Message, strum::IntoStaticStr)]
#[rtype(result = "()")]
pub enum StateSyncEvent {
StatePartReceived(ShardId, u64),
}

/// Combines peer address info, chain.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct FullPeerInfo {
Expand Down Expand Up @@ -404,6 +410,7 @@ pub struct PeerManagerAdapter {
pub async_request_sender: AsyncSender<PeerManagerMessageRequest, PeerManagerMessageResponse>,
pub request_sender: Sender<PeerManagerMessageRequest>,
pub set_chain_info_sender: Sender<SetChainInfo>,
pub state_sync_event_sender: Sender<StateSyncEvent>,
}

#[cfg(test)]
Expand Down
13 changes: 13 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ pub const DEFAULT_GC_NUM_EPOCHS_TO_KEEP: u64 = 5;
pub const DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL: u32 = 25;
pub const DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_ON_CATCHUP_EXTERNAL: u32 = 5;

/// The default number of attempts to obtain a state part from peers in the network
/// before giving up and downloading it from external storage.
pub const DEFAULT_EXTERNAL_STORAGE_FALLBACK_THRESHOLD: u64 = 5;

/// Configuration for garbage collection.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
#[serde(default)]
Expand Down Expand Up @@ -77,6 +81,10 @@ fn default_num_concurrent_requests_during_catchup() -> u32 {
DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_ON_CATCHUP_EXTERNAL
}

fn default_external_storage_fallback_threshold() -> u64 {
DEFAULT_EXTERNAL_STORAGE_FALLBACK_THRESHOLD
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct ExternalStorageConfig {
/// Location of state parts.
Expand All @@ -89,6 +97,10 @@ pub struct ExternalStorageConfig {
/// to reduce the performance impact of state sync.
#[serde(default = "default_num_concurrent_requests_during_catchup")]
pub num_concurrent_requests_during_catchup: u32,
/// The number of attempts the node will make to obtain a part from peers in
/// the network before it fetches from external storage.
#[serde(default = "default_external_storage_fallback_threshold")]
pub external_storage_fallback_threshold: u64,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
Expand Down Expand Up @@ -161,6 +173,7 @@ impl StateSyncConfig {
num_concurrent_requests: DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL,
num_concurrent_requests_during_catchup:
DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_ON_CATCHUP_EXTERNAL,
external_storage_fallback_threshold: DEFAULT_EXTERNAL_STORAGE_FALLBACK_THRESHOLD,
}),
}
}
Expand Down
5 changes: 5 additions & 0 deletions integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ impl TestLoopBuilder {
location: external_storage_location,
num_concurrent_requests: 1,
num_concurrent_requests_during_catchup: 1,
// We go straight to storage here because the network layer basically
// doesn't exist in testloop. We could mock a bunch of stuff to make
// the clients transfer state parts "peer to peer" but we wouldn't really
// gain anything over having them dump parts to a tempdir.
external_storage_fallback_threshold: 0,
}),
};

Expand Down
1 change: 1 addition & 0 deletions integration-tests/src/tests/client/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ fn sync_state_dump() {
},
num_concurrent_requests: 1,
num_concurrent_requests_during_catchup: 1,
external_storage_fallback_threshold: 0,
});

let nearcore::NearNode {
Expand Down
Loading