diff --git a/linkerd/http/metrics/src/requests/service.rs b/linkerd/http/metrics/src/requests/service.rs index ad1af7ecbc..c0247aeeed 100644 --- a/linkerd/http/metrics/src/requests/service.rs +++ b/linkerd/http/metrics/src/requests/service.rs @@ -1,6 +1,6 @@ use super::{Metrics, StatusMetrics}; use futures::{ready, TryFuture}; -use http_body::Body; +use http_body::{Body, Frame}; use linkerd_error::Error; use linkerd_http_classify::{ClassifyEos, ClassifyResponse}; use linkerd_metrics::NewMetrics; @@ -266,12 +266,12 @@ where self.inner.is_end_stream() } - fn poll_data( + fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll, Self::Error>>> { let this = self.project(); - let frame = ready!(this.inner.poll_data(cx)); + let frame = ready!(this.inner.poll_frame(cx)); if let Some(lock) = this.metrics.take() { let now = Instant::now(); @@ -283,13 +283,6 @@ where Poll::Ready(frame) } - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - self.project().inner.poll_trailers(cx) - } - #[inline] fn size_hint(&self) -> http_body::SizeHint { self.inner.size_hint() @@ -408,38 +401,46 @@ where self.inner.is_end_stream() } - fn poll_data( + fn poll_frame( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - let poll = ready!(self.as_mut().project().inner.poll_data(cx)); + ) -> Poll, Self::Error>>> { + // Poll the body for the next frame. + let poll = ready!(self.as_mut().project().inner.poll_frame(cx)); let frame = poll.map(|opt| opt.map_err(|e| self.as_mut().measure_err(e.into()))); + // Update latency metrics if we are tracking body latency. if !(*self.as_mut().project().latency_recorded) { - self.record_latency(); + self.as_mut().record_latency(); } - Poll::Ready(frame) - } - - fn poll_trailers( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let trls = ready!(self.as_mut().project().inner.poll_trailers(cx)) - .map_err(|e| self.as_mut().measure_err(e.into()))?; - - if let Some(c) = self - .as_mut() - .project() - .classify - .take() - .map(|c| c.eos(trls.as_ref())) - { - self.record_class(c); + match &frame { + // Classify the stream if we have reached the end of the stream. + None => { + if let Some(classify) = self.as_mut().project().classify.take() { + let class = classify.eos(None); + self.record_class(class); + } + } + // Classify the stream if we have reached a trailers frame. + Some(Ok(frame)) => { + if let trls @ Some(_) = frame.trailers_ref() { + if let Some(classify) = self.as_mut().project().classify.take() { + let class = classify.eos(trls); + self.record_class(class); + } + } + } + // Classify the stream if we have reached an error. + Some(Err(error)) => { + if let Some(classify) = self.as_mut().project().classify.take() { + let class = classify.error(error); + self.record_class(class); + } + } } - Poll::Ready(Ok(trls)) + Poll::Ready(frame) } #[inline]