Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

push_metrics panics inside tokio #453

Open
yds12 opened this issue Aug 23, 2022 · 5 comments
Open

push_metrics panics inside tokio #453

yds12 opened this issue Aug 23, 2022 · 5 comments

Comments

@yds12
Copy link

yds12 commented Aug 23, 2022

Describe the bug
Calling push_metrics from an async function with tokio causes this error:

thread 'main' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.' .../.cargo/registry/src/github.com-.../tokio-1.20.1/src/runtime/blocking/shutdown.rs:51:21

To Reproduce
Take the push example, annotate main with #[tokio::main], using tokio = { version = "1.20", features = ["rt", "rt-multi-thread", "macros"] } in the Cargo.toml. Running the example will produce the error above.

Expected behavior
No panic or async-compatible alternative.

@yds12
Copy link
Author

yds12 commented Aug 23, 2022

It's quite simple to make it work, basically just need to not use reqwest::blocking::Client, and put some async and await here and there (see patch below). But of course that would be a breaking change and force people to use async, so I don't know what you guys would like to do, maybe put the async code behind a feature flag? If you are interested I can send a PR for this.

diff --git a/examples/example_push.rs b/examples/example_push.rs
index 22d0195..c20f94b 100644
--- a/examples/example_push.rs
+++ b/examples/example_push.rs
@@ -26,7 +26,8 @@ lazy_static! {
 }
 
 #[cfg(feature = "push")]
-fn main() {
+#[tokio::main]
+async fn main() {
     let args: Vec<String> = env::args().collect();
     let program = args[0].clone();
 
@@ -63,6 +64,7 @@ fn main() {
                 password: "pass".to_owned(),
             }),
         )
+        .await
         .unwrap();
     }
 
diff --git a/src/push.rs b/src/push.rs
index 525b342..a3d039a 100644
--- a/src/push.rs
+++ b/src/push.rs
@@ -6,7 +6,7 @@ use std::hash::BuildHasher;
 use std::str::{self, FromStr};
 use std::time::Duration;
 
-use reqwest::blocking::Client;
+use reqwest::Client;
 use reqwest::header::CONTENT_TYPE;
 use reqwest::{Method, StatusCode, Url};
 
@@ -52,32 +52,32 @@ pub struct BasicAuthentication {
 /// Note that all previously pushed metrics with the same job and other grouping
 /// labels will be replaced with the metrics pushed by this call. (It uses HTTP
 /// method 'PUT' to push to the Pushgateway.)
-pub fn push_metrics<S: BuildHasher>(
+pub async fn push_metrics<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     mfs: Vec<proto::MetricFamily>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push(job, grouping, url, mfs, "PUT", basic_auth)
+    push(job, grouping, url, mfs, "PUT", basic_auth).await
 }
 
 /// `push_add_metrics` works like `push_metrics`, but only previously pushed
 /// metrics with the same name (and the same job and other grouping labels) will
 /// be replaced. (It uses HTTP method 'POST' to push to the Pushgateway.)
-pub fn push_add_metrics<S: BuildHasher>(
+pub async fn push_add_metrics<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     mfs: Vec<proto::MetricFamily>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push(job, grouping, url, mfs, "POST", basic_auth)
+    push(job, grouping, url, mfs, "POST", basic_auth).await
 }
 
 const LABEL_NAME_JOB: &str = "job";
 
-fn push<S: BuildHasher>(
+async fn push<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
@@ -160,7 +160,7 @@ fn push<S: BuildHasher>(
         builder = builder.basic_auth(username, Some(password));
     }
 
-    let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
+    let response = builder.send().await.map_err(|e| Error::Msg(format!("{}", e)))?;
 
     match response.status() {
         StatusCode::ACCEPTED => Ok(()),
@@ -173,7 +173,7 @@ fn push<S: BuildHasher>(
     }
 }
 
-fn push_from_collector<S: BuildHasher>(
+async fn push_from_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
@@ -187,31 +187,31 @@ fn push_from_collector<S: BuildHasher>(
     }
 
     let mfs = registry.gather();
-    push(job, grouping, url, mfs, method, basic_auth)
+    push(job, grouping, url, mfs, method, basic_auth).await
 }
 
 /// `push_collector` push metrics collected from the provided collectors. It is
 /// a convenient way to push only a few metrics.
-pub fn push_collector<S: BuildHasher>(
+pub async fn push_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     collectors: Vec<Box<dyn Collector>>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push_from_collector(job, grouping, url, collectors, "PUT", basic_auth)
+    push_from_collector(job, grouping, url, collectors, "PUT", basic_auth).await
 }
 
 /// `push_add_collector` works like `push_add_metrics`, it collects from the
 /// provided collectors. It is a convenient way to push only a few metrics.
-pub fn push_add_collector<S: BuildHasher>(
+pub async fn push_add_collector<S: BuildHasher>(
     job: &str,
     grouping: HashMap<String, String, S>,
     url: &str,
     collectors: Vec<Box<dyn Collector>>,
     basic_auth: Option<BasicAuthentication>,
 ) -> Result<()> {
-    push_from_collector(job, grouping, url, collectors, "POST", basic_auth)
+    push_from_collector(job, grouping, url, collectors, "POST", basic_auth).await
 }
 
 const DEFAULT_GROUP_LABEL_PAIR: (&str, &str) = ("instance", "unknown");
@@ -264,8 +264,8 @@ mod tests {
         assert!(!map.is_empty());
     }
 
-    #[test]
-    fn test_push_bad_label_name() {
+    #[tokio::test]
+    async fn test_push_bad_label_name() {
         let table = vec![
             // Error message: "pushed metric {} already contains a job label"
             (LABEL_NAME_JOB, "job label"),
@@ -280,7 +280,7 @@ mod tests {
             m.set_label(from_vec!(vec![l]));
             let mut mf = proto::MetricFamily::new();
             mf.set_metric(from_vec!(vec![m]));
-            let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None);
+            let res = push_metrics("test", hostname_grouping_key(), "mockurl", vec![mf], None).await;
             assert!(format!("{}", res.unwrap_err()).contains(case.1));
         }
     }

@lucab
Copy link
Member

lucab commented Aug 24, 2022

Thanks for the report. Have you perhaps tried the features-set described in #342 (comment) already?

@yds12
Copy link
Author

yds12 commented Aug 24, 2022

Hello, thanks for the reply. Just tried here, but the same error occurs.

@taj-p
Copy link

taj-p commented Dec 25, 2022

FWIW, the original fix no longer works. It's failing on the latest tokio (1.23.0) as far as I can tell.

@maoertel
Copy link

I wrote prometheus-push as a crate that handles the push functionality, so prometheus crates does not necessarily have to take care of this. prometheus-push works blocking, non-blocking, with this crate or "the other" prometheus_client crate. Or you can implement the traits provided there your self to use it whatever you want.

For this crate and non blocking reqwest it is as easy as that:

[dependencies]
prometheus_push = { version = "<version>", default-features = false, features = ["with_reqwest", "prometheus_crate"] }
use prometheus::labels;
use prometheus_push::prometheus_crate::PrometheusMetricsPusher;
use reqwest::Client;
use url::Url;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let push_gateway: Url = Url::parse("<address to pushgateway>")?;
    let client = Client::new();
    let metrics_pusher = PrometheusMetricsPusher::from(client, &push_gateway)?;
    metrics_pusher
        .push_all(
            "<your push jobs name>",
            &labels! { "<label_name>" => "<label_value>" },
            prometheus::gather(),
        )
        .await?;

    Ok(())
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants