diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fb8b46..e0d51c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ Versioning](https://semver.org/spec/v2.0.0.html). - Add a publish API to return the source, raw_events from the source, timestamps for REconverge. +- Add GraphQL API to delete timeseries from database by id. + ## [0.9.0] - 2023-04-03 diff --git a/src/graphql.rs b/src/graphql.rs index 5540482..e0925c4 100644 --- a/src/graphql.rs +++ b/src/graphql.rs @@ -53,7 +53,10 @@ pub struct Query( ); #[derive(Default, MergedObject)] -pub struct Mutation(status::GigantoConfigMutation); +pub struct Mutation( + status::GigantoConfigMutation, + timeseries::TimeSeriesMutation, +); #[derive(InputObject, Serialize)] pub struct TimeRange { @@ -451,6 +454,7 @@ impl TestSchema { schema, } } + async fn execute(&self, query: &str) -> async_graphql::Response { let request: async_graphql::Request = query.into(); self.schema.execute(request).await diff --git a/src/graphql/timeseries.rs b/src/graphql/timeseries.rs index 6c92bda..3f73643 100644 --- a/src/graphql/timeseries.rs +++ b/src/graphql/timeseries.rs @@ -1,7 +1,7 @@ -use super::{get_timestamp, load_connection, FromKeyValue}; +use super::{get_timestamp, load_connection, network::key_prefix, FromKeyValue}; use crate::{ graphql::{RawEventFilter, TimeRange}, - storage::Database, + storage::{lower_closed_bound_key, upper_open_bound_key, Database}, }; use async_graphql::{ connection::{query, Connection}, @@ -14,6 +14,9 @@ use std::{fmt::Debug, net::IpAddr}; #[derive(Default)] pub(super) struct TimeSeriesQuery; +#[derive(Default)] +pub(super) struct TimeSeriesMutation; + // #[allow(clippy::module_name_repetitions)] #[derive(InputObject)] pub struct TimeSeriesFilter { @@ -91,6 +94,31 @@ impl TimeSeriesQuery { } } +#[Object] +impl TimeSeriesMutation { + #[allow(clippy::unused_async)] + async fn delete_time_series<'ctx>( + &self, + ctx: &Context<'ctx>, + ids: Vec, + ) -> Result { + let store = ctx.data::()?.periodic_time_series_store()?; + for id in ids { + let prefix = key_prefix(&id); + let iter = store.boundary_iter( + &lower_closed_bound_key(&prefix, None), + &upper_open_bound_key(&prefix, None), + rocksdb::Direction::Forward, + ); + for item in iter { + let (key, _) = item.map_err(|e| format!("failed to read database: {e}"))?; + if store.delete(&key).is_err() {} + } + } + Ok("deleted".to_string()) + } +} + #[cfg(test)] mod tests { use crate::{graphql::TestSchema, storage::RawEventStore}; @@ -118,12 +146,12 @@ mod tests { let schema = TestSchema::new(); let store = schema.db.periodic_time_series_store().unwrap(); - insert_time_series(&store, "src 1", 1, vec![0.0; 12]); - insert_time_series(&store, "src 1", 2, vec![0.0; 12]); + insert_time_series(&store, "id 1", 1, vec![0.0; 12]); + insert_time_series(&store, "id 1", 2, vec![0.0; 12]); let query = r#" { - periodicTimeSeries (filter: {id: "src 1"}, first: 1) { + periodicTimeSeries (filter: {id: "id 1"}, first: 10) { edges { node { id @@ -138,8 +166,51 @@ mod tests { let res = schema.execute(query).await; assert_eq!( res.data.to_string(), - "{periodicTimeSeries: {edges: [{node: {id: \"src 1\",data: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}}],pageInfo: {hasPreviousPage: false}}}" + "{periodicTimeSeries: {edges: [{node: {id: \"id 1\",data: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}},{node: {id: \"id 1\",data: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}}],pageInfo: {hasPreviousPage: false}}}" ); + + let mutation = r#" + mutation { + deleteTimeSeries (ids: ["id 1"]) + } + "#; + + let res = schema.execute(mutation).await; + assert_eq!(res.data.to_string(), "{deleteTimeSeries: \"deleted\"}"); + + let query = r#" + { + periodicTimeSeries (filter: {id: "id 1"}, first: 10) { + edges { + node { + id + } + } + } + }"#; + + let res = schema.execute(query).await; + assert_eq!(res.data.to_string(), "{periodicTimeSeries: {edges: []}}"); + } + + #[tokio::test] + async fn delete_time_series() { + let schema = TestSchema::new(); + let store = schema.db.periodic_time_series_store().unwrap(); + + insert_time_series(&store, "id 1", 1, vec![0.0; 12]); + insert_time_series(&store, "id 1", 2, vec![0.0; 12]); + insert_time_series(&store, "id 2", 1, vec![0.0; 12]); + insert_time_series(&store, "id 2", 2, vec![0.0; 12]); + + let mutation = r#" + mutation { + deleteTimeSeries (ids: ["id 1"]) + } + "#; + + let res = schema.execute(mutation).await; + assert_eq!(res.data.to_string(), "{deleteTimeSeries: \"deleted\"}"); } fn insert_time_series( diff --git a/src/publish.rs b/src/publish.rs index f0c7d1f..d7e2a51 100644 --- a/src/publish.rs +++ b/src/publish.rs @@ -563,7 +563,7 @@ where send_crusher_stream_start_message(&mut sender, id) .await .map_err(|e| anyhow!("Failed to write crusher start mesaage: {}", e))?; - info!("start cruhser's publish Stream : {:?}", record_type); + info!("start crusher's publish Stream : {:?}", record_type); let iter = store.boundary_iter( &lower_closed_bound_key( @@ -598,8 +598,8 @@ where if last_ts > ts { continue; } - if send_bytes(&mut sender, &buf).await.is_err(){ - for r_key in channel_remove_keys{ + if send_bytes(&mut sender, &buf).await.is_err() { + for r_key in channel_remove_keys { stream_direct_channel .write() .await