Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multiple bugs: drop database, drop series, get all entries #223

Merged
merged 4 commits into from
Sep 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 75 additions & 21 deletions v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine};
use chrono::{NaiveDateTime, SecondsFormat};
use futures::prelude::*;
use influxdb2::{
api::buckets::ListBucketsRequest,
models::{DataPoint, PostBucketRequest, Query},
Client,
};
Expand Down Expand Up @@ -815,7 +816,13 @@ impl Storage for InfluxDbStorage {
OwnedKeyExpr::from_str(&zp.key_expr),
Timestamp::from_str(&zp.timestamp),
) {
(Ok(ke), Ok(ts)) => result.push((Some(ke), ts)),
(Ok(ke), Ok(ts)) => {
if ke.eq(&NONE_KEY_REF) {
result.push((None, ts))
} else {
result.push((Some(ke), ts))
}
}
(Err(err_ke), Err(err_ts)) => tracing::warn!(
"Failed to parse (OwnedKeyExpr,Timestamp):({:?},{:?}) Errors:({:?},{:?})",
zp.key_expr,
Expand Down Expand Up @@ -843,42 +850,89 @@ impl Storage for InfluxDbStorage {
impl Drop for InfluxDbStorage {
fn drop(&mut self) {
tracing::debug!("Closing InfluxDBv2 storage");
let db = match self.config.volume_cfg.get(PROP_STORAGE_DB) {
Some(serde_json::Value::String(s)) => s.to_string(),
_ => {
tracing::error!("no db was found");
return;
}
};

match self.on_closure {
OnClosure::DropDb => {
blockon_runtime(async move {
tracing::debug!("Close InfluxDBv2 storage, dropping database {}", db);
if let Err(e) = self.admin_client.delete_bucket(&db).await {
tracing::error!("Failed to drop InfluxDbv2 database '{}' : {}", db, e)
let db_name = self.db_name.clone();
let org = self.client.org.clone();
if let Err(e) = blockon_runtime(async {
tracing::debug!("Getting bucket ID for database {}", db_name);
let list_buckets_req = ListBucketsRequest {
after: None,
id: None,
limit: None,
name: Some(db_name.clone()),
offset: None,
org: None,
org_id: Some(org),
};
let response = self
.admin_client
.list_buckets(Some(list_buckets_req))
.await?;
if response.buckets.is_empty() {
bail!("Received empty bucket list from database");
}
});
if response.buckets.len() > 1 {
bail!("Influxdb2 bucket list contains more than one matching database");
}
let bucket_id = response.buckets[0]
.id
.clone()
.ok_or_else(|| zerror!("database bucket ID is None"))?;
tracing::debug!(
"Close InfluxDBv2 storage, dropping database {} with bucket ID {}",
db_name,
bucket_id
);
self.admin_client.delete_bucket(&bucket_id).await?;

Ok::<(), Error>(())
}) {
tracing::error!(
"Failed to drop InfluxDbv2 database '{}': {}",
self.db_name,
e
);
}
}
OnClosure::DropSeries => {
blockon_runtime(async move {
tracing::debug!(
"Close InfluxDBv2 storage, dropping all series from database {}",
db
self.db_name
);
// NOTE:
// InfluxDB2 MIN and MAX datetimes are as follows:
// - Min: 1677-09-21T00:12:43.145224194Z
// - Max: 2262-04-11T23:47:16.854775807Z
// `client.delete` function takes `NaiveDateTime` as parameters, internally converts them to string,
// but omits the nanoseconds. This is why `start` parameter is set to `1677-09-21T00:12:44.0Z` to be >= Min.
let start = NaiveDateTime::new(
chrono::NaiveDate::from_ymd_opt(1677, 9, 21)
.expect("Influxdb2 min date should be valid"),
chrono::NaiveTime::from_hms_opt(0, 12, 44)
.expect("Influxdb2 min date's time should be valid"),
);
let stop = NaiveDateTime::new(
chrono::NaiveDate::from_ymd_opt(2262, 4, 11)
.expect("Influxdb2 max date should be valid"),
chrono::NaiveTime::from_hms_opt(23, 47, 16)
.expect("Influxdb2 max date's time should be valid"),
);
let start = NaiveDateTime::MIN;
let stop = NaiveDateTime::MAX;
if let Err(e) = self.client.delete(&db, start, stop, None).await {
if let Err(e) = self.client.delete(&self.db_name, start, stop, None).await {
tracing::error!(
"Failed to drop all series from InfluxDbv2 database '{}' : {}",
db,
self.db_name,
e
)
}
});
}
OnClosure::DoNothing => {
tracing::debug!("Close InfluxDBv2 storage, keeping database {} as it is", db);
tracing::debug!(
"Close InfluxDBv2 storage, keeping database {} as it is",
self.db_name
);
}
}
}
Expand Down Expand Up @@ -956,7 +1010,7 @@ fn timerange_from_parameters(p: &str) -> ZResult<Option<String>> {
write_timeexpr(&mut result, t, 1);
}
TimeBound::Unbounded => {
result.push_str("start:1970-01-01T00:00:00Z");
result.push_str("start:0");
}
}
match stop {
Expand Down
Loading