Skip to content

Commit

Permalink
refactor(relay): remove explicit stream tracking
Browse files Browse the repository at this point in the history
With libp2p#4306 resolved, we no longer need to carry around oneshots in the relay code that track our alive streams. This is now handled directly by `Connection`.

Pull-Request: libp2p#4744.
  • Loading branch information
thomaseizinger authored Oct 30, 2023
1 parent 1ce9fe0 commit 35a92df
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 77 deletions.
2 changes: 0 additions & 2 deletions protocols/relay/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,6 @@ impl NetworkBehaviour for Behaviour {
src_peer_id,
src_connection_id,
inbound_circuit_req,
dst_handler_notifier,
dst_stream,
dst_pending_data,
} => {
Expand All @@ -616,7 +615,6 @@ impl NetworkBehaviour for Behaviour {
circuit_id,
dst_peer_id: event_source,
inbound_circuit_req,
dst_handler_notifier,
dst_stream,
dst_pending_data,
}),
Expand Down
28 changes: 1 addition & 27 deletions protocols/relay/src/behaviour/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::protocol::{inbound_hop, outbound_stop};
use crate::{proto, HOP_PROTOCOL_NAME, STOP_PROTOCOL_NAME};
use bytes::Bytes;
use either::Either;
use futures::channel::oneshot::{self, Canceled};
use futures::future::{BoxFuture, FutureExt, TryFutureExt};
use futures::io::AsyncWriteExt;
use futures::stream::{FuturesUnordered, StreamExt};
Expand Down Expand Up @@ -79,7 +78,6 @@ pub enum In {
circuit_id: CircuitId,
dst_peer_id: PeerId,
inbound_circuit_req: inbound_hop::CircuitReq,
dst_handler_notifier: oneshot::Sender<()>,
dst_stream: Stream,
dst_pending_data: Bytes,
},
Expand Down Expand Up @@ -126,7 +124,6 @@ impl fmt::Debug for In {
circuit_id,
inbound_circuit_req: _,
dst_peer_id,
dst_handler_notifier: _,
dst_stream: _,
dst_pending_data: _,
} => f
Expand Down Expand Up @@ -195,7 +192,6 @@ pub enum Event {
src_peer_id: PeerId,
src_connection_id: ConnectionId,
inbound_circuit_req: inbound_hop::CircuitReq,
dst_handler_notifier: oneshot::Sender<()>,
dst_stream: Stream,
dst_pending_data: Bytes,
},
Expand Down Expand Up @@ -292,7 +288,6 @@ impl fmt::Debug for Event {
src_peer_id,
src_connection_id,
inbound_circuit_req: _,
dst_handler_notifier: _,
dst_stream: _,
dst_pending_data: _,
} => f
Expand Down Expand Up @@ -372,11 +367,6 @@ pub struct Handler {
PeerId,
Result<(), inbound_hop::UpgradeError>,
)>,
/// Tracks substreams lend out to other [`Handler`]s.
///
/// Contains a [`futures::future::Future`] for each lend out substream that
/// resolves once the substream is dropped.
alive_lend_out_substreams: FuturesUnordered<oneshot::Receiver<()>>,
/// Futures relaying data for circuit between two peers.
circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>,

