diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml index 5c01fafa..7f500d8a 100644 --- a/yamux/Cargo.toml +++ b/yamux/Cargo.toml @@ -18,6 +18,7 @@ rand = "0.8.3" static_assertions = "1" pin-project = "1.1.0" zerocopy = { version = "0.7.0", features = ["derive"] } +bytes = "1.5.0" [dev-dependencies] quickcheck = "1.0" diff --git a/yamux/src/connection.rs b/yamux/src/connection.rs index 9617f2de..277a9f9a 100644 --- a/yamux/src/connection.rs +++ b/yamux/src/connection.rs @@ -722,7 +722,7 @@ impl Active { shared.update_state(self.id, stream_id, State::RecvClosed); } shared.window = shared.window.saturating_sub(frame.body_len()); - shared.buffer.push(frame.into_body()); + shared.buffer.push(frame.into_body().into()); if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) { if let Some(credit) = shared.next_window_update() { @@ -765,7 +765,7 @@ impl Active { return Action::Reset(Frame::new(header)); } shared.window = shared.window.saturating_sub(frame.body_len()); - shared.buffer.push(frame.into_body()); + shared.buffer.push(frame.into_body().into()); if let Some(w) = shared.reader.take() { w.wake() } diff --git a/yamux/src/frame.rs b/yamux/src/frame.rs index a7fa0e0c..65226a1d 100644 --- a/yamux/src/frame.rs +++ b/yamux/src/frame.rs @@ -11,6 +11,7 @@ pub mod header; mod io; +use bytes::{Buf, Bytes, BytesMut}; use futures::future::Either; use header::{Data, GoAway, Header, Ping, StreamId, WindowUpdate}; use std::{convert::TryInto, fmt::Debug, marker::PhantomData, num::TryFromIntError}; @@ -25,13 +26,13 @@ use self::header::HEADER_SIZE; /// The header can be zerocopy parsed into a Header struct by calling header()/header_mut(). #[derive(Clone, Debug, Eq, PartialEq)] pub struct Frame { - buffer: Vec, + buffer: BytesMut, _marker: PhantomData, } impl Frame { pub(crate) fn no_body(header: Header) -> Self { - let mut buffer = vec![0; HEADER_SIZE]; + let mut buffer = BytesMut::zeroed(HEADER_SIZE); header .write_to(&mut buffer) .expect("buffer is size of header"); @@ -43,21 +44,21 @@ impl Frame { } pub fn header(&self) -> &Header { - Ref::<_, Header>::new_from_prefix(self.buffer.as_slice()) + Ref::<_, Header>::new_from_prefix(self.buffer.as_ref()) .expect("buffer always holds a valid header") .0 .into_ref() } pub fn header_mut(&mut self) -> &mut Header { - Ref::<_, Header>::new_from_prefix(self.buffer.as_mut_slice()) + Ref::<_, Header>::new_from_prefix(self.buffer.as_mut()) .expect("buffer always holds a valid header") .0 .into_mut() } pub(crate) fn buffer(&self) -> &[u8] { - self.buffer.as_slice() + self.buffer.as_ref() } /// Introduce this frame to the right of a binary frame type. @@ -146,7 +147,7 @@ impl Frame { pub fn new(header: Header) -> Self { let total_buffer_size = HEADER_SIZE + header.body_len(); - let mut buffer = vec![0; total_buffer_size]; + let mut buffer = BytesMut::zeroed(total_buffer_size); header .write_to_prefix(&mut buffer) .expect("buffer always fits the header"); @@ -179,9 +180,9 @@ impl Frame { self.body().len() as u32 } - pub fn into_body(mut self) -> Vec { - // FIXME: Should we implement this more efficiently with `BytesMut`? I think that one would allow us to split of the body without allocating again .. - self.buffer.split_off(HEADER_SIZE) + pub fn into_body(mut self) -> Bytes { + self.buffer.advance(HEADER_SIZE); + self.buffer.freeze() } }