Skip to content

Commit

Permalink
Improve invoker observability
Browse files Browse the repository at this point in the history
Add few extra metrics to measure number of queued commands,
and number of in flight invocation tasks
  • Loading branch information
muhamadazmy committed Feb 25, 2025
1 parent e9d5373 commit ed73a84
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 3 deletions.
7 changes: 6 additions & 1 deletion crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use input_command::{InputCommand, InvokeCommand};
use invocation_state_machine::InvocationStateMachine;
use invocation_task::InvocationTask;
use invocation_task::{InvocationTaskOutput, InvocationTaskOutputInner};
use metrics::counter;
use metric_definitions::{INVOKER_SEG_QUEUE_LEN, INVOKER_TASKS_IN_FLIGHT};
use metrics::{counter, gauge};
use restate_core::cancellation_watcher;
use restate_errors::warn_it;
use restate_invoker_api::{
Expand Down Expand Up @@ -346,6 +347,9 @@ where
where
F: Future<Output = ()>,
{
gauge!(INVOKER_SEG_QUEUE_LEN).set(segmented_input_queue.len() as f64);
gauge!(INVOKER_TASKS_IN_FLIGHT).set(self.invocation_tasks.len() as f64);

tokio::select! {
Some(cmd) = self.status_rx.recv() => {
let keys = cmd.payload();
Expand All @@ -363,6 +367,7 @@ where
// --- Spillable queue loading/offloading
InputCommand::Invoke(invoke_command) => {
counter!(INVOKER_ENQUEUE).increment(1);

segmented_input_queue.enqueue(invoke_command).await;
},
// --- Other commands (they don't go through the segment queue)
Expand Down
16 changes: 15 additions & 1 deletion crates/invoker-impl/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
use metrics::{describe_counter, describe_gauge, describe_histogram, Unit};

pub const INVOKER_ENQUEUE: &str = "restate.invoker.enqueue.total";
pub const INVOKER_SEG_QUEUE_LEN: &str = "restate.invoker.segmented_queue.len";
pub const INVOKER_INVOCATION_TASK: &str = "restate.invoker.invocation_task.total";
pub const INVOKER_AVAILABLE_SLOTS: &str = "restate.invoker.available_slots";
pub const INVOKER_TASK_DURATION: &str = "restate.invoker.task_duration.seconds";
pub const INVOKER_TASKS_IN_FLIGHT: &str = "restate.invoker.inflight_tasks_count";

pub const TASK_OP_STARTED: &str = "started";
pub const TASK_OP_SUSPENDED: &str = "suspended";
Expand All @@ -29,6 +31,12 @@ pub(crate) fn describe_metrics() {
"Number of invocations that were added to the queue"
);

describe_gauge!(
INVOKER_SEG_QUEUE_LEN,
Unit::Count,
"Number of invocations in the queue"
);

describe_counter!(
INVOKER_INVOCATION_TASK,
Unit::Count,
Expand All @@ -45,5 +53,11 @@ pub(crate) fn describe_metrics() {
INVOKER_TASK_DURATION,
Unit::Seconds,
"Time taken to complete an invoker task"
)
);

describe_gauge!(
INVOKER_TASKS_IN_FLIGHT,
Unit::Count,
"Number of inflight invoker tasks"
);
}
5 changes: 4 additions & 1 deletion crates/invoker-impl/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ impl InvokerConcurrencyQuota {

pub(super) fn is_slot_available(&self) -> bool {
match self {
Self::Unlimited => true,
Self::Unlimited => {
gauge!(INVOKER_AVAILABLE_SLOTS).set(f64::INFINITY);
true
}
Self::Limited { available_slots } => {
gauge!(INVOKER_AVAILABLE_SLOTS).set(*available_slots as f64);
*available_slots > 0
Expand Down
5 changes: 5 additions & 0 deletions crates/queue/src/segmented_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ impl<T: Serialize + DeserializeOwned + Send + 'static> SegmentQueue<T> {
};
len
}

/// Number of records in queue
pub fn len(&self) -> usize {
self.segments.iter().fold(0, |len, seg| len + seg.len())
}
}

impl<T> SegmentQueue<T> {
Expand Down

0 comments on commit ed73a84

Please sign in to comment.