Skip to content

Commit 4ec6c25

Browse files
committed
Add client_connection_outgoing_queue_length metric
Like the previous metric added in this branch, it's per-client, so we'll use it for testing, but likely not merge it into master. I'll follow up in a separate PR with a version that's per-database instead.
1 parent 03e25c7 commit 4ec6c25

File tree

3 files changed

+33
-3
lines changed

3 files changed

+33
-3
lines changed

crates/client-api/src/routes/subscribe.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,22 @@ async fn ws_client_actor_inner(
237237
.remove_label_values(&addr, &client_identity, &connection_id) {
238238
log::error!("Failed to `remove_label_values` for `client_connection_incoming_queue_length`: {e:?}");
239239
};
240+
241+
if let Err(e) = WORKER_METRICS
242+
.client_connection_outgoing_queue_length
243+
.remove_label_values(&addr, &client_identity, &connection_id) {
244+
log::error!("Failed to `remove_label_values` for `client_connection_outgoing_queue_length`: {e:?}");
245+
}
240246
);
241247

242-
let queue_length_metric = WORKER_METRICS
248+
let incoming_queue_length_metric = WORKER_METRICS
243249
.client_connection_incoming_queue_length
244250
.with_label_values(&addr, &client_identity, &connection_id);
245251

252+
let outgoing_queue_length_metric = WORKER_METRICS
253+
.client_connection_outgoing_queue_length
254+
.with_label_values(&addr, &client_identity, &connection_id);
255+
246256
loop {
247257
rx_buf.clear();
248258
enum Item {
@@ -251,7 +261,7 @@ async fn ws_client_actor_inner(
251261
}
252262
if let MaybeDone::Gone = *current_message {
253263
if let Some((message, timer)) = message_queue.pop_front() {
254-
queue_length_metric.dec();
264+
incoming_queue_length_metric.dec();
255265
let client = client.clone();
256266
let fut = async move { client.handle_message(message, timer).await };
257267
current_message.set(MaybeDone::Future(fut));
@@ -285,6 +295,7 @@ async fn ws_client_actor_inner(
285295
// If we have an outgoing message to send, send it off.
286296
// No incoming `message` to handle, so `continue`.
287297
Some(n) = sendrx.recv_many(&mut rx_buf, 32).map(|n| (n != 0).then_some(n)) => {
298+
outgoing_queue_length_metric.sub(n as _);
288299
if closed {
289300
// TODO: this isn't great. when we receive a close request from the peer,
290301
// tungstenite doesn't let us send any new messages on the socket,
@@ -378,7 +389,7 @@ async fn ws_client_actor_inner(
378389
match message {
379390
Item::Message(ClientMessage::Message(message)) => {
380391
let timer = Instant::now();
381-
queue_length_metric.inc();
392+
incoming_queue_length_metric.inc();
382393
message_queue.push_back((message, timer))
383394
}
384395
Item::HandleResult(res) => {

crates/core/src/client/client_connection.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use bytes::Bytes;
1616
use bytestring::ByteString;
1717
use derive_more::From;
1818
use futures::prelude::*;
19+
use prometheus::IntGauge;
1920
use spacetimedb_client_api_messages::websocket::{
2021
BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe,
2122
UnsubscribeMulti, WebsocketFormat,
@@ -69,6 +70,7 @@ pub struct ClientConnectionSender {
6970
sendtx: mpsc::Sender<SerializableMessage>,
7071
abort_handle: AbortHandle,
7172
cancelled: AtomicBool,
73+
sendtx_queue_size_metric: Option<IntGauge>,
7274
}
7375

7476
#[derive(Debug, thiserror::Error)]
@@ -87,13 +89,15 @@ impl ClientConnectionSender {
8789
Ok(h) => h.spawn(async {}).abort_handle(),
8890
Err(_) => tokio::runtime::Runtime::new().unwrap().spawn(async {}).abort_handle(),
8991
};
92+
9093
(
9194
Self {
9295
id,
9396
config,
9497
sendtx,
9598
abort_handle,
9699
cancelled: AtomicBool::new(false),
100+
sendtx_queue_size_metric: None,
97101
},
98102
rx,
99103
)
@@ -111,6 +115,11 @@ impl ClientConnectionSender {
111115
if self.cancelled.load(Relaxed) {
112116
return Err(ClientSendError::Cancelled);
113117
}
118+
119+
if let Some(metric) = &self.sendtx_queue_size_metric {
120+
metric.inc();
121+
}
122+
114123
self.sendtx.try_send(message).map_err(|e| match e {
115124
mpsc::error::TrySendError::Full(_) => {
116125
// we've hit CLIENT_CHANNEL_CAPACITY messages backed up in
@@ -216,12 +225,17 @@ impl ClientConnection {
216225
})
217226
.abort_handle();
218227

228+
let sendtx_queue_size_metric = WORKER_METRICS
229+
.client_connection_outgoing_queue_length
230+
.with_label_values(&db, &id.identity, &id.connection_id);
231+
219232
let sender = Arc::new(ClientConnectionSender {
220233
id,
221234
config,
222235
sendtx,
223236
abort_handle,
224237
cancelled: AtomicBool::new(false),
238+
sendtx_queue_size_metric: Some(sendtx_queue_size_metric),
225239
});
226240
let this = Self {
227241
sender,

crates/core/src/worker_metrics/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,11 @@ metrics_group!(
262262
#[help = "The number of client -> server WebSocket messages waiting in a client connection's incoming queue"]
263263
#[labels(db: Identity, client_identity: Identity, connection_id: ConnectionId)]
264264
pub client_connection_incoming_queue_length: IntGaugeVec,
265+
266+
#[name = spacetime_client_connection_outgoing_queue_length]
267+
#[help = "The number of server -> client WebSocket messages waiting in a client connection's outgoing queue"]
268+
#[labels(db: Identity, client_identity: Identity, connection_id: ConnectionId)]
269+
pub client_connection_outgoing_queue_length: IntGaugeVec,
265270
}
266271
);
267272

0 commit comments

Comments
 (0)