Skip to content

Commit

Permalink
Add delete_sampling_policy api
Browse files Browse the repository at this point in the history
- Add a GraphQL API that can delete timeseries from the database using an ID.

Issue: #351
  • Loading branch information
BLYKIM committed Jul 17, 2023
1 parent 03f2dc6 commit 2c357c5
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 7 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
- Add a GraphQL API that can delete timeseries from the database using an ID.

### Changed

- Replaced `lazy_static` with the new `std::sync::OnceLock`.
Expand Down
6 changes: 5 additions & 1 deletion src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ pub struct Query(
);

#[derive(Default, MergedObject)]
pub struct Mutation(status::GigantoConfigMutation);
pub struct Mutation(
status::GigantoConfigMutation,
timeseries::TimeSeriesMutation,
);

#[derive(InputObject, Serialize)]
pub struct TimeRange {
Expand Down Expand Up @@ -555,6 +558,7 @@ impl TestSchema {
schema,
}
}

async fn execute(&self, query: &str) -> async_graphql::Response {
let request: async_graphql::Request = query.into();
self.schema.execute(request).await
Expand Down
83 changes: 77 additions & 6 deletions src/graphql/timeseries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{get_timestamp, load_connection, FromKeyValue};
use super::{get_timestamp, load_connection, network::key_prefix, FromKeyValue};
use crate::{
graphql::{RawEventFilter, TimeRange},
storage::Database,
storage::{lower_closed_bound_key, upper_open_bound_key, Database},
};
use async_graphql::{
connection::{query, Connection},
Expand All @@ -14,6 +14,9 @@ use std::{fmt::Debug, net::IpAddr};
#[derive(Default)]
pub(super) struct TimeSeriesQuery;

#[derive(Default)]
pub(super) struct TimeSeriesMutation;

// #[allow(clippy::module_name_repetitions)]
#[derive(InputObject)]
pub struct TimeSeriesFilter {
Expand Down Expand Up @@ -92,6 +95,31 @@ impl TimeSeriesQuery {
}
}

#[Object]
impl TimeSeriesMutation {
#[allow(clippy::unused_async)]
async fn delete_time_series<'ctx>(
&self,
ctx: &Context<'ctx>,
ids: Vec<String>,
) -> Result<String> {
let store = ctx.data::<Database>()?.periodic_time_series_store()?;
for id in ids {
let prefix = key_prefix(&id);
let iter = store.boundary_iter(
&lower_closed_bound_key(&prefix, None),
&upper_open_bound_key(&prefix, None),
rocksdb::Direction::Forward,
);
for item in iter {
let (key, _) = item.map_err(|e| format!("failed to read database: {e}"))?;
if store.delete(&key).is_err() {}
}
}
Ok("deleted".to_string())
}
}

#[cfg(test)]
mod tests {
use crate::{graphql::TestSchema, storage::RawEventStore};
Expand Down Expand Up @@ -119,12 +147,12 @@ mod tests {
let schema = TestSchema::new();
let store = schema.db.periodic_time_series_store().unwrap();

insert_time_series(&store, "src 1", 1, vec![0.0; 12]);
insert_time_series(&store, "src 1", 2, vec![0.0; 12]);
insert_time_series(&store, "id 1", 1, vec![0.0; 12]);
insert_time_series(&store, "id 1", 2, vec![0.0; 12]);

let query = r#"
{
periodicTimeSeries (filter: {id: "src 1"}, first: 1) {
periodicTimeSeries (filter: {id: "id 1"}, first: 10) {
edges {
node {
id
Expand All @@ -139,8 +167,51 @@ mod tests {
let res = schema.execute(query).await;
assert_eq!(
res.data.to_string(),
"{periodicTimeSeries: {edges: [{node: {id: \"src 1\",data: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}}],pageInfo: {hasPreviousPage: false}}}"
"{periodicTimeSeries: {edges: [{node: {id: \"id 1\",data: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}},{node: {id: \"id 1\",data: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}}],pageInfo: {hasPreviousPage: false}}}"
);

let mutation = r#"
mutation {
deleteTimeSeries (ids: ["id 1"])
}
"#;

let res = schema.execute(mutation).await;
assert_eq!(res.data.to_string(), "{deleteTimeSeries: \"deleted\"}");

let query = r#"
{
periodicTimeSeries (filter: {id: "id 1"}, first: 10) {
edges {
node {
id
}
}
}
}"#;

let res = schema.execute(query).await;
assert_eq!(res.data.to_string(), "{periodicTimeSeries: {edges: []}}");
}

#[tokio::test]
async fn delete_time_series() {
let schema = TestSchema::new();
let store = schema.db.periodic_time_series_store().unwrap();

insert_time_series(&store, "id 1", 1, vec![0.0; 12]);
insert_time_series(&store, "id 1", 2, vec![0.0; 12]);
insert_time_series(&store, "id 2", 1, vec![0.0; 12]);
insert_time_series(&store, "id 2", 2, vec![0.0; 12]);

let mutation = r#"
mutation {
deleteTimeSeries (ids: ["id 1"])
}
"#;

let res = schema.execute(mutation).await;
assert_eq!(res.data.to_string(), "{deleteTimeSeries: \"deleted\"}");
}

fn insert_time_series(
Expand Down

0 comments on commit 2c357c5

Please sign in to comment.