Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for FUSE worker idle and total count #1264

Merged
merged 1 commit into from
Feb 11, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions mountpoint-s3/src/fuse/session.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io;

use anyhow::Context;
use const_format::formatcp;
use fuser::{Filesystem, Session, SessionUnmounter};
use tracing::{debug, error, trace, warn};

Expand Down Expand Up @@ -128,13 +129,19 @@ trait Work: Send + Sync + 'static {
FA: FnMut();
}

/// [WorkerPool] organizes a pool of workers, handling the spawning of new workers and registering the new handles with
/// the channel [WorkerPool::workers] for tear down.
#[derive(Debug)]
struct WorkerPool<W: Work> {
state: Arc<WorkerPoolState<W>>,
workers: Sender<JoinHandle<W::Result>>,
max_workers: usize,
}

const METRIC_NAME_PREFIX_WORKERS: &str = "fuse.mp_workers";
const METRIC_NAME_FUSE_WORKERS_TOTAL: &str = formatcp!("{METRIC_NAME_PREFIX_WORKERS}.total_count");
const METRIC_NAME_FUSE_WORKERS_IDLE: &str = formatcp!("{METRIC_NAME_PREFIX_WORKERS}.idle_count");

#[derive(Debug)]
struct WorkerPoolState<W: Work> {
work: W,
Expand All @@ -143,6 +150,10 @@ struct WorkerPoolState<W: Work> {
}

impl<W: Work> WorkerPool<W> {
/// Start a new worker pool.
///
/// The worker pool will start with a small number of workers, and may eventually grow up to `max_workers`.
/// The `workers` argument consumes the worker thread handles to be joined when the pool is shutting down.
fn start(work: W, workers: Sender<JoinHandle<W::Result>>, max_workers: usize) -> anyhow::Result<()> {
assert!(max_workers > 0);

Expand All @@ -169,7 +180,7 @@ impl<W: Work> WorkerPool<W> {
/// Try to add a new worker.
/// Returns `Ok(false)` if there are already [`WorkerPool::max_workers`].
fn try_add_worker(&self) -> anyhow::Result<bool> {
let Ok(i) = self
let Ok(old_count) = self
.state
.worker_count
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |i| {
Expand All @@ -182,11 +193,17 @@ impl<W: Work> WorkerPool<W> {
else {
return Ok(false);
};
self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst);

let new_count = old_count + 1;
let idle_worker_count = self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst) + 1;
metrics::gauge!(METRIC_NAME_FUSE_WORKERS_TOTAL).set(new_count as f64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also be set in start?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is set when start is called, since start will invoke this method.

metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).set(idle_worker_count as f64);

let worker_index = old_count;
let clone = (*self).clone();
let worker = thread::Builder::new()
.name(format!("fuse-worker-{i}"))
.spawn(move || clone.run(i))
.name(format!("fuse-worker-{worker_index}"))
.spawn(move || clone.run(worker_index))
.context("failed to spawn worker threads")?;
self.workers.send(worker).unwrap();
Ok(true)
Expand All @@ -198,6 +215,7 @@ impl<W: Work> WorkerPool<W> {
self.state.work.run(
|| {
let previous_idle_count = self.state.idle_worker_count.fetch_sub(1, Ordering::SeqCst);
metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).decrement(1);
if previous_idle_count == 1 {
// This was the only idle thread, try to spawn a new one.
if let Err(error) = self.try_add_worker() {
Expand All @@ -207,6 +225,7 @@ impl<W: Work> WorkerPool<W> {
},
|| {
self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst);
metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).increment(1);
},
)
}
Expand Down
Loading