Skip to content

Commit

Permalink
refactor: migrate upgrades to {In,Out}boundConnectionUpgrade
Browse files Browse the repository at this point in the history
This needs to be done in one PR because we need to remove the blanket impl at the same time we add the new ones. I also chose to duplicate `SelectUpgrade` for the `libp2p::SwarmBuilder` because the `SelectUpgrade` within `libp2p-core` is really only meant to be used for protocol upgrade (which will go away at some point). This tiny bit of code duplication doesn't hurt and will help us in the future.

Resolves: libp2p#4521.

Pull-Request: libp2p#4695.
  • Loading branch information
thomaseizinger authored Oct 21, 2023
1 parent 1e3ffeb commit ed02a10
Show file tree
Hide file tree
Showing 28 changed files with 213 additions and 116 deletions.
2 changes: 2 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## 0.41.0 - unreleased

- Remove blanket-impl of `{In,Out}boundUpgrade` for `{In,Out}boundConnectionUpgrade`.
See [PR 4695](https://github.com/libp2p/rust-libp2p/pull/4695).

## 0.40.1

Expand Down
28 changes: 0 additions & 28 deletions core/src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,31 +157,3 @@ pub trait OutboundConnectionUpgrade<T>: UpgradeInfo {
/// The `info` is the identifier of the protocol, as produced by `protocol_info`.
fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future;
}

// Blanket implementation for InboundConnectionUpgrade based on InboundUpgrade for backwards compatibility
impl<U, T> InboundConnectionUpgrade<T> for U
where
U: InboundUpgrade<T>,
{
type Output = <U as InboundUpgrade<T>>::Output;
type Error = <U as InboundUpgrade<T>>::Error;
type Future = <U as InboundUpgrade<T>>::Future;

fn upgrade_inbound(self, socket: T, info: Self::Info) -> Self::Future {
self.upgrade_inbound(socket, info)
}
}

// Blanket implementation for OutboundConnectionUpgrade based on OutboundUpgrade for backwards compatibility
impl<U, T> OutboundConnectionUpgrade<T> for U
where
U: OutboundUpgrade<T>,
{
type Output = <U as OutboundUpgrade<T>>::Output;
type Error = <U as OutboundUpgrade<T>>::Error;
type Future = <U as OutboundUpgrade<T>>::Future;

fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future {
self.upgrade_outbound(socket, info)
}
}
8 changes: 5 additions & 3 deletions core/tests/transport_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

use futures::prelude::*;
use libp2p_core::transport::{ListenerId, MemoryTransport, Transport};
use libp2p_core::upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_core::upgrade::{
self, InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo,
};
use libp2p_identity as identity;
use libp2p_mplex::MplexConfig;
use libp2p_noise as noise;
Expand All @@ -40,7 +42,7 @@ impl UpgradeInfo for HelloUpgrade {
}
}

impl<C> InboundUpgrade<C> for HelloUpgrade
impl<C> InboundConnectionUpgrade<C> for HelloUpgrade
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
Expand All @@ -58,7 +60,7 @@ where
}
}

