Skip to content

Commit

Permalink
core/: Concurrent dial attempts (#2248)
Browse files Browse the repository at this point in the history
Concurrently dial address candidates within a single dial attempt.

Main motivation for this feature is to increase success rate on hole punching
(see #1896 (comment)
for details). Though, as a nice side effect, as one would expect, it does
improve connection establishment time.

Cleanups and fixes done along the way:

- Merge `pool.rs` and `manager.rs`.

- Instead of manually implementing state machines in `task.rs` use
  `async/await`.

- Fix bug where `NetworkBehaviour::inject_connection_closed` is called without a
  previous `NetworkBehaviour::inject_connection_established` (see
  #2242).

- Return handler to behaviour on incoming connection limit error. Missed in
  #2242.
  • Loading branch information
mxinden authored Oct 14, 2021
1 parent c0d7d4a commit 40c5335
Show file tree
Hide file tree
Showing 36 changed files with 2,233 additions and 2,310 deletions.
9 changes: 9 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
- Report `ListenersEvent::Closed` when dropping a listener in `ListenersStream::remove_listener`,
return `bool` instead of `Result<(), ()>` (see [PR 2261]).

- Concurrently dial address candidates within a single dial attempt (see [PR 2248]) configured
via `Network::with_dial_concurrency_factor`.

- On success of a single address, provide errors of the thus far failed dials via
`NetworkEvent::ConnectionEstablished::outgoing`.

- On failure of all addresses, provide the errors via `NetworkEvent::DialError`.

[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2213]: https://github.com/libp2p/rust-libp2p/pull/2213
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
Expand All @@ -47,6 +55,7 @@
[PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191
[PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195
[PR 2107]: https://github.com/libp2p/rust-libp2p/pull/2107
[PR 2248]: https://github.com/libp2p/rust-libp2p/pull/2248
[PR 2261]: https://github.com/libp2p/rust-libp2p/pull/2261
[RFC0002]: https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md
[RFC0003]: https://github.com/libp2p/specs/blob/master/RFC/0003-routing-records.md
Expand Down
84 changes: 62 additions & 22 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ pub(crate) mod handler;
mod listeners;
mod substream;

pub(crate) mod manager;
pub(crate) mod pool;

pub use error::{ConnectionError, PendingConnectionError};
pub use error::{
ConnectionError, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError,
};
pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
pub use listeners::{ListenerId, ListenersEvent, ListenersStream};
pub use manager::ConnectionId;
pub use pool::{ConnectionCounters, ConnectionLimits};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};
pub use substream::{Close, Substream, SubstreamEndpoint};
Expand All @@ -40,6 +41,21 @@ use std::hash::Hash;
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use substream::{Muxing, SubstreamEvent};

/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ConnectionId(usize);

impl ConnectionId {
/// Creates a `ConnectionId` from a non-negative integer.
///
/// This is primarily useful for creating connection IDs
/// in test environments. There is in general no guarantee
/// that all connection IDs are based on non-negative integers.
pub fn new(id: usize) -> Self {
ConnectionId(id)
}
}

/// The endpoint roles associated with a peer-to-peer communication channel.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Endpoint {
Expand Down Expand Up @@ -72,7 +88,40 @@ impl Endpoint {
}
}

/// The endpoint roles associated with a peer-to-peer connection.
/// The endpoint roles associated with a pending peer-to-peer connection.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum PendingPoint {
/// The socket comes from a dialer.
///
/// There is no single address associated with the Dialer of a pending
/// connection. Addresses are dialed in parallel. Only once the first dial
/// is successful is the address of the connection known.
Dialer,
/// The socket comes from a listener.
Listener {
/// Local connection address.
local_addr: Multiaddr,
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
},
}

impl From<ConnectedPoint> for PendingPoint {
fn from(endpoint: ConnectedPoint) -> Self {
match endpoint {
ConnectedPoint::Dialer { .. } => PendingPoint::Dialer,
ConnectedPoint::Listener {
local_addr,
send_back_addr,
} => PendingPoint::Listener {
local_addr,
send_back_addr,
},
}
}
}

/// The endpoint roles associated with an established peer-to-peer connection.
#[derive(PartialEq, Eq, Debug, Clone, Hash)]
pub enum ConnectedPoint {
/// We dialed the node.
Expand All @@ -84,7 +133,7 @@ pub enum ConnectedPoint {
Listener {
/// Local connection address.
local_addr: Multiaddr,
/// Stack of protocols used to send back data to the remote.
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
},
}
Expand Down Expand Up @@ -289,32 +338,23 @@ where
pub struct IncomingInfo<'a> {
/// Local connection address.
pub local_addr: &'a Multiaddr,
/// Stack of protocols used to send back data to the remote.
/// Address used to send back data to the remote.
pub send_back_addr: &'a Multiaddr,
}

impl<'a> IncomingInfo<'a> {
/// Builds the `ConnectedPoint` corresponding to the incoming connection.
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Listener {
/// Builds the [`PendingPoint`] corresponding to the incoming connection.
pub fn to_pending_point(&self) -> PendingPoint {
PendingPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
}

/// Borrowed information about an outgoing connection currently being negotiated.
#[derive(Debug, Copy, Clone)]
pub struct OutgoingInfo<'a> {
pub address: &'a Multiaddr,
pub peer_id: Option<&'a PeerId>,
}

impl<'a> OutgoingInfo<'a> {
/// Builds a `ConnectedPoint` corresponding to the outgoing connection.
/// Builds the [`ConnectedPoint`] corresponding to the incoming connection.
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Dialer {
address: self.address.clone(),
ConnectedPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
}
Expand Down
43 changes: 30 additions & 13 deletions core/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use crate::connection::ConnectionLimit;
use crate::transport::TransportError;
use crate::Multiaddr;
use std::{fmt, io};

/// Errors that can occur in the context of an established `Connection`.
Expand All @@ -29,10 +30,6 @@ pub enum ConnectionError<THandlerErr> {
// TODO: Eventually this should also be a custom error?
IO(io::Error),

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// The connection handler produced an error.
Handler(THandlerErr),
}
Expand All @@ -45,9 +42,6 @@ where
match self {
ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err),
ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err),
ConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
}
Expand All @@ -60,16 +54,31 @@ where
match self {
ConnectionError::IO(err) => Some(err),
ConnectionError::Handler(err) => Some(err),
ConnectionError::ConnectionLimit(..) => None,
}
}
}

