diff --git a/Cargo.toml b/Cargo.toml index 27e27a8f0..2db7f7e3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,5 +35,9 @@ actix-web-httpauth = { path = "./actix-web-httpauth" } # uncomment to quickly test against local actix-web repo # actix-http = { path = "../actix-web/actix-http" } # actix-router = { path = "../actix-web/actix-router" } -# actix-web = { path = "../actix-web" } +# actix-web = { path = "../actix-web/actix-web" } # awc = { path = "../actix-web/awc" } +actix-http = { git = "https://github.com/asonix/actix-web", branch = "asonix/play-with-h1-encoding" } +actix-router = { git = "https://github.com/asonix/actix-web", branch = "asonix/play-with-h1-encoding" } +actix-web = { git = "https://github.com/asonix/actix-web", branch = "asonix/play-with-h1-encoding" } +awc = { git = "https://github.com/asonix/actix-web", branch = "asonix/play-with-h1-encoding" } diff --git a/actix-ws/examples/chat.rs b/actix-ws/examples/chat.rs index 9d5c823df..440212832 100644 --- a/actix-ws/examples/chat.rs +++ b/actix-ws/examples/chat.rs @@ -82,7 +82,7 @@ async fn ws( loop { interval.tick().await; - if session2.ping(b"").await.is_err() { + if session2.ping(&b""[..]).await.is_err() { break; } @@ -97,7 +97,7 @@ async fn ws( while let Some(Ok(msg)) = stream.recv().await { match msg { AggregatedMessage::Ping(bytes) => { - if session.pong(&bytes).await.is_err() { + if session.pong(bytes).await.is_err() { return; } } diff --git a/actix-ws/src/lib.rs b/actix-ws/src/lib.rs index b3fd6a62b..c6f233e7d 100644 --- a/actix-ws/src/lib.rs +++ b/actix-ws/src/lib.rs @@ -40,7 +40,7 @@ pub use self::{ /// while let Some(Ok(msg)) = msg_stream.next().await { /// match msg { /// Message::Ping(bytes) => { -/// if session.pong(&bytes).await.is_err() { +/// if session.pong(bytes).await.is_err() { /// return; /// } /// } diff --git a/actix-ws/src/session.rs b/actix-ws/src/session.rs index ad210809e..8668626e3 100644 --- a/actix-ws/src/session.rs +++ b/actix-ws/src/session.rs @@ -98,16 +98,16 @@ impl Session { /// ```no_run /// # use actix_ws::Session; /// # async fn test(mut session: Session) { - /// if session.ping(b"").await.is_err() { + /// if session.ping(&b""[..]).await.is_err() { /// // session is closed /// } /// # } /// ``` - pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> { + pub async fn ping(&mut self, msg: impl Into) -> Result<(), Closed> { self.pre_check(); if let Some(inner) = self.inner.as_mut() { inner - .send(Message::Ping(Bytes::copy_from_slice(msg))) + .send(Message::Ping(msg.into())) .await .map_err(|_| Closed) } else { @@ -122,16 +122,16 @@ impl Session { /// # async fn test(mut session: Session, msg: Message) { /// match msg { /// Message::Ping(bytes) => { - /// let _ = session.pong(&bytes).await; + /// let _ = session.pong(bytes).await; /// } /// _ => (), /// } /// # } - pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> { + pub async fn pong(&mut self, msg: impl Into) -> Result<(), Closed> { self.pre_check(); if let Some(inner) = self.inner.as_mut() { inner - .send(Message::Pong(Bytes::copy_from_slice(msg))) + .send(Message::Pong(msg.into())) .await .map_err(|_| Closed) } else { diff --git a/actix-ws/src/stream.rs b/actix-ws/src/stream.rs index e95c17f43..aff4163ac 100644 --- a/actix-ws/src/stream.rs +++ b/actix-ws/src/stream.rs @@ -1,13 +1,14 @@ use std::{ collections::VecDeque, future::poll_fn, - io, mem, + io, pin::Pin, task::{Context, Poll}, }; -use actix_codec::{Decoder, Encoder}; +use actix_codec::Decoder; use actix_http::{ + big_bytes::BigBytes, ws::{Codec, Frame, Message, ProtocolError}, Payload, }; @@ -24,8 +25,7 @@ use crate::AggregatedMessageStream; /// Response body for a WebSocket. pub struct StreamingBody { session_rx: Receiver, - messages: VecDeque, - buf: BytesMut, + buf: BigBytes, codec: Codec, closing: bool, } @@ -34,8 +34,7 @@ impl StreamingBody { pub(super) fn new(session_rx: Receiver) -> Self { StreamingBody { session_rx, - messages: VecDeque::new(), - buf: BytesMut::new(), + buf: BigBytes::with_capacity(0), codec: Codec::new(), closing: false, } @@ -118,14 +117,12 @@ impl Stream for StreamingBody { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - if this.closing { - return Poll::Ready(None); - } - - loop { + while !this.closing { match Pin::new(&mut this.session_rx).poll_recv(cx) { Poll::Ready(Some(msg)) => { - this.messages.push_back(msg); + if let Err(err) = this.codec.encode_bigbytes(msg, &mut this.buf) { + return Poll::Ready(Some(Err(err.into()))); + } } Poll::Ready(None) => { this.closing = true; @@ -135,16 +132,18 @@ impl Stream for StreamingBody { } } - while let Some(msg) = this.messages.pop_front() { - if let Err(err) = this.codec.encode(msg, &mut this.buf) { - return Poll::Ready(Some(Err(err.into()))); - } + if let Some(bytes) = this.buf.pop_front() { + return Poll::Ready(Some(Ok(bytes))); } - if !this.buf.is_empty() { - return Poll::Ready(Some(Ok(mem::take(&mut this.buf).freeze()))); + if this.closing { + return Poll::Ready(None); } + // When we have a moment (pending) allow the BigBytes to release memory + // arbitrary 8KB (page size) + this.buf.clear(1024 * 8); + Poll::Pending } }