diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cd5ac9..a73182d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added + +- Delete DB by protocol name(column family) to graphQL API + (without protocol will remove all data) + ### Changed - Replaced `lazy_static` with the new `std::sync::OnceLock`. diff --git a/src/graphql.rs b/src/graphql.rs index 031f4ad..e02004b 100644 --- a/src/graphql.rs +++ b/src/graphql.rs @@ -1,3 +1,4 @@ +mod delete; mod export; mod log; pub mod network; @@ -52,6 +53,7 @@ pub struct Query( timeseries::TimeSeriesQuery, status::GigantoStatusQuery, source::SourceQuery, + delete::DeleteQuery, ); #[derive(Default, MergedObject)] diff --git a/src/graphql/delete.rs b/src/graphql/delete.rs new file mode 100644 index 0000000..2bdd3c0 --- /dev/null +++ b/src/graphql/delete.rs @@ -0,0 +1,135 @@ +use async_graphql::{Context, Object, Result}; + +use crate::storage::Database; + +#[derive(Default)] +pub(super) struct DeleteQuery; + +#[Object] +impl DeleteQuery { + #[allow(clippy::unused_async)] + async fn delete_all(&self, ctx: &Context<'_>, protocol: Option>) -> Result { + let mut cfs = String::from("Delete all data"); + if let Some(names) = protocol.clone() { + cfs = format!("Delete {names:?} data"); + } + let db = ctx.data::()?; + db.delete_all(protocol)?; + + Ok(cfs) + } +} + +#[cfg(test)] +mod tests { + use crate::graphql::TestSchema; + use crate::storage::RawEventStore; + use chrono::{Duration, Utc}; + use giganto_client::ingest::network::Conn; + use std::mem; + use std::net::IpAddr; + + #[tokio::test] + async fn delete_conn_data() { + let schema = TestSchema::new(); + let store = schema.db.conn_store().unwrap(); + + insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + + let query = r#" + { + deleteAll( + protocol: ["conn"] + ) + }"#; + let res = schema.execute(query).await; + + assert_eq!( + res.data.to_string(), + "{deleteAll: \"Delete [\\\"conn\\\"] data\"}" + ); + + // check data + let query = r#" + { + connRawEvents( + filter: { + source: "src 1" + } + first: 1 + ) { + edges { + node { + origAddr, + respAddr, + origPort, + } + } + } + }"#; + let res = schema.execute(query).await; + assert_eq!(res.data.to_string(), "{connRawEvents: {edges: []}}"); + } + + #[tokio::test] + async fn delete_all_data() { + let schema = TestSchema::new(); + let store = schema.db.conn_store().unwrap(); + + insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + + let query = r#" + { + deleteAll + }"#; + let res = schema.execute(query).await; + + assert_eq!(res.data.to_string(), "{deleteAll: \"Delete all data\"}"); + + // check data + let query = r#" + { + connRawEvents( + filter: { + source: "src 1" + } + first: 1 + ) { + edges { + node { + origAddr, + respAddr, + origPort, + } + } + } + }"#; + let res = schema.execute(query).await; + assert_eq!(res.data.to_string(), "{connRawEvents: {edges: []}}"); + } + + fn insert_conn_raw_event(store: &RawEventStore, source: &str, timestamp: i64) { + let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::()); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend(timestamp.to_be_bytes()); + + let tmp_dur = Duration::nanoseconds(12345); + let conn_body = Conn { + orig_addr: "192.168.4.76".parse::().unwrap(), + orig_port: 46378, + resp_addr: "192.168.4.76".parse::().unwrap(), + resp_port: 80, + proto: 6, + duration: tmp_dur.num_nanoseconds().unwrap(), + service: "".to_string(), + orig_bytes: 77, + resp_bytes: 295, + orig_pkts: 397, + resp_pkts: 511, + }; + let ser_conn_body = bincode::serialize(&conn_body).unwrap(); + + store.append(&key, &ser_conn_body).unwrap(); + } +} diff --git a/src/storage.rs b/src/storage.rs index 7c31140..90013f8 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -10,7 +10,7 @@ use crate::{ }, ingest::implement::EventFilter, }; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use chrono::{DateTime, NaiveDateTime, Utc}; use giganto_client::ingest::{ log::{Log, Oplog}, @@ -29,7 +29,7 @@ use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, DBIteratorWithThreadMode, Op use serde::de::DeserializeOwned; use std::{cmp, marker::PhantomData, mem, path::Path, sync::Arc, time::Duration}; use tokio::{select, sync::Notify, time}; -use tracing::error; +use tracing::{error, info}; const RAW_DATA_COLUMN_FAMILY_NAMES: [&str; 20] = [ "conn", @@ -55,6 +55,8 @@ const RAW_DATA_COLUMN_FAMILY_NAMES: [&str; 20] = [ ]; const META_DATA_COLUMN_FAMILY_NAMES: [&str; 1] = ["sources"]; const TIMESTAMP_SIZE: usize = 8; +const KEY_MIN: &[u8] = &[0]; +const KEY_MAX: &[u8] = &[255]; #[cfg(debug_assertions)] pub struct CfProperties { @@ -355,6 +357,68 @@ impl Database { .context("cannot access nfs column family")?; Ok(RawEventStore::new(&self.db, cf)) } + /// Delete all data in given column family names. + pub fn delete_all(&self, cfs: Option>) -> Result<()> { + if let Some(names) = cfs { + for name in names.clone() { + let str = name.as_str(); + if !(RAW_DATA_COLUMN_FAMILY_NAMES.contains(&str) + || META_DATA_COLUMN_FAMILY_NAMES.contains(&str)) + { + bail!("invalid column family name {name}"); + } + } + + for store in names { + let cf = self + .db + .cf_handle(store.as_str()) + .context("cannot access column family {store}")?; + + match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) { + Ok(_) => { + info!("{store} db deleted"); + self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?; + } + Err(e) => { + error!("delete all error: {e}"); + } + } + } + } else { + for store in RAW_DATA_COLUMN_FAMILY_NAMES { + let cf = self + .db + .cf_handle(store) + .context("cannot access column family")?; + match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) { + Ok(_) => { + info!("{store} db deleted"); + self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?; + } + Err(e) => { + error!("delete all error: {e}"); + } + } + } + for store in META_DATA_COLUMN_FAMILY_NAMES { + let cf = self + .db + .cf_handle(store) + .context("cannot access column family")?; + match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) { + Ok(_) => { + info!("{store} db deleted"); + self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?; + } + Err(e) => { + error!("delete all error: {e}"); + } + } + } + } + Ok(()) + } } pub struct RawEventStore<'db, T> {