diff --git a/CHANGELOG.md b/CHANGELOG.md index 203c1a29..7cb64009 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Versioning](https://semver.org/spec/v2.0.0.html). ### Added +- Add export file to GraphQL API. (`csv`, `json` format support) - Add `Statistics` column family. Receive and save traffic statistics from Piglet. - Save Giganto's `syslog` to a path written to `log_dir` in configuration file. - Add `Operationlog` diff --git a/Cargo.lock b/Cargo.lock index 7ce71e3a..79ab9206 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -718,6 +718,7 @@ dependencies = [ "rustls-pemfile 1.0.1", "semver", "serde", + "serde_json", "tempfile", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 27090268..e2397711 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ rustls = "0.20" rustls-pemfile = "1.0" semver = "1.0" serde = { version = "1", features = ["derive"] } +serde_json = "1" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tracing = "0.1" tracing-appender ="0.2" diff --git a/README.md b/README.md index 4d3b2aa2..8fe7fc2b 100644 --- a/README.md +++ b/README.md @@ -20,10 +20,12 @@ In the config file, you can specify the following options: key = "key.pem" # path to private key file cert = "cert.pem" # path to certificate file roots = ["ca1.pem", "ca2.pem", "ca3.pem"] # paths to CA certificate files -ingestion_address = "0.0.0.0:38370" # address to listen for QUIC connections +ingestion_address = "0.0.0.0:38370" # address to listen for ingestion QUIC +publish_address = "0.0.0.0:38371" # address to listen for publish QUIC data_dir = "tests/data" # path to directory to store data retention = "100d" # retention period for data log_dir = "tests/logs/apps" # path to giganto's syslog file +export_dir = "tests/export" # path to giganto's export file ``` By default, giganto reads the config file from the following directories: diff --git a/src/graphql.rs b/src/graphql.rs index 4392fb30..f61046cd 100644 --- a/src/graphql.rs +++ b/src/graphql.rs @@ -1,3 +1,4 @@ +mod export; mod log; pub mod network; mod packet; @@ -17,7 +18,7 @@ use async_graphql::{ }; use chrono::{DateTime, TimeZone, Utc}; use serde::{de::DeserializeOwned, Serialize}; -use std::net::IpAddr; +use std::{net::IpAddr, path::PathBuf}; use self::network::NetworkFilter; @@ -27,6 +28,7 @@ pub const TIMESTAMP_SIZE: usize = 8; pub struct Query( log::LogQuery, network::NetworkQuery, + export::ExportQuery, packet::PacketQuery, timeseries::TimeSeriesQuery, ); @@ -58,10 +60,11 @@ pub trait FromKeyValue: Sized { pub type Schema = async_graphql::Schema; type ConnArgs = (Vec<(Box<[u8]>, T)>, bool, bool); -pub fn schema(database: Database, packet_sources: PacketSources) -> Schema { +pub fn schema(database: Database, packet_sources: PacketSources, export_path: PathBuf) -> Schema { Schema::build(Query::default(), EmptyMutation, EmptySubscription) .data(database) .data(packet_sources) + .data(export_path) .finish() } @@ -339,7 +342,8 @@ impl TestSchema { let db_dir = tempfile::tempdir().unwrap(); let db = Database::open(db_dir.path()).unwrap(); let packet_sources = Arc::new(RwLock::new(HashMap::new())); - let schema = schema(db.clone(), packet_sources); + let export_dir = tempfile::tempdir().unwrap(); + let schema = schema(db.clone(), packet_sources, export_dir.path().to_path_buf()); Self { _dir: db_dir, db, diff --git a/src/graphql/export.rs b/src/graphql/export.rs new file mode 100644 index 00000000..a99fe42e --- /dev/null +++ b/src/graphql/export.rs @@ -0,0 +1,1862 @@ +use std::{ + borrow::Cow, + fmt::Display, + fs::{self, File}, + net::IpAddr, + path::{Path, PathBuf}, +}; + +use super::{ + network::{IpRange, PortRange}, + RawEventFilter, TimeRange, +}; +use crate::{ + ingestion::{ + Conn, DceRpc, Dns, EventFilter, Http, Kerberos, Log, Ntlm, PeriodicTimeSeries, Rdp, Smtp, + Ssh, + }, + publish::convert_time_format, + storage::{lower_closed_bound_key, upper_open_bound_key, Database, RawEventStore}, +}; +use anyhow::anyhow; +use async_graphql::{Context, InputObject, Object, Result}; +use chrono::{DateTime, Local, Utc}; +use rocksdb::Direction; +use serde::{de::DeserializeOwned, Serialize}; +use std::io::Write; +use tracing::{error, info}; + +#[derive(Default)] +pub(super) struct ExportQuery; + +#[derive(Serialize, Debug)] +struct ConnJsonOutput { + timestamp: String, + source: String, + orig_addr: String, + resp_addr: String, + orig_port: u16, + resp_port: u16, + proto: u8, + duration: i64, + orig_bytes: u64, + resp_bytes: u64, + orig_pkts: u64, + resp_pkts: u64, +} + +#[derive(Serialize, Debug)] +struct DnsJsonOutput { + timestamp: String, + source: String, + orig_addr: String, + resp_addr: String, + orig_port: u16, + resp_port: u16, + proto: u8, + query: String, + answer: Vec, +} + +#[derive(Serialize, Debug)] +struct HttpJsonOutput { + timestamp: String, + source: String, + orig_addr: String, + resp_addr: String, + orig_port: u16, + resp_port: u16, + method: String, + host: String, + uri: String, + referrer: String, + user_agent: String, + status_code: u16, +} + +#[derive(Serialize, Debug)] +struct RdpJsonOutput { + timestamp: String, + source: String, + orig_addr: String, + resp_addr: String, + orig_port: u16, + resp_port: u16, + cookie: String, +} + +#[derive(Serialize, Debug)] +struct SmtpJsonOutput { + timestamp: String, + source: String, + orig_addr: String, + resp_addr: String, + orig_port: u16, + resp_port: u16, + mailfrom: String, + date: String, + from: String, + to: String, + subject: String, + agent: String, +} + +#[derive(Serialize, Debug)] +struct NtlmJsonOutput { + timestamp: String, + source: String, + orig_addr: String, + resp_addr: String, + orig_port: u16, + resp_port: u16, + username: String, + hostname: String, + domainname: String, + server_nb_computer_name: String, + server_dns_computer_name: String, + server_tree_name: String, + success: String, +} + +#[derive(Serialize, Debug)] +struct KerberosJsonOutput { + timestamp: String, + source: String, + orig_addr: String, + resp_addr: String, + orig_port: u16, + resp_port: u16, + request_type: String, + client: String, + service: String, + success: String, + error_msg: String, + from: i64, + till: i64, + cipher: String, + forwardable: String, + renewable: String, + client_cert_subject: String, + server_cert_subject: String, +} + +#[derive(Serialize, Debug)] +struct SshJsonOutput { + timestamp: String, + source: String, + orig_addr: String, + resp_addr: String, + orig_port: u16, + resp_port: u16, + version: i64, + auth_success: String, + auth_attempts: i64, + direction: String, + client: String, + server: String, + cipher_alg: String, + mac_alg: String, + compression_alg: String, + kex_alg: String, + host_key_alg: String, + host_key: String, +} + +#[derive(Serialize, Debug)] +struct DceRpcJsonOutput { + timestamp: String, + source: String, + orig_addr: String, + resp_addr: String, + orig_port: u16, + resp_port: u16, + rtt: i64, + named_pipe: String, + endpoint: String, + operation: String, +} + +#[derive(Serialize, Debug)] +struct LogJsonOutput { + timestamp: String, + source: String, + kind: String, + log: String, +} + +#[derive(Serialize, Debug)] +struct TimeSeriesJsonOutput { + start: String, + id: String, + data: Vec, +} + +pub trait JsonOutput: Sized { + fn convert_json_output(&self, timestamp: String, source: String) -> Result; +} + +macro_rules! convert_json_output { + ($to:ident, $from:ty, $($fields:ident),*) => { + impl JsonOutput<$to> for $from { + fn convert_json_output(&self, timestamp:String, source:String) -> Result<$to> { + Ok($to { + timestamp, + source, + orig_addr: self.orig_addr.to_string(), + resp_addr: self.resp_addr.to_string(), + orig_port: self.orig_port, + resp_port: self.resp_port, + $( + $fields: self.$fields.clone(), + )* + }) + } + } + }; +} + +convert_json_output!( + ConnJsonOutput, + Conn, + proto, + duration, + orig_bytes, + resp_bytes, + orig_pkts, + resp_pkts +); + +convert_json_output!( + HttpJsonOutput, + Http, + method, + host, + uri, + referrer, + user_agent, + status_code +); + +convert_json_output!(RdpJsonOutput, Rdp, cookie); + +convert_json_output!(DnsJsonOutput, Dns, proto, query, answer); + +convert_json_output!( + SmtpJsonOutput, + Smtp, + mailfrom, + date, + from, + to, + subject, + agent +); + +convert_json_output!( + NtlmJsonOutput, + Ntlm, + username, + hostname, + domainname, + server_nb_computer_name, + server_dns_computer_name, + server_tree_name, + success +); + +convert_json_output!( + KerberosJsonOutput, + Kerberos, + request_type, + client, + service, + success, + error_msg, + from, + till, + cipher, + forwardable, + renewable, + client_cert_subject, + server_cert_subject +); + +convert_json_output!( + SshJsonOutput, + Ssh, + version, + auth_success, + auth_attempts, + direction, + client, + server, + cipher_alg, + mac_alg, + compression_alg, + kex_alg, + host_key_alg, + host_key +); + +convert_json_output!( + DceRpcJsonOutput, + DceRpc, + rtt, + named_pipe, + endpoint, + operation +); + +impl JsonOutput for Log { + fn convert_json_output(&self, timestamp: String, source: String) -> Result { + Ok(LogJsonOutput { + timestamp, + source, + kind: self.kind.clone(), + log: String::from_utf8_lossy(&self.log).to_string(), + }) + } +} + +impl JsonOutput for PeriodicTimeSeries { + fn convert_json_output( + &self, + timestamp: String, + source: String, + ) -> Result { + Ok(TimeSeriesJsonOutput { + start: timestamp, + id: source, + data: self.data.clone(), + }) + } +} + +#[allow(clippy::module_name_repetitions)] +#[derive(InputObject, Serialize)] +pub struct ExportFilter { + protocol: String, + source_id: String, + kind: Option, + time: Option, + orig_addr: Option, + resp_addr: Option, + orig_port: Option, + resp_port: Option, +} + +impl RawEventFilter for ExportFilter { + fn time(&self) -> (Option>, Option>) { + if let Some(time) = &self.time { + (time.start, time.end) + } else { + (None, None) + } + } + + fn check( + &self, + orig_addr: Option, + resp_addr: Option, + orig_port: Option, + resp_port: Option, + _log_level: Option, + _log_contents: Option, + ) -> Result { + if let Some(ip_range) = &self.orig_addr { + if let Some(orig_addr) = orig_addr { + let end = if let Some(end) = &ip_range.end { + orig_addr >= end.parse::()? + } else { + false + }; + + let start = if let Some(start) = &ip_range.start { + orig_addr < start.parse::()? + } else { + false + }; + if end || start { + return Ok(false); + }; + } + } + if let Some(ip_range) = &self.resp_addr { + if let Some(resp_addr) = resp_addr { + let end = if let Some(end) = &ip_range.end { + resp_addr >= end.parse::()? + } else { + false + }; + + let start = if let Some(start) = &ip_range.start { + resp_addr < start.parse::()? + } else { + false + }; + if end || start { + return Ok(false); + }; + } + } + if let Some(port_range) = &self.orig_port { + if let Some(orig_port) = orig_port { + let end = if let Some(end) = port_range.end { + orig_port >= end + } else { + false + }; + let start = if let Some(start) = port_range.start { + orig_port < start + } else { + false + }; + if end || start { + return Ok(false); + }; + } + } + if let Some(port_range) = &self.resp_port { + if let Some(resp_port) = resp_port { + let end = if let Some(end) = port_range.end { + resp_port >= end + } else { + false + }; + let start = if let Some(start) = port_range.start { + resp_port < start + } else { + false + }; + if end || start { + return Ok(false); + }; + } + } + Ok(true) + } +} + +#[Object] +impl ExportQuery { + #[allow(clippy::unused_async)] + async fn export( + &self, + ctx: &Context<'_>, + export_type: String, + filter: ExportFilter, + ) -> Result { + if filter.protocol == "log" || filter.protocol == "periodic time series" { + // check log/time_series protocol filter format + if filter.orig_addr.is_some() + || filter.resp_addr.is_some() + || filter.orig_port.is_some() + || filter.resp_port.is_some() + { + return Err(anyhow!("Invalid id/port input").into()); + } + } else { + // check network protocol filter format + if filter.kind.is_some() { + return Err(anyhow!("Invalid kind input").into()); + } + } + + // check export file type + if !(export_type.eq("csv") || export_type.eq("json")) { + return Err(anyhow!("Invalid export file format").into()); + } + + let db = ctx.data::()?; + let path = ctx.data::()?; + let key_prefix = export_key_prefix(&filter.source_id, &filter.kind); + + // set export file path + if !path.exists() { + fs::create_dir_all(path)?; + } + let filename = format!( + "{}_{}.dump", + &filter.protocol, + Local::now().format("%Y%m%d_%H%M%S"), + ); + let export_path = path.join(filename.replace(' ', "")); + let download_path = export_path.display().to_string(); + + export_by_protocol(db.clone(), key_prefix, filter, export_type, export_path)?; + + Ok(download_path) + } +} + +#[allow(clippy::too_many_lines)] +fn export_by_protocol( + db: Database, + key_prefix: Vec, + filter: ExportFilter, + export_type: String, + export_path: PathBuf, +) -> Result<()> { + match filter.protocol.as_str() { + "conn" => tokio::spawn(async move { + if let Ok(store) = db.conn_store() { + match process_export(&store, &key_prefix, &filter, &export_type, &export_path) { + Ok(result) => { + info!("{}", result); + } + Err(e) => { + error!("Failed to export file: {:?}", e); + } + } + } else { + error!("Failed to open db store"); + } + }), + "dns" => tokio::spawn(async move { + if let Ok(store) = db.dns_store() { + match process_export(&store, &key_prefix, &filter, &export_type, &export_path) { + Ok(result) => { + info!("{}", result); + } + Err(e) => { + error!("Failed to export file: {:?}", e); + } + } + } else { + error!("Failed to open db store"); + } + }), + "http" => tokio::spawn(async move { + if let Ok(store) = db.http_store() { + match process_export(&store, &key_prefix, &filter, &export_type, &export_path) { + Ok(result) => { + info!("{}", result); + } + Err(e) => { + error!("Failed to export file: {:?}", e); + } + } + } else { + error!("Failed to open db store"); + } + }), + "log" => tokio::spawn(async move { + if let Ok(store) = db.log_store() { + match process_export(&store, &key_prefix, &filter, &export_type, &export_path) { + Ok(result) => { + info!("{}", result); + } + Err(e) => { + error!("Failed to export file: {:?}", e); + } + } + } else { + error!("Failed to open db store"); + } + }), + "rdp" => tokio::spawn(async move { + if let Ok(store) = db.rdp_store() { + match process_export(&store, &key_prefix, &filter, &export_type, &export_path) { + Ok(result) => { + info!("{}", result); + } + Err(e) => { + error!("Failed to export file: {:?}", e); + } + } + } else { + error!("Failed to open db store"); + } + }), + "smtp" => tokio::spawn(async move { + if let Ok(store) = db.smtp_store() { + match process_export(&store, &key_prefix, &filter, &export_type, &export_path) { + Ok(result) => { + info!("{}", result); + } + Err(e) => { + error!("Failed to export file: {:?}", e); + } + } + } else { + error!("Failed to open db store"); + } + }), + "periodic time series" => tokio::spawn(async move { + if let Ok(store) = db.periodic_time_series_store() { + match process_export(&store, &key_prefix, &filter, &export_type, &export_path) { + Ok(result) => { + info!("{}", result); + } + Err(e) => { + error!("Failed to export file: {:?}", e); + } + } + } else { + error!("Failed to open db store"); + } + }), + "ntlm" => tokio::spawn(async move { + if let Ok(store) = db.ntlm_store() { + match process_export(&store, &key_prefix, &filter, &export_type, &export_path) { + Ok(result) => { + info!("{}", result); + } + Err(e) => { + error!("Failed to export file: {:?}", e); + } + } + } else { + error!("Failed to open db store"); + } + }), + "kerberos" => tokio::spawn(async move { + if let Ok(store) = db.kerberos_store() { + match process_export(&store, &key_prefix, &filter, &export_type, &export_path) { + Ok(result) => { + info!("{}", result); + } + Err(e) => { + error!("Failed to export file: {:?}", e); + } + } + } else { + error!("Failed to open db store"); + } + }), + "ssh" => tokio::spawn(async move { + if let Ok(store) = db.ssh_store() { + match process_export(&store, &key_prefix, &filter, &export_type, &export_path) { + Ok(result) => { + info!("{}", result); + } + Err(e) => { + error!("Failed to export file: {:?}", e); + } + } + } else { + error!("Failed to open db store"); + } + }), + "dce rpc" => tokio::spawn(async move { + if let Ok(store) = db.dce_rpc_store() { + match process_export(&store, &key_prefix, &filter, &export_type, &export_path) { + Ok(result) => { + info!("{}", result); + } + Err(e) => { + error!("Failed to export file: {:?}", e); + } + } + } else { + error!("Failed to open db store"); + } + }), + none => { + return Err(anyhow!("{}: Unknown protocol", none).into()); + } + }; + Ok(()) +} + +fn process_export<'c, T, N>( + store: &RawEventStore<'c, T>, + key_prefix: &[u8], + filter: &impl RawEventFilter, + export_type: &str, + export_path: &Path, +) -> Result +where + T: DeserializeOwned + Display + EventFilter + JsonOutput + Send + Serialize, + N: Serialize, +{ + let (start, end) = filter.time(); + let iter = store.boundary_iter( + &lower_closed_bound_key(key_prefix, start), + &upper_open_bound_key(key_prefix, end), + Direction::Forward, + ); + export_file(iter, filter, export_type, export_path) +} + +fn export_file( + iter: I, + filter: &impl RawEventFilter, + export_type: &str, + path: &Path, +) -> Result +where + I: Iterator, T)>> + Send, + T: Display + EventFilter + JsonOutput + Serialize, + N: Serialize, +{ + // export file open + let mut writer = File::create(path)?; + + // check filter condition & write file + for item in iter { + let (key, value) = item.map_err(|e| format!("Failed to read database: {e}"))?; + match filter.check( + value.orig_addr(), + value.resp_addr(), + value.orig_port(), + value.resp_port(), + value.log_level(), + value.log_contents(), + ) { + Ok(true) => { + let (source, timestamp) = parse_key(&key)?; + match export_type { + "csv" => { + writeln!( + writer, + "{}", + format_args!( + "{}\t{}\t{}", + convert_time_format(timestamp), + source, + value + ) + )?; + } + "json" => { + let json_data = value.convert_json_output( + convert_time_format(timestamp), + source.to_string(), + )?; + let json_data = serde_json::to_string(&json_data)?; + writeln!(writer, "{json_data}")?; + } + _ => {} + } + } + Ok(false) | Err(_) => {} + } + } + Ok(format!("export file success: {path:?}")) +} + +fn export_key_prefix(source_id: &str, kind: &Option) -> Vec { + let mut prefix = Vec::new(); + prefix.extend_from_slice(source_id.as_bytes()); + prefix.push(0); + if let Some(kind_val) = kind { + prefix.extend_from_slice(kind_val.as_bytes()); + prefix.push(0); + } + prefix +} + +fn parse_key(key: &[u8]) -> anyhow::Result<(Cow, i64)> { + if let Some(pos) = key.iter().position(|x| *x == 0) { + if let Some(s) = key.get(..pos) { + let source = String::from_utf8_lossy(s); + if let Some(t) = key.get(key.len() - 8..) { + let timestamp = i64::from_be_bytes(t.try_into()?); + return Ok((source, timestamp)); + }; + } + } + Err(anyhow!("Invalid key")) +} + +#[cfg(test)] +mod tests { + use crate::graphql::TestSchema; + use crate::ingestion::{ + Conn, DceRpc, Dns, Http, Kerberos, Log, Ntlm, PeriodicTimeSeries, Rdp, Smtp, Ssh, + }; + use crate::storage::RawEventStore; + use chrono::{Duration, Utc}; + use std::mem; + use std::net::IpAddr; + + #[tokio::test] + async fn invalid_query() { + let schema = TestSchema::new(); + + // invalid filter combine1 (log + addr) + let query = r#" + { + export( + filter:{ + protocol: "log", + sourceId: "src3", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert_eq!(res.data.to_string(), "null"); + + // invalid filter combine2 (network proto + kind) + let query = r#" + { + export( + filter:{ + protocol: "conn", + sourceId: "src3", + kind: "log1" + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert_eq!(res.data.to_string(), "null"); + + // invalid export format + let query = r#" + { + export( + filter:{ + protocol: "conn", + sourceId: "src3", + } + ,exportType:"ppt") + }"#; + let res = schema.execute(query).await; + assert_eq!(res.data.to_string(), "null"); + + // invalid protocol format + let query = r#" + { + export( + filter:{ + protocol: "invalid_proto", + sourceId: "src3", + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert_eq!(res.data.to_string(), "null"); + } + + #[tokio::test] + async fn export_conn_empty() { + let schema = TestSchema::new(); + let store = schema.db.conn_store().unwrap(); + + insert_conn_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_conn_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + let query = r#" + { + export( + filter:{ + protocol: "conn", + sourceId: "src3", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("conn")); + } + + #[tokio::test] + async fn export_conn_with_data() { + let schema = TestSchema::new(); + let store = schema.db.conn_store().unwrap(); + + insert_conn_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_conn_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + // export csv file + let query = r#" + { + export( + filter:{ + protocol: "conn", + sourceId: "src1", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("conn")); + + // export json file + let query = r#" + { + export( + filter:{ + protocol: "conn", + sourceId: "src2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("conn")); + } + + fn insert_conn_raw_event(store: &RawEventStore, source: &str, timestamp: i64) { + let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::()); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend(timestamp.to_be_bytes()); + + let tmp_dur = Duration::nanoseconds(12345); + let conn_body = Conn { + orig_addr: "192.168.4.76".parse::().unwrap(), + resp_addr: "192.168.4.76".parse::().unwrap(), + orig_port: 46378, + resp_port: 80, + proto: 6, + duration: tmp_dur.num_nanoseconds().unwrap(), + orig_bytes: 77, + resp_bytes: 295, + orig_pkts: 397, + resp_pkts: 511, + }; + let ser_conn_body = bincode::serialize(&conn_body).unwrap(); + + store.append(&key, &ser_conn_body).unwrap(); + } + + #[tokio::test] + async fn export_dns_empty() { + let schema = TestSchema::new(); + let store = schema.db.dns_store().unwrap(); + + insert_dns_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_dns_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + let query = r#" + { + export( + filter:{ + protocol: "dns", + sourceId: "src3", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("dns")); + } + + #[tokio::test] + async fn export_dns_with_data() { + let schema = TestSchema::new(); + let store = schema.db.dns_store().unwrap(); + + insert_dns_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_dns_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + // export csv file + let query = r#" + { + export( + filter:{ + protocol: "dns", + sourceId: "src1", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.70", end: "192.168.4.78" } + respAddr: { start: "31.3.245.100", end: "31.3.245.245" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("dns")); + + // export json file + let query = r#" + { + export( + filter:{ + protocol: "dns", + sourceId: "src2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.70", end: "192.168.4.78" } + respAddr: { start: "31.3.245.100", end: "31.3.245.245" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("dns")); + } + + fn insert_dns_raw_event(store: &RawEventStore, source: &str, timestamp: i64) { + let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::()); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend(timestamp.to_be_bytes()); + + let dns_body = Dns { + orig_addr: "192.168.4.76".parse::().unwrap(), + resp_addr: "31.3.245.133".parse::().unwrap(), + orig_port: 46378, + resp_port: 80, + proto: 17, + query: "Hello Server Hello Server Hello Server".to_string(), + answer: vec!["1.1.1.1".to_string()], + }; + let ser_dns_body = bincode::serialize(&dns_body).unwrap(); + + store.append(&key, &ser_dns_body).unwrap(); + } + + #[tokio::test] + async fn export_http_empty() { + let schema = TestSchema::new(); + let store = schema.db.http_store().unwrap(); + + insert_http_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_http_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + let query = r#" + { + export( + filter:{ + protocol: "http", + sourceId: "src3", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("http")); + } + + #[tokio::test] + async fn export_http_with_data() { + let schema = TestSchema::new(); + let store = schema.db.http_store().unwrap(); + + insert_http_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_http_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + // export csv file + let query = r#" + { + export( + filter:{ + protocol: "http", + sourceId: "src1", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.75", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("http")); + + // export json file + let query = r#" + { + export( + filter:{ + protocol: "http", + sourceId: "src2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.75", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("http")); + } + + fn insert_http_raw_event(store: &RawEventStore, source: &str, timestamp: i64) { + let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::()); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend(timestamp.to_be_bytes()); + + let http_body = Http { + orig_addr: "192.168.4.76".parse::().unwrap(), + resp_addr: "192.168.4.76".parse::().unwrap(), + orig_port: 46378, + resp_port: 80, + method: "POST".to_string(), + host: "einsis".to_string(), + uri: "/einsis.gif".to_string(), + referrer: "einsis.com".to_string(), + user_agent: "giganto".to_string(), + status_code: 200, + }; + let ser_http_body = bincode::serialize(&http_body).unwrap(); + + store.append(&key, &ser_http_body).unwrap(); + } + + #[tokio::test] + async fn export_rdp_empty() { + let schema = TestSchema::new(); + let store = schema.db.rdp_store().unwrap(); + + insert_rdp_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_rdp_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + let query = r#" + { + export( + filter:{ + protocol: "rdp", + sourceId: "src3", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.75", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("rdp")); + } + + #[tokio::test] + async fn export_rdp_with_data() { + let schema = TestSchema::new(); + let store = schema.db.rdp_store().unwrap(); + + insert_rdp_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_rdp_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + // export csv file + let query = r#" + { + export( + filter:{ + protocol: "rdp", + sourceId: "src1", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.75", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("rdp")); + + // export json file + let query = r#" + { + export( + filter:{ + protocol: "rdp", + sourceId: "src2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.70", end: "192.168.4.78" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("rdp")); + } + + fn insert_rdp_raw_event(store: &RawEventStore, source: &str, timestamp: i64) { + let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::()); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend(timestamp.to_be_bytes()); + + let rdp_body = Rdp { + orig_addr: "192.168.4.76".parse::().unwrap(), + resp_addr: "192.168.4.76".parse::().unwrap(), + orig_port: 46378, + resp_port: 80, + cookie: "rdp_test".to_string(), + }; + let ser_rdp_body = bincode::serialize(&rdp_body).unwrap(); + + store.append(&key, &ser_rdp_body).unwrap(); + } + + #[tokio::test] + async fn export_smtp_empty() { + let schema = TestSchema::new(); + let store = schema.db.smtp_store().unwrap(); + + insert_smtp_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_smtp_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + let query = r#" + { + export( + filter:{ + protocol: "smtp", + sourceId: "src3", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.70", end: "192.168.4.78" } + respAddr: { start: "192.168.4.70", end: "192.168.4.78" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("smtp")); + } + + #[tokio::test] + async fn export_smtp_with_data() { + let schema = TestSchema::new(); + let store = schema.db.smtp_store().unwrap(); + + insert_smtp_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_smtp_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + // export csv file + let query = r#" + { + export( + filter:{ + protocol: "smtp", + sourceId: "src1", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.70", end: "192.168.4.78" } + respAddr: { start: "192.168.4.70", end: "192.168.4.78" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("smtp")); + + // export json file + let query = r#" + { + export( + filter:{ + protocol: "smtp", + sourceId: "src2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.70", end: "192.168.4.78" } + respAddr: { start: "192.168.4.70", end: "192.168.4.78" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("smtp")); + } + + fn insert_smtp_raw_event(store: &RawEventStore, source: &str, timestamp: i64) { + let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::()); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend(timestamp.to_be_bytes()); + + let smtp_body = Smtp { + orig_addr: "192.168.4.76".parse::().unwrap(), + resp_addr: "192.168.4.76".parse::().unwrap(), + orig_port: 46378, + resp_port: 80, + mailfrom: "mailfrom".to_string(), + date: "date".to_string(), + from: "from".to_string(), + to: "to".to_string(), + subject: "subject".to_string(), + agent: "agent".to_string(), + }; + let ser_smtp_body = bincode::serialize(&smtp_body).unwrap(); + + store.append(&key, &ser_smtp_body).unwrap(); + } + + #[tokio::test] + async fn export_ntlm_empty() { + let schema = TestSchema::new(); + let store = schema.db.ntlm_store().unwrap(); + + insert_ntlm_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_ntlm_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + let query = r#" + { + export( + filter:{ + protocol: "ntlm", + sourceId: "src3", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("ntlm")); + } + + #[tokio::test] + async fn export_ntlm_with_data() { + let schema = TestSchema::new(); + let store = schema.db.ntlm_store().unwrap(); + + insert_ntlm_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_ntlm_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + // export csv file + let query = r#" + { + export( + filter:{ + protocol: "ntlm", + sourceId: "src1", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("ntlm")); + + // export json file + let query = r#" + { + export( + filter:{ + protocol: "ntlm", + sourceId: "src2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("ntlm")); + } + fn insert_ntlm_raw_event(store: &RawEventStore, source: &str, timestamp: i64) { + let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::()); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend(timestamp.to_be_bytes()); + + let ntlm_body = Ntlm { + orig_addr: "192.168.4.76".parse::().unwrap(), + resp_addr: "192.168.4.76".parse::().unwrap(), + orig_port: 46378, + resp_port: 80, + username: "bly".to_string(), + hostname: "host".to_string(), + domainname: "domain".to_string(), + server_nb_computer_name: "NB".to_string(), + server_dns_computer_name: "dns".to_string(), + server_tree_name: "tree".to_string(), + success: "tf".to_string(), + }; + let ser_ntlm_body = bincode::serialize(&ntlm_body).unwrap(); + + store.append(&key, &ser_ntlm_body).unwrap(); + } + + #[tokio::test] + async fn export_kerberos_empty() { + let schema = TestSchema::new(); + let store = schema.db.kerberos_store().unwrap(); + + insert_kerberos_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_kerberos_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + let query = r#" + { + export( + filter:{ + protocol: "kerberos", + sourceId: "src3", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("kerberos")); + } + + #[tokio::test] + async fn export_kerberos_with_data() { + let schema = TestSchema::new(); + let store = schema.db.kerberos_store().unwrap(); + + insert_kerberos_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_kerberos_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + // export csv file + let query = r#" + { + export( + filter:{ + protocol: "kerberos", + sourceId: "src1", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("kerberos")); + + // export json file + let query = r#" + { + export( + filter:{ + protocol: "kerberos", + sourceId: "src2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("kerberos")); + } + fn insert_kerberos_raw_event(store: &RawEventStore, source: &str, timestamp: i64) { + let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::()); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend(timestamp.to_be_bytes()); + + let kerberos_body = Kerberos { + orig_addr: "192.168.4.76".parse::().unwrap(), + resp_addr: "192.168.4.76".parse::().unwrap(), + orig_port: 46378, + resp_port: 80, + request_type: "req_type".to_string(), + client: "client".to_string(), + service: "service".to_string(), + success: "tf".to_string(), + error_msg: "err_msg".to_string(), + from: 5454, + till: 2345, + cipher: "cipher".to_string(), + forwardable: "forwardable".to_string(), + renewable: "renewable".to_string(), + client_cert_subject: "client_cert".to_string(), + server_cert_subject: "server_cert".to_string(), + }; + let ser_kerberos_body = bincode::serialize(&kerberos_body).unwrap(); + + store.append(&key, &ser_kerberos_body).unwrap(); + } + + #[tokio::test] + async fn export_ssh_empty() { + let schema = TestSchema::new(); + let store = schema.db.ssh_store().unwrap(); + + insert_ssh_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_ssh_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + let query = r#" + { + export( + filter:{ + protocol: "ssh", + sourceId: "src3", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("ssh")); + } + + #[tokio::test] + async fn export_ssh_with_data() { + let schema = TestSchema::new(); + let store = schema.db.ssh_store().unwrap(); + + insert_ssh_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_ssh_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + // export csv file + let query = r#" + { + export( + filter:{ + protocol: "ssh", + sourceId: "src1", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("ssh")); + + // export json file + let query = r#" + { + export( + filter:{ + protocol: "ssh", + sourceId: "src2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("ssh")); + } + fn insert_ssh_raw_event(store: &RawEventStore, source: &str, timestamp: i64) { + let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::()); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend(timestamp.to_be_bytes()); + + let ssh_body = Ssh { + orig_addr: "192.168.4.76".parse::().unwrap(), + resp_addr: "192.168.4.76".parse::().unwrap(), + orig_port: 46378, + resp_port: 80, + version: 01, + auth_success: "auth_success".to_string(), + auth_attempts: 3, + direction: "direction".to_string(), + client: "client".to_string(), + server: "server".to_string(), + cipher_alg: "cipher_alg".to_string(), + mac_alg: "mac_alg".to_string(), + compression_alg: "compression_alg".to_string(), + kex_alg: "kex_alg".to_string(), + host_key_alg: "host_key_alg".to_string(), + host_key: "host_key".to_string(), + }; + let ser_ssh_body = bincode::serialize(&ssh_body).unwrap(); + + store.append(&key, &ser_ssh_body).unwrap(); + } + + #[tokio::test] + async fn export_dce_rpc_empty() { + let schema = TestSchema::new(); + let store = schema.db.dce_rpc_store().unwrap(); + + insert_dce_rpc_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_dce_rpc_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + let query = r#" + { + export( + filter:{ + protocol: "dce rpc", + sourceId: "src3", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.72", end: "192.168.4.79" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46378, end: 46379 } + respPort: { start: 50, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("dcerpc")); + } + + #[tokio::test] + async fn export_dce_rpc_with_data() { + let schema = TestSchema::new(); + let store = schema.db.dce_rpc_store().unwrap(); + + insert_dce_rpc_raw_event(&store, "src1", Utc::now().timestamp_nanos()); + insert_dce_rpc_raw_event(&store, "src2", Utc::now().timestamp_nanos()); + + // export csv file + let query = r#" + { + export( + filter:{ + protocol: "dce rpc", + sourceId: "src1", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.70", end: "192.168.4.78" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("dcerpc")); + + // export json file + let query = r#" + { + export( + filter:{ + protocol: "dce rpc", + sourceId: "src2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + origAddr: { start: "192.168.4.70", end: "192.168.4.78" } + respAddr: { start: "192.168.4.75", end: "192.168.4.79" } + origPort: { start: 46377, end: 46380 } + respPort: { start: 0, end: 200 } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("dcerpc")); + } + fn insert_dce_rpc_raw_event(store: &RawEventStore, source: &str, timestamp: i64) { + let mut key = Vec::with_capacity(source.len() + 1 + mem::size_of::()); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend(timestamp.to_be_bytes()); + + let dce_rpc_body = DceRpc { + orig_addr: "192.168.4.76".parse::().unwrap(), + resp_addr: "192.168.4.76".parse::().unwrap(), + orig_port: 46378, + resp_port: 80, + rtt: 3, + named_pipe: "named_pipe".to_string(), + endpoint: "endpoint".to_string(), + operation: "operation".to_string(), + }; + let ser_dce_rpc_body = bincode::serialize(&dce_rpc_body).unwrap(); + + store.append(&key, &ser_dce_rpc_body).unwrap(); + } + + #[tokio::test] + async fn export_log_empty() { + let schema = TestSchema::new(); + let store = schema.db.log_store().unwrap(); + + insert_log_raw_event( + &store, + "src1", + Utc::now().timestamp_nanos(), + "kind1", + b"log1", + ); + insert_log_raw_event( + &store, + "src1", + Utc::now().timestamp_nanos(), + "kind2", + b"log2", + ); + + let query = r#" + { + export( + filter:{ + protocol: "log", + sourceId: "src3", + kind: "kind2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("log")); + } + + #[tokio::test] + async fn export_log_with_data() { + let schema = TestSchema::new(); + let store = schema.db.log_store().unwrap(); + + insert_log_raw_event( + &store, + "src1", + Utc::now().timestamp_nanos(), + "kind1", + b"log1", + ); + insert_log_raw_event( + &store, + "src2", + Utc::now().timestamp_nanos(), + "kind2", + b"log2", + ); + + // export csv file + let query = r#" + { + export( + filter:{ + protocol: "log", + sourceId: "src1", + kind: "kind1", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("log")); + + // export json file + let query = r#" + { + export( + filter:{ + protocol: "log", + sourceId: "src2", + kind: "kind2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("log")); + } + + fn insert_log_raw_event( + store: &RawEventStore, + source: &str, + timestamp: i64, + kind: &str, + body: &[u8], + ) { + let mut key: Vec = Vec::new(); + key.extend_from_slice(source.as_bytes()); + key.push(0); + key.extend_from_slice(kind.as_bytes()); + key.push(0); + key.extend_from_slice(×tamp.to_be_bytes()); + let log_body = Log { + kind: kind.to_string(), + log: body.to_vec(), + }; + let value = bincode::serialize(&log_body).unwrap(); + store.append(&key, &value).unwrap(); + } + + #[tokio::test] + async fn export_time_series_empty() { + let schema = TestSchema::new(); + let store = schema.db.periodic_time_series_store().unwrap(); + + insert_time_series(&store, "1", Utc::now().timestamp_nanos(), vec![0.0; 12]); + insert_time_series(&store, "2", Utc::now().timestamp_nanos(), vec![0.0; 12]); + + let query = r#" + { + export( + filter:{ + protocol: "periodic time series", + sourceId: "3", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("periodictimeseries")); + } + + #[tokio::test] + async fn export_time_series_with_data() { + let schema = TestSchema::new(); + let store = schema.db.periodic_time_series_store().unwrap(); + + insert_time_series(&store, "1", Utc::now().timestamp_nanos(), vec![0.0; 12]); + insert_time_series(&store, "2", Utc::now().timestamp_nanos(), vec![0.0; 12]); + + // export csv file + let query = r#" + { + export( + filter:{ + protocol: "periodic time series", + sourceId: "1", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + } + ,exportType:"csv") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("periodictimeseries")); + + // export json file + let query = r#" + { + export( + filter:{ + protocol: "periodic time series", + sourceId: "2", + time: { start: "1992-06-05T00:00:00Z", end: "2023-09-22T00:00:00Z" } + } + ,exportType:"json") + }"#; + let res = schema.execute(query).await; + assert!(res.data.to_string().contains("periodictimeseries")); + } + + fn insert_time_series( + store: &RawEventStore, + id: &str, + start: i64, + data: Vec, + ) { + let mut key: Vec = Vec::new(); + key.extend_from_slice(id.as_bytes()); + key.push(0); + key.extend_from_slice(&start.to_be_bytes()); + let time_series_data = PeriodicTimeSeries { + id: id.to_string(), + data, + }; + let value = bincode::serialize(&time_series_data).unwrap(); + store.append(&key, &value).unwrap(); + } +} diff --git a/src/graphql/network.rs b/src/graphql/network.rs index 593cafbe..e20cd7fa 100644 --- a/src/graphql/network.rs +++ b/src/graphql/network.rs @@ -31,14 +31,14 @@ pub struct NetworkFilter { #[derive(InputObject, Serialize)] pub struct IpRange { - start: Option, - end: Option, + pub start: Option, + pub end: Option, } #[derive(InputObject, Serialize)] pub struct PortRange { - start: Option, - end: Option, + pub start: Option, + pub end: Option, } impl RawEventFilter for NetworkFilter { diff --git a/src/ingestion.rs b/src/ingestion.rs index 0b5dc12f..ce695ad8 100644 --- a/src/ingestion.rs +++ b/src/ingestion.rs @@ -86,7 +86,7 @@ impl EventFilter for PeriodicTimeSeries { impl Display for PeriodicTimeSeries { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{}: {:?}", self.id, self.data) + write!(f, "{:?}", self.data) } } diff --git a/src/ingestion/log.rs b/src/ingestion/log.rs index 853a0cac..e6cd4241 100644 --- a/src/ingestion/log.rs +++ b/src/ingestion/log.rs @@ -36,7 +36,7 @@ impl EventFilter for Log { impl Display for Log { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{}: {:?}", self.kind, self.log) + write!(f, "{}\t{}", self.kind, String::from_utf8_lossy(&self.log)) } } diff --git a/src/ingestion/network.rs b/src/ingestion/network.rs index e7a43923..52bb557e 100644 --- a/src/ingestion/network.rs +++ b/src/ingestion/network.rs @@ -1,5 +1,5 @@ -use crate::ingestion::EventFilter; use crate::publish::PubMessage; +use crate::{ingestion::EventFilter, publish::convert_time_format}; use anyhow::Result; use serde::{Deserialize, Serialize}; use std::{ @@ -52,7 +52,7 @@ impl Display for Conn { self.resp_addr, self.resp_port, self.proto, - Conn::convert_time_format(self.duration), + convert_time_format(self.duration), self.orig_bytes, self.resp_bytes, self.orig_pkts, @@ -63,12 +63,7 @@ impl Display for Conn { impl PubMessage for Conn { fn message(&self, timestamp: i64, source: &str) -> Result> { - let conn_csv = format!( - "{}\t{}\t{}", - Conn::convert_time_format(timestamp), - source, - self - ); + let conn_csv = format!("{}\t{source}\t{self}", convert_time_format(timestamp)); Ok(bincode::serialize(&Some(( timestamp, @@ -137,12 +132,7 @@ impl Display for Dns { impl PubMessage for Dns { fn message(&self, timestamp: i64, source: &str) -> Result> { - let dns_csv = format!( - "{}\t{}\t{}", - Dns::convert_time_format(timestamp), - source, - self - ); + let dns_csv = format!("{}\t{source}\t{self}", convert_time_format(timestamp)); Ok(bincode::serialize(&Some((timestamp, &dns_csv.as_bytes())))?) } @@ -220,12 +210,7 @@ impl Display for Http { impl PubMessage for Http { fn message(&self, timestamp: i64, source: &str) -> Result> { - let http_csv = format!( - "{}\t{}\t{}", - Http::convert_time_format(timestamp), - source, - self - ); + let http_csv = format!("{}\t{source}\t{self}", convert_time_format(timestamp)); Ok(bincode::serialize(&Some(( timestamp, @@ -276,12 +261,7 @@ impl Display for Rdp { impl PubMessage for Rdp { fn message(&self, timestamp: i64, source: &str) -> Result> { - let rdp_csv = format!( - "{}\t{}\t{}", - Rdp::convert_time_format(timestamp), - source, - self - ); + let rdp_csv = format!("{}\t{source}\t{self}", convert_time_format(timestamp)); Ok(bincode::serialize(&Some((timestamp, &rdp_csv.as_bytes())))?) } @@ -363,12 +343,7 @@ impl Display for Smtp { impl PubMessage for Smtp { fn message(&self, timestamp: i64, source: &str) -> Result> { - let smtp_csv = format!( - "{}\t{}\t{}", - Smtp::convert_time_format(timestamp), - source, - self - ); + let smtp_csv = format!("{}\t{source}\t{self}", convert_time_format(timestamp)); Ok(bincode::serialize(&Some(( timestamp, @@ -463,12 +438,7 @@ impl Display for Ntlm { impl PubMessage for Ntlm { fn message(&self, timestamp: i64, source: &str) -> Result> { - let ntlm_csv = format!( - "{}\t{}\t{}", - Ntlm::convert_time_format(timestamp), - source, - self - ); + let ntlm_csv = format!("{}\t{source}\t{self}", convert_time_format(timestamp)); Ok(bincode::serialize(&Some(( timestamp, @@ -585,12 +555,7 @@ impl Display for Kerberos { impl PubMessage for Kerberos { fn message(&self, timestamp: i64, source: &str) -> Result> { - let kerberos_csv = format!( - "{}\t{}\t{}", - Kerberos::convert_time_format(timestamp), - source, - self - ); + let kerberos_csv = format!("{}\t{source}\t{self}", convert_time_format(timestamp)); Ok(bincode::serialize(&Some(( timestamp, @@ -707,12 +672,7 @@ impl Display for Ssh { impl PubMessage for Ssh { fn message(&self, timestamp: i64, source: &str) -> Result> { - let ssh_csv = format!( - "{}\t{}\t{}", - Ssh::convert_time_format(timestamp), - source, - self - ); + let ssh_csv = format!("{}\t{source}\t{self}", convert_time_format(timestamp)); Ok(bincode::serialize(&Some((timestamp, &ssh_csv.as_bytes())))?) } @@ -782,12 +742,7 @@ impl Display for DceRpc { impl PubMessage for DceRpc { fn message(&self, timestamp: i64, source: &str) -> Result> { - let dce_rpc_csv = format!( - "{}\t{}\t{}", - DceRpc::convert_time_format(timestamp), - source, - self - ); + let dce_rpc_csv = format!("{}\t{source}\t{self}", convert_time_format(timestamp)); Ok(bincode::serialize(&Some(( timestamp, diff --git a/src/main.rs b/src/main.rs index f8aeeb30..5ca7eb48 100644 --- a/src/main.rs +++ b/src/main.rs @@ -68,7 +68,11 @@ async fn main() -> Result<()> { let packet_sources = Arc::new(RwLock::new(HashMap::new())); let sources = Arc::new(RwLock::new(HashMap::new())); - let schema = graphql::schema(database.clone(), packet_sources.clone()); + let schema = graphql::schema( + database.clone(), + packet_sources.clone(), + settings.export_dir.clone(), + ); task::spawn(web::serve( schema, settings.graphql_address, diff --git a/src/publish.rs b/src/publish.rs index b9474ad0..644a1e01 100644 --- a/src/publish.rs +++ b/src/publish.rs @@ -118,12 +118,6 @@ pub trait PubMessage { fn done() -> Result> { Ok(bincode::serialize::)>>(&None)?) } - fn convert_time_format(timestamp: i64) -> String { - const A_BILLION: i64 = 1_000_000_000; - let nsecs = u32::try_from(timestamp % A_BILLION).unwrap_or_default(); - NaiveDateTime::from_timestamp_opt(timestamp / A_BILLION, nsecs) - .map_or("-".to_string(), |s| s.format("%s%.6f").to_string()) - } } trait StreamMessage { @@ -910,3 +904,10 @@ async fn request_network_stream( } Ok(()) } + +pub fn convert_time_format(timestamp: i64) -> String { + const A_BILLION: i64 = 1_000_000_000; + let nsecs = u32::try_from(timestamp % A_BILLION).unwrap_or_default(); + NaiveDateTime::from_timestamp_opt(timestamp / A_BILLION, nsecs) + .map_or("-".to_string(), |s| s.format("%s%.6f").to_string()) +} diff --git a/src/settings.rs b/src/settings.rs index 1d7a10de..99660cb5 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -23,6 +23,7 @@ pub struct Settings { #[serde(deserialize_with = "deserialize_socket_addr")] pub graphql_address: SocketAddr, // IP address & port to graphql pub log_dir: PathBuf, //giganto's syslog path + pub export_dir: PathBuf, //giganto's export file path } impl Settings { @@ -63,8 +64,14 @@ fn default_config_builder() -> ConfigBuilder { directories::ProjectDirs::from_path(PathBuf::from("db")).expect("unreachable db dir"); let log_dir = directories::ProjectDirs::from_path(PathBuf::from("logs/apps")) .expect("unreachable logs dir"); + let export_dir = directories::ProjectDirs::from_path(PathBuf::from("export")) + .expect("unreachable export dir"); let db_path = db_dir.data_dir().to_str().expect("unreachable db path"); let log_path = log_dir.data_dir().to_str().expect("unreachable log path"); + let export_path = export_dir + .data_dir() + .to_str() + .expect("unreachable export path"); let config_dir = dirs.config_dir(); let cert_path = config_dir.join("cert.pem"); let key_path = config_dir.join("key.pem"); @@ -86,6 +93,8 @@ fn default_config_builder() -> ConfigBuilder { .expect("retention") .set_default("log_path", log_path) .expect("log dir") + .set_default("export_path", export_path) + .expect("export_dir") } /// Deserializes a socket address. diff --git a/tests/config.toml b/tests/config.toml index e9ce6656..fa0a2e31 100644 --- a/tests/config.toml +++ b/tests/config.toml @@ -6,3 +6,4 @@ data_dir = "tests/data" retention = "100d" log_dir = "tests/logs/apps" + export_dir = "tests/export"