Skip to content

Commit

Permalink
stack-metrics: Implement Clone for TrackService (#2524)
Browse files Browse the repository at this point in the history
The stack metrics module cannot currently wrap services that need to
implement Clone.

To fix this, we introduce an extra Arc that ensures we only count
drops when all clones have been dropped.
  • Loading branch information
olix0r authored Nov 17, 2023
1 parent b99c595 commit bc62514
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 6 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1918,7 +1918,9 @@ dependencies = [
"linkerd-metrics",
"parking_lot",
"tokio",
"tokio-test",
"tower",
"tower-test",
]

[[package]]
Expand Down
5 changes: 5 additions & 0 deletions linkerd/stack/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ linkerd-metrics = { path = "../../metrics" }
parking_lot = "0.12"
tower = { version = "0.4", default-features = false }
tokio = { version = "1", features = ["time"] }

[dev-dependencies]
tokio = { version = "1", features = ["macros"] }
tokio-test = "0.4"
tower-test = "0.4"
114 changes: 108 additions & 6 deletions linkerd/stack/metrics/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::time::Instant;
use tokio::time;

/// A service that tracks metrics about its readiness.
///
Expand All @@ -13,7 +13,10 @@ use tokio::time::Instant;
pub struct TrackService<S> {
inner: S,
metrics: Arc<Metrics>,
blocked_since: Option<Instant>,
blocked_since: Option<time::Instant>,
// Metrics is shared across all distinct services, so we use a separate
// tracker.
_tracker: Arc<()>,
}

impl<S> TrackService<S> {
Expand All @@ -22,6 +25,19 @@ impl<S> TrackService<S> {
inner,
metrics,
blocked_since: None,
_tracker: Arc::new(()),
}
}
}

impl<S: Clone> Clone for TrackService<S> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
metrics: self.metrics.clone(),
_tracker: self._tracker.clone(),
// The clone's block status is distinct.
blocked_since: None,
}
}
}
Expand All @@ -41,7 +57,7 @@ where
// If the service was already pending, then add the time we
// waited and reset blocked_since. This allows the value to be
// updated even when we're "stuck" in pending.
let now = Instant::now();
let now = time::Instant::now();
if let Some(t0) = self.blocked_since.take() {
let not_ready = now.saturating_duration_since(t0);
self.metrics.poll_millis.add(not_ready.as_millis() as u64);
Expand All @@ -52,15 +68,15 @@ where
Poll::Ready(Ok(())) => {
self.metrics.ready_total.incr();
if let Some(t0) = self.blocked_since.take() {
let not_ready = Instant::now().saturating_duration_since(t0);
let not_ready = time::Instant::now().saturating_duration_since(t0);
self.metrics.poll_millis.add(not_ready.as_millis() as u64);
}
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => {
self.metrics.error_total.incr();
if let Some(t0) = self.blocked_since.take() {
let not_ready = Instant::now().saturating_duration_since(t0);
let not_ready = time::Instant::now().saturating_duration_since(t0);
self.metrics.poll_millis.add(not_ready.as_millis() as u64);
}
Poll::Ready(Err(e))
Expand All @@ -76,6 +92,92 @@ where

impl<S> Drop for TrackService<S> {
fn drop(&mut self) {
self.metrics.drop_total.incr();
if Arc::strong_count(&self._tracker) == 1 {
// If we're the last reference to the metrics, then we can
// increment the drop count.
self.metrics.drop_total.incr();
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test(flavor = "current_thread")]
async fn clone_drop() {
let metrics = Arc::new(Metrics::default());

let (a, _a) = tower_test::mock::pair::<(), ()>();
let a0 = TrackService::new(a, metrics.clone());
let a1 = a0.clone();

let (b, _b) = tower_test::mock::pair::<(), ()>();
let b0 = TrackService::new(b, metrics.clone());

drop(a1);
assert_eq!(metrics.drop_total.value(), 0.0, "Not dropped yet");

drop(b0);
assert_eq!(
metrics.drop_total.value(),
1.0,
"Dropping distinct service is counted"
);

drop(a0);
assert_eq!(
metrics.drop_total.value(),
2.0,
"Dropping last service clone counted"
);

assert_eq!(
metrics.create_total.value(),
0.0,
"No creates by the service"
);
}

#[cfg(test)]
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn clone_poll_ready() {
let metrics = Arc::new(Metrics::default());
let (mut svc0, mut handle) = tower_test::mock::spawn_with::<(), (), _, _>(|svc| {
TrackService::new(svc, metrics.clone())
});

handle.allow(0);
tokio_test::assert_pending!(svc0.poll_ready());
let mut svc1 = svc0.clone();
assert!(svc0.get_ref().blocked_since.is_some());
assert!(svc1.get_ref().blocked_since.is_none());

tokio_test::assert_pending!(svc1.poll_ready());
assert!(svc0.get_ref().blocked_since.is_some());
assert!(svc1.get_ref().blocked_since.is_some());

time::sleep(time::Duration::from_secs(1)).await;
handle.allow(2);
tokio_test::assert_ready_ok!(svc0.poll_ready());
tokio_test::assert_ready_ok!(svc1.poll_ready());
assert!(svc0.get_ref().blocked_since.is_none());
assert!(svc1.get_ref().blocked_since.is_none());

assert_eq!(
metrics.ready_total.value(),
2.0,
"Both clones should be counted discretely"
);
assert_eq!(
metrics.not_ready_total.value(),
2.0,
"Both clones should be counted discretely"
);
assert_eq!(
metrics.poll_millis.value(),
2000.0,
"Both clones should be counted discretely"
);
}
}

0 comments on commit bc62514

Please sign in to comment.