From da692f1c17f1497108ca68c87dd7342eefe2e3f1 Mon Sep 17 00:00:00 2001 From: Charles Schleich Date: Mon, 9 Sep 2024 16:02:54 +0200 Subject: [PATCH 1/3] blockon dynamic plugin loading cause db creation runtime crash. get_all_entries, get and deletion exhibiting similar behaviour --- v2/src/lib.rs | 82 +++++++++++++++++++++++++-------------------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/v2/src/lib.rs b/v2/src/lib.rs index 7e07e37..8d57746 100644 --- a/v2/src/lib.rs +++ b/v2/src/lib.rs @@ -214,7 +214,7 @@ impl Plugin for InfluxDbBackend { Ok(client) => client, Err(e) => bail!("Error in creating client for InfluxDBv2 volume: {:?}", e), }; - match blockon_runtime(async { admin_client.ready().await }) { + match blockon_runtime(admin_client.ready()) { Ok(res) => { if !res { bail!("InfluxDBv2 server is not ready! ") @@ -252,9 +252,7 @@ pub struct InfluxDbVolume { impl InfluxDbVolume { async fn create_db(&self, org_id: &str, db: &str) -> Result<(), influxdb2::RequestError> { let post_bucket_options = PostBucketRequest::new(org_id.into(), db.into()); - self.admin_client - .create_bucket(Some(post_bucket_options)) - .await + blockon_runtime(self.admin_client.create_bucket(Some(post_bucket_options))) } } @@ -479,18 +477,20 @@ impl InfluxDbStorage { // get the value and if it exists then extract the timestamp from it let query = Query::new(qs); - let query_result: Vec = match self.client.query::(Some(query)).await - { - Ok(result) => result, - Err(e) => { - tracing::error!( - "Couldn't get data from InfluxDBv2 database {} with error: {} ", - self.db_name, - e - ); - return Ok(None); - } - }; + + let client = self.client.clone(); + let query_result: Vec = + match await_task!(client.query::(Some(query)).await, client) { + Ok(result) => result, + Err(e) => { + tracing::error!( + "Couldn't get data from InfluxDBv2 database {} with error: {} ", + self.db_name, + e + ); + return Ok(None); + } + }; match query_result.first() { Some(zp) => match Timestamp::from_str(&zp.timestamp) { @@ -622,7 +622,6 @@ impl Storage for InfluxDbStorage { measurement.clone(), stop_timestamp ); - if let Err(e) = self .client .write(&self.db_name, stream::iter(zenoh_point)) @@ -634,7 +633,6 @@ impl Storage for InfluxDbStorage { e ) } - // schedule_measurement_drop is used to schedule the drop of measurement later in the future, if it's empty // influx 2.x doesn't support dropping measurements from the API Ok(StorageInsertionResult::Deleted) @@ -682,19 +680,19 @@ impl Storage for InfluxDbStorage { let query = Query::new(query_string); let client = self.client.clone(); - let res = await_task!(client.query::(Some(query)).await,); - - let query_result: Vec = match res { - Ok(result) => result, - Err(e) => { - tracing::error!( - "Couldn't get data from database {} in InfluxDBv2 storage with error: {} ", - self.db_name, - e - ); - vec![] - } - }; + + let query_result: Vec = + match await_task!(client.query::(Some(query)).await,) { + Ok(result) => result, + Err(e) => { + tracing::error!( + "Couldn't get data from database {} in InfluxDBv2 storage with error: {} ", + self.db_name, + e + ); + vec![] + } + }; let mut result: Vec = vec![]; @@ -776,16 +774,18 @@ impl Storage for InfluxDbStorage { let query = Query::new(qs); - let vec_zpoint: Vec = match self.client.query::(Some(query)).await { - Ok(result) => result, - Err(e) => { - bail!( - "Couldn't get data from database {} in InfluxDBv2 storage with error: {} ", - self.db_name, - e - ); - } - }; + let client = self.client.clone(); + let vec_zpoint: Vec = + match await_task!(client.query::(Some(query)).await,) { + Ok(result) => result, + Err(e) => { + bail!( + "Couldn't get data from database {} in InfluxDBv2 storage with error: {} ", + self.db_name, + e + ); + } + }; for zp in vec_zpoint { match ( From 23f165a038b10511917a34e4b46c50c5b33e1145 Mon Sep 17 00:00:00 2001 From: Charles Schleich Date: Thu, 12 Sep 2024 10:56:18 +0200 Subject: [PATCH 2/3] Add await_task! del, put, and replace blockon in create_db --- v2/src/lib.rs | 47 +++++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/v2/src/lib.rs b/v2/src/lib.rs index 8d57746..ea88606 100644 --- a/v2/src/lib.rs +++ b/v2/src/lib.rs @@ -250,9 +250,14 @@ pub struct InfluxDbVolume { } impl InfluxDbVolume { - async fn create_db(&self, org_id: &str, db: &str) -> Result<(), influxdb2::RequestError> { + async fn create_db(&self, org_id: &str, db: &str) -> ZResult<()> { let post_bucket_options = PostBucketRequest::new(org_id.into(), db.into()); - blockon_runtime(self.admin_client.create_bucket(Some(post_bucket_options))) + let admin_client = self.admin_client.clone(); + match await_task!(admin_client.create_bucket(Some(post_bucket_options)).await,) { + Ok(_) => tracing::info!("Created {db} Influx"), + Err(e) => bail!("Failed to create InfluxDBv2 Storage : {:?}", e), + } + Ok(()) } } @@ -332,10 +337,7 @@ impl Volume for InfluxDbVolume { Ok(db_exists) => { if !db_exists && createdb { // Try to create db using user credentials - match self.create_db(&creds.org_id, &db).await { - Ok(_) => tracing::info!("Created {db} Influx"), - Err(e) => bail!("Failed to create InfluxDBv2 Storage : {:?}", e), - } + self.create_db(&creds.org_id, &db).await? } else if db_exists && createdb { tracing::warn!("Database '{db}' already exists exists in Influx and config 'create_db'='true'"); } @@ -480,7 +482,7 @@ impl InfluxDbStorage { let client = self.client.clone(); let query_result: Vec = - match await_task!(client.query::(Some(query)).await, client) { + match await_task!(client.query::(Some(query)).await,) { Ok(result) => result, Err(e) => { tracing::error!( @@ -554,11 +556,10 @@ impl Storage for InfluxDbStorage { .timestamp(influx_time) //converted timestamp to i64 .build()?]; - match self - .client - .write(&self.db_name, stream::iter(zenoh_point)) - .await - { + let client = self.client.clone(); + let db_name = self.db_name.clone(); + + match await_task!(client.write(&db_name, stream::iter(zenoh_point)).await,) { Ok(_) => Ok(StorageInsertionResult::Inserted), Err(e) => bail!( "Failed to put Value for {:?} in InfluxDBv2 storage : {}", @@ -592,11 +593,14 @@ impl Storage for InfluxDbStorage { "Delete {:?} with Influx query in InfluxDBv2 storage, Time Range: {:?} - {:?}, Predicate: {:?}", measurement, start_timestamp, stop_timestamp, predicate ); - if let Err(e) = self - .client - .delete(&self.db_name, start_timestamp, stop_timestamp, predicate) - .await - { + + let client = self.client.clone(); + let db_name = self.db_name.clone(); + if let Err(e) = await_task!( + client + .delete(&db_name, start_timestamp, stop_timestamp, predicate) + .await, + ) { bail!( "Failed to delete points for measurement '{}' from InfluxDBv2 storage : {}", measurement, @@ -622,11 +626,10 @@ impl Storage for InfluxDbStorage { measurement.clone(), stop_timestamp ); - if let Err(e) = self - .client - .write(&self.db_name, stream::iter(zenoh_point)) - .await - { + + let client = self.client.clone(); + let db_name = self.db_name.clone(); + if let Err(e) = await_task!(client.write(&db_name, stream::iter(zenoh_point)).await,) { bail!( "Failed to mark measurement {:?} as deleted : {} in InfluxDBv2 storage", measurement, From f854b8fd209bd6ff2f7982ed530f3d6f9ea3c982 Mon Sep 17 00:00:00 2001 From: Charles Schleich Date: Thu, 12 Sep 2024 11:05:22 +0200 Subject: [PATCH 3/3] merge fix, storage credentials --- v2/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/src/lib.rs b/v2/src/lib.rs index 76228bc..9260352 100644 --- a/v2/src/lib.rs +++ b/v2/src/lib.rs @@ -357,7 +357,7 @@ impl Volume for InfluxDbVolume { Ok(db_exists) => { if !db_exists && createdb { // Try to create db using user credentials - self.create_db(&creds.org_id, &db).await? + self.create_db(&storage_creds.org_id, &db).await? } else if db_exists && createdb { tracing::warn!("Database '{db}' already exists exists in Influx and config 'create_db'='true'"); }