Skip to content

Commit

Permalink
chore(http/metrics): upgrade to hyper 1.x
Browse files Browse the repository at this point in the history
Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Jan 8, 2025
1 parent 82a93b0 commit 619777b
Showing 1 changed file with 35 additions and 34 deletions.
69 changes: 35 additions & 34 deletions linkerd/http/metrics/src/requests/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{Metrics, StatusMetrics};
use futures::{ready, TryFuture};
use http_body::Body;
use http_body::{Body, Frame};
use linkerd_error::Error;
use linkerd_http_classify::{ClassifyEos, ClassifyResponse};
use linkerd_metrics::NewMetrics;
Expand Down Expand Up @@ -266,12 +266,12 @@ where
self.inner.is_end_stream()
}

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.project();
let frame = ready!(this.inner.poll_data(cx));
let frame = ready!(this.inner.poll_frame(cx));

if let Some(lock) = this.metrics.take() {
let now = Instant::now();
Expand All @@ -283,13 +283,6 @@ where
Poll::Ready(frame)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
self.project().inner.poll_trailers(cx)
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
Expand Down Expand Up @@ -408,38 +401,46 @@ where
self.inner.is_end_stream()
}

fn poll_data(
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let poll = ready!(self.as_mut().project().inner.poll_data(cx));
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
// Poll the body for the next frame.
let poll = ready!(self.as_mut().project().inner.poll_frame(cx));
let frame = poll.map(|opt| opt.map_err(|e| self.as_mut().measure_err(e.into())));

// Update latency metrics if we are tracking body latency.
if !(*self.as_mut().project().latency_recorded) {
self.record_latency();
self.as_mut().record_latency();
}

Poll::Ready(frame)
}

fn poll_trailers(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let trls = ready!(self.as_mut().project().inner.poll_trailers(cx))
.map_err(|e| self.as_mut().measure_err(e.into()))?;

if let Some(c) = self
.as_mut()
.project()
.classify
.take()
.map(|c| c.eos(trls.as_ref()))
{
self.record_class(c);
match &frame {
// Classify the stream if we have reached the end of the stream.
None => {
if let Some(classify) = self.as_mut().project().classify.take() {
let class = classify.eos(None);
self.record_class(class);
}
}
// Classify the stream if we have reached a trailers frame.
Some(Ok(frame)) => {
if let trls @ Some(_) = frame.trailers_ref() {
if let Some(classify) = self.as_mut().project().classify.take() {
let class = classify.eos(trls);
self.record_class(class);
}
}
}
// Classify the stream if we have reached an error.
Some(Err(error)) => {
if let Some(classify) = self.as_mut().project().classify.take() {
let class = classify.error(error);
self.record_class(class);
}
}
}

Poll::Ready(Ok(trls))
Poll::Ready(frame)
}

#[inline]
Expand Down

0 comments on commit 619777b

Please sign in to comment.