/// Errors that can occur in the context of a pending outgoing `Connection`.
///
/// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to
/// [`PendingInboundConnectionError`], one or more [`TransportError`]s can occur for a single
/// connection.
pub type PendingOutboundConnectionError<TTransErr> =
PendingConnectionError<Vec<(Multiaddr, TransportError<TTransErr>)>>;

/// Errors that can occur in the context of a pending incoming `Connection`.
pub type PendingInboundConnectionError<TTransErr> =
PendingConnectionError<TransportError<TTransErr>>;

/// Errors that can occur in the context of a pending `Connection`.
#[derive(Debug)]
pub enum PendingConnectionError<TTransErr> {
/// An error occurred while negotiating the transport protocol(s).
Transport(TransportError<TTransErr>),
/// An error occurred while negotiating the transport protocol(s) on a connection.
Transport(TTransErr),

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// Pending connection attempt has been aborted.
Aborted,
Expand All @@ -85,14 +94,21 @@ pub enum PendingConnectionError<TTransErr> {

impl<TTransErr> fmt::Display for PendingConnectionError<TTransErr>
where
TTransErr: fmt::Display,
TTransErr: fmt::Display + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PendingConnectionError::IO(err) => write!(f, "Pending connection: I/O error: {}", err),
PendingConnectionError::Aborted => write!(f, "Pending connection: Aborted."),
PendingConnectionError::Transport(err) => {
write!(f, "Pending connection: Transport error: {}", err)
write!(
f,
"Pending connection: Transport error on connection: {}",
err
)
}
PendingConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
PendingConnectionError::InvalidPeerId => {
write!(f, "Pending connection: Invalid peer ID.")
Expand All @@ -108,9 +124,10 @@ where
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
PendingConnectionError::IO(err) => Some(err),
PendingConnectionError::Transport(err) => Some(err),
PendingConnectionError::Transport(_) => None,
PendingConnectionError::InvalidPeerId => None,
PendingConnectionError::Aborted => None,
PendingConnectionError::ConnectionLimit(..) => None,
}
}
}
Loading

0 comments on commit 40c5335

Please sign in to comment.