Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/db_creation_runtime_crash #214

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 63 additions & 60 deletions v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl Plugin for InfluxDbBackend {
Ok(client) => client,
Err(e) => bail!("Error in creating client for InfluxDBv2 volume: {:?}", e),
};
match blockon_runtime(async { admin_client.ready().await }) {
match blockon_runtime(admin_client.ready()) {
Ok(res) => {
if !res {
bail!("InfluxDBv2 server is not ready! ")
Expand Down Expand Up @@ -255,11 +255,14 @@ pub struct InfluxDbVolume {
}

impl InfluxDbVolume {
async fn create_db(&self, org_id: &str, db: &str) -> Result<(), influxdb2::RequestError> {
async fn create_db(&self, org_id: &str, db: &str) -> ZResult<()> {
let post_bucket_options = PostBucketRequest::new(org_id.into(), db.into());
self.admin_client
.create_bucket(Some(post_bucket_options))
.await
let admin_client = self.admin_client.clone();
match await_task!(admin_client.create_bucket(Some(post_bucket_options)).await,) {
Ok(_) => tracing::info!("Created {db} Influx"),
Err(e) => bail!("Failed to create InfluxDBv2 Storage : {:?}", e),
}
Ok(())
}
}

Expand Down Expand Up @@ -354,10 +357,7 @@ impl Volume for InfluxDbVolume {
Ok(db_exists) => {
if !db_exists && createdb {
// Try to create db using user credentials
match self.create_db(&storage_creds.org_id, &db).await {
Ok(_) => tracing::info!("Created {db} Influx"),
Err(e) => bail!("Failed to create InfluxDBv2 Storage : {:?}", e),
}
self.create_db(&storage_creds.org_id, &db).await?
} else if db_exists && createdb {
tracing::warn!("Database '{db}' already exists exists in Influx and config 'create_db'='true'");
}
Expand Down Expand Up @@ -499,18 +499,20 @@ impl InfluxDbStorage {

// get the value and if it exists then extract the timestamp from it
let query = Query::new(qs);
let query_result: Vec<ZenohPoint> = match self.client.query::<ZenohPoint>(Some(query)).await
{
Ok(result) => result,
Err(e) => {
tracing::error!(
"Couldn't get data from InfluxDBv2 database {} with error: {} ",
self.db_name,
e
);
return Ok(None);
}
};

let client = self.client.clone();
let query_result: Vec<ZenohPoint> =
match await_task!(client.query::<ZenohPoint>(Some(query)).await,) {
Ok(result) => result,
Err(e) => {
tracing::error!(
"Couldn't get data from InfluxDBv2 database {} with error: {} ",
self.db_name,
e
);
return Ok(None);
}
};

match query_result.first() {
Some(zp) => match Timestamp::from_str(&zp.timestamp) {
Expand Down Expand Up @@ -574,11 +576,10 @@ impl Storage for InfluxDbStorage {
.timestamp(influx_time) //converted timestamp to i64
.build()?];

match self
.client
.write(&self.db_name, stream::iter(zenoh_point))
.await
{
let client = self.client.clone();
let db_name = self.db_name.clone();

match await_task!(client.write(&db_name, stream::iter(zenoh_point)).await,) {
Ok(_) => Ok(StorageInsertionResult::Inserted),
Err(e) => bail!(
"Failed to put Value for {:?} in InfluxDBv2 storage : {}",
Expand Down Expand Up @@ -612,11 +613,14 @@ impl Storage for InfluxDbStorage {
"Delete {:?} with Influx query in InfluxDBv2 storage, Time Range: {:?} - {:?}, Predicate: {:?}",
measurement, start_timestamp, stop_timestamp, predicate
);
if let Err(e) = self
.client
.delete(&self.db_name, start_timestamp, stop_timestamp, predicate)
.await
{

let client = self.client.clone();
let db_name = self.db_name.clone();
if let Err(e) = await_task!(
client
.delete(&db_name, start_timestamp, stop_timestamp, predicate)
.await,
) {
bail!(
"Failed to delete points for measurement '{}' from InfluxDBv2 storage : {}",
measurement,
Expand All @@ -643,18 +647,15 @@ impl Storage for InfluxDbStorage {
stop_timestamp
);

if let Err(e) = self
.client
.write(&self.db_name, stream::iter(zenoh_point))
.await
{
let client = self.client.clone();
let db_name = self.db_name.clone();
if let Err(e) = await_task!(client.write(&db_name, stream::iter(zenoh_point)).await,) {
bail!(
"Failed to mark measurement {:?} as deleted : {} in InfluxDBv2 storage",
measurement,
e
)
}

// schedule_measurement_drop is used to schedule the drop of measurement later in the future, if it's empty
// influx 2.x doesn't support dropping measurements from the API
Ok(StorageInsertionResult::Deleted)
Expand Down Expand Up @@ -702,19 +703,19 @@ impl Storage for InfluxDbStorage {

let query = Query::new(query_string);
let client = self.client.clone();
let res = await_task!(client.query::<ZenohPoint>(Some(query)).await,);

let query_result: Vec<ZenohPoint> = match res {
Ok(result) => result,
Err(e) => {
tracing::error!(
"Couldn't get data from database {} in InfluxDBv2 storage with error: {} ",
self.db_name,
e
);
vec![]
}
};

let query_result: Vec<ZenohPoint> =
match await_task!(client.query::<ZenohPoint>(Some(query)).await,) {
Ok(result) => result,
Err(e) => {
tracing::error!(
"Couldn't get data from database {} in InfluxDBv2 storage with error: {} ",
self.db_name,
e
);
vec![]
}
};

let mut result: Vec<StoredData> = vec![];

Expand Down Expand Up @@ -796,16 +797,18 @@ impl Storage for InfluxDbStorage {

let query = Query::new(qs);

let vec_zpoint: Vec<ZenohPoint> = match self.client.query::<ZenohPoint>(Some(query)).await {
Ok(result) => result,
Err(e) => {
bail!(
"Couldn't get data from database {} in InfluxDBv2 storage with error: {} ",
self.db_name,
e
);
}
};
let client = self.client.clone();
let vec_zpoint: Vec<ZenohPoint> =
match await_task!(client.query::<ZenohPoint>(Some(query)).await,) {
Ok(result) => result,
Err(e) => {
bail!(
"Couldn't get data from database {} in InfluxDBv2 storage with error: {} ",
self.db_name,
e
);
}
};

for zp in vec_zpoint {
match (
Expand Down