Skip to content

Commit

Permalink
Add delete_sampling_policy api
Browse files Browse the repository at this point in the history
- Add GraphQL API to delete timeseries from database by id.

Close: #351
  • Loading branch information
BLYKIM committed Apr 19, 2023
1 parent 3984e48 commit 135f9d5
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Versioning](https://semver.org/spec/v2.0.0.html).

- Add a publish API to return the source, raw_events
from the source, timestamps for REconverge.
- Add GraphQL API to delete timeseries from database by id.


## [0.9.0] - 2023-04-03

Expand Down
6 changes: 5 additions & 1 deletion src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ pub struct Query(
);

#[derive(Default, MergedObject)]
pub struct Mutation(status::GigantoConfigMutation);
pub struct Mutation(
status::GigantoConfigMutation,
timeseries::TimeSeriesMutation,
);

#[derive(InputObject, Serialize)]
pub struct TimeRange {
Expand Down Expand Up @@ -451,6 +454,7 @@ impl TestSchema {
schema,
}
}

async fn execute(&self, query: &str) -> async_graphql::Response {
let request: async_graphql::Request = query.into();
self.schema.execute(request).await
Expand Down
83 changes: 77 additions & 6 deletions src/graphql/timeseries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{get_timestamp, load_connection, FromKeyValue};
use super::{get_timestamp, load_connection, network::key_prefix, FromKeyValue};
use crate::{
graphql::{RawEventFilter, TimeRange},
storage::Database,
storage::{lower_closed_bound_key, upper_open_bound_key, Database},
};
use async_graphql::{
connection::{query, Connection},
Expand All @@ -14,6 +14,9 @@ use std::{fmt::Debug, net::IpAddr};
#[derive(Default)]
pub(super) struct TimeSeriesQuery;

#[derive(Default)]
pub(super) struct TimeSeriesMutation;

// #[allow(clippy::module_name_repetitions)]
#[derive(InputObject)]
pub struct TimeSeriesFilter {
Expand Down Expand Up @@ -91,6 +94,31 @@ impl TimeSeriesQuery {
}
}

#[Object]
impl TimeSeriesMutation {
#[allow(clippy::unused_async)]
async fn delete_time_series<'ctx>(
&self,
ctx: &Context<'ctx>,
ids: Vec<String>,
) -> Result<String> {
let store = ctx.data::<Database>()?.periodic_time_series_store()?;
for id in ids {
let prefix = key_prefix(&id);
let iter = store.boundary_iter(
&lower_closed_bound_key(&prefix, None),
&upper_open_bound_key(&prefix, None),
rocksdb::Direction::Forward,
);
for item in iter {
let (key, _) = item.map_err(|e| format!("failed to read database: {e}"))?;
if store.delete(&key).is_err() {}
}
}
Ok("deleted".to_string())
}
}

#[cfg(test)]
mod tests {
use crate::{graphql::TestSchema, storage::RawEventStore};
Expand Down Expand Up @@ -118,12 +146,12 @@ mod tests {
let schema = TestSchema::new();
let store = schema.db.periodic_time_series_store().unwrap();

insert_time_series(&store, "src 1", 1, vec![0.0; 12]);
insert_time_series(&store, "src 1", 2, vec![0.0; 12]);
insert_time_series(&store, "id 1", 1, vec![0.0; 12]);
insert_time_series(&store, "id 1", 2, vec![0.0; 12]);

let query = r#"
{
periodicTimeSeries (filter: {id: "src 1"}, first: 1) {
periodicTimeSeries (filter: {id: "id 1"}, first: 10) {
edges {
node {
id
Expand All @@ -138,8 +166,51 @@ mod tests {
let res = schema.execute(query).await;
assert_eq!(
res.data.to_string(),
"{periodicTimeSeries: {edges: [{node: {id: \"src 1\",data: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}}],pageInfo: {hasPreviousPage: false}}}"
"{periodicTimeSeries: {edges: [{node: {id: \"id 1\",data: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}},{node: {id: \"id 1\",data: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]}}],pageInfo: {hasPreviousPage: false}}}"
);

let mutation = r#"
mutation {
deleteTimeSeries (ids: ["id 1"])
}
"#;

let res = schema.execute(mutation).await;
assert_eq!(res.data.to_string(), "{deleteTimeSeries: \"deleted\"}");

let query = r#"
{
periodicTimeSeries (filter: {id: "id 1"}, first: 10) {
edges {
node {
id
}
}
}
}"#;

let res = schema.execute(query).await;
assert_eq!(res.data.to_string(), "{periodicTimeSeries: {edges: []}}");
}

#[tokio::test]
async fn delete_time_series() {
let schema = TestSchema::new();
let store = schema.db.periodic_time_series_store().unwrap();

insert_time_series(&store, "id 1", 1, vec![0.0; 12]);
insert_time_series(&store, "id 1", 2, vec![0.0; 12]);
insert_time_series(&store, "id 2", 1, vec![0.0; 12]);
insert_time_series(&store, "id 2", 2, vec![0.0; 12]);

let mutation = r#"
mutation {
deleteTimeSeries (ids: ["id 1"])
}
"#;

let res = schema.execute(mutation).await;
assert_eq!(res.data.to_string(), "{deleteTimeSeries: \"deleted\"}");
}

fn insert_time_series(
Expand Down
6 changes: 3 additions & 3 deletions src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ where
send_crusher_stream_start_message(&mut sender, id)
.await
.map_err(|e| anyhow!("Failed to write crusher start mesaage: {}", e))?;
info!("start cruhser's publish Stream : {:?}", record_type);
info!("start crusher's publish Stream : {:?}", record_type);

let iter = store.boundary_iter(
&lower_closed_bound_key(
Expand Down Expand Up @@ -598,8 +598,8 @@ where
if last_ts > ts {
continue;
}
if send_bytes(&mut sender, &buf).await.is_err(){
for r_key in channel_remove_keys{
if send_bytes(&mut sender, &buf).await.is_err() {
for r_key in channel_remove_keys {
stream_direct_channel
.write()
.await
Expand Down

0 comments on commit 135f9d5

Please sign in to comment.