Skip to content

Commit

Permalink
chore(http/retry): work in progress upgrade
Browse files Browse the repository at this point in the history
Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Jan 8, 2025
1 parent 2c06812 commit 88aa1c5
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 66 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,7 @@ dependencies = [
"futures",
"http 1.2.0",
"http-body",
"http-body-util",
"hyper",
"linkerd-error",
"linkerd-exp-backoff",
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ publish = false
bytes = { workspace = true }
futures = { version = "0.3", default-features = false }
http-body = { workspace = true }
http-body-util = { workspace = true }
http = { workspace = true }
parking_lot = "0.12"
tokio = { version = "1", features = ["macros", "rt"] }
Expand Down
99 changes: 34 additions & 65 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::{
future::{self, Either},
FutureExt, TryFutureExt,
};
use http_body::Body;
use http_body::{Body, Frame};
use linkerd_http_box::BoxBody;
use linkerd_stack::Service;
use std::{
Expand All @@ -18,25 +18,11 @@ use std::{
/// behaves identically to a normal body.
pub struct PeekTrailersBody<B: Body = BoxBody> {
inner: B,

/// The first DATA frame received from the inner body, or an error that
/// The first frame received from the inner body, or an error that
/// occurred while polling for data.
///
/// If this is `None`, then the body has completed without any DATA frames.
first_data: Option<Result<B::Data, B::Error>>,

/// The inner body's trailers, if it was terminated by a `TRAILERS` frame
/// after 0 DATA frames, or an error if polling for trailers failed.
///
/// Yes, this is a bit of a complex type, so let's break it down:
/// - the outer `Option` indicates whether any trailers were received by
/// `WithTrailers`; if it's `None`, then we don't *know* if the response
/// had trailers, as it is not yet complete.
/// - the inner `Result` and `Option` are the `Result` and `Option` returned
/// by `HttpBody::trailers` on the inner body. If this is `Ok(None)`, then
/// the body has terminated without trailers --- it is *known* to not have
/// trailers.
trailers: Option<Result<Option<http::HeaderMap>, B::Error>>,
/// If this is `None`, then the body has completed without any frames.
frame: Option<Result<Frame<B::Data>, B::Error>>,
}

pub type WithPeekTrailersBody<B> = Either<
Expand All @@ -51,9 +37,10 @@ pub struct ResponseWithPeekTrailers<S>(pub(crate) S);

impl<B: Body> PeekTrailersBody<B> {
pub fn peek_trailers(&self) -> Option<&http::HeaderMap> {
self.trailers
self.frame
.as_ref()
.and_then(|trls| trls.as_ref().ok()?.as_ref())
.and_then(|f| f.as_ref().ok())
.and_then(Frame::trailers_ref)
}

pub fn map_response(rsp: http::Response<B>) -> WithPeekTrailersBody<B>
Expand Down Expand Up @@ -87,35 +74,26 @@ impl<B: Body> PeekTrailersBody<B> {
B::Data: Send + Unpin,
B::Error: Send,
{
let (parts, body) = rsp.into_parts();
let mut body = Self {
inner: body,
first_data: None,
trailers: None,
};

tracing::debug!("Buffering first data frame");
if let Some(data) = body.inner.data().await {
// The body has data; stop waiting for trailers.
body.first_data = Some(data);
} else {
// Okay, `poll_data` has returned `None`, so there are no data
// frames left. Let's see if there's trailers...
body.trailers = Some(body.inner.trailers().await);
}
if body.trailers.is_some() {
tracing::debug!("Buffered trailers frame");
use http_body_util::BodyExt;

let (parts, mut body) = rsp.into_parts();

tracing::debug!("Buffering first body frame");
let frame = body.frame().await;

if let Some(Ok(frame)) = &frame {
if frame.trailers_ref().is_some() {
tracing::debug!("Buffered trailers frame");
}
}

let body = Self { inner: body, frame };

http::Response::from_parts(parts, body)
}

fn no_trailers(rsp: http::Response<B>) -> http::Response<Self> {
rsp.map(|inner| Self {
inner,
first_data: None,
trailers: None,
})
rsp.map(|inner| Self { inner, frame: None })
}
}

Expand All @@ -128,33 +106,22 @@ where
type Data = B::Data;
type Error = B::Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.get_mut();
if let Some(first_data) = this.first_data.take() {
return Poll::Ready(Some(first_data));
}

Pin::new(&mut this.inner).poll_data(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let this = self.get_mut();
if let Some(trailers) = this.trailers.take() {
return Poll::Ready(trailers);
if let frame @ Some(_) = this.frame.take() {
return Poll::Ready(frame);
}

Pin::new(&mut this.inner).poll_trailers(cx)
Pin::new(&mut this.inner).poll_frame(cx)
}

#[inline]
fn is_end_stream(&self) -> bool {
self.first_data.is_none() && self.trailers.is_none() && self.inner.is_end_stream()
self.frame.is_none() && self.inner.is_end_stream()
}

#[inline]
Expand All @@ -164,12 +131,14 @@ where
let mut hint = self.inner.size_hint();
// If we're holding onto a chunk of data, add its length to the inner
// `Body`'s size hint.
if let Some(Ok(chunk)) = self.first_data.as_ref() {
let buffered = chunk.remaining() as u64;
if let Some(upper) = hint.upper() {
hint.set_upper(upper + buffered);
if let Some(Ok(frame)) = self.frame.as_ref() {
if let Some(chunk) = frame.data_ref() {
let buffered = chunk.remaining() as u64;
if let Some(upper) = hint.upper() {
hint.set_upper(upper + buffered);
}
hint.set_lower(hint.lower() + buffered);
}
hint.set_lower(hint.lower() + buffered);
}

hint
Expand Down
11 changes: 10 additions & 1 deletion linkerd/http/retry/src/replay.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
use http::HeaderMap;
use http_body::{Body, SizeHint};
use http_body::{Body, Frame, SizeHint};
use linkerd_error::Error;
use linkerd_http_box::BoxBody;
use parking_lot::Mutex;
Expand Down Expand Up @@ -160,6 +160,14 @@ where
type Data = Data;
type Error = Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
todo!("XXX(kate): temporary stub");
}

/*
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -282,6 +290,7 @@ where
Poll::Ready(Ok(None))
}
*/

fn is_end_stream(&self) -> bool {
// if the initial body was EOS as soon as it was wrapped, then we are
Expand Down

0 comments on commit 88aa1c5

Please sign in to comment.