Expand Down Expand Up @@ -411,7 +401,6 @@ impl Handler {
reservation_request_future: Default::default(),
circuit_accept_futures: Default::default(),
circuit_deny_futures: Default::default(),
alive_lend_out_substreams: Default::default(),
circuits: Default::default(),
active_reservation: Default::default(),
pending_connect_requests: Default::default(),
Expand Down Expand Up @@ -442,12 +431,9 @@ impl Handler {
.pop_front()
.expect("opened a stream without a pending stop command");

let (tx, rx) = oneshot::channel();
self.alive_lend_out_substreams.push(rx);

if self
.workers
.try_push(outbound_stop::connect(stream, stop_command, tx).map(Either::Right))
.try_push(outbound_stop::connect(stream, stop_command).map(Either::Right))
.is_err()
{
log::warn!("Dropping outbound stream because we are at capacity")
Expand Down Expand Up @@ -587,7 +573,6 @@ impl ConnectionHandler for Handler {
circuit_id,
dst_peer_id,
inbound_circuit_req,
dst_handler_notifier,
dst_stream,
dst_pending_data,
} => {
Expand All @@ -600,7 +585,6 @@ impl ConnectionHandler for Handler {
src_stream,
src_pending_data,
dst_peer_id,
dst_handler_notifier,
dst_stream,
dst_pending_data,
})
Expand Down Expand Up @@ -693,7 +677,6 @@ impl ConnectionHandler for Handler {
src_peer_id: circuit.src_peer_id,
src_connection_id: circuit.src_connection_id,
inbound_circuit_req: circuit.inbound_circuit_req,
dst_handler_notifier: circuit.dst_handler_notifier,
dst_stream: circuit.dst_stream,
dst_pending_data: circuit.dst_pending_data,
},
Expand Down Expand Up @@ -761,7 +744,6 @@ impl ConnectionHandler for Handler {
mut src_stream,
src_pending_data,
dst_peer_id,
dst_handler_notifier,
mut dst_stream,
dst_pending_data,
} = parts;
Expand All @@ -785,8 +767,6 @@ impl ConnectionHandler for Handler {
)
.await?;

// Inform destination handler that the stream to the destination is dropped.
drop(dst_handler_notifier);
Ok(())
}
.map(move |r| (circuit_id, dst_peer_id, r))
Expand Down Expand Up @@ -870,11 +850,6 @@ impl ConnectionHandler for Handler {
None => {}
}

// Check lend out substreams.
while let Poll::Ready(Some(Err(Canceled))) =
self.alive_lend_out_substreams.poll_next_unpin(cx)
{}

// Check keep alive status.
if self.active_reservation.is_none() {
if self.idle_at.is_none() {
Expand Down Expand Up @@ -925,7 +900,6 @@ struct CircuitParts {
src_stream: Stream,
src_pending_data: Bytes,
dst_peer_id: PeerId,
dst_handler_notifier: oneshot::Sender<()>,
dst_stream: Stream,
dst_pending_data: Bytes,
}
20 changes: 2 additions & 18 deletions protocols/relay/src/priv_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::protocol::{self, inbound_stop, outbound_hop};
use bytes::Bytes;
use either::Either;
use futures::channel::mpsc::Receiver;
use futures::channel::oneshot;
use futures::future::{BoxFuture, FutureExt};
use futures::io::{AsyncRead, AsyncWrite};
use futures::ready;
Expand Down Expand Up @@ -386,22 +385,13 @@ pub(crate) enum ConnectionState {
Operational {
read_buffer: Bytes,
substream: Stream,
/// "Drop notifier" pattern to signal to the transport that the connection has been dropped.
///
/// This is flagged as "dead-code" by the compiler because we never read from it here.
/// However, it is actual use is to trigger the `Canceled` error in the `Transport` when this `Sender` is dropped.
#[allow(dead_code)]
drop_notifier: oneshot::Sender<void::Void>,
},
}

impl Unpin for ConnectionState {}

impl ConnectionState {
pub(crate) fn new_inbound(
circuit: inbound_stop::Circuit,
drop_notifier: oneshot::Sender<void::Void>,
) -> Self {
pub(crate) fn new_inbound(circuit: inbound_stop::Circuit) -> Self {
ConnectionState::InboundAccepting {
accept: async {
let (substream, read_buffer) = circuit
Expand All @@ -411,22 +401,16 @@ impl ConnectionState {
Ok(ConnectionState::Operational {
read_buffer,
substream,
drop_notifier,
})
}
.boxed(),
}
}

pub(crate) fn new_outbound(
substream: Stream,
read_buffer: Bytes,
drop_notifier: oneshot::Sender<void::Void>,
) -> Self {
pub(crate) fn new_outbound(substream: Stream, read_buffer: Bytes) -> Self {
ConnectionState::Operational {
substream,
read_buffer,
drop_notifier,
}
}
}
Expand Down
24 changes: 1 addition & 23 deletions protocols/relay/src/priv_client/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,6 @@ pub struct Handler {

reservation: Reservation,

/// Tracks substreams lent out to the transport.
///
/// Contains a [`futures::future::Future`] for each lend out substream that
/// resolves once the substream is dropped.
alive_lend_out_substreams: FuturesUnordered<oneshot::Receiver<void::Void>>,

open_circuit_futs:
futures_bounded::FuturesSet<Result<inbound_stop::Circuit, inbound_stop::FatalUpgradeError>>,

Expand All @@ -177,7 +171,6 @@ impl Handler {
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
),
reservation: Reservation::None,
alive_lend_out_substreams: Default::default(),
open_circuit_futs: futures_bounded::FuturesSet::new(
STREAM_TIMEOUT,
MAX_CONCURRENT_STREAMS_PER_CONNECTION,
Expand Down Expand Up @@ -413,9 +406,7 @@ impl ConnectionHandler for Handler {
let src_peer_id = circuit.src_peer_id();
let limit = circuit.limit();

let (tx, rx) = oneshot::channel();
self.alive_lend_out_substreams.push(rx);
let connection = super::ConnectionState::new_inbound(circuit, tx);
let connection = super::ConnectionState::new_inbound(circuit);

pending_msgs.push_back(
transport::ToListenerMsg::IncomingRelayedConnection {
Expand Down Expand Up @@ -471,15 +462,6 @@ impl ConnectionHandler for Handler {
// Send errors to transport.
while let Poll::Ready(Some(())) = self.send_error_futs.poll_next_unpin(cx) {}

// Check status of lend out substreams.
loop {
match self.alive_lend_out_substreams.poll_next_unpin(cx) {
Poll::Ready(Some(Err(oneshot::Canceled))) => {}
Poll::Ready(Some(Ok(v))) => void::unreachable(v),
Poll::Ready(None) | Poll::Pending => break,
}
}

Poll::Pending
}

Expand Down Expand Up @@ -526,17 +508,13 @@ impl ConnectionHandler for Handler {
}
}
outbound_hop::OutboundStreamInfo::CircuitConnection(cmd) => {
let (tx, rx) = oneshot::channel();
self.alive_lend_out_substreams.push(rx);

if self
.outbound_circuits
.try_push(
outbound_hop::handle_connection_message_response(
stream,
self.remote_peer_id,
cmd,
tx,
)
.map_ok(Either::Right),
)
Expand Down
4 changes: 1 addition & 3 deletions protocols/relay/src/protocol/outbound_hop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use futures::prelude::*;
use futures_timer::Delay;
use log::debug;
use thiserror::Error;
use void::Void;

use libp2p_core::Multiaddr;
use libp2p_identity::PeerId;
Expand Down Expand Up @@ -183,7 +182,6 @@ pub(crate) async fn handle_connection_message_response(
protocol: Stream,
remote_peer_id: PeerId,
con_command: Command,
tx: oneshot::Sender<Void>,
) -> Result<Result<Option<Circuit>, CircuitFailedReason>, FatalUpgradeError> {
let msg = proto::HopMessage {
type_pb: proto::HopMessageType::CONNECT,
Expand Down Expand Up @@ -259,7 +257,7 @@ pub(crate) async fn handle_connection_message_response(
);

match con_command.send_back.send(Ok(priv_client::Connection {
state: priv_client::ConnectionState::new_outbound(io, read_buffer.freeze(), tx),
state: priv_client::ConnectionState::new_outbound(io, read_buffer.freeze()),
})) {
Ok(()) => Ok(Ok(Some(Circuit { limit }))),
Err(_) => {
Expand Down
4 changes: 0 additions & 4 deletions protocols/relay/src/protocol/outbound_stop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::time::Duration;

use asynchronous_codec::{Framed, FramedParts};
use bytes::Bytes;
use futures::channel::oneshot::{self};
use futures::prelude::*;
use thiserror::Error;

Expand Down Expand Up @@ -77,7 +76,6 @@ pub enum FatalUpgradeError {
pub(crate) async fn connect(
io: Stream,
stop_command: PendingConnect,
tx: oneshot::Sender<()>,
) -> Result<Result<Circuit, CircuitFailed>, FatalUpgradeError> {
let msg = proto::StopMessage {
type_pb: proto::StopMessageType::CONNECT,
Expand Down Expand Up @@ -164,7 +162,6 @@ pub(crate) async fn connect(
src_peer_id: stop_command.src_peer_id,
src_connection_id: stop_command.src_connection_id,
inbound_circuit_req: stop_command.inbound_circuit_req,
dst_handler_notifier: tx,
dst_stream: io,
dst_pending_data: read_buffer.freeze(),
}))
Expand All @@ -175,7 +172,6 @@ pub(crate) struct Circuit {
pub(crate) src_peer_id: PeerId,
pub(crate) src_connection_id: ConnectionId,
pub(crate) inbound_circuit_req: inbound_hop::CircuitReq,
pub(crate) dst_handler_notifier: oneshot::Sender<()>,
pub(crate) dst_stream: Stream,
pub(crate) dst_pending_data: Bytes,
}
Expand Down

0 comments on commit 35a92df

Please sign in to comment.