Skip to content

Commit

Permalink
Add remove all data with graphQL
Browse files Browse the repository at this point in the history
- Add deleteAll query with protocol(cf name)
- without protocol, delete all data in database

Close: #233
  • Loading branch information
BLYKIM committed Feb 6, 2023
1 parent e55ca74 commit 53ad3e3
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Versioning](https://semver.org/spec/v2.0.0.html).
- Publish support Packet request/response through QUIC.
- Add Packet store.
- Add GraphQL API for Packet.
- Delete DB by protocol name(column family) to graphQL API
(without protocol will remove all data)

### Changed

Expand Down
2 changes: 2 additions & 0 deletions src/graphql.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod delete;
mod export;
mod log;
pub mod network;
Expand Down Expand Up @@ -31,6 +32,7 @@ pub struct Query(
export::ExportQuery,
packet::PacketQuery,
timeseries::TimeSeriesQuery,
delete::DeleteQuery,
);

#[derive(InputObject, Serialize)]
Expand Down
135 changes: 135 additions & 0 deletions src/graphql/delete.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use async_graphql::{Context, Object, Result};

use crate::storage::Database;

#[derive(Default)]
pub(super) struct DeleteQuery;

#[Object]
impl DeleteQuery {
#[allow(clippy::unused_async)]
async fn delete_all(&self, ctx: &Context<'_>, protocol: Option<Vec<String>>) -> Result<String> {
let mut cfs = String::from("Delete all data");
if let Some(names) = protocol.clone() {
cfs = format!("Delete {names:?} data");
}
let db = ctx.data::<Database>()?;
db.delete_all(protocol)?;

Ok(cfs)
}
}

#[cfg(test)]
mod tests {
use crate::graphql::TestSchema;
use crate::storage::RawEventStore;
use chrono::{Duration, Utc};
use giganto_client::ingest::network::Conn;
use std::mem;
use std::net::IpAddr;

#[tokio::test]
async fn delete_conn_data() {
let schema = TestSchema::new();
let store = schema.db.conn_store().unwrap();

insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos());

let query = r#"
{
deleteAll(
protocol: ["conn"]
)
}"#;
let res = schema.execute(query).await;

assert_eq!(
res.data.to_string(),
"{deleteAll: \"Delete [\\\"conn\\\"] data\"}"
);

// check data
let query = r#"
{
connRawEvents(
filter: {
source: "src 1"
}
first: 1
) {
edges {
node {
origAddr,
respAddr,
origPort,
}
}
}
}"#;
let res = schema.execute(query).await;
assert_eq!(res.data.to_string(), "{connRawEvents: {edges: []}}");
}

#[tokio::test]
async fn delete_all_data() {
let schema = TestSchema::new();
let store = schema.db.conn_store().unwrap();

insert_conn_raw_event(&store, "src 1", Utc::now().timestamp_nanos());

let query = r#"
{
deleteAll
}"#;
let res = schema.execute(query).await;

assert_eq!(res.data.to_string(), "{deleteAll: \"Delete all data\"}");

// check data
let query = r#"
{
connRawEvents(
filter: {
source: "src 1"
}
first: 1
) {
edges {
node {
origAddr,
respAddr,
origPort,
}
}
}
}"#;
let res = schema.execute(query).await;
assert_eq!(res.data.to_string(), "{connRawEvents: {edges: []}}");
}

fn insert_conn_raw_event(store: &RawEventStore<Conn>, source: &str, timestamp: i64) {
let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::<i64>());
key.extend_from_slice(source.as_bytes());
key.push(0);
key.extend(timestamp.to_be_bytes());

let tmp_dur = Duration::nanoseconds(12345);
let conn_body = Conn {
orig_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
orig_port: 46378,
resp_addr: "192.168.4.76".parse::<IpAddr>().unwrap(),
resp_port: 80,
proto: 6,
duration: tmp_dur.num_nanoseconds().unwrap(),
service: "".to_string(),
orig_bytes: 77,
resp_bytes: 295,
orig_pkts: 397,
resp_pkts: 511,
};
let ser_conn_body = bincode::serialize(&conn_body).unwrap();

store.append(&key, &ser_conn_body).unwrap();
}
}
69 changes: 67 additions & 2 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
graphql::{network::NetworkFilter, RawEventFilter},
ingest::implement::EventFilter,
};
use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use chrono::{DateTime, NaiveDateTime, Utc};
use giganto_client::ingest::{
log::{Log, Oplog},
Expand All @@ -17,7 +17,7 @@ use rocksdb::{ColumnFamily, DBIteratorWithThreadMode, Options, DB};
use serde::de::DeserializeOwned;
use std::{cmp, marker::PhantomData, mem, path::Path, sync::Arc, time::Duration};
use tokio::time;
use tracing::error;
use tracing::{error, info};

const RAW_DATA_COLUMN_FAMILY_NAMES: [&str; 14] = [
"conn",
Expand All @@ -37,6 +37,8 @@ const RAW_DATA_COLUMN_FAMILY_NAMES: [&str; 14] = [
];
const META_DATA_COLUMN_FAMILY_NAMES: [&str; 1] = ["sources"];
const TIMESTAMP_SIZE: usize = 8;
const KEY_MIN: &[u8] = &[0];
const KEY_MAX: &[u8] = &[255];

#[derive(Clone)]
pub struct Database {
Expand Down Expand Up @@ -210,6 +212,69 @@ impl Database {
.context("cannot access sources column family")?;
Ok(SourceStore { db: &self.db, cf })
}

/// Delete all data in given column family names.
pub fn delete_all(&self, cfs: Option<Vec<String>>) -> Result<()> {
if let Some(names) = cfs {
for name in names.clone() {
let str = name.as_str();
if !(RAW_DATA_COLUMN_FAMILY_NAMES.contains(&str)
|| META_DATA_COLUMN_FAMILY_NAMES.contains(&str))
{
bail!("invalid column family name {name}");
}
}

for store in names {
let cf = self
.db
.cf_handle(store.as_str())
.context("cannot access column family {store}")?;

match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) {
Ok(_) => {
info!("{store} db deleted");
self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?;
}
Err(e) => {
error!("delete all error: {e}");
}
}
}
} else {
for store in RAW_DATA_COLUMN_FAMILY_NAMES {
let cf = self
.db
.cf_handle(store)
.context("cannot access column family")?;
match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) {
Ok(_) => {
info!("{store} db deleted");
self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?;
}
Err(e) => {
error!("delete all error: {e}");
}
}
}
for store in META_DATA_COLUMN_FAMILY_NAMES {
let cf = self
.db
.cf_handle(store)
.context("cannot access column family")?;
match self.db.delete_range_cf(cf, KEY_MIN, KEY_MAX) {
Ok(_) => {
info!("{store} db deleted");
self.db.delete_file_in_range_cf(cf, KEY_MIN, KEY_MAX)?;
}
Err(e) => {
error!("delete all error: {e}");
}
}
}
}
Ok(())
}
}

pub struct RawEventStore<'db, T> {
Expand Down

0 comments on commit 53ad3e3

Please sign in to comment.