Skip to content

Commit

Permalink
refactor(http/retry): PeekTrailersBody<B> uses BodyExt::frame()
Browse files Browse the repository at this point in the history
Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Jan 23, 2025
1 parent 8551d5a commit 5d34dc0
Showing 1 changed file with 183 additions and 104 deletions.
287 changes: 183 additions & 104 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures::{
future::{self, Either},
FutureExt,
};
use http::HeaderMap;
use http_body::Body;
use linkerd_http_box::BoxBody;
use pin_project::pin_project;
Expand All @@ -17,31 +18,34 @@ use std::{
/// If the first frame of the body stream was *not* a `TRAILERS` frame, this
/// behaves identically to a normal body.
#[pin_project(project = Projection)]
pub struct PeekTrailersBody<B: Body = BoxBody> {
/// The inner [`Body`].
pub enum PeekTrailersBody<B: Body = BoxBody> {
/// An empty body.
Empty,
/// A body that contains zero or one DATA frame.
///
/// This is the request or response body whose trailers are being peeked.
#[pin]
inner: B,

/// The first DATA frame received from the inner body, or an error that
/// occurred while polling for data.
/// This variant MAY have trailers that can be peeked.
Unary {
data: Option<Result<B::Data, B::Error>>,
trailers: Option<Result<HeaderMap, B::Error>>,
},
/// A body that (potentially) contains more than one DATA frame.
///
/// If this is `None`, then the body has completed without any DATA frames.
first_data: Option<Result<B::Data, B::Error>>,

/// The inner body's trailers, if it was terminated by a `TRAILERS` frame
/// after 0-1 DATA frames, or an error if polling for trailers failed.
/// This variant indicates that the inner body's trailers could not be observed.
Unknown {
first: Option<Result<B::Data, B::Error>>,
second: Option<Result<B::Data, B::Error>>,
/// The inner [`Body`].
#[pin]
inner: B,
},
/// A transparent, inert body.
///
/// Yes, this is a bit of a complex type, so let's break it down:
/// - the outer `Option` indicates whether any trailers were received by
/// `WithTrailers`; if it's `None`, then we don't *know* if the response
/// had trailers, as it is not yet complete.
/// - the inner `Result` and `Option` are the `Result` and `Option` returned
/// by `HttpBody::trailers` on the inner body. If this is `Ok(None)`, then
/// the body has terminated without trailers --- it is *known* to not have
/// trailers.
trailers: Option<Result<Option<http::HeaderMap>, B::Error>>,
/// This variant will not attempt to peek the inner body's trailers.
Passthru {
/// The inner [`Body`].
#[pin]
inner: B,
},
}

/// A future that yields a response instrumented with [`PeekTrailersBody<B>`].
Expand All @@ -60,9 +64,19 @@ impl<B: Body> PeekTrailersBody<B> {
/// This function will return `None` if the body's trailers could not be peeked, or if there
/// were no trailers included.
pub fn peek_trailers(&self) -> Option<&http::HeaderMap> {
self.trailers
.as_ref()
.and_then(|trls| trls.as_ref().ok()?.as_ref())
match self {
Self::Unary {
trailers: Some(Ok(trailers)),
..
} => Some(trailers),
Self::Unary {
trailers: None | Some(Err(_)),
..
}
| Self::Empty
| Self::Unknown { .. }
| Self::Passthru { .. } => None,
}
}

pub fn map_response(rsp: http::Response<B>) -> WithPeekTrailersBody<B>
Expand All @@ -76,14 +90,14 @@ impl<B: Body> PeekTrailersBody<B> {
// If the response isn't an HTTP version that has trailers, skip trying
// to read a trailers frame.
if let Version::HTTP_09 | Version::HTTP_10 | Version::HTTP_11 = rsp.version() {
return Either::Left(future::ready(Self::no_trailers(rsp)));
return Either::Left(future::ready(rsp.map(|inner| Self::Passthru { inner })));
}

// If the response doesn't have a body stream, also skip trying to read
// a trailers frame.
if rsp.is_end_stream() {
tracing::debug!("Skipping trailers for empty body");
return Either::Left(future::ready(Self::no_trailers(rsp)));
return Either::Left(future::ready(rsp.map(|_| Self::Empty)));
}

// Otherwise, return a future that tries to read the next frame.
Expand All @@ -94,55 +108,103 @@ impl<B: Body> PeekTrailersBody<B> {
}))
}

async fn read_body(mut body: B) -> Self
async fn read_body(body: B) -> Self
where
B: Send + Unpin,
B::Data: Send + Unpin,
B::Error: Send,
{
// XXX(kate): for now, wrap this in a compatibility adapter that yields `Frame<T>`s.
// this can be removed when we upgrade to http-body 1.0.
use crate::compat::ForwardCompatibleBody;
let mut body = ForwardCompatibleBody::new(body);

// First, poll the body for its first frame.
tracing::debug!("Buffering first data frame");
let first_data = body.data().await;

// Now, inspect the frame yielded. If the body yielded a data frame, we will only peek
// the trailers if they are immediately available. If the body did not yield a data frame,
// we will poll a future to yield the trailers.
let trailers = if first_data.is_some() {
// The body has data; stop waiting for trailers. Peek to see if there's immediately a
// trailers frame, and grab it if so. Otherwise, bail.
//
// XXX(eliza): the documentation for the `http::Body` trait says that `poll_trailers`
// should only be called after `poll_data` returns `None`...but, in practice, I'm
// fairly sure that this just means that it *will not return `Ready`* until there are
// no data frames left, which is fine for us here, because we `now_or_never` it.
body.trailers().now_or_never()
} else {
// Okay, `poll_data` has returned `None`, so there are no data frames left. Let's see
// if there's trailers...
let trls = body.trailers().await;
Some(trls)
let first_frame = body
.frame()
.map(|f| f.map(|r| r.map(Self::split_frame)))
.await;
let body: Self = match first_frame {
// The body has no frames. It is empty.
None => Self::Empty,
// The body yielded an error. We are done.
Some(Err(error)) => Self::Unary {
data: Some(Err(error)),
trailers: None,
},
// The body yielded a TRAILERS frame. We are done.
Some(Ok(Err(trailers))) => Self::Unary {
data: None,
trailers: Some(Ok(trailers)),
},
// The body yielded a DATA frame. Check for a second frame, without yielding again.
Some(Ok(Ok(first))) => {
if let Some(second) = body
.frame()
.map(|f| f.map(|r| r.map(Self::split_frame)))
.now_or_never()
{
// The second frame is available. Let's inspect it and determine what to do.
match second {
// The body is finished. There is not a TRAILERS frame.
None => Self::Unary {
data: Some(Ok(first)),
trailers: None,
},
// We immediately yielded a result, but it was an error. Alas!
Some(Err(error)) => Self::Unary {
data: Some(Ok(first)),
trailers: Some(Err(error)),
},
// We immediately yielded another frame, but it was a second DATA frame.
// We hold on to each frame, but we cannot wait for the TRAILERS.
Some(Ok(Ok(second))) => Self::Unknown {
first: Some(Ok(first)),
second: Some(Ok(second)),
inner: body.into_inner(),
},
// The body immediately yielded a second TRAILERS frame. Nice!
Some(Ok(Err(trailers))) => Self::Unary {
data: Some(Ok(first)),
trailers: Some(Ok(trailers)),
},
}
} else {
// If we are here, the second frame is not yet available. We cannot be sure
// that a second DATA frame is on the way, and we are no longer willing to
// await additional frames. There are no trailers to peek.
Self::Unknown {
first: None,
second: None,
inner: body.into_inner(),
}
}
}
};

if trailers.is_some() {
if body.peek_trailers().is_some() {
tracing::debug!("Buffered trailers frame");
}

Self {
inner: body,
first_data,
trailers,
}
body
}

/// Returns a response with an inert [`PeekTrailersBody<B>`].
/// Splits a `Frame<T>` into a `Result<T, E>`.
///
/// Frames do not expose their inner enums, and instead expose `into_data()` and
/// `into_trailers()` methods. This function will return `Ok(data)` if it is given a DATA
/// frame, and `Err(trailers)` if it is given a TRAILERS frame.
///
/// This will not peek the inner body's trailers.
fn no_trailers(rsp: http::Response<B>) -> http::Response<Self> {
rsp.map(|inner| Self {
inner,
first_data: None,
trailers: None,
})
/// This is an internal helper to facilitate pattern matching in `read_body(..)`, above.
fn split_frame(frame: crate::compat::Frame<B::Data>) -> Result<B::Data, HeaderMap> {
frame
.into_data()
.map_err(|frame| match frame.into_trailers() {
Ok(trls) => trls,
// Safety: this is not reachable, we called `into_data()` above.
Err(_) => unreachable!("into_data() and `into_trailers()` both returned `Err(_)`"),
})
}
}

Expand All @@ -159,68 +221,85 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let Projection {
inner,
first_data,
trailers: _,
} = self.project();

if let Some(first_data) = first_data.take() {
return Poll::Ready(Some(first_data));
let this = self.project();
match this {
Projection::Empty => Poll::Ready(None),
Projection::Passthru { inner } => inner.poll_data(cx),
Projection::Unary { data, .. } => Poll::Ready(data.take()),
Projection::Unknown { .. } => todo!(),
}

inner.poll_data(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let Projection {
inner,
first_data: _,
trailers,
} = self.project();

if let Some(trailers) = trailers.take() {
return Poll::Ready(trailers);
let this = self.project();
match this {
Projection::Empty => Poll::Ready(Ok(None)),
Projection::Passthru { inner } => inner.poll_trailers(cx),
Projection::Unary { trailers, .. } => Poll::Ready(trailers.take().transpose()),
Projection::Unknown { inner, .. } => inner.poll_trailers(cx),
}

inner.poll_trailers(cx)
}

#[inline]
fn is_end_stream(&self) -> bool {
let Self {
inner,
first_data,
trailers,
} = self;

let trailers_finished = match trailers {
Some(Ok(Some(_)) | Err(_)) => false,
None | Some(Ok(None)) => true,
};

first_data.is_none() && trailers_finished && inner.is_end_stream()
match self {
Self::Empty => true,
Self::Passthru { inner } => inner.is_end_stream(),
Self::Unary {
data: None,
trailers: None,
} => true,
Self::Unary { .. } => false,
Self::Unknown {
inner,
first: None,
second: None,
} => inner.is_end_stream(),
Self::Unknown { .. } => false,
}
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
use bytes::Buf;

let mut hint = self.inner.size_hint();
// If we're holding onto a chunk of data, add its length to the inner
// `Body`'s size hint.
if let Some(Ok(chunk)) = self.first_data.as_ref() {
let buffered = chunk.remaining() as u64;
if let Some(upper) = hint.upper() {
hint.set_upper(upper + buffered);
match self {
Self::Empty => http_body::SizeHint::new(),
Self::Passthru { inner } => inner.size_hint(),
Self::Unary {
data: Some(Ok(data)),
..
} => {
let size = data.remaining() as u64;
http_body::SizeHint::with_exact(size)
}
Self::Unary {
data: None | Some(Err(_)),
..
} => http_body::SizeHint::new(),
Self::Unknown {
first,
second,
inner,
} => {
// Add any frames we've buffered to the inner `Body`'s size hint.
let mut hint = inner.size_hint();
let mut add_to_hint = |frame: &Option<Result<B::Data, B::Error>>| {
if let Some(Ok(buf)) = frame {
let size = buf.remaining() as u64;
if let Some(upper) = hint.upper() {
hint.set_upper(upper + size);
}
hint.set_lower(hint.lower() + size);
}
};
add_to_hint(first);
add_to_hint(second);
hint
}
hint.set_lower(hint.lower() + buffered);
}

hint
}
}

Expand Down

0 comments on commit 5d34dc0

Please sign in to comment.