diff --git a/Cargo.lock b/Cargo.lock index 8900920cfd..338c724708 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1918,7 +1918,9 @@ dependencies = [ "linkerd-metrics", "parking_lot", "tokio", + "tokio-test", "tower", + "tower-test", ] [[package]] diff --git a/linkerd/stack/metrics/Cargo.toml b/linkerd/stack/metrics/Cargo.toml index c0b83e89ce..7854005d13 100644 --- a/linkerd/stack/metrics/Cargo.toml +++ b/linkerd/stack/metrics/Cargo.toml @@ -11,3 +11,8 @@ linkerd-metrics = { path = "../../metrics" } parking_lot = "0.12" tower = { version = "0.4", default-features = false } tokio = { version = "1", features = ["time"] } + +[dev-dependencies] +tokio = { version = "1", features = ["macros"] } +tokio-test = "0.4" +tower-test = "0.4" diff --git a/linkerd/stack/metrics/src/service.rs b/linkerd/stack/metrics/src/service.rs index b302f02944..572f37c8c3 100644 --- a/linkerd/stack/metrics/src/service.rs +++ b/linkerd/stack/metrics/src/service.rs @@ -3,7 +3,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::time::Instant; +use tokio::time; /// A service that tracks metrics about its readiness. /// @@ -13,7 +13,10 @@ use tokio::time::Instant; pub struct TrackService { inner: S, metrics: Arc, - blocked_since: Option, + blocked_since: Option, + // Metrics is shared across all distinct services, so we use a separate + // tracker. + _tracker: Arc<()>, } impl TrackService { @@ -22,6 +25,19 @@ impl TrackService { inner, metrics, blocked_since: None, + _tracker: Arc::new(()), + } + } +} + +impl Clone for TrackService { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + metrics: self.metrics.clone(), + _tracker: self._tracker.clone(), + // The clone's block status is distinct. + blocked_since: None, } } } @@ -41,7 +57,7 @@ where // If the service was already pending, then add the time we // waited and reset blocked_since. This allows the value to be // updated even when we're "stuck" in pending. - let now = Instant::now(); + let now = time::Instant::now(); if let Some(t0) = self.blocked_since.take() { let not_ready = now.saturating_duration_since(t0); self.metrics.poll_millis.add(not_ready.as_millis() as u64); @@ -52,7 +68,7 @@ where Poll::Ready(Ok(())) => { self.metrics.ready_total.incr(); if let Some(t0) = self.blocked_since.take() { - let not_ready = Instant::now().saturating_duration_since(t0); + let not_ready = time::Instant::now().saturating_duration_since(t0); self.metrics.poll_millis.add(not_ready.as_millis() as u64); } Poll::Ready(Ok(())) @@ -60,7 +76,7 @@ where Poll::Ready(Err(e)) => { self.metrics.error_total.incr(); if let Some(t0) = self.blocked_since.take() { - let not_ready = Instant::now().saturating_duration_since(t0); + let not_ready = time::Instant::now().saturating_duration_since(t0); self.metrics.poll_millis.add(not_ready.as_millis() as u64); } Poll::Ready(Err(e)) @@ -76,6 +92,92 @@ where impl Drop for TrackService { fn drop(&mut self) { - self.metrics.drop_total.incr(); + if Arc::strong_count(&self._tracker) == 1 { + // If we're the last reference to the metrics, then we can + // increment the drop count. + self.metrics.drop_total.incr(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test(flavor = "current_thread")] + async fn clone_drop() { + let metrics = Arc::new(Metrics::default()); + + let (a, _a) = tower_test::mock::pair::<(), ()>(); + let a0 = TrackService::new(a, metrics.clone()); + let a1 = a0.clone(); + + let (b, _b) = tower_test::mock::pair::<(), ()>(); + let b0 = TrackService::new(b, metrics.clone()); + + drop(a1); + assert_eq!(metrics.drop_total.value(), 0.0, "Not dropped yet"); + + drop(b0); + assert_eq!( + metrics.drop_total.value(), + 1.0, + "Dropping distinct service is counted" + ); + + drop(a0); + assert_eq!( + metrics.drop_total.value(), + 2.0, + "Dropping last service clone counted" + ); + + assert_eq!( + metrics.create_total.value(), + 0.0, + "No creates by the service" + ); + } + + #[cfg(test)] + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn clone_poll_ready() { + let metrics = Arc::new(Metrics::default()); + let (mut svc0, mut handle) = tower_test::mock::spawn_with::<(), (), _, _>(|svc| { + TrackService::new(svc, metrics.clone()) + }); + + handle.allow(0); + tokio_test::assert_pending!(svc0.poll_ready()); + let mut svc1 = svc0.clone(); + assert!(svc0.get_ref().blocked_since.is_some()); + assert!(svc1.get_ref().blocked_since.is_none()); + + tokio_test::assert_pending!(svc1.poll_ready()); + assert!(svc0.get_ref().blocked_since.is_some()); + assert!(svc1.get_ref().blocked_since.is_some()); + + time::sleep(time::Duration::from_secs(1)).await; + handle.allow(2); + tokio_test::assert_ready_ok!(svc0.poll_ready()); + tokio_test::assert_ready_ok!(svc1.poll_ready()); + assert!(svc0.get_ref().blocked_since.is_none()); + assert!(svc1.get_ref().blocked_since.is_none()); + + assert_eq!( + metrics.ready_total.value(), + 2.0, + "Both clones should be counted discretely" + ); + assert_eq!( + metrics.not_ready_total.value(), + 2.0, + "Both clones should be counted discretely" + ); + assert_eq!( + metrics.poll_millis.value(), + 2000.0, + "Both clones should be counted discretely" + ); } }