Skip to content

Commit c007976

Browse files
committed
Less granular message queue length metrics
This commit alters the message queue length metrics introduced by #2754 to be per-database, rather than per-client. This should limit the cardinality of these metrics, and better lines up with the labels of our other metrics. Because the metrics are now per-database rather than per-client, it's no longer correct to just drop the label when the client disconnects. Instead, care must be taken to decrement the metric by the number of messages which were waiting in the queue at the time of the disconnection. I've added comments to call attention to this complexity.
1 parent 4ec6c25 commit c007976

File tree

3 files changed

+77
-70
lines changed

3 files changed

+77
-70
lines changed

crates/client-api/src/routes/subscribe.rs

Lines changed: 46 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -228,30 +228,23 @@ async fn ws_client_actor_inner(
228228

229229
let addr = client.module.info().database_identity;
230230

231-
let client_identity = client.sender().id.identity;
232-
let connection_id = client.sender().id.connection_id;
233-
234-
scopeguard::defer!(
235-
if let Err(e) = WORKER_METRICS
236-
.client_connection_incoming_queue_length
237-
.remove_label_values(&addr, &client_identity, &connection_id) {
238-
log::error!("Failed to `remove_label_values` for `client_connection_incoming_queue_length`: {e:?}");
239-
};
240-
241-
if let Err(e) = WORKER_METRICS
242-
.client_connection_outgoing_queue_length
243-
.remove_label_values(&addr, &client_identity, &connection_id) {
244-
log::error!("Failed to `remove_label_values` for `client_connection_outgoing_queue_length`: {e:?}");
245-
}
246-
);
247-
248-
let incoming_queue_length_metric = WORKER_METRICS
249-
.client_connection_incoming_queue_length
250-
.with_label_values(&addr, &client_identity, &connection_id);
251-
252-
let outgoing_queue_length_metric = WORKER_METRICS
253-
.client_connection_outgoing_queue_length
254-
.with_label_values(&addr, &client_identity, &connection_id);
231+
// Grab handles on the total incoming and outgoing queue length metrics,
232+
// which we'll increment and decrement as we push into and pull out of those queues.
233+
// Note that `total_outgoing_queue_length` is incremented separately,
234+
// by `ClientConnectionSender::send` in core/src/client/client_connection.rs;
235+
// we're only responsible for decrementing that one.
236+
// Also note that much care must be taken to clean up these metrics when the connection closes!
237+
// Any path which exits this function must decrement each of these metrics
238+
// by the number of messages still waiting in this client's queue,
239+
// or else they will grow without bound as clients disconnect, and be useless.
240+
let incoming_queue_length_metric = WORKER_METRICS.total_incoming_queue_length.with_label_values(&addr);
241+
let outgoing_queue_length_metric = WORKER_METRICS.total_outgoing_queue_length.with_label_values(&addr);
242+
243+
let clean_up_metrics = |message_queue: &VecDeque<(DataMessage, Instant)>,
244+
sendrx: &mpsc::Receiver<SerializableMessage>| {
245+
incoming_queue_length_metric.sub(message_queue.len() as _);
246+
outgoing_queue_length_metric.sub(sendrx.len() as _);
247+
};
255248

256249
loop {
257250
rx_buf.clear();
@@ -289,7 +282,10 @@ async fn ws_client_actor_inner(
289282
continue;
290283
}
291284
// the client sent us a close frame
292-
None => break,
285+
None => {
286+
clean_up_metrics(&message_queue, &sendrx);
287+
break
288+
},
293289
},
294290

295291
// If we have an outgoing message to send, send it off.
@@ -302,31 +298,31 @@ async fn ws_client_actor_inner(
302298
// even though the websocket RFC allows it. should we fork tungstenite?
303299
log::info!("dropping messages due to ws already being closed: {:?}", &rx_buf[..n]);
304300
} else {
305-
let send_all = async {
306-
for msg in rx_buf.drain(..n) {
307-
let workload = msg.workload();
308-
let num_rows = msg.num_rows();
309-
310-
let msg = datamsg_to_wsmsg(serialize(msg, client.config));
311-
312-
// These metrics should be updated together,
313-
// or not at all.
314-
if let (Some(workload), Some(num_rows)) = (workload, num_rows) {
315-
WORKER_METRICS
316-
.websocket_sent_num_rows
317-
.with_label_values(&addr, &workload)
318-
.observe(num_rows as f64);
319-
WORKER_METRICS
320-
.websocket_sent_msg_size
321-
.with_label_values(&addr, &workload)
322-
.observe(msg.len() as f64);
323-
}
324-
// feed() buffers the message, but does not necessarily send it
325-
ws.feed(msg).await?;
301+
let send_all = async {
302+
for msg in rx_buf.drain(..n) {
303+
let workload = msg.workload();
304+
let num_rows = msg.num_rows();
305+
306+
let msg = datamsg_to_wsmsg(serialize(msg, client.config));
307+
308+
// These metrics should be updated together,
309+
// or not at all.
310+
if let (Some(workload), Some(num_rows)) = (workload, num_rows) {
311+
WORKER_METRICS
312+
.websocket_sent_num_rows
313+
.with_label_values(&addr, &workload)
314+
.observe(num_rows as f64);
315+
WORKER_METRICS
316+
.websocket_sent_msg_size
317+
.with_label_values(&addr, &workload)
318+
.observe(msg.len() as f64);
326319
}
327-
// now we flush all the messages to the socket
328-
ws.flush().await
329-
};
320+
// feed() buffers the message, but does not necessarily send it
321+
ws.feed(msg).await?;
322+
}
323+
// now we flush all the messages to the socket
324+
ws.flush().await
325+
};
330326
// Flush the websocket while continuing to poll the `handle_queue`,
331327
// to avoid deadlocks or delays due to enqueued futures holding resources.
332328
let send_all = also_poll(send_all, make_progress(&mut current_message));
@@ -375,6 +371,7 @@ async fn ws_client_actor_inner(
375371
} else {
376372
// the client never responded to our ping; drop them without trying to send them a Close
377373
log::warn!("client {} timed out", client.id);
374+
clean_up_metrics(&message_queue, &sendrx);
378375
break;
379376
}
380377
}

crates/core/src/client/client_connection.rs

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@ pub struct ClientConnectionSender {
7070
sendtx: mpsc::Sender<SerializableMessage>,
7171
abort_handle: AbortHandle,
7272
cancelled: AtomicBool,
73+
/// The `total_outgoing_queue_length` metric labeled with this database's `Identity`,
74+
/// which we'll increment whenever sending a message.
75+
///
76+
/// This metric will be decremented, and cleaned up,
77+
/// by `ws_client_actor_inner` in client-api/src/routes/subscribe.rs.
78+
/// Care must be taken not to increment it after the client has disconnected
79+
/// and performed its clean-up.
7380
sendtx_queue_size_metric: Option<IntGauge>,
7481
}
7582

@@ -116,21 +123,26 @@ impl ClientConnectionSender {
116123
return Err(ClientSendError::Cancelled);
117124
}
118125

119-
if let Some(metric) = &self.sendtx_queue_size_metric {
120-
metric.inc();
121-
}
122-
123-
self.sendtx.try_send(message).map_err(|e| match e {
124-
mpsc::error::TrySendError::Full(_) => {
126+
match self.sendtx.try_send(message) {
127+
Err(mpsc::error::TrySendError::Full(_)) => {
125128
// we've hit CLIENT_CHANNEL_CAPACITY messages backed up in
126129
// the channel, so forcibly kick the client
127130
tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded");
128131
self.abort_handle.abort();
129132
self.cancelled.store(true, Relaxed);
130-
ClientSendError::Cancelled
133+
return Err(ClientSendError::Cancelled);
131134
}
132-
mpsc::error::TrySendError::Closed(_) => ClientSendError::Disconnected,
133-
})?;
135+
Err(mpsc::error::TrySendError::Closed(_)) => return Err(ClientSendError::Disconnected),
136+
Ok(()) => {
137+
// If we successfully pushed a message into the queue, increment the queue size metric.
138+
// Don't do this before pushing because, if the client has disconnected,
139+
// it will already have performed its clean-up,
140+
// and so would never perform the corresponding `dec` to this `inc`.
141+
if let Some(metric) = &self.sendtx_queue_size_metric {
142+
metric.inc();
143+
}
144+
}
145+
}
134146

135147
Ok(())
136148
}
@@ -225,9 +237,7 @@ impl ClientConnection {
225237
})
226238
.abort_handle();
227239

228-
let sendtx_queue_size_metric = WORKER_METRICS
229-
.client_connection_outgoing_queue_length
230-
.with_label_values(&db, &id.identity, &id.connection_id);
240+
let sendtx_queue_size_metric = WORKER_METRICS.total_outgoing_queue_length.with_label_values(&db);
231241

232242
let sender = Arc::new(ClientConnectionSender {
233243
id,

crates/core/src/worker_metrics/mod.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -258,15 +258,15 @@ metrics_group!(
258258
#[labels(txn_type: WorkloadType, db: Identity)]
259259
pub bytes_sent_to_clients: IntCounterVec,
260260

261-
#[name = spacetime_client_connection_incoming_queue_length]
262-
#[help = "The number of client -> server WebSocket messages waiting in a client connection's incoming queue"]
263-
#[labels(db: Identity, client_identity: Identity, connection_id: ConnectionId)]
264-
pub client_connection_incoming_queue_length: IntGaugeVec,
265-
266-
#[name = spacetime_client_connection_outgoing_queue_length]
267-
#[help = "The number of server -> client WebSocket messages waiting in a client connection's outgoing queue"]
268-
#[labels(db: Identity, client_identity: Identity, connection_id: ConnectionId)]
269-
pub client_connection_outgoing_queue_length: IntGaugeVec,
261+
#[name = spacetime_total_incoming_queue_length]
262+
#[help = "The number of client -> server WebSocket messages waiting any client's incoming queue"]
263+
#[labels(db: Identity)]
264+
pub total_incoming_queue_length: IntGaugeVec,
265+
266+
#[name = spacetime_total_outgoing_queue_length]
267+
#[help = "The number of server -> client WebSocket messages waiting in any client's outgoing queue"]
268+
#[labels(db: Identity)]
269+
pub total_outgoing_queue_length: IntGaugeVec,
270270
}
271271
);
272272

0 commit comments

Comments
 (0)