Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix(p2p): accept listener connection during bootstrap
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Mar 22, 2024
1 parent 5b6ddb8 commit 34a7c20
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 28 deletions.
4 changes: 2 additions & 2 deletions crates/topos-p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ impl<'a> NetworkBuilder<'a> {
pending_record_requests: HashMap::new(),
shutdown,
health_state: crate::runtime::HealthState {
bootpeer_connection_retries: 3,
successfully_connected_to_bootpeer: if self.known_peers.is_empty() {
bootnode_connection_retries: 3,
successfully_connected_to_bootnode: if self.known_peers.is_empty() {
// Node seems to be a boot node
Some(ConnectionId::new_unchecked(0))
} else {
Expand Down
61 changes: 43 additions & 18 deletions crates/topos-p2p/src/runtime/handle_event.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use libp2p::{multiaddr::Protocol, swarm::SwarmEvent};
use libp2p::{core::Endpoint, multiaddr::Protocol, swarm::SwarmEvent};
use tracing::{debug, error, info, warn};

use crate::{error::P2PError, event::ComposedEvent, Event, Runtime};
Expand Down Expand Up @@ -62,13 +62,13 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
error,
} if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
&& self.health_state.dialed_bootpeer.contains(&connection_id) =>
&& self.health_state.dialed_bootnode.contains(&connection_id) =>
{
warn!("Unable to connect to bootpeer {peer_id}: {error:?}");
self.health_state.dialed_bootpeer.remove(&connection_id);
if self.health_state.dialed_bootpeer.is_empty() {
warn!("Unable to connect to bootnode {peer_id}: {error:?}");
self.health_state.dialed_bootnode.remove(&connection_id);
if self.health_state.dialed_bootnode.is_empty() {
// We tried to connect to all bootnode without success
error!("Unable to connect to any bootnode");
}
Expand Down Expand Up @@ -100,25 +100,49 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
num_established,
concurrent_dial_errors,
established_in,
} if self.health_state.dialed_bootpeer.contains(&connection_id) => {
info!("Successfully connected to bootpeer {peer_id}");
} if self.health_state.dialed_bootnode.contains(&connection_id) => {
info!("Successfully connected to bootnode {peer_id}");
if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
{
self.health_state.successfully_connected_to_bootpeer = Some(connection_id);
_ = self.health_state.dialed_bootpeer.remove(&connection_id);
self.health_state.successfully_connected_to_bootnode = Some(connection_id);
_ = self.health_state.dialed_bootnode.remove(&connection_id);
}
}

SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
peer_id,
endpoint,
connection_id,
..
} => {
info!(
"Connection established with peer {peer_id} as {:?}",
endpoint.to_endpoint()
);
if self
.health_state
.successfully_connected_to_bootnode
.is_none()
&& self.boot_peers.contains(&peer_id)
{
info!(
"Connection established with bootnode {peer_id} as {:?}",
endpoint.to_endpoint()
);

if endpoint.to_endpoint() == Endpoint::Listener {
if let Err(error) = self.swarm.dial(peer_id) {
error!(
"Unable to dial bootnode {peer_id} after incoming connection: \
{error}"
);
}
}
} else {
info!(
"Connection established with peer {peer_id} as {:?}",
endpoint.to_endpoint()
);
}

if self.swarm.connected_peers().count() >= self.config.minimum_cluster_size {
if let Err(error) = self.swarm.behaviour_mut().gossipsub.subscribe() {
Expand Down Expand Up @@ -164,8 +188,8 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
peer_id: Some(ref peer_id),
connection_id,
} if self.boot_peers.contains(peer_id) => {
info!("Dialing bootpeer {peer_id} on connection: {connection_id}");
self.health_state.dialed_bootpeer.insert(connection_id);
info!("Dialing bootnode {peer_id} on connection: {connection_id}");
self.health_state.dialed_bootnode.insert(connection_id);
}

SwarmEvent::Dialing {
Expand All @@ -185,6 +209,7 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
SwarmEvent::ListenerError { listener_id, error } => {
error!("Unhandled ListenerError {listener_id:?} | {error}")
}

event => {
warn!("Unhandled SwarmEvent: {:?}", event);
}
Expand Down
10 changes: 5 additions & 5 deletions crates/topos-p2p/src/runtime/handle_event/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl EventHandler<Box<Event>> for Runtime {
{
if self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
{
warn!(
Expand Down Expand Up @@ -85,11 +85,11 @@ impl EventHandler<Box<Event>> for Runtime {
} if num_remaining == 0
&& self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_none()
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
{
match self.health_state.bootpeer_connection_retries.checked_sub(1) {
match self.health_state.bootnode_connection_retries.checked_sub(1) {
None => {
error!(
"Bootstrap query finished but unable to connect to bootnode, stopping"
Expand All @@ -103,7 +103,7 @@ impl EventHandler<Box<Event>> for Runtime {
{} more times",
new
);
self.health_state.bootpeer_connection_retries = new;
self.health_state.bootnode_connection_retries = new;
}
}
}
Expand All @@ -119,7 +119,7 @@ impl EventHandler<Box<Event>> for Runtime {
} if num_remaining == 0
&& self
.health_state
.successfully_connected_to_bootpeer
.successfully_connected_to_bootnode
.is_some()
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
{
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-p2p/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ pub(crate) struct HealthState {
/// Indicates if the node is listening on any address
pub(crate) is_listening: bool,
/// List the bootnodes that the node has tried to connect to
pub(crate) dialed_bootpeer: HashSet<ConnectionId>,
pub(crate) dialed_bootnode: HashSet<ConnectionId>,
/// Indicates if the node has successfully connected to a bootnode
pub(crate) successfully_connected_to_bootpeer: Option<ConnectionId>,
pub(crate) successfully_connected_to_bootnode: Option<ConnectionId>,
/// Track the number of remaining retries to connect to any bootnode
pub(crate) bootpeer_connection_retries: usize,
pub(crate) bootnode_connection_retries: usize,
}

impl Runtime {
Expand Down

0 comments on commit 34a7c20

Please sign in to comment.