Skip to content

Commit

Permalink
Add delete_sampling_policy api
Browse files Browse the repository at this point in the history
- Add GraphQL API to delete timeseries from database by id.

Close: #351
  • Loading branch information
BLYKIM committed Apr 19, 2023
1 parent f4cb443 commit 50fccd9
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 6 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ file is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and
this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Add GraphQL API to delete timeseries from database by id.

## [0.9.0] - 2023-04-03

### Added
Expand Down
5 changes: 4 additions & 1 deletion src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ pub struct Query(
);

#[derive(Default, MergedObject)]
pub struct Mutation(status::GigantoConfigMutation);
pub struct Mutation(
status::GigantoConfigMutation,
timeseries::TimeSeriesMutation,
);

#[derive(InputObject, Serialize)]
pub struct TimeRange {
Expand Down
32 changes: 30 additions & 2 deletions src/graphql/timeseries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{get_timestamp, load_connection, FromKeyValue};
use super::{get_timestamp, load_connection, network::key_prefix, FromKeyValue};
use crate::{
graphql::{RawEventFilter, TimeRange},
storage::Database,
storage::{lower_closed_bound_key, upper_open_bound_key, Database},
};
use async_graphql::{
connection::{query, Connection},
Expand All @@ -14,6 +14,9 @@ use std::{fmt::Debug, net::IpAddr};
#[derive(Default)]
pub(super) struct TimeSeriesQuery;

#[derive(Default)]
pub(super) struct TimeSeriesMutation;

// #[allow(clippy::module_name_repetitions)]
#[derive(InputObject)]
pub struct TimeSeriesFilter {
Expand Down Expand Up @@ -91,6 +94,31 @@ impl TimeSeriesQuery {
}
}

#[Object]
impl TimeSeriesMutation {
#[allow(clippy::unused_async)]
async fn delete_time_series<'ctx>(
&self,
ctx: &Context<'ctx>,
ids: Vec<String>,
) -> Result<String> {
let store = ctx.data::<Database>()?.periodic_time_series_store()?;
for id in ids {
let prefix = key_prefix(&id);
let iter = store.boundary_iter(
&lower_closed_bound_key(&prefix, None),
&upper_open_bound_key(&prefix, None),
rocksdb::Direction::Forward,
);
for item in iter {
let (key, _) = item.map_err(|e| format!("failed to read database: {e}"))?;
if store.delete(&key).is_err() {}
}
}
Ok("deleted".to_string())
}
}

#[cfg(test)]
mod tests {
use crate::{graphql::TestSchema, storage::RawEventStore};
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ARG:
<CONFIG> A TOML config file
";

#[allow(clippy::too_many_lines)]
#[tokio::main]
async fn main() -> Result<()> {
let mut settings = if let Some(config_filename) = parse() {
Expand Down
6 changes: 3 additions & 3 deletions src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ where
send_crusher_stream_start_message(&mut sender, id)
.await
.map_err(|e| anyhow!("Failed to write crusher start mesaage: {}", e))?;
info!("start cruhser's publish Stream : {:?}", record_type);
info!("start crusher's publish Stream : {:?}", record_type);

let iter = store.boundary_iter(
&lower_closed_bound_key(
Expand Down Expand Up @@ -598,8 +598,8 @@ where
if last_ts > ts {
continue;
}
if send_bytes(&mut sender, &buf).await.is_err(){
for r_key in channel_remove_keys{
if send_bytes(&mut sender, &buf).await.is_err() {
for r_key in channel_remove_keys {
stream_direct_channel
.write()
.await
Expand Down

0 comments on commit 50fccd9

Please sign in to comment.