Skip to content

Commit

Permalink
feat(app): work in progress body frame metrics
Browse files Browse the repository at this point in the history
🚧 woah there partner!

this commit is on a rolling work-in-progress branch. keep your hardhat
on!

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Oct 31, 2024
1 parent c268774 commit 044cc73
Show file tree
Hide file tree
Showing 11 changed files with 540 additions and 6 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,9 @@ dependencies = [
"ahash",
"bytes",
"futures",
"futures-util",
"http",
"http-body",
"hyper",
"linkerd-app-core",
"linkerd-app-test",
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ linkerd-tonic-stream = { path = "../../tonic-stream" }
linkerd-tonic-watch = { path = "../../tonic-watch" }

[dev-dependencies]
futures-util = "0.3"
http-body = "0.4"
hyper = { version = "0.14", features = ["http1", "http2"] }
tokio = { version = "1", features = ["macros", "sync", "time"] }
tokio-rustls = "0.24"
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/src/http/logical/policy/route/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ where
}| concrete,
)
.push(filters::NewApplyFilters::<Self, _, _>::layer())
.check_new_service::<Self, http::Request<http::BoxBody>>()
.push(metrics::layer(&metrics))
.check_new_service::<Self, http::Request<http::BoxBody>>()
.push(svc::NewMapErr::layer_with(|t: &Self| {
let backend = t.params.concrete.backend_ref.clone();
move |source| {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{BackendRef, ParentRef, RouteRef};
use linkerd_app_core::{metrics::prom, svc};
use linkerd_http_prom::{
body_data::response::{BodyDataMetrics, NewRecordBodyData, ResponseMetricsFamilies},
record_response::{self, NewResponseDuration, StreamLabel},
NewCountRequests, RequestCount, RequestCountFamilies,
};
Expand All @@ -11,10 +12,12 @@ pub use linkerd_http_prom::record_response::MkStreamLabel;
#[cfg(test)]
mod tests;

// DEV(kate); the backend metrics structure where body data is introduced.
#[derive(Debug)]
pub struct RouteBackendMetrics<L: StreamLabel> {
requests: RequestCountFamilies<labels::RouteBackend>,
responses: ResponseMetrics<L>,
body_metrics: ResponseMetricsFamilies<labels::RouteBackend>, // DEV(kate); rename this `ResponseBodyFamilies`
}

type ResponseMetrics<L> = record_response::ResponseMetrics<
Expand All @@ -26,14 +29,24 @@ pub fn layer<T, N>(
metrics: &RouteBackendMetrics<T::StreamLabel>,
) -> impl svc::Layer<
N,
Service = NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
Service = NewRecordBodyData<
ExtractRecordBodyDataParams,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
>,
>,
> + Clone
where
T: MkStreamLabel,
N: svc::NewService<T>,
NewRecordBodyData<
ExtractRecordBodyDataParams,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
>,
>: svc::NewService<T>,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
Expand All @@ -44,28 +57,37 @@ where
let RouteBackendMetrics {
requests,
responses,
body_metrics,
} = metrics.clone();

svc::layer::mk(move |inner| {
use svc::Layer;
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
.layer(inner),
NewRecordBodyData::layer_via(ExtractRecordBodyDataParams(body_metrics.clone())).layer(
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
.layer(inner),
),
)
})
}

#[derive(Clone, Debug)]
pub struct ExtractRequestCount(RequestCountFamilies<labels::RouteBackend>);

#[derive(Clone, Debug)]
pub struct ExtractRecordBodyDataParams(ResponseMetricsFamilies<labels::RouteBackend>);

// === impl RouteBackendMetrics ===

impl<L: StreamLabel> RouteBackendMetrics<L> {
pub fn register(reg: &mut prom::Registry, histo: impl IntoIterator<Item = f64>) -> Self {
let requests = RequestCountFamilies::register(reg);
let responses = record_response::ResponseMetrics::register(reg, histo);
let body_metrics = ResponseMetricsFamilies::register(reg);
Self {
requests,
responses,
body_metrics,
}
}

Expand All @@ -83,13 +105,22 @@ impl<L: StreamLabel> RouteBackendMetrics<L> {
pub(crate) fn get_statuses(&self, l: &L::StatusLabels) -> prom::Counter {
self.responses.get_statuses(l)
}

#[cfg(test)]
pub(crate) fn get_response_body_metrics(
&self,
l: &labels::RouteBackend,
) -> linkerd_http_prom::body_data::response::BodyDataMetrics {
self.body_metrics.get(l)
}
}

impl<L: StreamLabel> Default for RouteBackendMetrics<L> {
fn default() -> Self {
Self {
requests: Default::default(),
responses: Default::default(),
body_metrics: Default::default(),
}
}
}
Expand All @@ -99,6 +130,7 @@ impl<L: StreamLabel> Clone for RouteBackendMetrics<L> {
Self {
requests: self.requests.clone(),
responses: self.responses.clone(),
body_metrics: self.body_metrics.clone(),
}
}
}
Expand All @@ -114,3 +146,17 @@ where
.metrics(&labels::RouteBackend(t.param(), t.param(), t.param()))
}
}

// === impl ExtractRecordBodyDataParams ===

