diff --git a/v2/src/lib.rs b/v2/src/lib.rs index b0530fb..9260352 100644 --- a/v2/src/lib.rs +++ b/v2/src/lib.rs @@ -219,7 +219,7 @@ impl Plugin for InfluxDbBackend { Ok(client) => client, Err(e) => bail!("Error in creating client for InfluxDBv2 volume: {:?}", e), }; - match blockon_runtime(async { admin_client.ready().await }) { + match blockon_runtime(admin_client.ready()) { Ok(res) => { if !res { bail!("InfluxDBv2 server is not ready! ") @@ -255,11 +255,14 @@ pub struct InfluxDbVolume { } impl InfluxDbVolume { - async fn create_db(&self, org_id: &str, db: &str) -> Result<(), influxdb2::RequestError> { + async fn create_db(&self, org_id: &str, db: &str) -> ZResult<()> { let post_bucket_options = PostBucketRequest::new(org_id.into(), db.into()); - self.admin_client - .create_bucket(Some(post_bucket_options)) - .await + let admin_client = self.admin_client.clone(); + match await_task!(admin_client.create_bucket(Some(post_bucket_options)).await,) { + Ok(_) => tracing::info!("Created {db} Influx"), + Err(e) => bail!("Failed to create InfluxDBv2 Storage : {:?}", e), + } + Ok(()) } } @@ -354,10 +357,7 @@ impl Volume for InfluxDbVolume { Ok(db_exists) => { if !db_exists && createdb { // Try to create db using user credentials - match self.create_db(&storage_creds.org_id, &db).await { - Ok(_) => tracing::info!("Created {db} Influx"), - Err(e) => bail!("Failed to create InfluxDBv2 Storage : {:?}", e), - } + self.create_db(&storage_creds.org_id, &db).await? } else if db_exists && createdb { tracing::warn!("Database '{db}' already exists exists in Influx and config 'create_db'='true'"); } @@ -499,18 +499,20 @@ impl InfluxDbStorage { // get the value and if it exists then extract the timestamp from it let query = Query::new(qs); - let query_result: Vec = match self.client.query::(Some(query)).await - { - Ok(result) => result, - Err(e) => { - tracing::error!( - "Couldn't get data from InfluxDBv2 database {} with error: {} ", - self.db_name, - e - ); - return Ok(None); - } - }; + + let client = self.client.clone(); + let query_result: Vec = + match await_task!(client.query::(Some(query)).await,) { + Ok(result) => result, + Err(e) => { + tracing::error!( + "Couldn't get data from InfluxDBv2 database {} with error: {} ", + self.db_name, + e + ); + return Ok(None); + } + }; match query_result.first() { Some(zp) => match Timestamp::from_str(&zp.timestamp) { @@ -574,11 +576,10 @@ impl Storage for InfluxDbStorage { .timestamp(influx_time) //converted timestamp to i64 .build()?]; - match self - .client - .write(&self.db_name, stream::iter(zenoh_point)) - .await - { + let client = self.client.clone(); + let db_name = self.db_name.clone(); + + match await_task!(client.write(&db_name, stream::iter(zenoh_point)).await,) { Ok(_) => Ok(StorageInsertionResult::Inserted), Err(e) => bail!( "Failed to put Value for {:?} in InfluxDBv2 storage : {}", @@ -612,11 +613,14 @@ impl Storage for InfluxDbStorage { "Delete {:?} with Influx query in InfluxDBv2 storage, Time Range: {:?} - {:?}, Predicate: {:?}", measurement, start_timestamp, stop_timestamp, predicate ); - if let Err(e) = self - .client - .delete(&self.db_name, start_timestamp, stop_timestamp, predicate) - .await - { + + let client = self.client.clone(); + let db_name = self.db_name.clone(); + if let Err(e) = await_task!( + client + .delete(&db_name, start_timestamp, stop_timestamp, predicate) + .await, + ) { bail!( "Failed to delete points for measurement '{}' from InfluxDBv2 storage : {}", measurement, @@ -643,18 +647,15 @@ impl Storage for InfluxDbStorage { stop_timestamp ); - if let Err(e) = self - .client - .write(&self.db_name, stream::iter(zenoh_point)) - .await - { + let client = self.client.clone(); + let db_name = self.db_name.clone(); + if let Err(e) = await_task!(client.write(&db_name, stream::iter(zenoh_point)).await,) { bail!( "Failed to mark measurement {:?} as deleted : {} in InfluxDBv2 storage", measurement, e ) } - // schedule_measurement_drop is used to schedule the drop of measurement later in the future, if it's empty // influx 2.x doesn't support dropping measurements from the API Ok(StorageInsertionResult::Deleted) @@ -702,19 +703,19 @@ impl Storage for InfluxDbStorage { let query = Query::new(query_string); let client = self.client.clone(); - let res = await_task!(client.query::(Some(query)).await,); - - let query_result: Vec = match res { - Ok(result) => result, - Err(e) => { - tracing::error!( - "Couldn't get data from database {} in InfluxDBv2 storage with error: {} ", - self.db_name, - e - ); - vec![] - } - }; + + let query_result: Vec = + match await_task!(client.query::(Some(query)).await,) { + Ok(result) => result, + Err(e) => { + tracing::error!( + "Couldn't get data from database {} in InfluxDBv2 storage with error: {} ", + self.db_name, + e + ); + vec![] + } + }; let mut result: Vec = vec![]; @@ -796,16 +797,18 @@ impl Storage for InfluxDbStorage { let query = Query::new(qs); - let vec_zpoint: Vec = match self.client.query::(Some(query)).await { - Ok(result) => result, - Err(e) => { - bail!( - "Couldn't get data from database {} in InfluxDBv2 storage with error: {} ", - self.db_name, - e - ); - } - }; + let client = self.client.clone(); + let vec_zpoint: Vec = + match await_task!(client.query::(Some(query)).await,) { + Ok(result) => result, + Err(e) => { + bail!( + "Couldn't get data from database {} in InfluxDBv2 storage with error: {} ", + self.db_name, + e + ); + } + }; for zp in vec_zpoint { match (