Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(http/retry): use boxed bodies instead of hyper::Body #3515

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions linkerd/http/retry/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ mod tests {
fn empty_body_is_always_eos() {
// If the initial body was empty, every clone should always return
// `true` from `is_end_stream`.
let initial = ReplayBody::try_new(hyper::Body::empty(), 64 * 1024)
let initial = ReplayBody::try_new(BoxBody::empty(), 64 * 1024)
.expect("empty body can't be too large");
assert!(initial.is_end_stream());

Expand All @@ -785,7 +785,7 @@ mod tests {
async fn eos_only_when_fully_replayed() {
// Test that each clone of a body is not EOS until the data has been
// fully replayed.
let mut initial = ReplayBody::try_new(hyper::Body::from("hello world"), 64 * 1024)
let mut initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024)
.expect("body must not be too large");
let mut replay = initial.clone();

Expand Down Expand Up @@ -829,6 +829,9 @@ mod tests {
// initial body will complete, but the replay will immediately fail.
let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace");

// TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via
// `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`.
// alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`.
let (mut tx, body) = hyper::Body::channel();
let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large");
let mut replay = initial.clone();
Expand Down Expand Up @@ -859,6 +862,9 @@ mod tests {
// cap, we allow the request to continue, but stop buffering.
let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug");

// TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via
// `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`.
// alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`.
let (mut tx, body) = hyper::Body::channel();
let mut initial = ReplayBody::try_new(body, 8).expect("channel body must not be too large");
let mut replay = initial.clone();
Expand Down Expand Up @@ -889,11 +895,13 @@ mod tests {
#[test]
fn body_too_big() {
let max_size = 8;
let mk_body =
|sz: usize| -> hyper::Body { (0..sz).map(|_| "x").collect::<String>().into() };
let mk_body = |sz: usize| -> BoxBody {
let s = (0..sz).map(|_| "x").collect::<String>();
BoxBody::new(s)
};

assert!(
ReplayBody::try_new(hyper::Body::empty(), max_size).is_ok(),
ReplayBody::try_new(BoxBody::empty(), max_size).is_ok(),
"empty body is not too big"
);

Expand All @@ -907,6 +915,9 @@ mod tests {
"over-sized body is too big"
);

// TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via
// `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`.
// alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`.
let (_sender, body) = hyper::Body::channel();
assert!(
ReplayBody::try_new(body, max_size).is_ok(),
Expand All @@ -915,18 +926,25 @@ mod tests {
}

struct Test {
// Sends body data.
tx: Tx,
initial: ReplayBody<hyper::Body>,
replay: ReplayBody<hyper::Body>,
/// The "initial" body.
initial: ReplayBody<BoxBody>,
/// Replays the initial body.
replay: ReplayBody<BoxBody>,
/// An RAII guard for the tracing subscriber.
_trace: tracing::subscriber::DefaultGuard,
}

struct Tx(hyper::body::Sender);

impl Test {
fn new() -> Self {
let (tx, body) = hyper::Body::channel();
let initial = ReplayBody::try_new(body, 64 * 1024).expect("body too large");
// TODO(kate): see #8733. this `Body::channel` should become a `mpsc::channel`, via
// `http_body_util::StreamBody` and `tokio_stream::wrappers::ReceiverStream`.
// alternately, hyperium/http-body#140 adds a channel-backed body to `http-body-util`.
let (tx, rx) = hyper::Body::channel();
let initial = ReplayBody::try_new(BoxBody::new(rx), 64 * 1024).expect("body too large");
let replay = initial.clone();
Self {
tx: Tx(tx),
Expand Down
Loading