impl<C> OutboundUpgrade<C> for HelloUpgrade
impl<C> OutboundConnectionUpgrade<C> for HelloUpgrade
where
C: AsyncWrite + AsyncRead + Send + Unpin + 'static,
{
Expand Down
1 change: 1 addition & 0 deletions libp2p/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::marker::PhantomData;

mod phase;
mod select_muxer;
mod select_security;

/// Build a [`Swarm`](libp2p_swarm::Swarm) by combining an identity, a set of
Expand Down
7 changes: 4 additions & 3 deletions libp2p/src/builder/phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ use swarm::*;
use tcp::*;
use websocket::*;

use super::select_muxer::SelectMuxerUpgrade;
use super::select_security::SelectSecurityUpgrade;
use super::SwarmBuilder;

use libp2p_core::{muxing::StreamMuxerBox, upgrade::SelectUpgrade, Transport};
use libp2p_core::{muxing::StreamMuxerBox, Transport};
use libp2p_identity::Keypair;

pub trait IntoSecurityUpgrade<C> {
Expand Down Expand Up @@ -94,15 +95,15 @@ where
U1: IntoMultiplexerUpgrade<C>,
U2: IntoMultiplexerUpgrade<C>,
{
type Upgrade = SelectUpgrade<U1::Upgrade, U2::Upgrade>;
type Upgrade = SelectMuxerUpgrade<U1::Upgrade, U2::Upgrade>;

fn into_multiplexer_upgrade(self) -> Self::Upgrade {
let (f1, f2) = self;

let u1 = f1.into_multiplexer_upgrade();
let u2 = f2.into_multiplexer_upgrade();

SelectUpgrade::new(u1, u2)
SelectMuxerUpgrade::new(u1, u2)
}
}

Expand Down
15 changes: 8 additions & 7 deletions libp2p/src/builder/phase/other_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::convert::Infallible;
use std::marker::PhantomData;
use std::sync::Arc;

use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
use libp2p_core::Transport;
#[cfg(feature = "relay")]
use libp2p_core::{InboundUpgrade, Negotiated, OutboundUpgrade, UpgradeInfo};
use libp2p_core::{Negotiated, UpgradeInfo};
#[cfg(feature = "relay")]
use libp2p_identity::PeerId;

Expand Down Expand Up @@ -119,19 +120,19 @@ impl<T: AuthenticatedMultiplexedTransport, Provider>
SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
SecError: std::error::Error + Send + Sync + 'static,
SecUpgrade: IntoSecurityUpgrade<libp2p_relay::client::Connection>,
SecUpgrade::Upgrade: InboundUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
<SecUpgrade::Upgrade as InboundUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
<SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
<SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
<SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
<<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::Info: Send,

MuxStream: libp2p_core::muxing::StreamMuxer + Send + 'static,
MuxStream::Substream: Send + 'static,
MuxStream::Error: Send + Sync + 'static,
MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
<MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
<MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
<MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
<MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxError: std::error::Error + Send + Sync + 'static,
<<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
Expand Down
25 changes: 13 additions & 12 deletions libp2p/src/builder/phase/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::*;
use crate::SwarmBuilder;
#[cfg(all(not(target_arch = "wasm32"), feature = "websocket"))]
use libp2p_core::muxing::StreamMuxer;
use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
#[cfg(any(
feature = "relay",
all(not(target_arch = "wasm32"), feature = "websocket")
Expand Down Expand Up @@ -90,19 +91,19 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Quic
SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
SecError: std::error::Error + Send + Sync + 'static,
SecUpgrade: IntoSecurityUpgrade<libp2p_relay::client::Connection>,
SecUpgrade::Upgrade: InboundUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
<SecUpgrade::Upgrade as InboundUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
<SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
<SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
<SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
<<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::Info: Send,

MuxStream: libp2p_core::muxing::StreamMuxer + Send + 'static,
MuxStream::Substream: Send + 'static,
MuxStream::Error: Send + Sync + 'static,
MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
<MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
<MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
<MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
<MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxError: std::error::Error + Send + Sync + 'static,
<<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
Expand Down Expand Up @@ -207,19 +208,19 @@ macro_rules! impl_quic_phase_with_websocket {
SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
SecError: std::error::Error + Send + Sync + 'static,
SecUpgrade: IntoSecurityUpgrade<$websocketStream>,
SecUpgrade::Upgrade: InboundUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
<SecUpgrade::Upgrade as InboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
<SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<$websocketStream>>>::Future: Send,
SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<$websocketStream>, Output = (libp2p_identity::PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
<SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
<SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<$websocketStream>>>::Future: Send,
<<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<SecUpgrade as IntoSecurityUpgrade<$websocketStream>>::Upgrade as UpgradeInfo>::Info: Send,

MuxStream: StreamMuxer + Send + 'static,
MuxStream::Substream: Send + 'static,
MuxStream::Error: Send + Sync + 'static,
MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
<MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
<MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
<MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
<MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxError: std::error::Error + Send + Sync + 'static,
<<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
Expand Down
13 changes: 7 additions & 6 deletions libp2p/src/builder/phase/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::marker::PhantomData;

#[cfg(feature = "relay")]
use libp2p_core::muxing::StreamMuxerBox;
use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade};
#[cfg(feature = "relay")]
use libp2p_core::Transport;
#[cfg(any(feature = "relay", feature = "websocket"))]
Expand Down Expand Up @@ -59,19 +60,19 @@ impl<Provider, T: AuthenticatedMultiplexedTransport> SwarmBuilder<Provider, Rela
SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static,
SecError: std::error::Error + Send + Sync + 'static,
SecUpgrade: IntoSecurityUpgrade<libp2p_relay::client::Connection>,
SecUpgrade::Upgrade: InboundUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
<SecUpgrade::Upgrade as InboundUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
<SecUpgrade::Upgrade as OutboundUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
SecUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>, Output = (PeerId, SecStream), Error = SecError> + Clone + Send + 'static,
<SecUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
<SecUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<libp2p_relay::client::Connection>>>::Future: Send,
<<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<SecUpgrade as IntoSecurityUpgrade<libp2p_relay::client::Connection>>::Upgrade as UpgradeInfo>::Info: Send,

MuxStream: StreamMuxer + Send + 'static,
MuxStream::Substream: Send + 'static,
MuxStream::Error: Send + Sync + 'static,
MuxUpgrade: IntoMultiplexerUpgrade<SecStream>,
MuxUpgrade::Upgrade: InboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
<MuxUpgrade::Upgrade as InboundUpgrade<Negotiated<SecStream>>>::Future: Send,
<MuxUpgrade::Upgrade as OutboundUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxUpgrade::Upgrade: InboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + OutboundConnectionUpgrade<Negotiated<SecStream>, Output = MuxStream, Error = MuxError> + Clone + Send + 'static,
<MuxUpgrade::Upgrade as InboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
<MuxUpgrade::Upgrade as OutboundConnectionUpgrade<Negotiated<SecStream>>>::Future: Send,
MuxError: std::error::Error + Send + Sync + 'static,
<<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send,
<<MuxUpgrade as IntoMultiplexerUpgrade<SecStream>>::Upgrade as UpgradeInfo>::Info: Send,
Expand Down
Loading

0 comments on commit ed02a10

Please sign in to comment.