diff --git a/mountpoint-s3/src/fuse/session.rs b/mountpoint-s3/src/fuse/session.rs index 028267a7f..4bb582df9 100644 --- a/mountpoint-s3/src/fuse/session.rs +++ b/mountpoint-s3/src/fuse/session.rs @@ -1,6 +1,7 @@ use std::io; use anyhow::Context; +use const_format::formatcp; use fuser::{Filesystem, Session, SessionUnmounter}; use tracing::{debug, error, trace, warn}; @@ -128,6 +129,8 @@ trait Work: Send + Sync + 'static { FA: FnMut(); } +/// [WorkerPool] organizes a pool of workers, handling the spawning of new workers and registering the new handles with +/// the channel [WorkerPool::workers] for tear down. #[derive(Debug)] struct WorkerPool { state: Arc>, @@ -135,6 +138,10 @@ struct WorkerPool { max_workers: usize, } +const METRIC_NAME_PREFIX_WORKERS: &str = "fuse.mp_workers"; +const METRIC_NAME_FUSE_WORKERS_TOTAL: &str = formatcp!("{METRIC_NAME_PREFIX_WORKERS}.total_count"); +const METRIC_NAME_FUSE_WORKERS_IDLE: &str = formatcp!("{METRIC_NAME_PREFIX_WORKERS}.idle_count"); + #[derive(Debug)] struct WorkerPoolState { work: W, @@ -143,6 +150,10 @@ struct WorkerPoolState { } impl WorkerPool { + /// Start a new worker pool. + /// + /// The worker pool will start with a small number of workers, and may eventually grow up to `max_workers`. + /// The `workers` argument consumes the worker thread handles to be joined when the pool is shutting down. fn start(work: W, workers: Sender>, max_workers: usize) -> anyhow::Result<()> { assert!(max_workers > 0); @@ -169,7 +180,7 @@ impl WorkerPool { /// Try to add a new worker. /// Returns `Ok(false)` if there are already [`WorkerPool::max_workers`]. fn try_add_worker(&self) -> anyhow::Result { - let Ok(i) = self + let Ok(old_count) = self .state .worker_count .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |i| { @@ -182,11 +193,17 @@ impl WorkerPool { else { return Ok(false); }; - self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst); + + let new_count = old_count + 1; + let idle_worker_count = self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst) + 1; + metrics::gauge!(METRIC_NAME_FUSE_WORKERS_TOTAL).set(new_count as f64); + metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).set(idle_worker_count as f64); + + let worker_index = old_count; let clone = (*self).clone(); let worker = thread::Builder::new() - .name(format!("fuse-worker-{i}")) - .spawn(move || clone.run(i)) + .name(format!("fuse-worker-{worker_index}")) + .spawn(move || clone.run(worker_index)) .context("failed to spawn worker threads")?; self.workers.send(worker).unwrap(); Ok(true) @@ -198,6 +215,7 @@ impl WorkerPool { self.state.work.run( || { let previous_idle_count = self.state.idle_worker_count.fetch_sub(1, Ordering::SeqCst); + metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).decrement(1); if previous_idle_count == 1 { // This was the only idle thread, try to spawn a new one. if let Err(error) = self.try_add_worker() { @@ -207,6 +225,7 @@ impl WorkerPool { }, || { self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst); + metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).increment(1); }, ) }