Skip to content

Commit

Permalink
fix: Wrap potentially blocking code in exporter (#23)
Browse files Browse the repository at this point in the history
# What ❔

Wraps potentially blocking metrics collection in
`tokio::task::spawn_blocking`.

## Why ❔

- We already have some blocking I/O used in metric collectors (e.g.,
RocksDB metrics).
- Some new metrics (e.g., table sizes for Postgres) would use
non-blocking I/O, which could be converted into blocking calls using
`Handle::block_on()`. Without this wrapper, they run into the issue of
calling it in a non-blocking context.
  • Loading branch information
slowli authored Mar 4, 2024
1 parent d68926c commit 7a7b013
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
27 changes: 18 additions & 9 deletions crates/vise-exporter/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,23 @@ struct MetricsExporterInner {
}

impl MetricsExporterInner {
fn render_body(&self) -> Body {
async fn render_body(&self) -> Body {
let mut buffer = self.scrape_legacy_metrics();

let latency = EXPORTER_METRICS.scrape_latency[&Facade::Vise].start();
let mut new_buffer = String::with_capacity(1_024);
self.registry.encode(&mut new_buffer, self.format).unwrap();
// ^ `unwrap()` is safe; writing to a string never fails.
let registry = Arc::clone(&self.registry);
let format = self.format;
// `Registry::encode()` is blocking in the general case (specifically, if collectors are used; they may use
// blocking I/O etc.). We cannot make metric collection non-blocking because the underlying library only provides
// blocking interface for collectors.
let new_buffer = tokio::task::spawn_blocking(move || {
let mut new_buffer = String::with_capacity(1_024);
registry.encode(&mut new_buffer, format).unwrap();
// ^ `unwrap()` is safe; writing to a string never fails.
new_buffer
})
.await
.unwrap(); // propagate panics should they occur in the spawned blocking task

let latency = latency.observe();
let scraped_size = new_buffer.len();
Expand Down Expand Up @@ -99,8 +109,7 @@ impl MetricsExporterInner {
.collect()
}

// TODO: consider using a streaming response?
fn render(&self) -> Response<Body> {
async fn render(&self) -> Response<Body> {
let content_type = if matches!(self.format, Format::Prometheus) {
Format::PROMETHEUS_CONTENT_TYPE
} else {
Expand All @@ -109,7 +118,7 @@ impl MetricsExporterInner {
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, content_type)
.body(self.render_body())
.body(self.render_body().await)
.unwrap()
}
}
Expand Down Expand Up @@ -266,7 +275,7 @@ impl<'a> MetricsExporter<'a> {
let inner = self.inner.clone();
future::ready(Ok::<_, hyper::Error>(service_fn(move |_| {
let inner = inner.clone();
async move { Ok::<_, hyper::Error>(inner.render()) }
async move { Ok::<_, hyper::Error>(inner.render().await) }
})))
}));
let local_addr = server.local_addr();
Expand Down Expand Up @@ -308,7 +317,7 @@ impl<'a> MetricsExporter<'a> {
.method(Method::PUT)
.uri(endpoint.clone())
.header(header::CONTENT_TYPE, Format::OPEN_METRICS_CONTENT_TYPE)
.body(self.inner.render_body())
.body(self.inner.render_body().await)
.expect("Failed creating Prometheus push gateway request");

match client.request(request).await {
Expand Down
2 changes: 1 addition & 1 deletion crates/vise-exporter/src/exporter/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn legacy_and_modern_metrics_can_coexist() {
let exporter = exporter.with_legacy_exporter(init_legacy_exporter);
report_metrics();

let response = exporter.inner.render();
let response = exporter.inner.render().await;
let response = body::to_bytes(response.into_body()).await;
let response = response.expect("failed decoding response");
assert_scraped_payload_is_valid(&response);
Expand Down

0 comments on commit 7a7b013

Please sign in to comment.