From 5e99e142c5fd44eb0700a094979c2ef0143f018a Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Tue, 7 Jan 2025 00:00:00 +0000 Subject: [PATCH] chore(http/stream-timeouts): upgrade to hyper 1.x Signed-off-by: katelyn martin --- linkerd/http/stream-timeouts/src/lib.rs | 75 +++++++------------------ 1 file changed, 21 insertions(+), 54 deletions(-) diff --git a/linkerd/http/stream-timeouts/src/lib.rs b/linkerd/http/stream-timeouts/src/lib.rs index 90ffc99ea9..7d618bb5f1 100644 --- a/linkerd/http/stream-timeouts/src/lib.rs +++ b/linkerd/http/stream-timeouts/src/lib.rs @@ -3,6 +3,7 @@ //! See [`EnforceTimeouts`]. use futures::FutureExt; +use http_body::Frame; use linkerd_error::{Error, Result}; use linkerd_stack as svc; use parking_lot::RwLock; @@ -356,47 +357,31 @@ where type Data = B::Data; type Error = Error; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - let this = self.project(); - - if let Poll::Ready(res) = this.inner.poll_data(cx) { - if let Some(idle) = this.idle { - idle.reset(time::Instant::now()); - } - return Poll::Ready(res); - } - - if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) { - // TODO telemetry - return Poll::Ready(Some(Err(Error::from(e)))); - } - - Poll::Pending - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let this = self.project(); + ) -> Poll, Self::Error>>> { + let mut this = self.project(); - if let Poll::Ready(res) = this.inner.poll_trailers(cx) { + // Poll for the next frame. + if let Poll::Ready(res) = this.inner.as_mut().poll_frame(cx) { let now = time::Instant::now(); if let Some(idle) = this.idle { idle.reset(now); } - if let Some(tx) = this.request_flushed.take() { - let _ = tx.send(now); + // Send a timestamp when the end of the stream is reached. + if this.inner.as_ref().is_end_stream() { + if let Some(tx) = this.request_flushed.take() { + let _ = tx.send(now); + } } return Poll::Ready(res); } + // Poll for a timeout error. if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) { // TODO telemetry - return Poll::Ready(Err(Error::from(e))); + return Poll::Ready(Some(Err(Error::from(e)))); } Poll::Pending @@ -416,19 +401,22 @@ where type Data = B::Data; type Error = Error; - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - let this = self.project(); + ) -> Poll, Self::Error>>> { + let mut this = self.project(); - if let Poll::Ready(res) = this.inner.poll_data(cx) { + // Poll for the next frame. + if let Poll::Ready(res) = this.inner.as_mut().poll_frame(cx) { + let now = time::Instant::now(); if let Some(idle) = this.idle { - idle.reset(time::Instant::now()); + idle.reset(now); } return Poll::Ready(res); } + // Poll for a timeout error. if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) { // TODO telemetry return Poll::Ready(Some(Err(Error::from(e)))); @@ -437,27 +425,6 @@ where Poll::Pending } - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let this = self.project(); - - if let Poll::Ready(res) = this.inner.poll_trailers(cx) { - if let Some(idle) = this.idle { - idle.reset(time::Instant::now()); - }; - return Poll::Ready(res); - } - - if let Poll::Ready(e) = poll_body_timeout(this.deadline, this.idle, cx) { - // TODO telemetry - return Poll::Ready(Err(Error::from(e))); - } - - Poll::Pending - } - fn is_end_stream(&self) -> bool { self.inner.is_end_stream() }