Skip to content

Commit

Permalink
chore(http/stream-timeouts): upgrade to hyper 1.x
Browse files Browse the repository at this point in the history
Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Jan 7, 2025
1 parent a83e17c commit 5e99e14
Showing 1 changed file with 21 additions and 54 deletions.
75 changes: 21 additions & 54 deletions linkerd/http/stream-timeouts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! See [`EnforceTimeouts<S>`].
use futures::FutureExt;
use http_body::Frame;
use linkerd_error::{Error, Result};
use linkerd_stack as svc;
use parking_lot::RwLock;
Expand Down Expand Up @@ -356,47 +357,31 @@ where
type Data = B::Data;
type Error = Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let this = self.project();

if let Poll::Ready(res) = this.inner.poll_data(cx) {
if let Some(idle) = this.idle {
idle.reset(time::Instant::now());
}
return Poll::Ready(res);
}

if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) {
// TODO telemetry
return Poll::Ready(Some(Err(Error::from(e))));
}

Poll::Pending
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let this = self.project();
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let mut this = self.project();

if let Poll::Ready(res) = this.inner.poll_trailers(cx) {
// Poll for the next frame.
if let Poll::Ready(res) = this.inner.as_mut().poll_frame(cx) {
let now = time::Instant::now();
if let Some(idle) = this.idle {
idle.reset(now);
}
if let Some(tx) = this.request_flushed.take() {
let _ = tx.send(now);
// Send a timestamp when the end of the stream is reached.
if this.inner.as_ref().is_end_stream() {
if let Some(tx) = this.request_flushed.take() {
let _ = tx.send(now);
}
}
return Poll::Ready(res);
}

// Poll for a timeout error.
if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) {
// TODO telemetry
return Poll::Ready(Err(Error::from(e)));
return Poll::Ready(Some(Err(Error::from(e))));
}

Poll::Pending
Expand All @@ -416,19 +401,22 @@ where
type Data = B::Data;
type Error = Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let this = self.project();
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let mut this = self.project();

if let Poll::Ready(res) = this.inner.poll_data(cx) {
// Poll for the next frame.
if let Poll::Ready(res) = this.inner.as_mut().poll_frame(cx) {
let now = time::Instant::now();
if let Some(idle) = this.idle {
idle.reset(time::Instant::now());
idle.reset(now);
}
return Poll::Ready(res);
}

// Poll for a timeout error.
if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) {
// TODO telemetry
return Poll::Ready(Some(Err(Error::from(e))));
Expand All @@ -437,27 +425,6 @@ where
Poll::Pending
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let this = self.project();

if let Poll::Ready(res) = this.inner.poll_trailers(cx) {
if let Some(idle) = this.idle {
idle.reset(time::Instant::now());
};
return Poll::Ready(res);
}

if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) {
// TODO telemetry
return Poll::Ready(Err(Error::from(e)));
}

Poll::Pending
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}
Expand Down

0 comments on commit 5e99e14

Please sign in to comment.