Skip to content

Commit

Permalink
Add delete_sampling_policy api
Browse files Browse the repository at this point in the history
- Add mutation to timeseries.
- Delete timeseries from database by id.

Close: #351
  • Loading branch information
BLYKIM committed Apr 13, 2023
1 parent f4cb443 commit e3dedbf
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 5 deletions.
5 changes: 4 additions & 1 deletion src/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ pub struct Query(
);

#[derive(Default, MergedObject)]
pub struct Mutation(status::GigantoConfigMutation);
pub struct Mutation(
status::GigantoConfigMutation,
timeseries::TimeSeriesMutation,
);

#[derive(InputObject, Serialize)]
pub struct TimeRange {
Expand Down
32 changes: 30 additions & 2 deletions src/graphql/timeseries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{get_timestamp, load_connection, FromKeyValue};
use super::{get_timestamp, load_connection, network::key_prefix, FromKeyValue};
use crate::{
graphql::{RawEventFilter, TimeRange},
storage::Database,
storage::{lower_closed_bound_key, upper_open_bound_key, Database},
};
use async_graphql::{
connection::{query, Connection},
Expand All @@ -14,6 +14,9 @@ use std::{fmt::Debug, net::IpAddr};
#[derive(Default)]
pub(super) struct TimeSeriesQuery;

#[derive(Default)]
pub(super) struct TimeSeriesMutation;

// #[allow(clippy::module_name_repetitions)]
#[derive(InputObject)]
pub struct TimeSeriesFilter {
Expand Down Expand Up @@ -91,6 +94,31 @@ impl TimeSeriesQuery {
}
}

#[Object]
impl TimeSeriesMutation {
async fn delete_time_series<'ctx>(
&self,
ctx: &Context<'ctx>,
ids: Vec<String>,
) -> Result<String> {
let store = ctx.data::<Database>()?.periodic_time_series_store()?;
for id in ids {
// 각 id에 대한 TimeSeries 데이터 삭제
let prefix = key_prefix(&id);
let mut iter = store.boundary_iter(
&lower_closed_bound_key(&prefix, None),
&upper_open_bound_key(&prefix, None),
rocksdb::Direction::Forward,
);
while let Some(item) = iter.next() {
let (key, _) = item.map_err(|e| format!("failed to read database: {e}"))?;
if store.delete(&key).is_err() {}
}
}
Ok("deleted".to_string())
}
}

#[cfg(test)]
mod tests {
use crate::{graphql::TestSchema, storage::RawEventStore};
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ARG:
<CONFIG> A TOML config file
";

#[allow(clippy::too_many_lines)]
#[tokio::main]
async fn main() -> Result<()> {
let mut settings = if let Some(config_filename) = parse() {
Expand Down
4 changes: 2 additions & 2 deletions src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ where
if last_ts > ts {
continue;
}
if send_bytes(&mut sender, &buf).await.is_err(){
for r_key in channel_remove_keys{
if send_bytes(&mut sender, &buf).await.is_err() {
for r_key in channel_remove_keys {
stream_direct_channel
.write()
.await
Expand Down

0 comments on commit e3dedbf

Please sign in to comment.