Skip to content

Per-database incoming and outgoing queue length metrics #2773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 50 additions & 25 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,24 @@ async fn ws_client_actor_inner(

let addr = client.module.info().database_identity;

// Grab handles on the total incoming and outgoing queue length metrics,
// which we'll increment and decrement as we push into and pull out of those queues.
// Note that `total_outgoing_queue_length` is incremented separately,
// by `ClientConnectionSender::send` in core/src/client/client_connection.rs;
// we're only responsible for decrementing that one.
// Also note that much care must be taken to clean up these metrics when the connection closes!
// Any path which exits this function must decrement each of these metrics
// by the number of messages still waiting in this client's queue,
// or else they will grow without bound as clients disconnect, and be useless.
let incoming_queue_length_metric = WORKER_METRICS.total_incoming_queue_length.with_label_values(&addr);
let outgoing_queue_length_metric = WORKER_METRICS.total_outgoing_queue_length.with_label_values(&addr);

let clean_up_metrics = |message_queue: &VecDeque<(DataMessage, Instant)>,
sendrx: &mpsc::Receiver<SerializableMessage>| {
incoming_queue_length_metric.sub(message_queue.len() as _);
outgoing_queue_length_metric.sub(sendrx.len() as _);
};

loop {
rx_buf.clear();
enum Item {
Expand All @@ -236,6 +254,7 @@ async fn ws_client_actor_inner(
}
if let MaybeDone::Gone = *current_message {
if let Some((message, timer)) = message_queue.pop_front() {
incoming_queue_length_metric.dec();
let client = client.clone();
let fut = async move { client.handle_message(message, timer).await };
current_message.set(MaybeDone::Future(fut));
Expand Down Expand Up @@ -263,43 +282,47 @@ async fn ws_client_actor_inner(
continue;
}
// the client sent us a close frame
None => break,
None => {
clean_up_metrics(&message_queue, &sendrx);
break
},
},

// If we have an outgoing message to send, send it off.
// No incoming `message` to handle, so `continue`.
Some(n) = sendrx.recv_many(&mut rx_buf, 32).map(|n| (n != 0).then_some(n)) => {
outgoing_queue_length_metric.sub(n as _);
if closed {
// TODO: this isn't great. when we receive a close request from the peer,
// tungstenite doesn't let us send any new messages on the socket,
// even though the websocket RFC allows it. should we fork tungstenite?
log::info!("dropping messages due to ws already being closed: {:?}", &rx_buf[..n]);
} else {
let send_all = async {
for msg in rx_buf.drain(..n) {
let workload = msg.workload();
let num_rows = msg.num_rows();

let msg = datamsg_to_wsmsg(serialize(msg, client.config));

// These metrics should be updated together,
// or not at all.
if let (Some(workload), Some(num_rows)) = (workload, num_rows) {
WORKER_METRICS
.websocket_sent_num_rows
.with_label_values(&addr, &workload)
.observe(num_rows as f64);
WORKER_METRICS
.websocket_sent_msg_size
.with_label_values(&addr, &workload)
.observe(msg.len() as f64);
}
// feed() buffers the message, but does not necessarily send it
ws.feed(msg).await?;
let send_all = async {
for msg in rx_buf.drain(..n) {
let workload = msg.workload();
let num_rows = msg.num_rows();

let msg = datamsg_to_wsmsg(serialize(msg, client.config));

// These metrics should be updated together,
// or not at all.
if let (Some(workload), Some(num_rows)) = (workload, num_rows) {
WORKER_METRICS
.websocket_sent_num_rows
.with_label_values(&addr, &workload)
.observe(num_rows as f64);
WORKER_METRICS
.websocket_sent_msg_size
.with_label_values(&addr, &workload)
.observe(msg.len() as f64);
}
// now we flush all the messages to the socket
ws.flush().await
};
// feed() buffers the message, but does not necessarily send it
ws.feed(msg).await?;
}
// now we flush all the messages to the socket
ws.flush().await
};
// Flush the websocket while continuing to poll the `handle_queue`,
// to avoid deadlocks or delays due to enqueued futures holding resources.
let send_all = also_poll(send_all, make_progress(&mut current_message));
Expand Down Expand Up @@ -348,6 +371,7 @@ async fn ws_client_actor_inner(
} else {
// the client never responded to our ping; drop them without trying to send them a Close
log::warn!("client {} timed out", client.id);
clean_up_metrics(&message_queue, &sendrx);
break;
}
}
Expand All @@ -362,6 +386,7 @@ async fn ws_client_actor_inner(
match message {
Item::Message(ClientMessage::Message(message)) => {
let timer = Instant::now();
incoming_queue_length_metric.inc();
message_queue.push_back((message, timer))
}
Item::HandleResult(res) => {
Expand Down
36 changes: 30 additions & 6 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use bytes::Bytes;
use bytestring::ByteString;
use derive_more::From;
use futures::prelude::*;
use prometheus::{Histogram, IntCounter};
use prometheus::{Histogram, IntCounter, IntGauge};
use spacetimedb_client_api_messages::websocket::{
BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe,
UnsubscribeMulti, WebsocketFormat,
Expand Down Expand Up @@ -91,6 +91,15 @@ pub struct ClientConnectionSender {
pub struct ClientConnectionMetrics {
pub websocket_request_msg_size: Histogram,
pub websocket_requests: IntCounter,

/// The `total_outgoing_queue_length` metric labeled with this database's `Identity`,
/// which we'll increment whenever sending a message.
///
/// This metric will be decremented, and cleaned up,
/// by `ws_client_actor_inner` in client-api/src/routes/subscribe.rs.
/// Care must be taken not to increment it after the client has disconnected
/// and performed its clean-up.
pub sendtx_queue_size: IntGauge,
}

impl ClientConnectionMetrics {
Expand All @@ -102,10 +111,14 @@ impl ClientConnectionMetrics {
let websocket_requests = WORKER_METRICS
.websocket_requests
.with_label_values(&database_identity, message_kind);
let sendtx_queue_size = WORKER_METRICS
.total_outgoing_queue_length
.with_label_values(&database_identity);

Self {
websocket_request_msg_size,
websocket_requests,
sendtx_queue_size,
}
}
}
Expand All @@ -126,6 +139,7 @@ impl ClientConnectionSender {
Ok(h) => h.spawn(async {}).abort_handle(),
Err(_) => tokio::runtime::Runtime::new().unwrap().spawn(async {}).abort_handle(),
};

let cancelled = AtomicBool::new(false);
let sender = Self {
id,
Expand All @@ -150,17 +164,27 @@ impl ClientConnectionSender {
if self.cancelled.load(Relaxed) {
return Err(ClientSendError::Cancelled);
}
self.sendtx.try_send(message).map_err(|e| match e {
mpsc::error::TrySendError::Full(_) => {

match self.sendtx.try_send(message) {
Err(mpsc::error::TrySendError::Full(_)) => {
// we've hit CLIENT_CHANNEL_CAPACITY messages backed up in
// the channel, so forcibly kick the client
tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded");
self.abort_handle.abort();
self.cancelled.store(true, Relaxed);
ClientSendError::Cancelled
return Err(ClientSendError::Cancelled);
}
mpsc::error::TrySendError::Closed(_) => ClientSendError::Disconnected,
})?;
Err(mpsc::error::TrySendError::Closed(_)) => return Err(ClientSendError::Disconnected),
Ok(()) => {
// If we successfully pushed a message into the queue, increment the queue size metric.
// Don't do this before pushing because, if the client has disconnected,
// it will already have performed its clean-up,
// and so would never perform the corresponding `dec` to this `inc`.
if let Some(metrics) = &self.metrics {
metrics.sendtx_queue_size.inc();
}
}
}

Ok(())
}
Expand Down
10 changes: 10 additions & 0 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,16 @@ metrics_group!(
#[help = "The cumulative number of bytes sent to clients"]
#[labels(txn_type: WorkloadType, db: Identity)]
pub bytes_sent_to_clients: IntCounterVec,

#[name = spacetime_total_incoming_queue_length]
#[help = "The number of client -> server WebSocket messages waiting any client's incoming queue"]
#[labels(db: Identity)]
pub total_incoming_queue_length: IntGaugeVec,

#[name = spacetime_total_outgoing_queue_length]
#[help = "The number of server -> client WebSocket messages waiting in any client's outgoing queue"]
#[labels(db: Identity)]
pub total_outgoing_queue_length: IntGaugeVec,
}
);

Expand Down
Loading