diff --git a/CHANGELOG.md b/CHANGELOG.md index f5026c0f..7fcf235d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ file is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- Delete DB by protocol name(column family) to graphQL API (without protocol will remove all data) + ## [0.7.0] - 2023-01-04 ### Added @@ -105,6 +111,7 @@ Versioning](https://semver.org/spec/v2.0.0.html). - Initial release. +[Unreleased]: https://github.com/aicers/giganto/compare/0.7.0...main [0.7.0]: https://github.com/aicers/giganto/compare/0.6.0...0.7.0 [0.6.0]: https://github.com/aicers/giganto/compare/0.5.0...0.6.0 [0.5.0]: https://github.com/aicers/giganto/compare/0.4.0...0.5.0 diff --git a/src/graphql.rs b/src/graphql.rs index f61046cd..7c72ced4 100644 --- a/src/graphql.rs +++ b/src/graphql.rs @@ -1,3 +1,4 @@ +mod delete; mod export; mod log; pub mod network; @@ -31,6 +32,7 @@ pub struct Query( export::ExportQuery, packet::PacketQuery, timeseries::TimeSeriesQuery, + delete::DeleteQuery, ); #[derive(InputObject, Serialize)] diff --git a/src/graphql/delete.rs b/src/graphql/delete.rs new file mode 100644 index 00000000..06370f7e --- /dev/null +++ b/src/graphql/delete.rs @@ -0,0 +1,134 @@ +use async_graphql::{Context, Object, Result}; + +use crate::storage::Database; + +#[derive(Default)] +pub(super) struct DeleteQuery; + +#[Object] +impl DeleteQuery { + #[allow(clippy::unused_async)] + async fn delete_all(&self, ctx: &Context<'_>, protocol: Option>) -> Result { + let mut cfs = String::from("Delete all data"); + if let Some(names) = protocol.clone() { + cfs = format!("Delete {names:?} data"); + } + let db = ctx.data::()?; + db.delete_all(protocol)?; + + Ok(cfs) + } +} + +#[cfg(test)] +mod tests { + use crate::graphql::TestSchema; + use crate::ingestion::Conn; + use crate::storage::RawEventStore; + use chrono::{Duration, Utc}; + use std::mem; + use std::net::IpAddr; + + #[tokio::test] + async fn delete_conn_data() { + let schema = TestSchema::new(); + let store = schema.db.conn_store().unwrap(); + + insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + + let query = r#" + { + deleteAll( + protocol: ["conn"] + ) + }"#; + let res = schema.execute(query).await; + + assert_eq!( + res.data.to_string(), + "{deleteAll: \"Delete [\\\"conn\\\"] data\"}" + ); + + // check data + let query = r#" + { + connRawEvents( + filter: { + source: "src 1" + } + first: 1 + ) { + edges { + node { + origAddr, + respAddr, + origPort, + } + } + } + }"#; + let res = schema.execute(query).await; + assert_eq!(res.data.to_string(), "{connRawEvents: {edges: []}}"); + } + + #[tokio::test] + async fn delete_all_data() { + let schema = TestSchema::new(); + let store = schema.db.conn_store().unwrap(); + + insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos()); + + let query = r#" + { + deleteAll + }"#; + let res = schema.execute(query).await; + + assert_eq!(res.data.to_string(), "{deleteAll: \"Delete all data\"}"); + + // check data + let query = r#" + { + connRawEvents( + filter: { + source: "src 1" + } + first: 1 + ) { + edges { + node { + origAddr, + respAddr, + origPort, + } + } + } + }"#; + let res = schema.execute(query).await; + assert_eq!(res.data.to_string(), "{connRawEvents: {edges: []}}"); + } + + fn insert_conn_raw_event(store: &RawEventStore, source: &str, timestamp: i64) { + let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::()); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend(timestamp.to_be_bytes()); + + let tmp_dur = Duration::nanoseconds(12345); + let conn_body = Conn { + orig_addr: "192.168.4.76".parse::().unwrap(), + resp_addr: "192.168.4.76".parse::().unwrap(), + orig_port: 46378, + resp_port: 80, + proto: 6, + duration: tmp_dur.num_nanoseconds().unwrap(), + orig_bytes: 77, + resp_bytes: 295, + orig_pkts: 397, + resp_pkts: 511, + }; + let ser_conn_body = bincode::serialize(&conn_body).unwrap(); + + store.append(&key, &ser_conn_body).unwrap(); + } +} diff --git a/src/storage.rs b/src/storage.rs index 3f5f75a3..79e1cdb4 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -3,14 +3,14 @@ use crate::{ graphql::{network::NetworkFilter, RawEventFilter}, ingestion::{self, EventFilter}, }; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use chrono::{DateTime, NaiveDateTime, Utc}; pub use rocksdb::Direction; use rocksdb::{ColumnFamily, DBIteratorWithThreadMode, Options, DB}; use serde::de::DeserializeOwned; use std::{cmp, marker::PhantomData, mem, path::Path, sync::Arc, time::Duration}; use tokio::time; -use tracing::error; +use tracing::{error, info}; const RAW_DATA_COLUMN_FAMILY_NAMES: [&str; 13] = [ "conn", @@ -29,6 +29,8 @@ const RAW_DATA_COLUMN_FAMILY_NAMES: [&str; 13] = [ ]; const META_DATA_COLUMN_FAMILY_NAMES: [&str; 1] = ["sources"]; const TIMESTAMP_SIZE: usize = 8; +const KEY_MIN: &[u8] = &[0]; +const KEY_MAX: &[u8] = &[255]; #[derive(Clone)] pub struct Database { @@ -194,6 +196,70 @@ impl Database { .context("cannot access operation log column family")?; Ok(RawEventStore::new(&self.db, cf)) } + + /// Delete all data in given column family names. + pub fn delete_all(&self, cfs: Option>) -> Result<()> { + if let Some(names) = cfs { + for name in names.clone() { + let str = name.as_str(); + if !(RAW_DATA_COLUMN_FAMILY_NAMES.contains(&str) + || META_DATA_COLUMN_FAMILY_NAMES.contains(&str)) + { + bail!("invalid column family name {name}"); + } + } + + for store in names { + let cf = self + .db + .cf_handle(store.as_str()) + .context("cannot access column family {store}")?; + + match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) { + Ok(_) => { + info!("{store} db deleted"); + self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?; + } + Err(e) => { + error!("delete all error: {e}"); + } + } + } + } else { + for store in RAW_DATA_COLUMN_FAMILY_NAMES { + let cf = self + .db + .cf_handle(store) + .context("cannot access column family")?; + match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) { + Ok(_) => { + info!("{store} db deleted"); + self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?; + } + Err(e) => { + error!("delete all error: {e}"); + } + } + } + for store in META_DATA_COLUMN_FAMILY_NAMES { + let cf = self + .db + .cf_handle(store) + .context("cannot access column family")?; + match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) { + Ok(_) => { + info!("{store} db deleted"); + self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?; + } + Err(e) => { + error!("delete all error: {e}"); + } + } + } + } + + Ok(()) + } } pub struct RawEventStore<'db, T> {