Skip to content

Commit

Permalink
Add remove all data with graphQL
Browse files Browse the repository at this point in the history
- Add deleteAll query with protocol(cf name)
- without protocol, delete all data in database

Close: #233
  • Loading branch information
BLYKIM committed Jul 18, 2023
1 parent 03f2dc6 commit 45ca0d9
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 2 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Delete DB by protocol name(column family) to graphQL API
(without protocol will remove all data)

### Changed

- Replaced `lazy_static` with the new `std::sync::OnceLock`.
Expand Down
2 changes: 2 additions & 0 deletions src/graphql.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod delete;
mod export;
mod log;
pub mod network;
Expand Down Expand Up @@ -52,6 +53,7 @@ pub struct Query(
timeseries::TimeSeriesQuery,
status::GigantoStatusQuery,
source::SourceQuery,
delete::DeleteQuery,
);

#[derive(Default, MergedObject)]
Expand Down
135 changes: 135 additions & 0 deletions src/graphql/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use async_graphql::{Context, Object, Result};

use crate::storage::Database;

#[derive(Default)]
pub(super) struct DeleteQuery;

#[Object]
impl DeleteQuery {
#[allow(clippy::unused_async)]
async fn delete_all(&self, ctx: &Context<'_>, protocol: Option<Vec<String>>) -> Result<String> {
let mut cfs = String::from("Delete all data");
if let Some(names) = protocol.clone() {
cfs = format!("Delete {names:?} data");
}
let db = ctx.data::<Database>()?;
db.delete_all(protocol)?;

Ok(cfs)
}
}

#[cfg(test)]
mod tests {
use crate::graphql::TestSchema;
use crate::storage::RawEventStore;
use chrono::{Duration, Utc};
use giganto_client::ingest::network::Conn;
use std::mem;
use std::net::IpAddr;

#[tokio::test]
async fn delete_conn_data() {
let schema = TestSchema::new();
let store = schema.db.conn_store().unwrap();

insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos());

let query = r#"
{
deleteAll(
protocol: ["conn"]
)
}"#;
let res = schema.execute(query).await;

assert_eq!(
res.data.to_string(),
"{deleteAll: \"Delete [\\\"conn\\\"] data\"}"
);

// check data
let query = r#"
{
connRawEvents(
filter: {
source: "src 1"
}
first: 1
) {
edges {
node {
origAddr,
respAddr,
origPort,
}
}
}
}"#;
let res = schema.execute(query).await;
assert_eq!(res.data.to_string(), "{connRawEvents: {edges: []}}");
}

#[tokio::test]
async fn delete_all_data() {
let schema = TestSchema::new();
let store = schema.db.conn_store().unwrap();

insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos());

let query = r#"
{
deleteAll
}"#;
let res = schema.execute(query).await;

assert_eq!(res.data.to_string(), "{deleteAll: \"Delete all data\"}");

// check data
let query = r#"
{
connRawEvents(
filter: {
source: "src 1"
}
first: 1
) {
edges {
node {
origAddr,
respAddr,
origPort,
}
}
}
}"#;
let res = schema.execute(query).await;
assert_eq!(res.data.to_string(), "{connRawEvents: {edges: []}}");
}

fn insert_conn_raw_event(store: &RawEventStore<Conn>, source: &str, timestamp: i64) {
let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::<i64>());
key.extend_from_slice(source.as_bytes());
key.push(0);
key.extend(timestamp.to_be_bytes());

let tmp_dur = Duration::nanoseconds(12345);
let conn_body = Conn {
orig_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
orig_port: 46378,
resp_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
resp_port: 80,
proto: 6,
duration: tmp_dur.num_nanoseconds().unwrap(),
service: "".to_string(),
orig_bytes: 77,
resp_bytes: 295,
orig_pkts: 397,
resp_pkts: 511,
};
let ser_conn_body = bincode::serialize(&conn_body).unwrap();

store.append(&key, &ser_conn_body).unwrap();
}
}
68 changes: 66 additions & 2 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
},
ingest::implement::EventFilter,
};
use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use chrono::{DateTime, NaiveDateTime, Utc};
use giganto_client::ingest::{
log::{Log, Oplog},
Expand All @@ -29,7 +29,7 @@ use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, DBIteratorWithThreadMode, Op
use serde::de::DeserializeOwned;
use std::{cmp, marker::PhantomData, mem, path::Path, sync::Arc, time::Duration};
use tokio::{select, sync::Notify, time};
use tracing::error;
use tracing::{error, info};

const RAW_DATA_COLUMN_FAMILY_NAMES: [&str; 20] = [
"conn",
Expand All @@ -55,6 +55,8 @@ const RAW_DATA_COLUMN_FAMILY_NAMES: [&str; 20] = [
];
const META_DATA_COLUMN_FAMILY_NAMES: [&str; 1] = ["sources"];
const TIMESTAMP_SIZE: usize = 8;
const KEY_MIN: &[u8] = &[0];
const KEY_MAX: &[u8] = &[255];

#[cfg(debug_assertions)]
pub struct CfProperties {
Expand Down Expand Up @@ -355,6 +357,68 @@ impl Database {
.context("cannot access nfs column family")?;
Ok(RawEventStore::new(&self.db, cf))
}
/// Delete all data in given column family names.
pub fn delete_all(&self, cfs: Option<Vec<String>>) -> Result<()> {
if let Some(names) = cfs {
for name in names.clone() {
let str = name.as_str();
if !(RAW_DATA_COLUMN_FAMILY_NAMES.contains(&str)
|| META_DATA_COLUMN_FAMILY_NAMES.contains(&str))
{
bail!("invalid column family name {name}");
}
}

for store in names {
let cf = self
.db
.cf_handle(store.as_str())
.context("cannot access column family {store}")?;

match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) {
Ok(_) => {
info!("{store} db deleted");
self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?;
}
Err(e) => {
error!("delete all error: {e}");
}
}
}
} else {
for store in RAW_DATA_COLUMN_FAMILY_NAMES {
let cf = self
.db
.cf_handle(store)
.context("cannot access column family")?;
match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) {
Ok(_) => {
info!("{store} db deleted");
self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?;
}
Err(e) => {
error!("delete all error: {e}");
}
}
}
for store in META_DATA_COLUMN_FAMILY_NAMES {
let cf = self
.db
.cf_handle(store)
.context("cannot access column family")?;
match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) {
Ok(_) => {
info!("{store} db deleted");
self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?;
}
Err(e) => {
error!("delete all error: {e}");
}
}
}
}
Ok(())
}
}

pub struct RawEventStore<'db, T> {
Expand Down

0 comments on commit 45ca0d9

Please sign in to comment.