diff --git a/Cargo.lock b/Cargo.lock index 72d0545709..1fad71fc53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1325,7 +1325,9 @@ dependencies = [ "ahash", "bytes", "futures", + "futures-util", "http", + "http-body", "hyper", "linkerd-app-core", "linkerd-app-test", @@ -1554,6 +1556,7 @@ dependencies = [ name = "linkerd-http-prom" version = "0.1.0" dependencies = [ + "bytes", "futures", "http", "http-body", diff --git a/linkerd/app/outbound/Cargo.toml b/linkerd/app/outbound/Cargo.toml index 97b2ca7337..10b41c8200 100644 --- a/linkerd/app/outbound/Cargo.toml +++ b/linkerd/app/outbound/Cargo.toml @@ -49,6 +49,8 @@ linkerd-tonic-stream = { path = "../../tonic-stream" } linkerd-tonic-watch = { path = "../../tonic-watch" } [dev-dependencies] +futures-util = "0.3" +http-body = "0.4" hyper = { version = "0.14", features = ["http1", "http2"] } tokio = { version = "1", features = ["macros", "sync", "time"] } tokio-rustls = "0.24" diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs index 3e2684bed1..af15735757 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs @@ -1,6 +1,7 @@ use crate::{BackendRef, ParentRef, RouteRef}; use linkerd_app_core::{metrics::prom, svc}; use linkerd_http_prom::{ + body_data::response::{BodyDataMetrics, NewRecordBodyData, ResponseBodyFamilies}, record_response::{self, NewResponseDuration, StreamLabel}, NewCountRequests, RequestCount, RequestCountFamilies, }; @@ -15,6 +16,7 @@ mod tests; pub struct RouteBackendMetrics { requests: RequestCountFamilies, responses: ResponseMetrics, + body_metrics: ResponseBodyFamilies, } type ResponseMetrics = record_response::ResponseMetrics< @@ -26,14 +28,24 @@ pub fn layer( metrics: &RouteBackendMetrics, ) -> impl svc::Layer< N, - Service = NewCountRequests< - ExtractRequestCount, - NewResponseDuration>, N>, + Service = NewRecordBodyData< + ExtractRecordBodyDataParams, + NewCountRequests< + ExtractRequestCount, + NewResponseDuration>, N>, + >, >, > + Clone where T: MkStreamLabel, N: svc::NewService, + NewRecordBodyData< + ExtractRecordBodyDataParams, + NewCountRequests< + ExtractRequestCount, + NewResponseDuration>, N>, + >, + >: svc::NewService, NewCountRequests< ExtractRequestCount, NewResponseDuration>, N>, @@ -44,12 +56,16 @@ where let RouteBackendMetrics { requests, responses, + body_metrics, } = metrics.clone(); + svc::layer::mk(move |inner| { use svc::Layer; - NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer( - NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone())) - .layer(inner), + NewRecordBodyData::layer_via(ExtractRecordBodyDataParams(body_metrics.clone())).layer( + NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer( + NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone())) + .layer(inner), + ), ) }) } @@ -57,15 +73,20 @@ where #[derive(Clone, Debug)] pub struct ExtractRequestCount(RequestCountFamilies); +#[derive(Clone, Debug)] +pub struct ExtractRecordBodyDataParams(ResponseBodyFamilies); + // === impl RouteBackendMetrics === impl RouteBackendMetrics { pub fn register(reg: &mut prom::Registry, histo: impl IntoIterator) -> Self { let requests = RequestCountFamilies::register(reg); let responses = record_response::ResponseMetrics::register(reg, histo); + let body_metrics = ResponseBodyFamilies::register(reg); Self { requests, responses, + body_metrics, } } @@ -83,6 +104,14 @@ impl RouteBackendMetrics { pub(crate) fn get_statuses(&self, l: &L::StatusLabels) -> prom::Counter { self.responses.get_statuses(l) } + + #[cfg(test)] + pub(crate) fn get_response_body_metrics( + &self, + l: &labels::RouteBackend, + ) -> linkerd_http_prom::body_data::response::BodyDataMetrics { + self.body_metrics.get(l) + } } impl Default for RouteBackendMetrics { @@ -90,6 +119,7 @@ impl Default for RouteBackendMetrics { Self { requests: Default::default(), responses: Default::default(), + body_metrics: Default::default(), } } } @@ -99,6 +129,7 @@ impl Clone for RouteBackendMetrics { Self { requests: self.requests.clone(), responses: self.responses.clone(), + body_metrics: self.body_metrics.clone(), } } } @@ -114,3 +145,17 @@ where .metrics(&labels::RouteBackend(t.param(), t.param(), t.param())) } } + +// === impl ExtractRecordBodyDataParams === + +impl svc::ExtractParam for ExtractRecordBodyDataParams +where + T: svc::Param + svc::Param + svc::Param, +{ + fn extract_param(&self, t: &T) -> BodyDataMetrics { + let Self(families) = self; + let labels = labels::RouteBackend(t.param(), t.param(), t.param()); + + families.get(&labels) + } +} diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs index 1ba6427e6f..dd0e68434b 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs @@ -5,9 +5,11 @@ use super::{ LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp, RouteBackendMetrics, }; use crate::http::{concrete, logical::Concrete}; +use bytes::Buf; use linkerd_app_core::{ svc::{self, http::BoxBody, Layer, NewService}, transport::{Remote, ServerAddr}, + Error, }; use linkerd_proxy_client_policy as policy; @@ -116,6 +118,128 @@ async fn http_request_statuses() { assert_eq!(mixed.get(), 1); } +/// Tests that metrics count frames in the backend response body. +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn body_data_layer_records_frames() -> Result<(), Error> { + use http_body::Body; + use linkerd_app_core::proxy::http; + use linkerd_http_prom::body_data::response::BodyDataMetrics; + use tower::{Service, ServiceExt}; + + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::RouteBackendMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let backend_ref = crate::BackendRef(policy::Meta::new_default("backend")); + + let (mut svc, mut handle) = + mock_http_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref); + handle.allow(1); + + // Create a request. + let req = { + let empty = hyper::Body::empty(); + let body = BoxBody::new(empty); + http::Request::builder().method("DOOT").body(body).unwrap() + }; + + // Call the service once it is ready to accept a request. + tracing::info!("calling service"); + svc.ready().await.expect("ready"); + let call = svc.call(req); + let (req, send_resp) = handle.next_request().await.unwrap(); + debug_assert_eq!(req.method().as_str(), "DOOT"); + + // Acquire the counters for this backend. + tracing::info!("acquiring response body metrics"); + let labels = labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone()); + let BodyDataMetrics { + frames_total, + frames_bytes, + } = metrics.get_response_body_metrics(&labels); + + // Before we've sent a response, the counter should be zero. + assert_eq!(frames_total.get(), 0); + assert_eq!(frames_bytes.get(), 0); + + // Create a response whose body is backed by a channel that we can send chunks to, send it. + tracing::info!("sending response"); + let mut resp_tx = { + let (tx, body) = hyper::Body::channel(); + let body = BoxBody::new(body); + let resp = http::Response::builder() + .status(http::StatusCode::IM_A_TEAPOT) + .body(body) + .unwrap(); + send_resp.send_response(resp); + tx + }; + + // Before we've sent any bytes, the counter should be zero. + assert_eq!(frames_total.get(), 0); + assert_eq!(frames_bytes.get(), 0); + + // On the client end, poll our call future and await the response. + tracing::info!("polling service future"); + let (parts, body) = call.await?.into_parts(); + debug_assert_eq!(parts.status, 418); + + let mut body = Box::pin(body); + + /// Returns the next chunk from a boxed body. + async fn read_chunk(body: &mut std::pin::Pin>) -> Result, Error> { + use std::task::{Context, Poll}; + let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref()); + let data = match body.as_mut().poll_data(&mut ctx) { + Poll::Ready(Some(Ok(d))) => d, + _ => panic!("next chunk should be ready"), + }; + let chunk = data.chunk().to_vec(); + Ok(chunk) + } + + { + // Send a chunk, confirm that our counters are incremented. + tracing::info!("sending first chunk"); + resp_tx.send_data("hello".into()).await?; + let chunk = read_chunk(&mut body).await?; + debug_assert_eq!("hello".as_bytes(), chunk, "should get same value back out"); + assert_eq!(frames_total.get(), 1); + assert_eq!(frames_bytes.get(), 5); + } + + { + // Send another chunk, confirm that our counters are incremented once more. + tracing::info!("sending second chunk"); + resp_tx.send_data(", world!".into()).await?; + let chunk = read_chunk(&mut body).await?; + debug_assert_eq!( + ", world!".as_bytes(), + chunk, + "should get same value back out" + ); + assert_eq!(frames_total.get(), 2); + assert_eq!(frames_bytes.get(), 5 + 8); + } + + { + // Close the body, show that the counters remain at the same values. + use std::task::{Context, Poll}; + tracing::info!("closing response body"); + drop(resp_tx); + let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref()); + match body.as_mut().poll_data(&mut ctx) { + Poll::Ready(None) => {} + _ => panic!("got unexpected poll result"), + }; + assert_eq!(frames_total.get(), 2); + assert_eq!(frames_bytes.get(), 5 + 8); + } + + Ok(()) +} + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn grpc_request_statuses_ok() { let _trace = linkerd_tracing::test::trace_init(); diff --git a/linkerd/http/prom/Cargo.toml b/linkerd/http/prom/Cargo.toml index 53c5a5496e..6f7ed8110d 100644 --- a/linkerd/http/prom/Cargo.toml +++ b/linkerd/http/prom/Cargo.toml @@ -13,6 +13,7 @@ Tower middleware for Prometheus metrics. test-util = [] [dependencies] +bytes = "1" futures = { version = "0.3", default-features = false } http = "0.2" http-body = "0.4" diff --git a/linkerd/http/prom/src/body_data.rs b/linkerd/http/prom/src/body_data.rs new file mode 100644 index 0000000000..237e811e36 --- /dev/null +++ b/linkerd/http/prom/src/body_data.rs @@ -0,0 +1,5 @@ +pub mod request; +pub mod response; + +mod body; +mod metrics; diff --git a/linkerd/http/prom/src/body_data/body.rs b/linkerd/http/prom/src/body_data/body.rs new file mode 100644 index 0000000000..47f8ffdda6 --- /dev/null +++ b/linkerd/http/prom/src/body_data/body.rs @@ -0,0 +1,80 @@ +use super::metrics::BodyDataMetrics; +use http::HeaderMap; +use http_body::SizeHint; +use pin_project::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +/// An instrumented body. +#[pin_project] +pub struct Body { + /// The inner body. + #[pin] + inner: B, + /// Metrics with which the inner body will be instrumented. + metrics: BodyDataMetrics, +} + +impl Body { + /// Returns a new, instrumented body. + pub(crate) fn new(body: B, metrics: BodyDataMetrics) -> Self { + Self { + inner: body, + metrics, + } + } +} + +impl http_body::Body for Body +where + B: http_body::Body, +{ + type Data = B::Data; + type Error = B::Error; + + /// Attempt to pull out the next data buffer of this stream. + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let this = self.project(); + let inner = this.inner; + let BodyDataMetrics { + frames_total, + frames_bytes, + } = this.metrics; + + let data = std::task::ready!(inner.poll_data(cx)); + + if let Some(Ok(data)) = data.as_ref() { + // We've polled and yielded a new chunk! Increment our telemetry. + // + // NB: We're careful to call `remaining()` rather than `chunk()`, which + // "can return a shorter slice (this allows non-continuous internal representation)." + let bytes = ::remaining(data) + .try_into() + .unwrap_or(u64::MAX); + frames_bytes.inc_by(bytes.into()); + frames_total.inc(); + } + + Poll::Ready(data) + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + self.project().inner.poll_trailers(cx) + } + + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } + + fn size_hint(&self) -> SizeHint { + self.inner.size_hint() + } +} diff --git a/linkerd/http/prom/src/body_data/metrics.rs b/linkerd/http/prom/src/body_data/metrics.rs new file mode 100644 index 0000000000..8ae1872f33 --- /dev/null +++ b/linkerd/http/prom/src/body_data/metrics.rs @@ -0,0 +1,94 @@ +//! Prometheus counters for request and response bodies. + +use linkerd_metrics::prom::{self, Counter, Family, Registry}; + +/// Counters for response body frames. +#[derive(Clone, Debug)] +pub struct ResponseBodyFamilies { + /// Counts the number of response body frames. + resp_body_frames_total: Family, + /// Counts the total number of bytes in response body frames. + resp_body_frames_bytes: Family, +} + +/// Counters to instrument a request or response body. +#[derive(Clone, Debug, Default)] +pub struct BodyDataMetrics { + /// Counts the number of request body frames. + pub frames_total: Counter, + /// Counts the total number of bytes in request body frames. + pub frames_bytes: Counter, +} + +// === impl ResponseBodyFamilies === + +impl Default for ResponseBodyFamilies +where + L: Clone + std::hash::Hash + Eq, +{ + fn default() -> Self { + Self { + resp_body_frames_total: Default::default(), + resp_body_frames_bytes: Default::default(), + } + } +} + +impl ResponseBodyFamilies +where + L: prom::encoding::EncodeLabelSet + + std::fmt::Debug + + std::hash::Hash + + Eq + + Clone + + Send + + Sync + + 'static, +{ + const RESP_BODY_FRAMES_TOTAL_NAME: &'static str = "resp_body_frames_total"; + const RESP_BODY_FRAMES_TOTAL_HELP: &'static str = + "Counts the number of frames in response bodies."; + + const RESP_BODY_FRAMES_BYTES_NAME: &'static str = "resp_body_frames_bytes"; + const RESP_BODY_FRAMES_BYTES_HELP: &'static str = + "Counts the total number of bytes in response bodies."; + + /// Registers and returns a new family of body data metrics. + pub fn register(registry: &mut Registry) -> Self { + let resp_body_frames_total = Family::default(); + registry.register( + Self::RESP_BODY_FRAMES_TOTAL_NAME, + Self::RESP_BODY_FRAMES_TOTAL_HELP, + resp_body_frames_total.clone(), + ); + + let resp_body_frames_bytes = Family::default(); + registry.register_with_unit( + Self::RESP_BODY_FRAMES_BYTES_NAME, + Self::RESP_BODY_FRAMES_BYTES_HELP, + prom::Unit::Bytes, + resp_body_frames_bytes.clone(), + ); + + Self { + resp_body_frames_total, + resp_body_frames_bytes, + } + } + + /// Returns the [`BodyDataMetrics`] for the given label set. + pub fn get(&self, labels: &L) -> BodyDataMetrics { + let Self { + resp_body_frames_total, + resp_body_frames_bytes, + } = self; + + let frames_total = resp_body_frames_total.get_or_create(labels).clone(); + let frames_bytes = resp_body_frames_bytes.get_or_create(labels).clone(); + + BodyDataMetrics { + frames_total, + frames_bytes, + } + } +} diff --git a/linkerd/http/prom/src/body_data/request.rs b/linkerd/http/prom/src/body_data/request.rs new file mode 100644 index 0000000000..fb270395da --- /dev/null +++ b/linkerd/http/prom/src/body_data/request.rs @@ -0,0 +1 @@ +// TODO(kate): write a middleware for request body. diff --git a/linkerd/http/prom/src/body_data/response.rs b/linkerd/http/prom/src/body_data/response.rs new file mode 100644 index 0000000000..30d52bcc75 --- /dev/null +++ b/linkerd/http/prom/src/body_data/response.rs @@ -0,0 +1,105 @@ +//! Tower middleware to instrument response bodies. + +pub use super::metrics::{BodyDataMetrics, ResponseBodyFamilies}; + +use http::{Request, Response}; +use http_body::Body; +use linkerd_error::Error; +use linkerd_http_box::BoxBody; +use linkerd_stack::{self as svc, layer::Layer, ExtractParam, NewService, Service}; + +/// A [`NewService`] that creates [`RecordBodyData`] services. +#[derive(Clone, Debug)] +pub struct NewRecordBodyData { + /// The [`ExtractParam`] strategy for obtaining our parameters. + extract: X, + /// The inner [`NewService`]. + inner: N, +} + +/// Tracks body frames for an inner `S`-typed [`Service`]. +#[derive(Clone, Debug)] +pub struct RecordBodyData { + /// The inner [`Service`]. + inner: S, + /// The metrics to be affixed to the response body. + metrics: BodyDataMetrics, +} + +// === impl NewRecordBodyData === + +impl NewRecordBodyData { + /// Returns a [`Layer`] that tracks body chunks. + /// + /// This uses an `X`-typed [`ExtractParam`] implementation to extract service parameters + /// from a `T`-typed target. + pub fn layer_via(extract: X) -> impl Layer { + svc::layer::mk(move |inner| Self { + extract: extract.clone(), + inner, + }) + } +} + +impl NewService for NewRecordBodyData +where + X: ExtractParam, + N: NewService, +{ + type Service = RecordBodyData; + + fn new_service(&self, target: T) -> Self::Service { + let Self { extract, inner } = self; + + let metrics = extract.extract_param(&target); + let inner = inner.new_service(target); + + RecordBodyData { inner, metrics } + } +} + +// === impl RecordBodyData === + +impl Service> for RecordBodyData +where + S: Service, Response = Response>, + RespB: Body + Send + 'static, + RespB::Data: Send + 'static, + RespB::Error: Into, +{ + type Response = Response; + type Error = S::Error; + type Future = futures::future::MapOk< + S::Future, + Box) -> Self::Response + Send + 'static>, + >; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + use futures::TryFutureExt; + + let Self { inner, metrics } = self; + let metrics = metrics.clone(); + let instrument = Box::new(|resp| Self::instrument_response(resp, metrics)); + + inner.call(req).map_ok(instrument) + } +} + +impl RecordBodyData { + fn instrument_response(resp: Response, metrics: BodyDataMetrics) -> Response + where + B: Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, + { + resp.map(|b| super::body::Body::new(b, metrics)) + .map(BoxBody::new) + } +} diff --git a/linkerd/http/prom/src/lib.rs b/linkerd/http/prom/src/lib.rs index 4f00f842b4..51ee223934 100644 --- a/linkerd/http/prom/src/lib.rs +++ b/linkerd/http/prom/src/lib.rs @@ -1,6 +1,7 @@ #![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] #![forbid(unsafe_code)] +pub mod body_data; mod count_reqs; pub mod record_response;