From 6a321e77ae7d7075fb62ae660845c95c46265b8e Mon Sep 17 00:00:00 2001 From: tottoto Date: Wed, 20 Mar 2024 11:23:07 +0900 Subject: [PATCH 01/10] refactor(body): remove Incoming::channel helper --- src/body/incoming.rs | 24 ++++++++++-------------- src/proto/h1/dispatch.rs | 3 ++- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/body/incoming.rs b/src/body/incoming.rs index e0beba6108..e5a34b8116 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -100,15 +100,6 @@ const WANT_PENDING: usize = 1; const WANT_READY: usize = 2; impl Incoming { - /// Create a `Body` stream with an associated sender half. - /// - /// Useful when wanting to stream chunks from another thread. - #[inline] - #[cfg(test)] - pub(crate) fn channel() -> (Sender, Incoming) { - Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) - } - #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) { let (data_tx, data_rx) = mpsc::channel(0); @@ -482,7 +473,11 @@ mod tests { eq(Incoming::empty(), SizeHint::with_exact(0), "empty"); - eq(Incoming::channel().1, SizeHint::new(), "channel"); + eq( + Incoming::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false).1, + SizeHint::new(), + "channel", + ); eq( Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, @@ -494,7 +489,7 @@ mod tests { #[cfg(not(miri))] #[tokio::test] async fn channel_abort() { - let (tx, mut rx) = Incoming::channel(); + let (tx, mut rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false); tx.abort(); @@ -505,7 +500,8 @@ mod tests { #[cfg(all(not(miri), feature = "http1"))] #[tokio::test] async fn channel_abort_when_buffer_is_full() { - let (mut tx, mut rx) = Incoming::channel(); + let (mut tx, mut rx) = + Incoming::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false); tx.try_send_data("chunk 1".into()).expect("send 1"); // buffer is full, but can still send abort @@ -527,7 +523,7 @@ mod tests { #[cfg(feature = "http1")] #[test] fn channel_buffers_one() { - let (mut tx, _rx) = Incoming::channel(); + let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false); tx.try_send_data("chunk 1".into()).expect("send 1"); @@ -539,7 +535,7 @@ mod tests { #[cfg(not(miri))] #[tokio::test] async fn channel_empty() { - let (_, mut rx) = Incoming::channel(); + let (_, mut rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false); assert!(rx.frame().await.is_none()); } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 3a4faf0487..6c59dea2e6 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -764,7 +764,8 @@ mod tests { assert!(dispatcher.poll().is_pending()); let body = { - let (mut tx, body) = IncomingBody::channel(); + let (mut tx, body) = + IncomingBody::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false); tx.try_send_data("".into()).unwrap(); body }; From c63f7376eb18f4657e7a5b61aeeaf792ab1f9fea Mon Sep 17 00:00:00 2001 From: tottoto Date: Wed, 20 Mar 2024 12:13:28 +0900 Subject: [PATCH 02/10] refactor(body): rename Incoming new_channel constructor with channel --- src/body/incoming.rs | 22 ++++++++++------------ src/proto/h1/dispatch.rs | 7 +++---- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/body/incoming.rs b/src/body/incoming.rs index e5a34b8116..d2a3717046 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -101,7 +101,7 @@ const WANT_READY: usize = 2; impl Incoming { #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] - pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) { + pub(crate) fn channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) { let (data_tx, data_rx) = mpsc::channel(0); let (trailers_tx, trailers_rx) = oneshot::channel(); @@ -474,13 +474,13 @@ mod tests { eq(Incoming::empty(), SizeHint::with_exact(0), "empty"); eq( - Incoming::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false).1, + Incoming::channel(DecodedLength::CHUNKED, /*wanter =*/ false).1, SizeHint::new(), "channel", ); eq( - Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, + Incoming::channel(DecodedLength::new(4), /*wanter =*/ false).1, SizeHint::with_exact(4), "channel with length", ); @@ -489,7 +489,7 @@ mod tests { #[cfg(not(miri))] #[tokio::test] async fn channel_abort() { - let (tx, mut rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false); + let (tx, mut rx) = Incoming::channel(DecodedLength::CHUNKED, /*wanter =*/ false); tx.abort(); @@ -500,8 +500,7 @@ mod tests { #[cfg(all(not(miri), feature = "http1"))] #[tokio::test] async fn channel_abort_when_buffer_is_full() { - let (mut tx, mut rx) = - Incoming::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false); + let (mut tx, mut rx) = Incoming::channel(DecodedLength::CHUNKED, /*wanter =*/ false); tx.try_send_data("chunk 1".into()).expect("send 1"); // buffer is full, but can still send abort @@ -523,7 +522,7 @@ mod tests { #[cfg(feature = "http1")] #[test] fn channel_buffers_one() { - let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false); + let (mut tx, _rx) = Incoming::channel(DecodedLength::CHUNKED, /*wanter =*/ false); tx.try_send_data("chunk 1".into()).expect("send 1"); @@ -535,14 +534,14 @@ mod tests { #[cfg(not(miri))] #[tokio::test] async fn channel_empty() { - let (_, mut rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false); + let (_, mut rx) = Incoming::channel(DecodedLength::CHUNKED, /*wanter =*/ false); assert!(rx.frame().await.is_none()); } #[test] fn channel_ready() { - let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); + let (mut tx, _rx) = Incoming::channel(DecodedLength::CHUNKED, /*wanter = */ false); let mut tx_ready = tokio_test::task::spawn(tx.ready()); @@ -551,8 +550,7 @@ mod tests { #[test] fn channel_wanter() { - let (mut tx, mut rx) = - Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); + let (mut tx, mut rx) = Incoming::channel(DecodedLength::CHUNKED, /*wanter = */ true); let mut tx_ready = tokio_test::task::spawn(tx.ready()); let mut rx_data = tokio_test::task::spawn(rx.frame()); @@ -573,7 +571,7 @@ mod tests { #[test] fn channel_notices_closure() { - let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); + let (mut tx, rx) = Incoming::channel(DecodedLength::CHUNKED, /*wanter = */ true); let mut tx_ready = tokio_test::task::spawn(tx.ready()); diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 6c59dea2e6..5543d78a51 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -261,8 +261,7 @@ where let body = match body_len { DecodedLength::ZERO => IncomingBody::empty(), other => { - let (tx, rx) = - IncomingBody::new_channel(other, wants.contains(Wants::EXPECT)); + let (tx, rx) = IncomingBody::channel(other, wants.contains(Wants::EXPECT)); self.body_tx = Some(tx); rx } @@ -733,7 +732,7 @@ mod tests { let _dispatcher = tokio::spawn(async move { dispatcher.await }); let body = { - let (mut tx, body) = IncomingBody::new_channel(DecodedLength::new(4), false); + let (mut tx, body) = IncomingBody::channel(DecodedLength::new(4), false); tx.try_send_data("reee".into()).unwrap(); body }; @@ -765,7 +764,7 @@ mod tests { let body = { let (mut tx, body) = - IncomingBody::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false); + IncomingBody::channel(DecodedLength::CHUNKED, /*wanter =*/ false); tx.try_send_data("".into()).unwrap(); body }; From 6a6b99f75617e050c8251ed163ecf3d6ee717a2c Mon Sep 17 00:00:00 2001 From: tottoto Date: Wed, 20 Mar 2024 12:15:56 +0900 Subject: [PATCH 03/10] refactor(body): remove unused Sender::send_data test helper --- src/body/incoming.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/body/incoming.rs b/src/body/incoming.rs index d2a3717046..fe8b76ce31 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -355,16 +355,6 @@ impl Sender { futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await } - /// Send data on data channel when it is ready. - #[cfg(test)] - #[allow(unused)] - pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { - self.ready().await?; - self.data_tx - .try_send(Ok(chunk)) - .map_err(|_| crate::Error::new_closed()) - } - /// Send trailers on trailers channel. #[allow(unused)] pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { From 054bbb900cded16c08a6e5ad1184a1a73a2d9a15 Mon Sep 17 00:00:00 2001 From: tottoto Date: Wed, 20 Mar 2024 12:25:42 +0900 Subject: [PATCH 04/10] refactor(body): move Sender test helper to test module --- src/body/incoming.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/body/incoming.rs b/src/body/incoming.rs index fe8b76ce31..75ac420d27 100644 --- a/src/body/incoming.rs +++ b/src/body/incoming.rs @@ -350,11 +350,6 @@ impl Sender { } } - #[cfg(test)] - async fn ready(&mut self) -> crate::Result<()> { - futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await - } - /// Send trailers on trailers channel. #[allow(unused)] pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { @@ -384,11 +379,6 @@ impl Sender { .map_err(|err| err.into_inner().expect("just sent Ok")) } - #[cfg(test)] - pub(crate) fn abort(mut self) { - self.send_error(crate::Error::new_body_write_aborted()); - } - pub(crate) fn send_error(&mut self, err: crate::Error) { let _ = self .data_tx @@ -424,6 +414,16 @@ mod tests { use super::{Body, DecodedLength, Incoming, Sender, SizeHint}; use http_body_util::BodyExt; + impl Sender { + async fn ready(&mut self) -> crate::Result<()> { + futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await + } + + fn abort(mut self) { + self.send_error(crate::Error::new_body_write_aborted()); + } + } + #[test] fn test_size_of() { // These are mostly to help catch *accidentally* increasing From f6e33a75da431de1b47599696f71b8aed80198b6 Mon Sep 17 00:00:00 2001 From: tottoto Date: Wed, 20 Mar 2024 18:30:05 +0900 Subject: [PATCH 05/10] reractor(body): change incoming module directory --- src/body/{incoming.rs => incoming/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/body/{incoming.rs => incoming/mod.rs} (100%) diff --git a/src/body/incoming.rs b/src/body/incoming/mod.rs similarity index 100% rename from src/body/incoming.rs rename to src/body/incoming/mod.rs From 9872591a8138210f1307297e57b1f9e85240b1c2 Mon Sep 17 00:00:00 2001 From: tottoto Date: Wed, 20 Mar 2024 18:56:15 +0900 Subject: [PATCH 06/10] refactor(body): move channel based incoming body implementation to ChanBody type --- src/body/incoming/channel.rs | 182 ++++++++++++++++++++++++++++++ src/body/incoming/mod.rs | 209 +++++------------------------------ 2 files changed, 208 insertions(+), 183 deletions(-) create mode 100644 src/body/incoming/channel.rs diff --git a/src/body/incoming/channel.rs b/src/body/incoming/channel.rs new file mode 100644 index 0000000000..c4a32d6715 --- /dev/null +++ b/src/body/incoming/channel.rs @@ -0,0 +1,182 @@ +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use bytes::Bytes; +use futures_channel::{mpsc, oneshot}; +use futures_util::ready; +use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver +use http::HeaderMap; +use http_body::{Frame, SizeHint}; + +use crate::body::DecodedLength; +use crate::common::watch; + +type BodySender = mpsc::Sender>; +type TrailersSender = oneshot::Sender; + +const WANT_PENDING: usize = 1; +const WANT_READY: usize = 2; + +pub(super) struct ChanBody { + content_length: DecodedLength, + want_tx: watch::Sender, + data_rx: mpsc::Receiver>, + trailers_rx: oneshot::Receiver, +} + +impl ChanBody { + pub(super) fn new(content_length: DecodedLength, wanter: bool) -> (Sender, Self) { + let (data_tx, data_rx) = mpsc::channel(0); + let (trailers_tx, trailers_rx) = oneshot::channel(); + + // If wanter is true, `Sender::poll_ready()` won't becoming ready + // until the `Body` has been polled for data once. + let want = if wanter { WANT_PENDING } else { WANT_READY }; + + let (want_tx, want_rx) = watch::channel(want); + + let tx = Sender { + want_rx, + data_tx, + trailers_tx: Some(trailers_tx), + }; + let rx = Self { + content_length, + want_tx, + data_rx, + trailers_rx, + }; + + (tx, rx) + } + + pub(super) fn poll_frame( + &mut self, + cx: &mut Context<'_>, + ) -> Poll, crate::Error>>> { + let Self { + content_length: ref mut len, + ref mut data_rx, + ref mut want_tx, + ref mut trailers_rx, + } = self; + + want_tx.send(WANT_READY); + + if !data_rx.is_terminated() { + if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) { + len.sub_if(chunk.len() as u64); + return Poll::Ready(Some(Ok(Frame::data(chunk)))); + } + } + + // check trailers after data is terminated + match ready!(Pin::new(trailers_rx).poll(cx)) { + Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))), + Err(_) => Poll::Ready(None), + } + } + + pub(super) fn is_end_stream(&self) -> bool { + self.content_length == DecodedLength::ZERO + } + + pub(super) fn size_hint(&self) -> SizeHint { + super::opt_len(self.content_length) + } +} + +/// A sender half created through [`Body::channel()`]. +/// +/// Useful when wanting to stream chunks from another thread. +/// +/// ## Body Closing +/// +/// Note that the request body will always be closed normally when the sender is dropped (meaning +/// that the empty terminating chunk will be sent to the remote). If you desire to close the +/// connection with an incomplete response (e.g. in the case of an error during asynchronous +/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion. +/// +/// [`Body::channel()`]: struct.Body.html#method.channel +/// [`Sender::abort()`]: struct.Sender.html#method.abort +#[must_use = "Sender does nothing unless sent on"] +pub(crate) struct Sender { + want_rx: watch::Receiver, + data_tx: BodySender, + trailers_tx: Option, +} + +impl Sender { + /// Check to see if this `Sender` can send more data. + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // Check if the receiver end has tried polling for the body yet + ready!(self.poll_want(cx)?); + self.data_tx + .poll_ready(cx) + .map_err(|_| crate::Error::new_closed()) + } + + pub(crate) fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll> { + match self.want_rx.load(cx) { + WANT_READY => Poll::Ready(Ok(())), + WANT_PENDING => Poll::Pending, + watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())), + unexpected => unreachable!("want_rx value: {}", unexpected), + } + } + + /// Send trailers on trailers channel. + #[allow(unused)] + pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { + let tx = match self.trailers_tx.take() { + Some(tx) => tx, + None => return Err(crate::Error::new_closed()), + }; + tx.send(trailers).map_err(|_| crate::Error::new_closed()) + } + + /// Try to send data on this channel. + /// + /// # Errors + /// + /// Returns `Err(Bytes)` if the channel could not (currently) accept + /// another `Bytes`. + /// + /// # Note + /// + /// This is mostly useful for when trying to send from some other thread + /// that doesn't have an async context. If in an async context, prefer + /// `send_data()` instead. + pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { + self.data_tx + .try_send(Ok(chunk)) + .map_err(|err| err.into_inner().expect("just sent Ok")) + } + + pub(crate) fn send_error(&mut self, err: crate::Error) { + let _ = self + .data_tx + // clone so the send works even if buffer is full + .clone() + .try_send(Err(err)); + } +} + +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + #[derive(Debug)] + struct Open; + #[derive(Debug)] + struct Closed; + + let mut builder = f.debug_tuple("Sender"); + match self.want_rx.peek() { + watch::CLOSED => builder.field(&Closed), + _ => builder.field(&Open), + }; + + builder.finish() + } +} diff --git a/src/body/incoming/mod.rs b/src/body/incoming/mod.rs index 75ac420d27..5d9defb20c 100644 --- a/src/body/incoming/mod.rs +++ b/src/body/incoming/mod.rs @@ -1,38 +1,28 @@ -use std::fmt; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -use std::future::Future; +mod channel; + +use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -use futures_channel::{mpsc, oneshot}; -#[cfg(all( - any(feature = "http1", feature = "http2"), - any(feature = "client", feature = "server") -))] +#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] use futures_util::ready; +use http_body::{Body, Frame, SizeHint}; + #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver +use self::channel::ChanBody; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -use http::HeaderMap; -use http_body::{Body, Frame, SizeHint}; +pub(crate) use self::channel::Sender; #[cfg(all( any(feature = "http1", feature = "http2"), any(feature = "client", feature = "server") ))] use super::DecodedLength; -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -use crate::common::watch; #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] use crate::proto::h2::ping; -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -type BodySender = mpsc::Sender>; -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -type TrailersSender = oneshot::Sender; - /// A stream of `Bytes`, used when receiving bodies from the network. /// /// Note that Users should not instantiate this struct directly. When working with the hyper client, @@ -56,12 +46,7 @@ pub struct Incoming { enum Kind { Empty, #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] - Chan { - content_length: DecodedLength, - want_tx: watch::Sender, - data_rx: mpsc::Receiver>, - trailers_rx: oneshot::Receiver, - }, + Chan(ChanBody), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] H2 { content_length: DecodedLength, @@ -73,57 +58,11 @@ enum Kind { Ffi(crate::ffi::UserBody), } -/// A sender half created through [`Body::channel()`]. -/// -/// Useful when wanting to stream chunks from another thread. -/// -/// ## Body Closing -/// -/// Note that the request body will always be closed normally when the sender is dropped (meaning -/// that the empty terminating chunk will be sent to the remote). If you desire to close the -/// connection with an incomplete response (e.g. in the case of an error during asynchronous -/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion. -/// -/// [`Body::channel()`]: struct.Body.html#method.channel -/// [`Sender::abort()`]: struct.Sender.html#method.abort -#[must_use = "Sender does nothing unless sent on"] -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -pub(crate) struct Sender { - want_rx: watch::Receiver, - data_tx: BodySender, - trailers_tx: Option, -} - -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -const WANT_PENDING: usize = 1; -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -const WANT_READY: usize = 2; - impl Incoming { #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] pub(crate) fn channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) { - let (data_tx, data_rx) = mpsc::channel(0); - let (trailers_tx, trailers_rx) = oneshot::channel(); - - // If wanter is true, `Sender::poll_ready()` won't becoming ready - // until the `Body` has been polled for data once. - let want = if wanter { WANT_PENDING } else { WANT_READY }; - - let (want_tx, want_rx) = watch::channel(want); - - let tx = Sender { - want_rx, - data_tx, - trailers_tx: Some(trailers_tx), - }; - let rx = Incoming::new(Kind::Chan { - content_length, - want_tx, - data_rx, - trailers_rx, - }); - - (tx, rx) + let (tx, chan) = ChanBody::new(content_length, wanter); + (tx, Incoming::new(Kind::Chan(chan))) } fn new(kind: Kind) -> Incoming { @@ -201,27 +140,7 @@ impl Body for Incoming { match self.kind { Kind::Empty => Poll::Ready(None), #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] - Kind::Chan { - content_length: ref mut len, - ref mut data_rx, - ref mut want_tx, - ref mut trailers_rx, - } => { - want_tx.send(WANT_READY); - - if !data_rx.is_terminated() { - if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) { - len.sub_if(chunk.len() as u64); - return Poll::Ready(Some(Ok(Frame::data(chunk)))); - } - } - - // check trailers after data is terminated - match ready!(Pin::new(trailers_rx).poll(cx)) { - Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))), - Err(_) => Poll::Ready(None), - } - } + Kind::Chan(ref mut body) => body.poll_frame(cx), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] Kind::H2 { ref mut data_done, @@ -273,7 +192,7 @@ impl Body for Incoming { match self.kind { Kind::Empty => true, #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] - Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO, + Kind::Chan(ref body) => body.is_end_stream(), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), #[cfg(feature = "ffi")] @@ -282,22 +201,10 @@ impl Body for Incoming { } fn size_hint(&self) -> SizeHint { - #[cfg(all( - any(feature = "http1", feature = "http2"), - any(feature = "client", feature = "server") - ))] - fn opt_len(decoded_length: DecodedLength) -> SizeHint { - if let Some(content_length) = decoded_length.into_opt() { - SizeHint::with_exact(content_length) - } else { - SizeHint::default() - } - } - match self.kind { Kind::Empty => SizeHint::with_exact(0), #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] - Kind::Chan { content_length, .. } => opt_len(content_length), + Kind::Chan(ref body) => body.size_hint(), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] Kind::H2 { content_length, .. } => opt_len(content_length), #[cfg(feature = "ffi")] @@ -306,6 +213,18 @@ impl Body for Incoming { } } +#[cfg(all( + any(feature = "http1", feature = "http2"), + any(feature = "client", feature = "server") +))] +fn opt_len(decoded_length: DecodedLength) -> SizeHint { + if let Some(content_length) = decoded_length.into_opt() { + SizeHint::with_exact(content_length) + } else { + SizeHint::default() + } +} + impl fmt::Debug for Incoming { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { #[derive(Debug)] @@ -330,82 +249,6 @@ impl fmt::Debug for Incoming { } } -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -impl Sender { - /// Check to see if this `Sender` can send more data. - pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // Check if the receiver end has tried polling for the body yet - ready!(self.poll_want(cx)?); - self.data_tx - .poll_ready(cx) - .map_err(|_| crate::Error::new_closed()) - } - - fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.want_rx.load(cx) { - WANT_READY => Poll::Ready(Ok(())), - WANT_PENDING => Poll::Pending, - watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())), - unexpected => unreachable!("want_rx value: {}", unexpected), - } - } - - /// Send trailers on trailers channel. - #[allow(unused)] - pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { - let tx = match self.trailers_tx.take() { - Some(tx) => tx, - None => return Err(crate::Error::new_closed()), - }; - tx.send(trailers).map_err(|_| crate::Error::new_closed()) - } - - /// Try to send data on this channel. - /// - /// # Errors - /// - /// Returns `Err(Bytes)` if the channel could not (currently) accept - /// another `Bytes`. - /// - /// # Note - /// - /// This is mostly useful for when trying to send from some other thread - /// that doesn't have an async context. If in an async context, prefer - /// `send_data()` instead. - #[cfg(feature = "http1")] - pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { - self.data_tx - .try_send(Ok(chunk)) - .map_err(|err| err.into_inner().expect("just sent Ok")) - } - - pub(crate) fn send_error(&mut self, err: crate::Error) { - let _ = self - .data_tx - // clone so the send works even if buffer is full - .clone() - .try_send(Err(err)); - } -} - -#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] -impl fmt::Debug for Sender { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - #[derive(Debug)] - struct Open; - #[derive(Debug)] - struct Closed; - - let mut builder = f.debug_tuple("Sender"); - match self.want_rx.peek() { - watch::CLOSED => builder.field(&Closed), - _ => builder.field(&Open), - }; - - builder.finish() - } -} - #[cfg(test)] mod tests { use std::mem; From b5a2248e8aa0a9865f1127af74869cebbad21222 Mon Sep 17 00:00:00 2001 From: tottoto Date: Wed, 20 Mar 2024 20:23:47 +0900 Subject: [PATCH 07/10] refactor(body): move channel h2 incoming body implementation to H2Body type --- src/body/incoming/h2.rs | 88 ++++++++++++++++++++++++++++++++++++++++ src/body/incoming/mod.rs | 77 ++++++----------------------------- 2 files changed, 100 insertions(+), 65 deletions(-) create mode 100644 src/body/incoming/h2.rs diff --git a/src/body/incoming/h2.rs b/src/body/incoming/h2.rs new file mode 100644 index 0000000000..767441916b --- /dev/null +++ b/src/body/incoming/h2.rs @@ -0,0 +1,88 @@ +use std::task::{Context, Poll}; + +use bytes::Bytes; +use futures_util::ready; +use http_body::{Frame, SizeHint}; + +use crate::body::DecodedLength; +use crate::proto::h2::ping; + +pub(super) struct H2Body { + content_length: DecodedLength, + data_done: bool, + ping: ping::Recorder, + recv: h2::RecvStream, +} + +impl H2Body { + pub(super) fn new( + recv: h2::RecvStream, + mut content_length: DecodedLength, + ping: ping::Recorder, + ) -> Self { + // If the stream is already EOS, then the "unknown length" is clearly + // actually ZERO. + if !content_length.is_exact() && recv.is_end_stream() { + content_length = DecodedLength::ZERO; + } + + Self { + data_done: false, + ping, + content_length, + recv, + } + } + + pub(super) fn poll_frame( + &mut self, + cx: &mut Context<'_>, + ) -> Poll, crate::Error>>> { + let Self { + ref mut data_done, + ref ping, + recv: ref mut h2, + content_length: ref mut len, + } = self; + + if !*data_done { + match ready!(h2.poll_data(cx)) { + Some(Ok(bytes)) => { + let _ = h2.flow_control().release_capacity(bytes.len()); + len.sub_if(bytes.len() as u64); + ping.record_data(bytes.len()); + return Poll::Ready(Some(Ok(Frame::data(bytes)))); + } + Some(Err(e)) => { + return match e.reason() { + // These reasons should cause the body reading to stop, but not fail it. + // The same logic as for `Read for H2Upgraded` is applied here. + Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(None), + _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))), + }; + } + None => { + *data_done = true; + // fall through to trailers + } + } + } + + // after data, check trailers + match ready!(h2.poll_trailers(cx)) { + Ok(t) => { + ping.record_non_data(); + Poll::Ready(Ok(t.map(Frame::trailers)).transpose()) + } + Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))), + } + } + + pub(super) fn is_end_stream(&self) -> bool { + self.recv.is_end_stream() + } + + pub(super) fn size_hint(&self) -> SizeHint { + super::opt_len(self.content_length) + } +} diff --git a/src/body/incoming/mod.rs b/src/body/incoming/mod.rs index 5d9defb20c..ec8c75f3f1 100644 --- a/src/body/incoming/mod.rs +++ b/src/body/incoming/mod.rs @@ -1,13 +1,13 @@ #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] mod channel; +#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] +mod h2; use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; -#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] -use futures_util::ready; use http_body::{Body, Frame, SizeHint}; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] @@ -15,6 +15,9 @@ use self::channel::ChanBody; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] pub(crate) use self::channel::Sender; +#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] +use self::h2::H2Body; + #[cfg(all( any(feature = "http1", feature = "http2"), any(feature = "client", feature = "server") @@ -48,12 +51,7 @@ enum Kind { #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] Chan(ChanBody), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] - H2 { - content_length: DecodedLength, - data_done: bool, - ping: ping::Recorder, - recv: h2::RecvStream, - }, + H2(H2Body), #[cfg(feature = "ffi")] Ffi(crate::ffi::UserBody), } @@ -81,22 +79,11 @@ impl Incoming { #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] pub(crate) fn h2( - recv: h2::RecvStream, - mut content_length: DecodedLength, + recv: ::h2::RecvStream, + content_length: DecodedLength, ping: ping::Recorder, ) -> Self { - // If the stream is already EOS, then the "unknown length" is clearly - // actually ZERO. - if !content_length.is_exact() && recv.is_end_stream() { - content_length = DecodedLength::ZERO; - } - - Incoming::new(Kind::H2 { - data_done: false, - ping, - content_length, - recv, - }) + Incoming::new(Kind::H2(H2Body::new(recv, content_length, ping))) } #[cfg(feature = "ffi")] @@ -142,47 +129,7 @@ impl Body for Incoming { #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] Kind::Chan(ref mut body) => body.poll_frame(cx), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] - Kind::H2 { - ref mut data_done, - ref ping, - recv: ref mut h2, - content_length: ref mut len, - } => { - if !*data_done { - match ready!(h2.poll_data(cx)) { - Some(Ok(bytes)) => { - let _ = h2.flow_control().release_capacity(bytes.len()); - len.sub_if(bytes.len() as u64); - ping.record_data(bytes.len()); - return Poll::Ready(Some(Ok(Frame::data(bytes)))); - } - Some(Err(e)) => { - return match e.reason() { - // These reasons should cause the body reading to stop, but not fail it. - // The same logic as for `Read for H2Upgraded` is applied here. - Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => { - Poll::Ready(None) - } - _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))), - }; - } - None => { - *data_done = true; - // fall through to trailers - } - } - } - - // after data, check trailers - match ready!(h2.poll_trailers(cx)) { - Ok(t) => { - ping.record_non_data(); - Poll::Ready(Ok(t.map(Frame::trailers)).transpose()) - } - Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))), - } - } - + Kind::H2(ref mut body) => body.poll_frame(cx), #[cfg(feature = "ffi")] Kind::Ffi(ref mut body) => body.poll_data(cx), } @@ -194,7 +141,7 @@ impl Body for Incoming { #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] Kind::Chan(ref body) => body.is_end_stream(), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] - Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), + Kind::H2(ref body) => body.is_end_stream(), #[cfg(feature = "ffi")] Kind::Ffi(..) => false, } @@ -206,7 +153,7 @@ impl Body for Incoming { #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] Kind::Chan(ref body) => body.size_hint(), #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] - Kind::H2 { content_length, .. } => opt_len(content_length), + Kind::H2(ref body) => body.size_hint(), #[cfg(feature = "ffi")] Kind::Ffi(..) => SizeHint::default(), } From 46d4d433b9276b9f424d3465a483ce84d703ae9a Mon Sep 17 00:00:00 2001 From: tottoto Date: Wed, 20 Mar 2024 20:36:02 +0900 Subject: [PATCH 08/10] refactor(body): refactor incoming body feature config --- src/body/incoming/mod.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/body/incoming/mod.rs b/src/body/incoming/mod.rs index ec8c75f3f1..bf19784942 100644 --- a/src/body/incoming/mod.rs +++ b/src/body/incoming/mod.rs @@ -57,12 +57,6 @@ enum Kind { } impl Incoming { - #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))] - pub(crate) fn channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) { - let (tx, chan) = ChanBody::new(content_length, wanter); - (tx, Incoming::new(Kind::Chan(chan))) - } - fn new(kind: Kind) -> Incoming { Incoming { kind } } @@ -71,13 +65,17 @@ impl Incoming { pub(crate) fn empty() -> Incoming { Incoming::new(Kind::Empty) } +} - #[cfg(feature = "ffi")] - pub(crate) fn ffi() -> Incoming { - Incoming::new(Kind::Ffi(crate::ffi::UserBody::new())) +#[cfg(any(feature = "client", feature = "server"))] +impl Incoming { + #[cfg(feature = "http1")] + pub(crate) fn channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) { + let (tx, chan) = ChanBody::new(content_length, wanter); + (tx, Incoming::new(Kind::Chan(chan))) } - #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] + #[cfg(feature = "http2")] pub(crate) fn h2( recv: ::h2::RecvStream, content_length: DecodedLength, @@ -85,8 +83,14 @@ impl Incoming { ) -> Self { Incoming::new(Kind::H2(H2Body::new(recv, content_length, ping))) } +} + +#[cfg(feature = "ffi")] +impl Incoming { + pub(crate) fn ffi() -> Incoming { + Incoming::new(Kind::Ffi(crate::ffi::UserBody::new())) + } - #[cfg(feature = "ffi")] pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody { match self.kind { Kind::Ffi(ref mut body) => return body, From 0746207218ad950358ca964d2d141fb3e001703e Mon Sep 17 00:00:00 2001 From: tottoto Date: Wed, 20 Mar 2024 20:42:39 +0900 Subject: [PATCH 09/10] refactor(body): remove opt_len helper function --- src/body/incoming/channel.rs | 5 ++++- src/body/incoming/h2.rs | 5 ++++- src/body/incoming/mod.rs | 12 ------------ 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/src/body/incoming/channel.rs b/src/body/incoming/channel.rs index c4a32d6715..8515c84872 100644 --- a/src/body/incoming/channel.rs +++ b/src/body/incoming/channel.rs @@ -84,7 +84,10 @@ impl ChanBody { } pub(super) fn size_hint(&self) -> SizeHint { - super::opt_len(self.content_length) + self.content_length + .into_opt() + .map(SizeHint::with_exact) + .unwrap_or_default() } } diff --git a/src/body/incoming/h2.rs b/src/body/incoming/h2.rs index 767441916b..4606927088 100644 --- a/src/body/incoming/h2.rs +++ b/src/body/incoming/h2.rs @@ -83,6 +83,9 @@ impl H2Body { } pub(super) fn size_hint(&self) -> SizeHint { - super::opt_len(self.content_length) + self.content_length + .into_opt() + .map(SizeHint::with_exact) + .unwrap_or_default() } } diff --git a/src/body/incoming/mod.rs b/src/body/incoming/mod.rs index bf19784942..9396917228 100644 --- a/src/body/incoming/mod.rs +++ b/src/body/incoming/mod.rs @@ -164,18 +164,6 @@ impl Body for Incoming { } } -#[cfg(all( - any(feature = "http1", feature = "http2"), - any(feature = "client", feature = "server") -))] -fn opt_len(decoded_length: DecodedLength) -> SizeHint { - if let Some(content_length) = decoded_length.into_opt() { - SizeHint::with_exact(content_length) - } else { - SizeHint::default() - } -} - impl fmt::Debug for Incoming { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { #[derive(Debug)] From 31cff092bc10625f624c04371b38b4e9e31d3d24 Mon Sep 17 00:00:00 2001 From: tottoto Date: Thu, 21 Mar 2024 18:15:11 +0900 Subject: [PATCH 10/10] reafctor(body): refactor Debug implementation for Incoming --- src/body/incoming/mod.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/body/incoming/mod.rs b/src/body/incoming/mod.rs index 9396917228..d05c860924 100644 --- a/src/body/incoming/mod.rs +++ b/src/body/incoming/mod.rs @@ -172,17 +172,11 @@ impl fmt::Debug for Incoming { struct Empty; let mut builder = f.debug_tuple("Body"); - match self.kind { - Kind::Empty => builder.field(&Empty), - #[cfg(any( - all( - any(feature = "http1", feature = "http2"), - any(feature = "client", feature = "server") - ), - feature = "ffi" - ))] - _ => builder.field(&Streaming), - }; + if matches!(self.kind, Kind::Empty) { + builder.field(&Empty); + } else { + builder.field(&Streaming); + } builder.finish() }