Skip to content

Commit

Permalink
refactor(http/retry): outline ForwardCompatibleBody<B> (#3614)
Browse files Browse the repository at this point in the history
* refactor(http/retry): outline `ForwardCompatibleBody<B>`

in #3559 (4b53081), we introduced a backported `Frame<T>` type, and a
`ForwardCompatibleBody<B>` type that allows us to interact with a
`http_body::Body` circa 0.4.6 in terms of frame-based interfaces that
match those of the 1.0 interface.

see linkerd/linkerd2#8733 for more information on upgrading hyper.

in #3559, we narrowly added this as an internal submodule of the
`linkerd-http-retry` library. these facilities however, would have
utility in other places such as `linkerd-app-core`.

this commit pulls these compatibility shims out into a
`linkerd-http-body-compat` library so that they can be imported and
reused elsewhere.

Signed-off-by: katelyn martin <[email protected]>

* nit(http/body-compat): tidy `combinators` imports

Signed-off-by: katelyn martin <[email protected]>

---------

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn authored Feb 13, 2025
1 parent e0a7121 commit faa42fd
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 40 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,14 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-http-body-compat"
version = "0.1.0"
dependencies = [
"http",
"http-body",
]

[[package]]
name = "linkerd-http-box"
version = "0.1.0"
Expand Down Expand Up @@ -1813,6 +1821,7 @@ dependencies = [
"hyper",
"linkerd-error",
"linkerd-exp-backoff",
"linkerd-http-body-compat",
"linkerd-http-box",
"linkerd-metrics",
"linkerd-mock-http-body",
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"linkerd/error-respond",
"linkerd/exp-backoff",
"linkerd/http/access-log",
"linkerd/http/body-compat",
"linkerd/http/box",
"linkerd/http/classify",
"linkerd/http/executor",
Expand Down
11 changes: 11 additions & 0 deletions linkerd/http/body-compat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "linkerd-http-body-compat"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false

[dependencies]
http = { workspace = true }
http-body = { workspace = true }
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use std::{
task::{Context, Poll},
};

pub(crate) use self::frame::Frame;
pub use self::frame::Frame;

mod frame;

#[derive(Debug)]
pub(crate) struct ForwardCompatibleBody<B> {
pub struct ForwardCompatibleBody<B> {
inner: B,
data_finished: bool,
trailers_finished: bool,
Expand All @@ -21,7 +21,7 @@ pub(crate) struct ForwardCompatibleBody<B> {
// === impl ForwardCompatibleBody ===

impl<B: Body> ForwardCompatibleBody<B> {
pub(crate) fn new(body: B) -> Self {
pub fn new(body: B) -> Self {
if body.is_end_stream() {
Self {
inner: body,
Expand All @@ -37,28 +37,28 @@ impl<B: Body> ForwardCompatibleBody<B> {
}
}

pub(crate) fn into_inner(self) -> B {
pub fn into_inner(self) -> B {
self.inner
}

/// Returns a future that resolves to the next frame.
pub(crate) fn frame(&mut self) -> combinators::Frame<'_, B> {
pub fn frame(&mut self) -> combinators::Frame<'_, B> {
combinators::Frame(self)
}

/// Returns `true` when the end of stream has been reached.
pub(crate) fn is_end_stream(&self) -> bool {
pub fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

/// Returns the bounds on the remaining length of the stream.
pub(crate) fn size_hint(&self) -> SizeHint {
pub fn size_hint(&self) -> SizeHint {
self.inner.size_hint()
}
}

impl<B: Body + Unpin> ForwardCompatibleBody<B> {
pub(crate) fn poll_frame(
pub fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<B::Data>, B::Error>>> {
Expand All @@ -78,14 +78,13 @@ impl<B: Body + Unpin> ForwardCompatibleBody<B> {
///
/// [frame]: https://docs.rs/http-body-util/0.1.2/http_body_util/combinators/struct.Frame.html
mod combinators {
use core::future::Future;
use core::pin::Pin;
use core::task;
use http_body::Body;
use std::ops::Not;
use std::task::ready;

use super::ForwardCompatibleBody;
use core::{future::Future, pin::Pin, task};
use http_body::Body;
use std::{
ops::Not,
task::{ready, Context, Poll},
};

#[must_use = "futures don't do anything unless polled"]
#[derive(Debug)]
Expand All @@ -95,7 +94,7 @@ mod combinators {
impl<T: Body + Unpin> Future for Frame<'_, T> {
type Output = Option<Result<super::Frame<T::Data>, T::Error>>;

fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let Self(ForwardCompatibleBody {
inner,
data_finished,
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ thiserror = "2"
linkerd-http-box = { path = "../box" }
linkerd-error = { path = "../../error" }
linkerd-exp-backoff = { path = "../../exp-backoff" }
linkerd-http-body-compat = { path = "../body-compat" }
linkerd-metrics = { path = "../../metrics" }
linkerd-stack = { path = "../../stack" }

Expand Down
2 changes: 0 additions & 2 deletions linkerd/http/retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
pub mod peek_trailers;
pub mod replay;

mod compat;

pub use self::{peek_trailers::PeekTrailersBody, replay::ReplayBody};
pub use tower::retry::budget::Budget;

Expand Down
6 changes: 3 additions & 3 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl<B: Body> PeekTrailersBody<B> {
{
// XXX(kate): for now, wrap this in a compatibility adapter that yields `Frame<T>`s.
// this can be removed when we upgrade to http-body 1.0.
use crate::compat::ForwardCompatibleBody;
use linkerd_http_body_compat::ForwardCompatibleBody;
let mut body = ForwardCompatibleBody::new(body);

// First, poll the body for its first frame.
Expand Down Expand Up @@ -220,9 +220,9 @@ impl<B: Body> PeekTrailersBody<B> {
///
/// This is an internal helper to facilitate pattern matching in `read_body(..)`, above.
fn split_frame(
frame: crate::compat::Frame<B::Data>,
frame: linkerd_http_body_compat::Frame<B::Data>,
) -> Option<futures::future::Either<B::Data, HeaderMap>> {
use {crate::compat::Frame, futures::future::Either};
use {futures::future::Either, linkerd_http_body_compat::Frame};
match frame.into_data().map_err(Frame::into_trailers) {
Ok(data) => Some(Either::Left(data)),
Err(Ok(trailers)) => Some(Either::Right(trailers)),
Expand Down
8 changes: 4 additions & 4 deletions linkerd/http/retry/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct SharedState<B> {
struct BodyState<B> {
replay: Replay,
trailers: Option<HeaderMap>,
rest: crate::compat::ForwardCompatibleBody<B>,
rest: linkerd_http_body_compat::ForwardCompatibleBody<B>,
is_completed: bool,

/// Maximum number of bytes to buffer.
Expand Down Expand Up @@ -104,7 +104,7 @@ impl<B: Body> ReplayBody<B> {
state: Some(BodyState {
replay: Default::default(),
trailers: None,
rest: crate::compat::ForwardCompatibleBody::new(body),
rest: linkerd_http_body_compat::ForwardCompatibleBody::new(body),
is_completed: false,
max_bytes: max_bytes + 1,
}),
Expand Down Expand Up @@ -368,9 +368,9 @@ impl<B: Body> ReplayBody<B> {
///
/// This is an internal helper to facilitate pattern matching in `read_body(..)`, above.
fn split_frame(
frame: crate::compat::Frame<B::Data>,
frame: linkerd_http_body_compat::Frame<B::Data>,
) -> Option<futures::future::Either<B::Data, HeaderMap>> {
use {crate::compat::Frame, futures::future::Either};
use {futures::future::Either, linkerd_http_body_compat::Frame};
match frame.into_data().map_err(Frame::into_trailers) {
Ok(data) => Some(Either::Left(data)),
Err(Ok(trailers)) => Some(Either::Right(trailers)),
Expand Down
30 changes: 15 additions & 15 deletions linkerd/http/retry/src/replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async fn replays_trailers() {
drop(tx);

let read_trailers = |body: ReplayBody<_>| async move {
let mut body = crate::compat::ForwardCompatibleBody::new(body);
let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(body);
let _ = body
.frame()
.await
Expand Down Expand Up @@ -126,8 +126,8 @@ async fn replays_trailers_only() {
replay,
_trace,
} = Test::new();
let mut initial = crate::compat::ForwardCompatibleBody::new(initial);
let mut replay = crate::compat::ForwardCompatibleBody::new(replay);
let mut initial = linkerd_http_body_compat::ForwardCompatibleBody::new(initial);
let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay);

let mut tlrs = HeaderMap::new();
tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap());
Expand Down Expand Up @@ -332,8 +332,8 @@ async fn eos_only_when_fully_replayed() {
.expect("body must not be too large");
let replay = initial.clone();

let mut initial = crate::compat::ForwardCompatibleBody::new(initial);
let mut replay = crate::compat::ForwardCompatibleBody::new(replay);
let mut initial = linkerd_http_body_compat::ForwardCompatibleBody::new(initial);
let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay);

// Read the initial body, show that the replay does not consider itself to have reached the
// end-of-stream. Then drop the initial body, show that the replay is still not done.
Expand Down Expand Up @@ -374,7 +374,7 @@ async fn eos_only_when_fully_replayed() {
drop(replay);

// Read the second replay body.
let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2);
let mut replay2 = linkerd_http_body_compat::ForwardCompatibleBody::new(replay2);
replay2
.frame()
.await
Expand All @@ -396,8 +396,8 @@ async fn eos_only_when_fully_replayed_with_trailers() {
.expect("body must not be too large");
let replay = initial.clone();

let mut initial = crate::compat::ForwardCompatibleBody::new(initial);
let mut replay = crate::compat::ForwardCompatibleBody::new(replay);
let mut initial = linkerd_http_body_compat::ForwardCompatibleBody::new(initial);
let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay);

// Read the initial body, show that the replay does not consider itself to have reached the
// end-of-stream. Then drop the initial body, show that the replay is still not done.
Expand Down Expand Up @@ -450,7 +450,7 @@ async fn eos_only_when_fully_replayed_with_trailers() {
drop(replay);

// Read the second replay body.
let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2);
let mut replay2 = linkerd_http_body_compat::ForwardCompatibleBody::new(replay2);
replay2
.frame()
.await
Expand Down Expand Up @@ -508,7 +508,7 @@ async fn caps_buffer() {

// The request's replay should error, since we discarded the buffer when
// we hit the cap.
let mut replay = crate::compat::ForwardCompatibleBody::new(replay);
let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay);
let err = replay
.frame()
.await
Expand Down Expand Up @@ -554,7 +554,7 @@ async fn caps_across_replays() {
drop(replay);

// The second replay will fail, though, because the buffer was discarded.
let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2);
let mut replay2 = linkerd_http_body_compat::ForwardCompatibleBody::new(replay2);
let err = replay2
.frame()
.await
Expand Down Expand Up @@ -638,7 +638,7 @@ async fn size_hint_is_correct_across_replays() {
assert_eq!(chunk(&mut initial).await.as_deref(), Some(BODY));
let initial = {
// TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers.
let mut body = crate::compat::ForwardCompatibleBody::new(initial);
let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(initial);
assert!(body.frame().await.is_none());
body.into_inner()
};
Expand All @@ -661,7 +661,7 @@ async fn size_hint_is_correct_across_replays() {
assert_eq!(chunk(&mut replay).await.as_deref(), Some(BODY));
// let replay = {
// // TODO(kate): the replay doesn't report ending until it has (not) yielded trailers.
// let mut body = crate::compat::ForwardCompatibleBody::new(replay);
// let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(replay);
// assert!(body.frame().await.is_none());
// body.into_inner()
// };
Expand Down Expand Up @@ -770,7 +770,7 @@ where
T: http_body::Body + Unpin,
{
tracing::trace!("waiting for a body chunk...");
let chunk = crate::compat::ForwardCompatibleBody::new(body)
let chunk = linkerd_http_body_compat::ForwardCompatibleBody::new(body)
.frame()
.await
.expect("yields a result")
Expand All @@ -788,7 +788,7 @@ where
B: http_body::Body + Unpin,
B::Error: std::fmt::Debug,
{
let mut body = crate::compat::ForwardCompatibleBody::new(body);
let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(body);
let mut data = String::new();
let mut trailers = None;

Expand Down

0 comments on commit faa42fd

Please sign in to comment.