impl<T> svc::ExtractParam<BodyDataMetrics, T> for ExtractRecordBodyDataParams
where
T: svc::Param<ParentRef> + svc::Param<RouteRef> + svc::Param<BackendRef>,
{
fn extract_param(&self, t: &T) -> BodyDataMetrics {
let Self(families) = self;
let labels = labels::RouteBackend(t.param(), t.param(), t.param());

families.get(&labels)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use super::{
LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp, RouteBackendMetrics,
};
use crate::http::{concrete, logical::Concrete};
use bytes::Buf;
use linkerd_app_core::{
svc::{self, http::BoxBody, Layer, NewService},
transport::{Remote, ServerAddr},
Error,
};
use linkerd_proxy_client_policy as policy;

Expand Down Expand Up @@ -116,6 +118,121 @@ async fn http_request_statuses() {
assert_eq!(mixed.get(), 1);
}

// DEV(kate); test response body
#[allow(unused)]
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn body_data_layer_records_frames() -> Result<(), Error> {
use http_body::Body;
use linkerd_app_core::proxy::http;
use linkerd_http_prom::body_data::response::BodyDataMetrics;
use tower::{Service, ServiceExt};

let _trace = linkerd_tracing::test::trace_init();

let metrics = super::RouteBackendMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let backend_ref = crate::BackendRef(policy::Meta::new_default("backend"));

let (mut svc, mut handle) =
mock_http_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref);
handle.allow(1);

// Create a request.
let req = {
let empty = hyper::Body::empty();
let body = BoxBody::new(empty);
http::Request::builder().body(body).unwrap()
};

// Call the service once it is ready to accept a request.
tracing::info!("calling service");
svc.ready().await.expect("ready");
let mut call = svc.call(req);
let (req, send_resp) = handle.next_request().await.unwrap();

// Acquire the counters for this backend.
tracing::info!("acquiring response body metrics");
let labels = labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone());
let BodyDataMetrics {
frames_total,
frames_bytes,
} = metrics.get_response_body_metrics(&labels);

// Before we've sent a response, the counter should be zero.
assert_eq!(frames_total.get(), 0);
assert_eq!(frames_bytes.get(), 0);

// Create a response whose body is backed by a channel that we can send chunks to, send it.
tracing::info!("sending response");
let mut resp_tx = {
let (tx, body) = hyper::Body::channel();
let body = BoxBody::new(body);
let resp = http::Response::builder()
.status(http::StatusCode::IM_A_TEAPOT)
.body(body)
.unwrap();
send_resp.send_response(resp);
tx
};

// Before we've sent any bytes, the counter should be zero.
assert_eq!(frames_total.get(), 0);
assert_eq!(frames_bytes.get(), 0);

// On the client end, poll our call future and await the response.
tracing::info!("polling service future");
let (parts, mut body) = call.await?.into_parts();
assert_eq!(parts.status, 418);

let mut body = Box::pin(body);

/// Returns the next chunk from a boxed body.
async fn read_chunk(body: &mut std::pin::Pin<Box<BoxBody>>) -> Result<Vec<u8>, Error> {
use std::task::{Context, Poll};
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
let data = match body.as_mut().poll_data(&mut ctx) {
Poll::Ready(Some(Ok(d))) => d,
_ => panic!("next chunk should be ready"),
};
let chunk = data.chunk().to_vec();
Ok(chunk)
}

{
// Send a chunk, confirm that our counters are incremented.
tracing::info!("sending first chunk");
resp_tx.send_data("hello".into()).await?;
let chunk = read_chunk(&mut body).await?;
debug_assert_eq!("hello".as_bytes(), chunk, "should get same value back out");
assert_eq!(frames_total.get(), 1);
}

{
// Send another chunk, confirm that our counters are incremented once more.
tracing::info!("sending second chunk");
resp_tx.send_data("world".into()).await?;
let chunk = read_chunk(&mut body).await?;
debug_assert_eq!("world".as_bytes(), chunk, "should get same value back out");
assert_eq!(frames_total.get(), 2);
}

{
// Close the body, show that the counters remain at the same values.
use std::task::{Context, Poll};
tracing::info!("closing response body");
drop(resp_tx);
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
match body.as_mut().poll_data(&mut ctx) {
Poll::Ready(None) => {}
other => panic!("got unexpected poll result"),
};
assert_eq!(frames_total.get(), 2);
}

Ok(())
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn grpc_request_statuses_ok() {
let _trace = linkerd_tracing::test::trace_init();
Expand Down Expand Up @@ -279,6 +396,7 @@ async fn grpc_request_statuses_error_body() {

// === Util ===

// DEV(kate); route backend mock
fn mock_http_route_backend_metrics(
metrics: &RouteBackendMetrics<LabelHttpRouteBackendRsp>,
parent_ref: &crate::ParentRef,
Expand Down
5 changes: 5 additions & 0 deletions linkerd/http/prom/src/body_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod request;
pub mod response;

mod body;
mod metrics;
72 changes: 72 additions & 0 deletions linkerd/http/prom/src/body_data/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use super::metrics::BodyDataMetrics;
use http::HeaderMap;
use http_body::SizeHint;
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};

/// An instrumented body.
#[pin_project]
pub struct Body<B> {
/// The inner body.
#[pin]
inner: B,
/// Metrics with which the inner body will be instrumented.
metrics: BodyDataMetrics,
}

impl<B> Body<B> {
/// Returns a new, instrumented body.
pub(crate) fn new(body: B, metrics: BodyDataMetrics) -> Self {
Self {
inner: body,
metrics,
}
}
}

impl<B> http_body::Body for Body<B>
where
B: http_body::Body,
{
type Data = B::Data;
type Error = B::Error;

/// Attempt to pull out the next data buffer of this stream.
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let this = self.project();
let inner = this.inner;
let BodyDataMetrics {
frames_total,
frames_bytes: _, // DEV(kate); count bytes in body chunks.
} = this.metrics;

let data = std::task::ready!(inner.poll_data(cx));

if let Some(Ok(_)) = data.as_ref() {
frames_total.inc();
}

Poll::Ready(data)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
self.project().inner.poll_trailers(cx)
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

fn size_hint(&self) -> SizeHint {
self.inner.size_hint()
}
}
Loading

0 comments on commit 044cc73

Please sign in to comment.