From 7a7b01397194b72cb31c1326c2b9582d95eba3b3 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 4 Mar 2024 14:52:07 +0200 Subject: [PATCH] fix: Wrap potentially blocking code in exporter (#23) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # What ❔ Wraps potentially blocking metrics collection in `tokio::task::spawn_blocking`. ## Why ❔ - We already have some blocking I/O used in metric collectors (e.g., RocksDB metrics). - Some new metrics (e.g., table sizes for Postgres) would use non-blocking I/O, which could be converted into blocking calls using `Handle::block_on()`. Without this wrapper, they run into the issue of calling it in a non-blocking context. --- crates/vise-exporter/src/exporter/mod.rs | 27 ++++++++++++++-------- crates/vise-exporter/src/exporter/tests.rs | 2 +- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/crates/vise-exporter/src/exporter/mod.rs b/crates/vise-exporter/src/exporter/mod.rs index 3cedec6..22f7885 100644 --- a/crates/vise-exporter/src/exporter/mod.rs +++ b/crates/vise-exporter/src/exporter/mod.rs @@ -34,13 +34,23 @@ struct MetricsExporterInner { } impl MetricsExporterInner { - fn render_body(&self) -> Body { + async fn render_body(&self) -> Body { let mut buffer = self.scrape_legacy_metrics(); let latency = EXPORTER_METRICS.scrape_latency[&Facade::Vise].start(); - let mut new_buffer = String::with_capacity(1_024); - self.registry.encode(&mut new_buffer, self.format).unwrap(); - // ^ `unwrap()` is safe; writing to a string never fails. + let registry = Arc::clone(&self.registry); + let format = self.format; + // `Registry::encode()` is blocking in the general case (specifically, if collectors are used; they may use + // blocking I/O etc.). We cannot make metric collection non-blocking because the underlying library only provides + // blocking interface for collectors. + let new_buffer = tokio::task::spawn_blocking(move || { + let mut new_buffer = String::with_capacity(1_024); + registry.encode(&mut new_buffer, format).unwrap(); + // ^ `unwrap()` is safe; writing to a string never fails. + new_buffer + }) + .await + .unwrap(); // propagate panics should they occur in the spawned blocking task let latency = latency.observe(); let scraped_size = new_buffer.len(); @@ -99,8 +109,7 @@ impl MetricsExporterInner { .collect() } - // TODO: consider using a streaming response? - fn render(&self) -> Response { + async fn render(&self) -> Response { let content_type = if matches!(self.format, Format::Prometheus) { Format::PROMETHEUS_CONTENT_TYPE } else { @@ -109,7 +118,7 @@ impl MetricsExporterInner { Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, content_type) - .body(self.render_body()) + .body(self.render_body().await) .unwrap() } } @@ -266,7 +275,7 @@ impl<'a> MetricsExporter<'a> { let inner = self.inner.clone(); future::ready(Ok::<_, hyper::Error>(service_fn(move |_| { let inner = inner.clone(); - async move { Ok::<_, hyper::Error>(inner.render()) } + async move { Ok::<_, hyper::Error>(inner.render().await) } }))) })); let local_addr = server.local_addr(); @@ -308,7 +317,7 @@ impl<'a> MetricsExporter<'a> { .method(Method::PUT) .uri(endpoint.clone()) .header(header::CONTENT_TYPE, Format::OPEN_METRICS_CONTENT_TYPE) - .body(self.inner.render_body()) + .body(self.inner.render_body().await) .expect("Failed creating Prometheus push gateway request"); match client.request(request).await { diff --git a/crates/vise-exporter/src/exporter/tests.rs b/crates/vise-exporter/src/exporter/tests.rs index 063d604..035bfd4 100644 --- a/crates/vise-exporter/src/exporter/tests.rs +++ b/crates/vise-exporter/src/exporter/tests.rs @@ -55,7 +55,7 @@ async fn legacy_and_modern_metrics_can_coexist() { let exporter = exporter.with_legacy_exporter(init_legacy_exporter); report_metrics(); - let response = exporter.inner.render(); + let response = exporter.inner.render().await; let response = body::to_bytes(response.into_body()).await; let response = response.expect("failed decoding response"); assert_scraped_payload_is_valid(&response);