From 60fe85436fdab640a878d5c1d9929d3ed3a13e03 Mon Sep 17 00:00:00 2001 From: chunningham Date: Thu, 6 Apr 2023 01:15:37 +1000 Subject: [PATCH] remove exchange-protocol subcrate --- Cargo.toml | 3 +- exchange-protocol/CHANGELOG.md | 190 ----- exchange-protocol/Cargo.toml | 33 - exchange-protocol/src/codec.rs | 83 -- exchange-protocol/src/handler.rs | 435 ---------- exchange-protocol/src/handler/protocol.rs | 189 ----- exchange-protocol/src/lib.rs | 952 ---------------------- exchange-protocol/tests/ping.rs | 388 --------- 8 files changed, 1 insertion(+), 2272 deletions(-) delete mode 100644 exchange-protocol/CHANGELOG.md delete mode 100644 exchange-protocol/Cargo.toml delete mode 100644 exchange-protocol/src/codec.rs delete mode 100644 exchange-protocol/src/handler.rs delete mode 100644 exchange-protocol/src/handler/protocol.rs delete mode 100644 exchange-protocol/src/lib.rs delete mode 100644 exchange-protocol/tests/ping.rs diff --git a/Cargo.toml b/Cargo.toml index e32cdc62..f438dbd8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,8 +63,7 @@ path = "lib/" members = [ "lib", "sdk-wasm", - "sdk", - "exchange-protocol" + "sdk" ] exclude = [ diff --git a/exchange-protocol/CHANGELOG.md b/exchange-protocol/CHANGELOG.md deleted file mode 100644 index a97d2400..00000000 --- a/exchange-protocol/CHANGELOG.md +++ /dev/null @@ -1,190 +0,0 @@ -# 0.23.0 - -- Update to `libp2p-core` `v0.38.0`. - -- Update to `libp2p-swarm` `v0.41.0`. - -- Replace `RequestResponse`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods. - See [PR 3011]. - -- Replace `RequestResponseHandler`'s `ConnectionHandler` implemention `inject_*` methods - with the new `on_*` methods. See [PR 3085]. - -- Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090]. - -[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 -[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 -[PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090 - -# 0.22.0 - -- Bump rand to 0.8 and quickcheck to 1. See [PR 2857]. - -- Update to `libp2p-core` `v0.37.0`. - -- Update to `libp2p-swarm` `v0.40.0`. - -[PR 2857]: https://github.com/libp2p/rust-libp2p/pull/2857 - -# 0.21.0 - -- Update to `libp2p-swarm` `v0.39.0`. - -- Update to `libp2p-core` `v0.36.0`. - -# 0.20.0 - -- Update to `libp2p-swarm` `v0.38.0`. - -- Update to `libp2p-core` `v0.35.0`. - -# 0.19.0 - -- Update to `libp2p-core` `v0.34.0`. - -- Update to `libp2p-swarm` `v0.37.0`. - -# 0.18.0 - -- Update to `libp2p-core` `v0.33.0`. - -- Update to `libp2p-swarm` `v0.36.0`. - -# 0.17.0 - -- Update to `libp2p-swarm` `v0.35.0`. - -# 0.16.0 [2022-02-22] - -- Update to `libp2p-core` `v0.32.0`. - -- Update to `libp2p-swarm` `v0.34.0`. - -- Merge NetworkBehaviour's inject_\* paired methods (see PR 2445). - -[PR 2445]: https://github.com/libp2p/rust-libp2p/pull/2445 - -# 0.15.0 [2022-01-27] - -- Update dependencies. - -- Remove unused `lru` crate (see [PR 2358]). - -- Migrate to Rust edition 2021 (see [PR 2339]). - -[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339 -[PR 2358]: https://github.com/libp2p/rust-libp2p/pull/2358 - -# 0.14.0 [2021-11-16] - -- Use `instant` instead of `wasm-timer` (see [PR 2245]). - -- Update dependencies. - -[PR 2245]: https://github.com/libp2p/rust-libp2p/pull/2245 - -# 0.13.0 [2021-11-01] - -- Make default features of `libp2p-core` optional. - [PR 2181](https://github.com/libp2p/rust-libp2p/pull/2181) - -- Update dependencies. - -- Manually implement `Debug` for `RequestResponseHandlerEvent` and - `RequestProtocol`. See [PR 2183]. - -- Remove `RequestResponse::throttled` and the `throttled` module. - See [PR 2236]. - -[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183 -[PR 2236]: https://github.com/libp2p/rust-libp2p/pull/2236 - -# 0.12.0 [2021-07-12] - -- Update dependencies. - -# 0.11.0 [2021-04-13] - -- Update `libp2p-swarm`. -- Implement `std::error::Error` for `InboundFailure` and `OutboundFailure` [PR - 2033](https://github.com/libp2p/rust-libp2p/pull/2033). - -# 0.10.0 [2021-03-17] - -- Update `libp2p-swarm`. - -- Close stream even when no response has been sent. - [PR 1987](https://github.com/libp2p/rust-libp2p/pull/1987). - -- Update dependencies. - -# 0.9.1 [2021-02-15] - -- Make `is_pending_outbound` return true on pending connection. - [PR 1928](https://github.com/libp2p/rust-libp2p/pull/1928). - -- Update dependencies. - -# 0.9.0 [2021-01-12] - -- Update dependencies. - -- Re-export `throttled`-specific response channel. [PR - 1902](https://github.com/libp2p/rust-libp2p/pull/1902). - -# 0.8.0 [2020-12-17] - -- Update `libp2p-swarm` and `libp2p-core`. - -- Emit `InboundFailure::ConnectionClosed` for inbound requests that failed due - to the underlying connection closing. - [PR 1886](https://github.com/libp2p/rust-libp2p/pull/1886). - -- Derive Clone for `InboundFailure` and `Outbound}Failure`. - [PR 1891](https://github.com/libp2p/rust-libp2p/pull/1891) - -# 0.7.0 [2020-12-08] - -- Refine emitted events for inbound requests, introducing - the `ResponseSent` event and the `ResponseOmission` - inbound failures. This effectively removes previous - support for one-way protocols without responses. - [PR 1867](https://github.com/libp2p/rust-libp2p/pull/1867). - -# 0.6.0 [2020-11-25] - -- Update `libp2p-swarm` and `libp2p-core`. - -# 0.5.0 [2020-11-09] - -- Update dependencies. - -# 0.4.0 [2020-10-16] - -- Update dependencies. - -# 0.3.0 [2020-09-09] - -- Add support for opt-in request-based flow-control to any - request-response protocol via `RequestResponse::throttled()`. - [PR 1726](https://github.com/libp2p/rust-libp2p/pull/1726). - -- Update `libp2p-swarm` and `libp2p-core`. - -# 0.2.0 [2020-08-18] - -- Fixed connection keep-alive, permitting connections to close due - to inactivity. -- Bump `libp2p-core` and `libp2p-swarm` dependencies. - -# 0.1.1 - -- Always properly `close()` the substream after sending requests and -responses in the `InboundUpgrade` and `OutboundUpgrade`. Otherwise this is -left to `RequestResponseCodec::write_request` and `RequestResponseCodec::write_response`, -which can be a pitfall and lead to subtle problems (see e.g. -https://github.com/libp2p/rust-libp2p/pull/1606). - -# 0.1.0 - -- Initial release. diff --git a/exchange-protocol/Cargo.toml b/exchange-protocol/Cargo.toml deleted file mode 100644 index 216a21df..00000000 --- a/exchange-protocol/Cargo.toml +++ /dev/null @@ -1,33 +0,0 @@ -[package] -name = "exchange-protocol" -edition = "2021" -rust-version = "1.65.0" -description = "Streaming Request/Response Protocols" -version = "0.1.0" -authors = ["Parity Technologies "] -license = "MIT" -keywords = ["peer-to-peer", "libp2p", "networking"] -categories = ["network-programming", "asynchronous"] - -[dependencies] -async-trait = "0.1" -bytes = "1" -futures = { default-features = false, version = "0.3", features = ["alloc", "std"] } -instant = "0.1.11" -libp2p = { default-features = false, version = "0.50.0" } -log = "0.4.11" -rand = "0.8" -smallvec = "1.6.1" -unsigned-varint = { version = "0.7", features = ["std", "futures"] } - -[dev-dependencies] -async-std = "1.6.2" -env_logger = "0.9.0" -rand = "0.8" - -# Passing arguments to the docsrs builder in order to properly document cfg's. -# More information: https://docs.rs/about/builds#cross-compiling -[package.metadata.docs.rs] -all-features = true -rustdoc-args = ["--cfg", "docsrs"] -rustc-args = ["--cfg", "docsrs"] diff --git a/exchange-protocol/src/codec.rs b/exchange-protocol/src/codec.rs deleted file mode 100644 index 3e7590b1..00000000 --- a/exchange-protocol/src/codec.rs +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -pub use libp2p::core::ProtocolName; - -use async_trait::async_trait; -use futures::prelude::*; -use std::io; - -/// A `RequestResponseCodec` defines the request and response types -/// for a [`RequestResponse`](crate::RequestResponse) protocol or -/// protocol family and how they are encoded / decoded on an I/O stream. -#[async_trait] -pub trait RequestResponseCodec { - /// The type of protocol(s) or protocol versions being negotiated. - type Protocol: ProtocolName + Send + Clone; - /// The type of inbound and outbound requests. - type Request: Send; - /// The type of inbound and outbound responses. - type Response: Send - where - T: AsyncRead + Send; - - /// Reads a request from the given I/O stream according to the - /// negotiated protocol. - async fn read_request( - &mut self, - protocol: &Self::Protocol, - io: T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send; - - /// Reads a response from the given I/O stream according to the - /// negotiated protocol. - async fn read_response( - &mut self, - protocol: &Self::Protocol, - io: T, - ) -> io::Result> - where - T: AsyncRead + Unpin + Send; - - /// Writes a request to the given I/O stream according to the - /// negotiated protocol. - async fn write_request( - &mut self, - protocol: &Self::Protocol, - io: T, - req: Self::Request, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send; - - /// Writes a response to the given I/O stream according to the - /// negotiated protocol. - async fn write_response( - &mut self, - protocol: &Self::Protocol, - io: T, - res: Self::Response, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - R: AsyncRead + Send; -} diff --git a/exchange-protocol/src/handler.rs b/exchange-protocol/src/handler.rs deleted file mode 100644 index 55a899dc..00000000 --- a/exchange-protocol/src/handler.rs +++ /dev/null @@ -1,435 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -mod protocol; - -use crate::codec::RequestResponseCodec; -use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; - -use libp2p::swarm::handler::{ - ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, - ListenUpgradeError, -}; -pub use protocol::{ProtocolSupport, RequestProtocol, ResponseProtocol}; - -use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; -use instant::Instant; -use libp2p::core::upgrade::{NegotiationError, UpgradeError}; -use libp2p::swarm::{ - handler::{ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive}, - NegotiatedSubstream, SubstreamProtocol, -}; -use smallvec::SmallVec; -use std::{ - collections::VecDeque, - fmt, io, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, - task::{Context, Poll}, - time::Duration, -}; - -/// A connection handler of a `RequestResponse` protocol. -#[doc(hidden)] -pub struct RequestResponseHandler -where - TCodec: RequestResponseCodec, - R: AsyncRead + Send, -{ - /// The supported inbound protocols. - inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, - /// The request/response message codec. - codec: TCodec, - /// The keep-alive timeout of idle connections. A connection is considered - /// idle if there are no outbound substreams. - keep_alive_timeout: Duration, - /// The timeout for inbound and outbound substreams (i.e. request - /// and response processing). - substream_timeout: Duration, - /// The current connection keep-alive. - keep_alive: KeepAlive, - /// A pending fatal error that results in the connection being closed. - pending_error: Option>, - /// Queue of events to emit in `poll()`. - pending_events: VecDeque>, - /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`. - outbound: VecDeque>, - /// Inbound upgrades waiting for the incoming request. - inbound: FuturesUnordered< - BoxFuture< - 'static, - Result< - ( - (RequestId, TCodec::Request), - oneshot::Sender>, - ), - oneshot::Canceled, - >, - >, - >, - inbound_request_id: Arc, -} - -impl RequestResponseHandler -where - TCodec: RequestResponseCodec + Send + Clone + 'static, - R: AsyncRead + Send + 'static, -{ - pub(super) fn new( - inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, - codec: TCodec, - keep_alive_timeout: Duration, - substream_timeout: Duration, - inbound_request_id: Arc, - ) -> Self { - Self { - inbound_protocols, - codec, - keep_alive: KeepAlive::Yes, - keep_alive_timeout, - substream_timeout, - outbound: VecDeque::new(), - inbound: FuturesUnordered::new(), - pending_events: VecDeque::new(), - pending_error: None, - inbound_request_id, - } - } - - fn on_fully_negotiated_inbound( - &mut self, - FullyNegotiatedInbound { - protocol: sent, - info: request_id, - }: FullyNegotiatedInbound< - ::InboundProtocol, - ::InboundOpenInfo, - >, - ) { - if sent { - self.pending_events - .push_back(RequestResponseHandlerEvent::ResponseSent(request_id)) - } else { - self.pending_events - .push_back(RequestResponseHandlerEvent::ResponseOmission(request_id)) - } - } - - fn on_dial_upgrade_error( - &mut self, - DialUpgradeError { info, error }: DialUpgradeError< - ::OutboundOpenInfo, - ::OutboundProtocol, - >, - ) { - match error { - ConnectionHandlerUpgrErr::Timeout => { - self.pending_events - .push_back(RequestResponseHandlerEvent::OutboundTimeout(info)); - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { - // The remote merely doesn't support the protocol(s) we requested. - // This is no reason to close the connection, which may - // successfully communicate with other protocols already. - // An event is reported to permit user code to react to the fact that - // the remote peer does not support the requested protocol(s). - self.pending_events.push_back( - RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info), - ); - } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error); - } - } - } - fn on_listen_upgrade_error( - &mut self, - ListenUpgradeError { info, error }: ListenUpgradeError< - ::InboundOpenInfo, - ::InboundProtocol, - >, - ) { - match error { - ConnectionHandlerUpgrErr::Timeout => self - .pending_events - .push_back(RequestResponseHandlerEvent::InboundTimeout(info)), - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { - // The local peer merely doesn't support the protocol(s) requested. - // This is no reason to close the connection, which may - // successfully communicate with other protocols already. - // An event is reported to permit user code to react to the fact that - // the local peer does not support the requested protocol(s). - self.pending_events.push_back( - RequestResponseHandlerEvent::InboundUnsupportedProtocols(info), - ); - } - _ => { - // Anything else is considered a fatal error or misbehaviour of - // the remote peer and results in closing the connection. - self.pending_error = Some(error); - } - } - } -} - -/// The events emitted by the [`RequestResponseHandler`]. -#[doc(hidden)] -pub enum RequestResponseHandlerEvent -where - TCodec: RequestResponseCodec, - R: AsyncRead + Send, -{ - /// A request has been received. - Request { - request_id: RequestId, - request: TCodec::Request, - sender: oneshot::Sender>, - }, - /// A response has been received. - Response { - request_id: RequestId, - response: TCodec::Response, - }, - /// A response to an inbound request has been sent. - ResponseSent(RequestId), - /// A response to an inbound request was omitted as a result - /// of dropping the response `sender` of an inbound `Request`. - ResponseOmission(RequestId), - /// An outbound request timed out while sending the request - /// or waiting for the response. - OutboundTimeout(RequestId), - /// An outbound request failed to negotiate a mutually supported protocol. - OutboundUnsupportedProtocols(RequestId), - /// An inbound request timed out while waiting for the request - /// or sending the response. - InboundTimeout(RequestId), - /// An inbound request failed to negotiate a mutually supported protocol. - InboundUnsupportedProtocols(RequestId), -} - -impl fmt::Debug - for RequestResponseHandlerEvent -where - R: Send, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - RequestResponseHandlerEvent::Request { - request_id, - request: _, - sender: _, - } => f - .debug_struct("RequestResponseHandlerEvent::Request") - .field("request_id", request_id) - .finish(), - RequestResponseHandlerEvent::Response { - request_id, - response: _, - } => f - .debug_struct("RequestResponseHandlerEvent::Response") - .field("request_id", request_id) - .finish(), - RequestResponseHandlerEvent::ResponseSent(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::ResponseSent") - .field(request_id) - .finish(), - RequestResponseHandlerEvent::ResponseOmission(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::ResponseOmission") - .field(request_id) - .finish(), - RequestResponseHandlerEvent::OutboundTimeout(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::OutboundTimeout") - .field(request_id) - .finish(), - RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::OutboundUnsupportedProtocols") - .field(request_id) - .finish(), - RequestResponseHandlerEvent::InboundTimeout(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::InboundTimeout") - .field(request_id) - .finish(), - RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::InboundUnsupportedProtocols") - .field(request_id) - .finish(), - } - } -} - -impl ConnectionHandler for RequestResponseHandler -where - TCodec: RequestResponseCodec + Send + Clone + 'static, - R: AsyncRead + Send + 'static, -{ - type InEvent = RequestProtocol; - type OutEvent = RequestResponseHandlerEvent; - type Error = ConnectionHandlerUpgrErr; - type InboundProtocol = ResponseProtocol; - type OutboundProtocol = RequestProtocol; - type OutboundOpenInfo = RequestId; - type InboundOpenInfo = RequestId; - - fn listen_protocol(&self) -> SubstreamProtocol { - // A channel for notifying the handler when the inbound - // upgrade received the request. - let (rq_send, rq_recv) = oneshot::channel(); - - // A channel for notifying the inbound upgrade when the - // response is sent. - let (rs_send, rs_recv) = oneshot::channel(); - - let request_id = RequestId(self.inbound_request_id.fetch_add(1, Ordering::Relaxed)); - - // By keeping all I/O inside the `ResponseProtocol` and thus the - // inbound substream upgrade via above channels, we ensure that it - // is all subject to the configured timeout without extra bookkeeping - // for inbound substreams as well as their timeouts and also make the - // implementation of inbound and outbound upgrades symmetric in - // this sense. - let proto = ResponseProtocol { - protocols: self.inbound_protocols.clone(), - codec: self.codec.clone(), - request_sender: rq_send, - response_receiver: rs_recv, - request_id, - }; - - // The handler waits for the request to come in. It then emits - // `RequestResponseHandlerEvent::Request` together with a - // `ResponseChannel`. - self.inbound - .push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed()); - - SubstreamProtocol::new(proto, request_id).with_timeout(self.substream_timeout) - } - - fn on_behaviour_event(&mut self, request: Self::InEvent) { - self.keep_alive = KeepAlive::Yes; - self.outbound.push_back(request); - } - - fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, RequestId, Self::OutEvent, Self::Error>> - { - // Check for a pending (fatal) error. - if let Some(err) = self.pending_error.take() { - // The handler will not be polled again by the `Swarm`. - return Poll::Ready(ConnectionHandlerEvent::Close(err)); - } - - // Drain pending events. - if let Some(event) = self.pending_events.pop_front() { - return Poll::Ready(ConnectionHandlerEvent::Custom(event)); - } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { - self.pending_events.shrink_to_fit(); - } - - // Check for inbound requests. - while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) { - match result { - Ok(((id, rq), rs_sender)) => { - // We received an inbound request. - self.keep_alive = KeepAlive::Yes; - return Poll::Ready(ConnectionHandlerEvent::Custom( - RequestResponseHandlerEvent::Request { - request_id: id, - request: rq, - sender: rs_sender, - }, - )); - } - Err(oneshot::Canceled) => { - // The inbound upgrade has errored or timed out reading - // or waiting for the request. The handler is informed - // via `inject_listen_upgrade_error`. - } - } - } - - // Emit outbound requests. - if let Some(request) = self.outbound.pop_front() { - let info = request.request_id; - return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(request, info) - .with_timeout(self.substream_timeout), - }); - } - - debug_assert!(self.outbound.is_empty()); - - if self.outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { - self.outbound.shrink_to_fit(); - } - - if self.inbound.is_empty() && self.keep_alive.is_yes() { - // No new inbound or outbound requests. However, we may just have - // started the latest inbound or outbound upgrade(s), so make sure - // the keep-alive timeout is preceded by the substream timeout. - let until = Instant::now() + self.substream_timeout + self.keep_alive_timeout; - self.keep_alive = KeepAlive::Until(until); - } - - Poll::Pending - } - - fn on_connection_event( - &mut self, - event: ConnectionEvent< - Self::InboundProtocol, - Self::OutboundProtocol, - Self::InboundOpenInfo, - Self::OutboundOpenInfo, - >, - ) { - match event { - ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { - self.on_fully_negotiated_inbound(fully_negotiated_inbound) - } - ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { - protocol: response, - info: request_id, - }) => { - self.pending_events - .push_back(RequestResponseHandlerEvent::Response { - request_id, - response, - }); - } - ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { - self.on_dial_upgrade_error(dial_upgrade_error) - } - ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { - self.on_listen_upgrade_error(listen_upgrade_error) - } - ConnectionEvent::AddressChange(_) => {} - } - } -} diff --git a/exchange-protocol/src/handler/protocol.rs b/exchange-protocol/src/handler/protocol.rs deleted file mode 100644 index e7be4f37..00000000 --- a/exchange-protocol/src/handler/protocol.rs +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! The definition of a request/response protocol via inbound -//! and outbound substream upgrades. The inbound upgrade -//! receives a request and sends a response, whereas the -//! outbound upgrade send a request and receives a response. - -use crate::codec::RequestResponseCodec; -use crate::RequestId; - -use futures::{channel::oneshot, future::BoxFuture, prelude::*}; -use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use libp2p::swarm::NegotiatedSubstream; -use smallvec::SmallVec; -use std::{fmt, io}; - -/// The level of support for a particular protocol. -#[derive(Debug, Clone)] -pub enum ProtocolSupport { - /// The protocol is only supported for inbound requests. - Inbound, - /// The protocol is only supported for outbound requests. - Outbound, - /// The protocol is supported for inbound and outbound requests. - Full, -} - -impl ProtocolSupport { - /// Whether inbound requests are supported. - pub fn inbound(&self) -> bool { - match self { - ProtocolSupport::Inbound | ProtocolSupport::Full => true, - ProtocolSupport::Outbound => false, - } - } - - /// Whether outbound requests are supported. - pub fn outbound(&self) -> bool { - match self { - ProtocolSupport::Outbound | ProtocolSupport::Full => true, - ProtocolSupport::Inbound => false, - } - } -} - -/// Response substream upgrade protocol. -/// -/// Receives a request and sends a response. -#[derive(Debug)] -pub struct ResponseProtocol -where - TCodec: RequestResponseCodec, - R: AsyncRead + Send, -{ - pub(crate) codec: TCodec, - pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, - pub(crate) request_sender: oneshot::Sender<(RequestId, TCodec::Request)>, - pub(crate) response_receiver: oneshot::Receiver>, - pub(crate) request_id: RequestId, -} - -impl UpgradeInfo for ResponseProtocol -where - TCodec: RequestResponseCodec, - R: AsyncRead + Send, -{ - type Info = TCodec::Protocol; - type InfoIter = smallvec::IntoIter<[Self::Info; 2]>; - - fn protocol_info(&self) -> Self::InfoIter { - self.protocols.clone().into_iter() - } -} - -impl InboundUpgrade for ResponseProtocol -where - TCodec: RequestResponseCodec + Send + 'static, - R: AsyncRead + Send + 'static, -{ - type Output = bool; - type Error = io::Error; - type Future = BoxFuture<'static, Result>; - - fn upgrade_inbound(mut self, io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future { - let (reader, mut writer) = io.split(); - async move { - let read = self.codec.read_request(&protocol, reader); - let request = read.await?; - match self.request_sender.send((self.request_id, request)) { - Ok(()) => {}, - Err(_) => panic!( - "Expect request receiver to be alive i.e. protocol handler to be alive.", - ), - } - - if let Ok(response) = self.response_receiver.await { - let write = self.codec.write_response(&protocol, &mut writer, response); - write.await?; - - writer.close().await?; - // Response was sent. Indicate to handler to emit a `ResponseSent` event. - Ok(true) - } else { - writer.close().await?; - // No response was sent. Indicate to handler to emit a `ResponseOmission` event. - Ok(false) - } - }.boxed() - } -} - -/// Request substream upgrade protocol. -/// -/// Sends a request and receives a response. -pub struct RequestProtocol -where - TCodec: RequestResponseCodec, -{ - pub(crate) codec: TCodec, - pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, - pub(crate) request_id: RequestId, - pub(crate) request: TCodec::Request, -} - -impl fmt::Debug for RequestProtocol -where - TCodec: RequestResponseCodec, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RequestProtocol") - .field("request_id", &self.request_id) - .finish() - } -} - -impl UpgradeInfo for RequestProtocol -where - TCodec: RequestResponseCodec, -{ - type Info = TCodec::Protocol; - type InfoIter = smallvec::IntoIter<[Self::Info; 2]>; - - fn protocol_info(&self) -> Self::InfoIter { - self.protocols.clone().into_iter() - } -} - -impl OutboundUpgrade for RequestProtocol -where - TCodec: RequestResponseCodec + Send + 'static, -{ - type Output = TCodec::Response; - type Error = io::Error; - type Future = BoxFuture<'static, Result>; - - fn upgrade_outbound( - mut self, - mut io: NegotiatedSubstream, - protocol: Self::Info, - ) -> Self::Future { - async move { - let write = self.codec.write_request(&protocol, &mut io, self.request); - write.await?; - io.close().await?; - let read = self.codec.read_response(&protocol, io); - let response = read.await?; - Ok(response) - } - .boxed() - } -} diff --git a/exchange-protocol/src/lib.rs b/exchange-protocol/src/lib.rs deleted file mode 100644 index dd06d91e..00000000 --- a/exchange-protocol/src/lib.rs +++ /dev/null @@ -1,952 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Generic request/response protocols. -//! -//! ## General Usage -//! -//! [`RequestResponse`] is a `NetworkBehaviour` that implements a generic -//! request/response protocol or protocol family, whereby each request is -//! sent over a new substream on a connection. `RequestResponse` is generic -//! over the actual messages being sent, which are defined in terms of a -//! [`RequestResponseCodec`]. Creating a request/response protocol thus amounts -//! to providing an implementation of this trait which can then be -//! given to [`RequestResponse::new`]. Further configuration options are -//! available via the [`RequestResponseConfig`]. -//! -//! Requests are sent using [`RequestResponse::send_request`] and the -//! responses received as [`RequestResponseMessage::Response`] via -//! [`RequestResponseEvent::Message`]. -//! -//! Responses are sent using [`RequestResponse::send_response`] upon -//! receiving a [`RequestResponseMessage::Request`] via -//! [`RequestResponseEvent::Message`]. -//! -//! ## Protocol Families -//! -//! A single [`RequestResponse`] instance can be used with an entire -//! protocol family that share the same request and response types. -//! For that purpose, [`RequestResponseCodec::Protocol`] is typically -//! instantiated with a sum type. -//! -//! ## Limited Protocol Support -//! -//! It is possible to only support inbound or outbound requests for -//! a particular protocol. This is achieved by instantiating `RequestResponse` -//! with protocols using [`ProtocolSupport::Inbound`] or -//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol -//! family can be configured in this way. Such protocols will not be -//! advertised during inbound respectively outbound protocol negotiation -//! on the substreams. - -#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] - -pub mod codec; -pub mod handler; - -pub use codec::{ProtocolName, RequestResponseCodec}; -pub use handler::ProtocolSupport; - -use futures::channel::oneshot; -use futures::io::AsyncRead; -use handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent}; -use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; -use libp2p::swarm::{ - behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}, - dial_opts::DialOpts, - IntoConnectionHandler, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, -}; -use smallvec::SmallVec; -use std::{ - collections::{HashMap, HashSet, VecDeque}, - fmt, - sync::{atomic::AtomicU64, Arc}, - task::{Context, Poll}, - time::Duration, -}; - -/// An inbound request or response. -#[derive(Debug)] -pub enum RequestResponseMessage { - /// A request message. - Request { - /// The ID of this request. - request_id: RequestId, - /// The request message. - request: TRequest, - /// The channel waiting for the response. - /// - /// If this channel is dropped instead of being used to send a response - /// via [`RequestResponse::send_response`], a [`RequestResponseEvent::InboundFailure`] - /// with [`InboundFailure::ResponseOmission`] is emitted. - channel: ResponseChannel, - }, - /// A response message. - Response { - /// The ID of the request that produced this response. - /// - /// See [`RequestResponse::send_request`]. - request_id: RequestId, - /// The response message. - response: TResponse, - }, -} - -/// The events emitted by a [`RequestResponse`] protocol. -#[derive(Debug)] -pub enum RequestResponseEvent { - /// An incoming message (request or response). - Message { - /// The peer who sent the message. - peer: PeerId, - /// The incoming message. - message: RequestResponseMessage, - }, - /// An outbound request failed. - OutboundFailure { - /// The peer to whom the request was sent. - peer: PeerId, - /// The (local) ID of the failed request. - request_id: RequestId, - /// The error that occurred. - error: OutboundFailure, - }, - /// An inbound request failed. - InboundFailure { - /// The peer from whom the request was received. - peer: PeerId, - /// The ID of the failed inbound request. - request_id: RequestId, - /// The error that occurred. - error: InboundFailure, - }, - /// A response to an inbound request has been sent. - /// - /// When this event is received, the response has been flushed on - /// the underlying transport connection. - ResponseSent { - /// The peer to whom the response was sent. - peer: PeerId, - /// The ID of the inbound request whose response was sent. - request_id: RequestId, - }, -} - -/// Possible failures occurring in the context of sending -/// an outbound request and receiving the response. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum OutboundFailure { - /// The request could not be sent because a dialing attempt failed. - DialFailure, - /// The request timed out before a response was received. - /// - /// It is not known whether the request may have been - /// received (and processed) by the remote peer. - Timeout, - /// The connection closed before a response was received. - /// - /// It is not known whether the request may have been - /// received (and processed) by the remote peer. - ConnectionClosed, - /// The remote supports none of the requested protocols. - UnsupportedProtocols, -} - -impl fmt::Display for OutboundFailure { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - OutboundFailure::DialFailure => write!(f, "Failed to dial the requested peer"), - OutboundFailure::Timeout => write!(f, "Timeout while waiting for a response"), - OutboundFailure::ConnectionClosed => { - write!(f, "Connection was closed before a response was received") - } - OutboundFailure::UnsupportedProtocols => { - write!(f, "The remote supports none of the requested protocols") - } - } - } -} - -impl std::error::Error for OutboundFailure {} - -/// Possible failures occurring in the context of receiving an -/// inbound request and sending a response. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum InboundFailure { - /// The inbound request timed out, either while reading the - /// incoming request or before a response is sent, e.g. if - /// [`RequestResponse::send_response`] is not called in a - /// timely manner. - Timeout, - /// The connection closed before a response could be send. - ConnectionClosed, - /// The local peer supports none of the protocols requested - /// by the remote. - UnsupportedProtocols, - /// The local peer failed to respond to an inbound request - /// due to the [`ResponseChannel`] being dropped instead of - /// being passed to [`RequestResponse::send_response`]. - ResponseOmission, -} - -impl fmt::Display for InboundFailure { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - InboundFailure::Timeout => { - write!(f, "Timeout while receiving request or sending response") - } - InboundFailure::ConnectionClosed => { - write!(f, "Connection was closed before a response could be sent") - } - InboundFailure::UnsupportedProtocols => write!( - f, - "The local peer supports none of the protocols requested by the remote" - ), - InboundFailure::ResponseOmission => write!( - f, - "The response channel was dropped without sending a response to the remote" - ), - } - } -} - -impl std::error::Error for InboundFailure {} - -/// A channel for sending a response to an inbound request. -/// -/// See [`RequestResponse::send_response`]. -#[derive(Debug)] -pub struct ResponseChannel { - sender: oneshot::Sender, -} - -impl ResponseChannel { - /// Checks whether the response channel is still open, i.e. - /// the `RequestResponse` behaviour is still waiting for a - /// a response to be sent via [`RequestResponse::send_response`] - /// and this response channel. - /// - /// If the response channel is no longer open then the inbound - /// request timed out waiting for the response. - pub fn is_open(&self) -> bool { - !self.sender.is_canceled() - } -} - -/// The ID of an inbound or outbound request. -/// -/// Note: [`RequestId`]'s uniqueness is only guaranteed between two -/// inbound and likewise between two outbound requests. There is no -/// uniqueness guarantee in a set of both inbound and outbound -/// [`RequestId`]s nor in a set of inbound or outbound requests -/// originating from different [`RequestResponse`] behaviours. -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct RequestId(u64); - -impl fmt::Display for RequestId { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -/// The configuration for a `RequestResponse` protocol. -#[derive(Debug, Clone)] -pub struct RequestResponseConfig { - request_timeout: Duration, - connection_keep_alive: Duration, -} - -impl Default for RequestResponseConfig { - fn default() -> Self { - Self { - connection_keep_alive: Duration::from_secs(10), - request_timeout: Duration::from_secs(10), - } - } -} - -impl RequestResponseConfig { - /// Sets the keep-alive timeout of idle connections. - pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self { - self.connection_keep_alive = v; - self - } - - /// Sets the timeout for inbound and outbound requests. - pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self { - self.request_timeout = v; - self - } -} - -/// A request/response protocol for some message codec. -pub struct RequestResponse -where - TCodec: RequestResponseCodec + Clone + Send + 'static, - R: AsyncRead + Send + 'static, -{ - /// The supported inbound protocols. - inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, - /// The supported outbound protocols. - outbound_protocols: SmallVec<[TCodec::Protocol; 2]>, - /// The next (local) request ID. - next_request_id: RequestId, - /// The next (inbound) request ID. - next_inbound_id: Arc, - /// The protocol configuration. - config: RequestResponseConfig, - /// The protocol codec for reading and writing requests and responses. - codec: TCodec, - /// Pending events to return from `poll`. - pending_events: VecDeque< - NetworkBehaviourAction< - RequestResponseEvent< - TCodec::Request, - TCodec::Response, - TCodec::Response, - >, - RequestResponseHandler, - >, - >, - /// The currently connected peers, their pending outbound and inbound responses and their known, - /// reachable addresses, if any. - connected: HashMap>, - /// Externally managed addresses via `add_address` and `remove_address`. - addresses: HashMap>, - /// Requests that have not yet been sent and are waiting for a connection - /// to be established. - pending_outbound_requests: HashMap; 10]>>, -} - -impl RequestResponse -where - TCodec: RequestResponseCodec + Clone + Send + 'static, - R: AsyncRead + Send + 'static, -{ - /// Creates a new `RequestResponse` behaviour for the given - /// protocols, codec and configuration. - pub fn new(codec: TCodec, protocols: I, cfg: RequestResponseConfig) -> Self - where - I: IntoIterator, - { - let mut inbound_protocols = SmallVec::new(); - let mut outbound_protocols = SmallVec::new(); - for (p, s) in protocols { - if s.inbound() { - inbound_protocols.push(p.clone()); - } - if s.outbound() { - outbound_protocols.push(p.clone()); - } - } - RequestResponse { - inbound_protocols, - outbound_protocols, - next_request_id: RequestId(1), - next_inbound_id: Arc::new(AtomicU64::new(1)), - config: cfg, - codec, - pending_events: VecDeque::new(), - connected: HashMap::new(), - pending_outbound_requests: HashMap::new(), - addresses: HashMap::new(), - } - } - - /// Initiates sending a request. - /// - /// If the targeted peer is currently not connected, a dialing - /// attempt is initiated and the request is sent as soon as a - /// connection is established. - /// - /// > **Note**: In order for such a dialing attempt to succeed, - /// > the `RequestResonse` protocol must either be embedded - /// > in another `NetworkBehaviour` that provides peer and - /// > address discovery, or known addresses of peers must be - /// > managed via [`RequestResponse::add_address`] and - /// > [`RequestResponse::remove_address`]. - pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId { - let request_id = self.next_request_id(); - let request = RequestProtocol { - request_id, - codec: self.codec.clone(), - protocols: self.outbound_protocols.clone(), - request, - }; - - if let Some(request) = self.try_send_request(peer, request) { - let handler = self.new_handler(); - self.pending_events.push_back(NetworkBehaviourAction::Dial { - opts: DialOpts::peer_id(*peer).build(), - handler, - }); - self.pending_outbound_requests - .entry(*peer) - .or_default() - .push(request); - } - - request_id - } - - /// Initiates sending a response to an inbound request. - /// - /// If the [`ResponseChannel`] is already closed due to a timeout or the - /// connection being closed, the response is returned as an `Err` for - /// further handling. Once the response has been successfully sent on the - /// corresponding connection, [`RequestResponseEvent::ResponseSent`] is - /// emitted. In all other cases [`RequestResponseEvent::InboundFailure`] - /// will be or has been emitted. - /// - /// The provided `ResponseChannel` is obtained from an inbound - /// [`RequestResponseMessage::Request`]. - pub fn send_response( - &mut self, - ch: ResponseChannel>, - rs: TCodec::Response, - ) -> Result<(), TCodec::Response> { - ch.sender.send(rs) - } - - /// Adds a known address for a peer that can be used for - /// dialing attempts by the `Swarm`, i.e. is returned - /// by [`NetworkBehaviour::addresses_of_peer`]. - /// - /// Addresses added in this way are only removed by `remove_address`. - pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) { - self.addresses.entry(*peer).or_default().push(address); - } - - /// Removes an address of a peer previously added via `add_address`. - pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) { - let mut last = false; - if let Some(addresses) = self.addresses.get_mut(peer) { - addresses.retain(|a| a != address); - last = addresses.is_empty(); - } - if last { - self.addresses.remove(peer); - } - } - - /// Checks whether a peer is currently connected. - pub fn is_connected(&self, peer: &PeerId) -> bool { - if let Some(connections) = self.connected.get(peer) { - !connections.is_empty() - } else { - false - } - } - - /// Checks whether an outbound request to the peer with the provided - /// [`PeerId`] initiated by [`RequestResponse::send_request`] is still - /// pending, i.e. waiting for a response. - pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { - // Check if request is already sent on established connection. - let est_conn = self - .connected - .get(peer) - .map(|cs| { - cs.iter() - .any(|c| c.pending_inbound_responses.contains(request_id)) - }) - .unwrap_or(false); - // Check if request is still pending to be sent. - let pen_conn = self - .pending_outbound_requests - .get(peer) - .map(|rps| rps.iter().any(|rp| rp.request_id == *request_id)) - .unwrap_or(false); - - est_conn || pen_conn - } - - /// Checks whether an inbound request from the peer with the provided - /// [`PeerId`] is still pending, i.e. waiting for a response by the local - /// node through [`RequestResponse::send_response`]. - pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { - self.connected - .get(peer) - .map(|cs| { - cs.iter() - .any(|c| c.pending_outbound_responses.contains(request_id)) - }) - .unwrap_or(false) - } - - /// Returns the next request ID. - fn next_request_id(&mut self) -> RequestId { - let request_id = self.next_request_id; - self.next_request_id.0 += 1; - request_id - } - - /// Tries to send a request by queueing an appropriate event to be - /// emitted to the `Swarm`. If the peer is not currently connected, - /// the given request is return unchanged. - fn try_send_request( - &mut self, - peer: &PeerId, - request: RequestProtocol, - ) -> Option> { - if let Some(connections) = self.connected.get_mut(peer) { - if connections.is_empty() { - return Some(request); - } - let ix = (request.request_id.0 as usize) % connections.len(); - let conn = &mut connections[ix]; - conn.pending_inbound_responses.insert(request.request_id); - self.pending_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer, - handler: NotifyHandler::One(conn.id), - event: request, - }); - None - } else { - Some(request) - } - } - - /// Remove pending outbound response for the given peer and connection. - /// - /// Returns `true` if the provided connection to the given peer is still - /// alive and the [`RequestId`] was previously present and is now removed. - /// Returns `false` otherwise. - fn remove_pending_outbound_response( - &mut self, - peer: &PeerId, - connection: ConnectionId, - request: RequestId, - ) -> bool { - self.get_connection_mut(peer, connection) - .map(|c| c.pending_outbound_responses.remove(&request)) - .unwrap_or(false) - } - - /// Remove pending inbound response for the given peer and connection. - /// - /// Returns `true` if the provided connection to the given peer is still - /// alive and the [`RequestId`] was previously present and is now removed. - /// Returns `false` otherwise. - fn remove_pending_inbound_response( - &mut self, - peer: &PeerId, - connection: ConnectionId, - request: &RequestId, - ) -> bool { - self.get_connection_mut(peer, connection) - .map(|c| c.pending_inbound_responses.remove(request)) - .unwrap_or(false) - } - - /// Returns a mutable reference to the connection in `self.connected` - /// corresponding to the given [`PeerId`] and [`ConnectionId`]. - fn get_connection_mut( - &mut self, - peer: &PeerId, - connection: ConnectionId, - ) -> Option<&mut Connection> { - self.connected - .get_mut(peer) - .and_then(|connections| connections.iter_mut().find(|c| c.id == connection)) - } - - fn on_address_change( - &mut self, - AddressChange { - peer_id, - connection_id, - new, - .. - }: AddressChange, - ) { - let new_address = match new { - ConnectedPoint::Dialer { address, .. } => Some(address.clone()), - ConnectedPoint::Listener { .. } => None, - }; - let connections = self - .connected - .get_mut(&peer_id) - .expect("Address change can only happen on an established connection."); - - let connection = connections - .iter_mut() - .find(|c| c.id == connection_id) - .expect("Address change can only happen on an established connection."); - connection.address = new_address; - } - - fn on_connection_established( - &mut self, - ConnectionEstablished { - peer_id, - connection_id, - endpoint, - other_established, - .. - }: ConnectionEstablished, - ) { - let address = match endpoint { - ConnectedPoint::Dialer { address, .. } => Some(address.clone()), - ConnectedPoint::Listener { .. } => None, - }; - self.connected - .entry(peer_id) - .or_default() - .push(Connection::new(connection_id, address)); - - if other_established == 0 { - if let Some(pending) = self.pending_outbound_requests.remove(&peer_id) { - for request in pending { - let request = self.try_send_request(&peer_id, request); - assert!(request.is_none()); - } - } - } - } - - fn on_connection_closed( - &mut self, - ConnectionClosed { - peer_id, - connection_id, - remaining_established, - .. - }: ConnectionClosed<::ConnectionHandler>, - ) { - let connections = self - .connected - .get_mut(&peer_id) - .expect("Expected some established connection to peer before closing."); - - let connection = connections - .iter() - .position(|c| c.id == connection_id) - .map(|p: usize| connections.remove(p)) - .expect("Expected connection to be established before closing."); - - debug_assert_eq!(connections.is_empty(), remaining_established == 0); - if connections.is_empty() { - self.connected.remove(&peer_id); - } - - for request_id in connection.pending_outbound_responses { - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::InboundFailure { - peer: peer_id, - request_id, - error: InboundFailure::ConnectionClosed, - }, - )); - } - - for request_id in connection.pending_inbound_responses { - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::OutboundFailure { - peer: peer_id, - request_id, - error: OutboundFailure::ConnectionClosed, - }, - )); - } - } - - fn on_dial_failure( - &mut self, - DialFailure { peer_id, .. }: DialFailure<::ConnectionHandler>, - ) { - if let Some(peer) = peer_id { - // If there are pending outgoing requests when a dial failure occurs, - // it is implied that we are not connected to the peer, since pending - // outgoing requests are drained when a connection is established and - // only created when a peer is not connected when a request is made. - // Thus these requests must be considered failed, even if there is - // another, concurrent dialing attempt ongoing. - if let Some(pending) = self.pending_outbound_requests.remove(&peer) { - for request in pending { - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::OutboundFailure { - peer, - request_id: request.request_id, - error: OutboundFailure::DialFailure, - }, - )); - } - } - } - } -} - -impl NetworkBehaviour for RequestResponse -where - TCodec: RequestResponseCodec + Send + Clone + 'static, - R: AsyncRead + Send + 'static, -{ - type ConnectionHandler = RequestResponseHandler; - type OutEvent = RequestResponseEvent< - TCodec::Request, - TCodec::Response, - TCodec::Response, - >; - - fn new_handler(&mut self) -> Self::ConnectionHandler { - RequestResponseHandler::new( - self.inbound_protocols.clone(), - self.codec.clone(), - self.config.connection_keep_alive, - self.config.request_timeout, - self.next_inbound_id.clone(), - ) - } - - fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { - let mut addresses = Vec::new(); - if let Some(connections) = self.connected.get(peer) { - addresses.extend(connections.iter().filter_map(|c| c.address.clone())) - } - if let Some(more) = self.addresses.get(peer) { - addresses.extend(more.into_iter().cloned()); - } - addresses - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(connection_established) => { - self.on_connection_established(connection_established) - } - FromSwarm::ConnectionClosed(connection_closed) => { - self.on_connection_closed(connection_closed) - } - FromSwarm::AddressChange(address_change) => self.on_address_change(address_change), - FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure), - FromSwarm::ListenFailure(_) => {} - FromSwarm::NewListener(_) => {} - FromSwarm::NewListenAddr(_) => {} - FromSwarm::ExpiredListenAddr(_) => {} - FromSwarm::ListenerError(_) => {} - FromSwarm::ListenerClosed(_) => {} - FromSwarm::NewExternalAddr(_) => {} - FromSwarm::ExpiredExternalAddr(_) => {} - } - } - - fn on_connection_handler_event( - &mut self, - peer: PeerId, - connection: ConnectionId, - event: <::Handler as - libp2p::swarm::ConnectionHandler>::OutEvent, - ) { - match event { - RequestResponseHandlerEvent::Response { - request_id, - response, - } => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); - debug_assert!( - removed, - "Expect request_id to be pending before receiving response.", - ); - - let message = RequestResponseMessage::Response { - request_id, - response, - }; - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::Message { peer, message }, - )); - } - RequestResponseHandlerEvent::Request { - request_id, - request, - sender, - } => { - let channel = ResponseChannel { sender }; - let message = RequestResponseMessage::Request { - request_id, - request, - channel, - }; - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::Message { peer, message }, - )); - - match self.get_connection_mut(&peer, connection) { - Some(connection) => { - let inserted = connection.pending_outbound_responses.insert(request_id); - debug_assert!(inserted, "Expect id of new request to be unknown."); - } - // Connection closed after `RequestResponseEvent::Request` has been emitted. - None => { - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::InboundFailure { - peer, - request_id, - error: InboundFailure::ConnectionClosed, - }, - )); - } - } - } - RequestResponseHandlerEvent::ResponseSent(request_id) => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); - debug_assert!( - removed, - "Expect request_id to be pending before response is sent." - ); - - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::ResponseSent { peer, request_id }, - )); - } - RequestResponseHandlerEvent::ResponseOmission(request_id) => { - let removed = self.remove_pending_outbound_response(&peer, connection, request_id); - debug_assert!( - removed, - "Expect request_id to be pending before response is omitted.", - ); - - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::InboundFailure { - peer, - request_id, - error: InboundFailure::ResponseOmission, - }, - )); - } - RequestResponseHandlerEvent::OutboundTimeout(request_id) => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); - debug_assert!( - removed, - "Expect request_id to be pending before request times out." - ); - - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::OutboundFailure { - peer, - request_id, - error: OutboundFailure::Timeout, - }, - )); - } - RequestResponseHandlerEvent::InboundTimeout(request_id) => { - // Note: `RequestResponseHandlerEvent::InboundTimeout` is emitted both for timing - // out to receive the request and for timing out sending the response. In the former - // case the request is never added to `pending_outbound_responses` and thus one can - // not assert the request_id to be present before removing it. - self.remove_pending_outbound_response(&peer, connection, request_id); - - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::InboundFailure { - peer, - request_id, - error: InboundFailure::Timeout, - }, - )); - } - RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => { - let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); - debug_assert!( - removed, - "Expect request_id to be pending before failing to connect.", - ); - - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::OutboundFailure { - peer, - request_id, - error: OutboundFailure::UnsupportedProtocols, - }, - )); - } - RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => { - // Note: No need to call `self.remove_pending_outbound_response`, - // `RequestResponseHandlerEvent::Request` was never emitted for this request and - // thus request was never added to `pending_outbound_responses`. - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::InboundFailure { - peer, - request_id, - error: InboundFailure::UnsupportedProtocols, - }, - )); - } - } - } - - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { - if let Some(ev) = self.pending_events.pop_front() { - return Poll::Ready(ev); - } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { - self.pending_events.shrink_to_fit(); - } - - Poll::Pending - } -} - -/// Internal threshold for when to shrink the capacity -/// of empty queues. If the capacity of an empty queue -/// exceeds this threshold, the associated memory is -/// released. -const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100; - -/// Internal information tracked for an established connection. -struct Connection { - id: ConnectionId, - address: Option, - /// Pending outbound responses where corresponding inbound requests have - /// been received on this connection and emitted via `poll` but have not yet - /// been answered. - pending_outbound_responses: HashSet, - /// Pending inbound responses for previously sent requests on this - /// connection. - pending_inbound_responses: HashSet, -} - -impl Connection { - fn new(id: ConnectionId, address: Option) -> Self { - Self { - id, - address, - pending_outbound_responses: Default::default(), - pending_inbound_responses: Default::default(), - } - } -} diff --git a/exchange-protocol/tests/ping.rs b/exchange-protocol/tests/ping.rs deleted file mode 100644 index 44bdaf05..00000000 --- a/exchange-protocol/tests/ping.rs +++ /dev/null @@ -1,388 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Integration tests for the `RequestResponse` network behaviour. - -use async_trait::async_trait; -use futures::{channel::mpsc, prelude::*, AsyncWriteExt}; -use libp2p::core::{ - identity, - muxing::StreamMuxerBox, - transport, - upgrade::{self, read_length_prefixed, write_length_prefixed}, - Multiaddr, PeerId, -}; -use libp2p::noise::NoiseAuthenticated; -use libp2p::request_response::*; -use libp2p::swarm::{Swarm, SwarmEvent}; -use libp2p::tcp; -use libp2p::core::Transport; -use rand::{self, Rng}; -use std::{io, iter}; - -#[test] -fn is_response_outbound() { - let _ = env_logger::try_init(); - let ping = Ping("ping".to_string().into_bytes()); - let offline_peer = PeerId::random(); - - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); - - let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); - - let request_id1 = swarm1 - .behaviour_mut() - .send_request(&offline_peer, ping.clone()); - - match futures::executor::block_on(swarm1.select_next_some()) { - SwarmEvent::Behaviour(RequestResponseEvent::OutboundFailure { - peer, - request_id: req_id, - error: _error, - }) => { - assert_eq!(&offline_peer, &peer); - assert_eq!(req_id, request_id1); - } - e => panic!("Peer: Unexpected event: {:?}", e), - } - - let request_id2 = swarm1.behaviour_mut().send_request(&offline_peer, ping); - - assert!(!swarm1 - .behaviour() - .is_pending_outbound(&offline_peer, &request_id1)); - assert!(swarm1 - .behaviour() - .is_pending_outbound(&offline_peer, &request_id2)); -} - -/// Exercises a simple ping protocol. -#[test] -fn ping_protocol() { - let ping = Ping("ping".to_string().into_bytes()); - let pong = Pong("pong".to_string().into_bytes()); - - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); - - let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); - - let (peer2_id, trans) = mk_transport(); - let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); - - let (mut tx, mut rx) = mpsc::channel::(1); - - let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - swarm1.listen_on(addr).unwrap(); - - let expected_ping = ping.clone(); - let expected_pong = pong.clone(); - - let peer1 = async move { - loop { - match swarm1.select_next_some().await { - SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(), - SwarmEvent::Behaviour(RequestResponseEvent::Message { - peer, - message: - RequestResponseMessage::Request { - request, channel, .. - }, - }) => { - assert_eq!(&request, &expected_ping); - assert_eq!(&peer, &peer2_id); - swarm1 - .behaviour_mut() - .send_response(channel, pong.clone()) - .unwrap(); - } - SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { peer, .. }) => { - assert_eq!(&peer, &peer2_id); - } - SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {:?}", e), - _ => {} - } - } - }; - - let num_pings: u8 = rand::thread_rng().gen_range(1..100); - - let peer2 = async move { - let mut count = 0; - let addr = rx.next().await.unwrap(); - swarm2.behaviour_mut().add_address(&peer1_id, addr.clone()); - let mut req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); - assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id)); - - loop { - match swarm2.select_next_some().await { - SwarmEvent::Behaviour(RequestResponseEvent::Message { - peer, - message: - RequestResponseMessage::Response { - request_id, - response, - }, - }) => { - count += 1; - assert_eq!(&response, &expected_pong); - assert_eq!(&peer, &peer1_id); - assert_eq!(req_id, request_id); - if count >= num_pings { - return; - } else { - req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); - } - } - SwarmEvent::Behaviour(e) => panic!("Peer2: Unexpected event: {:?}", e), - _ => {} - } - } - }; - - async_std::task::spawn(Box::pin(peer1)); - let () = async_std::task::block_on(peer2); -} - -#[test] -fn emits_inbound_connection_closed_failure() { - let ping = Ping("ping".to_string().into_bytes()); - - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); - - let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); - - let (peer2_id, trans) = mk_transport(); - let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); - - let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - swarm1.listen_on(addr).unwrap(); - - futures::executor::block_on(async move { - while swarm1.next().now_or_never().is_some() {} - let addr1 = Swarm::listeners(&swarm1).next().unwrap(); - - swarm2.behaviour_mut().add_address(&peer1_id, addr1.clone()); - swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); - - // Wait for swarm 1 to receive request by swarm 2. - let _channel = loop { - futures::select!( - event = swarm1.select_next_some() => match event { - SwarmEvent::Behaviour(RequestResponseEvent::Message { - peer, - message: RequestResponseMessage::Request { request, channel, .. } - }) => { - assert_eq!(&request, &ping); - assert_eq!(&peer, &peer2_id); - break channel; - }, - SwarmEvent::Behaviour(ev) => panic!("Peer1: Unexpected event: {:?}", ev), - _ => {} - }, - event = swarm2.select_next_some() => { - if let SwarmEvent::Behaviour(ev) = event { - panic!("Peer2: Unexpected event: {:?}", ev); - } - } - ) - }; - - // Drop swarm 2 in order for the connection between swarm 1 and 2 to close. - drop(swarm2); - - loop { - match swarm1.select_next_some().await { - SwarmEvent::Behaviour(RequestResponseEvent::InboundFailure { - error: InboundFailure::ConnectionClosed, - .. - }) => break, - SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {:?}", e), - _ => {} - } - } - }); -} - -/// We expect the substream to be properly closed when response channel is dropped. -/// Since the ping protocol used here expects a response, the sender considers this -/// early close as a protocol violation which results in the connection being closed. -/// If the substream were not properly closed when dropped, the sender would instead -/// run into a timeout waiting for the response. -#[test] -fn emits_inbound_connection_closed_if_channel_is_dropped() { - let ping = Ping("ping".to_string().into_bytes()); - - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); - - let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); - - let (peer2_id, trans) = mk_transport(); - let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); - - let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - swarm1.listen_on(addr).unwrap(); - - futures::executor::block_on(async move { - while swarm1.next().now_or_never().is_some() {} - let addr1 = Swarm::listeners(&swarm1).next().unwrap(); - - swarm2.behaviour_mut().add_address(&peer1_id, addr1.clone()); - swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); - - // Wait for swarm 1 to receive request by swarm 2. - let event = loop { - futures::select!( - event = swarm1.select_next_some() => { - if let SwarmEvent::Behaviour(RequestResponseEvent::Message { - peer, - message: RequestResponseMessage::Request { request, channel, .. } - }) = event { - assert_eq!(&request, &ping); - assert_eq!(&peer, &peer2_id); - - drop(channel); - continue; - } - }, - event = swarm2.select_next_some() => { - if let SwarmEvent::Behaviour(ev) = event { - break ev; - } - }, - ) - }; - - let error = match event { - RequestResponseEvent::OutboundFailure { error, .. } => error, - e => panic!("unexpected event from peer 2: {:?}", e), - }; - - assert_eq!(error, OutboundFailure::ConnectionClosed); - }); -} - -fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) { - let id_keys = identity::Keypair::generate_ed25519(); - let peer_id = id_keys.public().to_peer_id(); - - ( - peer_id, - tcp::async_io::Transport::new(tcp::Config::default().nodelay(true)) - .upgrade(upgrade::Version::V1) - .authenticate(NoiseAuthenticated::xx(&id_keys).unwrap()) - .multiplex(libp2p::yamux::YamuxConfig::default()) - .boxed(), - ) -} - -// Simple Ping-Pong Protocol - -#[derive(Debug, Clone)] -struct PingProtocol(); -#[derive(Clone)] -struct PingCodec(); -#[derive(Debug, Clone, PartialEq, Eq)] -struct Ping(Vec); -#[derive(Debug, Clone, PartialEq, Eq)] -struct Pong(Vec); - -impl ProtocolName for PingProtocol { - fn protocol_name(&self) -> &[u8] { - "/ping/1".as_bytes() - } -} - -#[async_trait] -impl RequestResponseCodec for PingCodec { - type Protocol = PingProtocol; - type Request = Ping; - type Response = Pong; - - async fn read_request(&mut self, _: &PingProtocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let vec = read_length_prefixed(io, 1024).await?; - - if vec.is_empty() { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - - Ok(Ping(vec)) - } - - async fn read_response(&mut self, _: &PingProtocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let vec = read_length_prefixed(io, 1024).await?; - - if vec.is_empty() { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - - Ok(Pong(vec)) - } - - async fn write_request( - &mut self, - _: &PingProtocol, - io: &mut T, - Ping(data): Ping, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - write_length_prefixed(io, data).await?; - io.close().await?; - - Ok(()) - } - - async fn write_response( - &mut self, - _: &PingProtocol, - io: &mut T, - Pong(data): Pong, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - write_length_prefixed(io, data).await?; - io.close().await?; - - Ok(()) - } -}