Skip to content

Commit

Permalink
feat(http/retry): ReplayBody<B> polls for frames
Browse files Browse the repository at this point in the history
pr #3559 (dd4fbcd) refactored our trailer peeking body middleware to
model its buffering in terms of the `Frame<T>` type used in
`http-body`'s 1.0 release.

this commit performs a similar change for the other piece of body
middleware that super linkerd's retry facilities: `ReplayBody<B>`. the
inner body `B` is now wrapped in the `ForwardCompatibleBody<B>` adapter,
and we now poll it in terms of frames.

NB: polling the underlying in terms of frames has a subtle knock-on
effect regarding when we observe the trailers, in the liminal period
between this refactor and the subsequent upgrade to hyper 1.0, whilst we
must still implement the existing 0.4 interface for `Body` that includes
`poll_trailers()`.

see the comment above `replay_trailers` for more on this, describing why
we now initialize this to `true`. relatedly, this is why we now longer
delegate down to `B::poll_trailers` ourselves. it will have already been
called by our adapter.

`ReplayBody::is_end_stream()` now behaves identically when initially
polling a body compared to subsequent replays. this is fine, as
`is_end_stream()` is a hint that facilitates optimizations
(hyperium/http-body#143). we do still report the end properly, we just
won't be quite as prescient on the initial playthrough.

see:
- linkerd/linkerd2#8733.
- #3559

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Feb 6, 2025
1 parent a5eb1dc commit 43988c8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 26 deletions.
3 changes: 0 additions & 3 deletions linkerd/http/retry/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,17 @@ impl<B: Body> ForwardCompatibleBody<B> {
}

/// Returns `true` when the end of stream has been reached.
#[allow(unused, reason = "not yet used")]
pub(crate) fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

/// Returns the bounds on the remaining length of the stream.
#[allow(unused, reason = "not yet used")]
pub(crate) fn size_hint(&self) -> SizeHint {
self.inner.size_hint()
}
}

impl<B: Body + Unpin> ForwardCompatibleBody<B> {
#[allow(unused, reason = "not yet used")]
pub(crate) fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
81 changes: 58 additions & 23 deletions linkerd/http/retry/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct SharedState<B> {
struct BodyState<B> {
replay: Replay,
trailers: Option<HeaderMap>,
rest: B,
rest: crate::compat::ForwardCompatibleBody<B>,
is_completed: bool,

/// Maximum number of bytes to buffer.
Expand Down Expand Up @@ -104,13 +104,19 @@ impl<B: Body> ReplayBody<B> {
state: Some(BodyState {
replay: Default::default(),
trailers: None,
rest: body,
rest: crate::compat::ForwardCompatibleBody::new(body),
is_completed: false,
max_bytes: max_bytes + 1,
}),
// The initial `ReplayBody` has nothing to replay
// The initial `ReplayBody` has no data to replay.
replay_body: false,
replay_trailers: false,
// NOTE(kate): When polling the inner body in terms of frames, we will not yield
// `Ready(None)` from `Body::poll_data()` until we have reached the end of the
// underlying stream. Once we have migrated to `http-body` v1, this field will be
// initialized `false` thanks to the use of `Body::poll_frame()`, but for now we must
// initialize this to true; `poll_trailers()` will be called after the trailers have
// been observed previously, even for the initial body.
replay_trailers: true,
})
}

Expand Down Expand Up @@ -204,16 +210,33 @@ where
// Poll the inner body for more data. If the body has ended, remember
// that so that future clones will not try polling it again (as
// described above).
let data = {
let data: B::Data = {
use futures::{future::Either, ready};
// Poll the inner body for the next frame.
tracing::trace!("Polling initial body");
match futures::ready!(Pin::new(&mut state.rest).poll_data(cx)) {
Some(Ok(data)) => data,
Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
let poll = Pin::new(&mut state.rest).poll_frame(cx).map_err(Into::into);
let frame = match ready!(poll) {
// The body yielded a new frame.
Some(Ok(frame)) => frame,
// The body yielded an error.
Some(Err(error)) => return Poll::Ready(Some(Err(error))),
// The body has reached the end of the stream.
None => {
tracing::trace!("Initial body completed");
state.is_completed = true;
return Poll::Ready(None);
}
};
// Now, inspect the frame: was it a chunk of data, or a trailers frame?
match Self::split_frame(frame) {
Some(Either::Left(data)) => data,
Some(Either::Right(trailers)) => {
tracing::trace!("Initial body completed");
state.trailers = Some(trailers);
state.is_completed = true;
return Poll::Ready(None);
}
None => return Poll::Ready(None),
}
};

Expand All @@ -234,7 +257,7 @@ where
/// NOT be polled until the previous body has been dropped.
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
let this = self.get_mut();
let state = Self::acquire_state(&mut this.state, &this.shared.body);
Expand All @@ -251,20 +274,6 @@ where
}
}

// If the inner body has previously ended, don't poll it again.
if !state.rest.is_end_stream() {
return Pin::new(&mut state.rest)
.poll_trailers(cx)
.map_ok(|tlrs| {
// Record a copy of the inner body's trailers in the shared state.
if state.trailers.is_none() {
state.trailers.clone_from(&tlrs);
}
tlrs
})
.map_err(Into::into);
}

Poll::Ready(Ok(None))
}

Expand Down Expand Up @@ -334,6 +343,32 @@ impl<B> Drop for ReplayBody<B> {
}
}

impl<B: Body> ReplayBody<B> {
/// Splits a `Frame<T>` into a chunk of data or a header map.
///
/// Frames do not expose their inner enums, and instead expose `into_data()` and
/// `into_trailers()` methods. This function breaks the frame into either `Some(Left(data))`
/// if it is given a DATA frame, and `Some(Right(trailers))` if it is given a TRAILERS frame.
///
/// This returns `None` if an unknown frame is provided, that is neither.
///
/// This is an internal helper to facilitate pattern matching in `read_body(..)`, above.
fn split_frame(
frame: crate::compat::Frame<B::Data>,
) -> Option<futures::future::Either<B::Data, HeaderMap>> {
use {crate::compat::Frame, futures::future::Either};
match frame.into_data().map_err(Frame::into_trailers) {
Ok(data) => Some(Either::Left(data)),
Err(Ok(trailers)) => Some(Either::Right(trailers)),
Err(Err(_unknown)) => {
// It's possible that some sort of unknown frame could be encountered.
tracing::warn!("an unknown body frame has been buffered");
None
}
}
}
}

// === impl BodyState ===

impl<B> BodyState<B> {
Expand Down
8 changes: 8 additions & 0 deletions linkerd/http/retry/src/replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ async fn eos_only_when_fully_replayed() {
.expect("yields a frame")
.into_data()
.expect("yields a data frame");
// TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers.
assert!(initial.frame().await.is_none());
assert!(initial.is_end_stream());
assert!(!replay.is_end_stream());
drop(initial);
Expand Down Expand Up @@ -634,6 +636,12 @@ async fn size_hint_is_correct_across_replays() {

// Read the body, check the size hint again.
assert_eq!(chunk(&mut initial).await.as_deref(), Some(BODY));
let initial = {
// TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers.
let mut body = crate::compat::ForwardCompatibleBody::new(initial);
assert!(body.frame().await.is_none());
body.into_inner()
};
debug_assert!(initial.is_end_stream());
// TODO(kate): this currently misreports the *remaining* size of the body.
// let size = initial.size_hint();
Expand Down

0 comments on commit 43988c8

Please sign in to comment.