Skip to content

Commit

Permalink
stack-metrics: Implement Clone for TrackService
Browse files Browse the repository at this point in the history
The stack metrics module cannot currently wrap services that need to
implement Clone.

To fix this, we introduce an extra Arc that ensures we only count
drops when all clones have been dropped.
  • Loading branch information
olix0r committed Nov 16, 2023
1 parent 9f7e7ac commit e5ac14d
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 6 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1918,7 +1918,9 @@ dependencies = [
"linkerd-metrics",
"parking_lot",
"tokio",
"tokio-test",
"tower",
"tower-test",
]

[[package]]
Expand Down
5 changes: 5 additions & 0 deletions linkerd/stack/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ linkerd-metrics = { path = "../../metrics" }
parking_lot = "0.12"
tower = { version = "0.4", default-features = false }
tokio = { version = "1", features = ["time"] }

[dev-dependencies]
tokio = { version = "1", features = ["macros"] }
tokio-test = "0.4"
tower-test = "0.4"
108 changes: 102 additions & 6 deletions linkerd/stack/metrics/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::time::Instant;
use tokio::time;

/// A service that tracks metrics about its readiness.
///
Expand All @@ -13,15 +13,30 @@ use tokio::time::Instant;
pub struct TrackService<S> {
inner: S,
metrics: Arc<Metrics>,
blocked_since: Option<Instant>,
blocked_since: Option<time::Instant>,
_tracker: Arc<()>,
}

impl<S> TrackService<S> {
///
pub(crate) fn new(inner: S, metrics: Arc<Metrics>) -> Self {
Self {
inner,
metrics,
blocked_since: None,
_tracker: Arc::new(()),
}
}
}

impl<S: Clone> Clone for TrackService<S> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
metrics: self.metrics.clone(),
_tracker: self._tracker.clone(),
// The clone's block status is distinct.
blocked_since: None,
}
}
}
Expand All @@ -41,7 +56,7 @@ where
// If the service was already pending, then add the time we
// waited and reset blocked_since. This allows the value to be
// updated even when we're "stuck" in pending.
let now = Instant::now();
let now = time::Instant::now();
if let Some(t0) = self.blocked_since.take() {
let not_ready = now.saturating_duration_since(t0);
self.metrics.poll_millis.add(not_ready.as_millis() as u64);
Expand All @@ -52,15 +67,15 @@ where
Poll::Ready(Ok(())) => {
self.metrics.ready_total.incr();
if let Some(t0) = self.blocked_since.take() {
let not_ready = Instant::now().saturating_duration_since(t0);
let not_ready = time::Instant::now().saturating_duration_since(t0);
self.metrics.poll_millis.add(not_ready.as_millis() as u64);
}
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => {
self.metrics.error_total.incr();
if let Some(t0) = self.blocked_since.take() {
let not_ready = Instant::now().saturating_duration_since(t0);
let not_ready = time::Instant::now().saturating_duration_since(t0);
self.metrics.poll_millis.add(not_ready.as_millis() as u64);
}
Poll::Ready(Err(e))
Expand All @@ -76,6 +91,87 @@ where

impl<S> Drop for TrackService<S> {
fn drop(&mut self) {
self.metrics.drop_total.incr();
if Arc::strong_count(&self._tracker) == 1 {
// If we're the last reference to the metrics, then we can
// increment the drop count.
self.metrics.drop_total.incr();
}
}
}

#[cfg(test)]
#[tokio::test(flavor = "current_thread")]
async fn clone_drop() {
let metrics = Arc::new(Metrics::default());

let (a, _a) = tower_test::mock::pair::<(), ()>();
let a0 = TrackService::new(a, metrics.clone());
let a1 = a0.clone();

let (b, _b) = tower_test::mock::pair::<(), ()>();
let b0 = TrackService::new(b, metrics.clone());

drop(a1);
assert_eq!(metrics.drop_total.value(), 0.0, "Not dropped yet");

drop(b0);
assert_eq!(
metrics.drop_total.value(),
1.0,
"Dropping distinct service is counted"
);

drop(a0);
assert_eq!(
metrics.drop_total.value(),
2.0,
"Dropping last service clone counted"
);

assert_eq!(
metrics.create_total.value(),
0.0,
"No creates by the service"
);
}

#[cfg(test)]
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn clone_poll_ready() {
let metrics = Arc::new(Metrics::default());
let (mut svc0, mut handle) =
tower_test::mock::spawn_with::<(), (), _, _>(|svc| TrackService::new(svc, metrics.clone()));

handle.allow(0);
tokio_test::assert_pending!(svc0.poll_ready());
let mut svc1 = svc0.clone();
assert!(svc0.get_ref().blocked_since.is_some());
assert!(svc1.get_ref().blocked_since.is_none());

tokio_test::assert_pending!(svc1.poll_ready());
assert!(svc0.get_ref().blocked_since.is_some());
assert!(svc1.get_ref().blocked_since.is_some());

time::sleep(time::Duration::from_secs(1)).await;
handle.allow(2);
tokio_test::assert_ready_ok!(svc0.poll_ready());
tokio_test::assert_ready_ok!(svc1.poll_ready());
assert!(svc0.get_ref().blocked_since.is_none());
assert!(svc1.get_ref().blocked_since.is_none());

assert_eq!(
metrics.ready_total.value(),
2.0,
"Both clones should be counted discretely"
);
assert_eq!(
metrics.not_ready_total.value(),
2.0,
"Both clones should be counted discretely"
);
assert_eq!(
metrics.poll_millis.value(),
2000.0,
"Both clones should be counted discretely"
);
}

0 comments on commit e5ac14d

Please sign in to comment.