diff --git a/CHANGELOG.md b/CHANGELOG.md index 908e38a..8d208e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,9 @@ Versioning](https://semver.org/spec/v2.0.0.html). - Added `request_from_peer: Option` argument to GraphQL endpoints: `netflow5_raw_events`, `netflow9_raw_events`, `secu_log_raw_events`, `statistics`. +- Supported `log-broker` to send/receive operation log with redis server. + - Set the redis server with `redis_log_address`, `redis_log_agent_id` and + `redis_log_fetch_interval` in configuration options. ### Changed @@ -40,6 +43,7 @@ Versioning](https://semver.org/spec/v2.0.0.html). - Changed `export` GraphQL query's response value format from `{export_path}` to `{export_path}@{giganto_node_name}` - Changed `PEER_VERSION_REQ` to ">=0.16.0-alpha.1,<0.17.0" +- Changed logging from `tracing` to `log-broker`. ### Fixed diff --git a/Cargo.lock b/Cargo.lock index f7064f5..7bae634 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -277,6 +277,30 @@ version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +[[package]] +name = "bb8" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98b4b0f25f18bcdc3ac72bdb486ed0acf7e185221fd4dc985bc15db5800b0ba2" +dependencies = [ + "async-trait", + "futures-channel", + "futures-util", + "parking_lot", + "tokio", +] + +[[package]] +name = "bb8-redis" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4094bc17b933090cfded54315a86db01d67ec999583d4bab894c520f8c097d1f" +dependencies = [ + "async-trait", + "bb8", + "redis", +] + [[package]] name = "bincode" version = "1.3.3" @@ -434,6 +458,20 @@ dependencies = [ "unreachable", ] +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "config" version = "0.13.4" @@ -920,6 +958,7 @@ dependencies = [ "humantime", "humantime-serde", "libc", + "log-broker", "mockito", "num-traits", "num_enum", @@ -927,6 +966,7 @@ dependencies = [ "proc-macro2", "quinn", "quote", + "redis", "regex", "reqwest", "rocksdb", @@ -940,7 +980,6 @@ dependencies = [ "tempfile", "tokio", "toml_edit 0.21.0", - "tracing", "url", "warp", "x509-parser", @@ -994,7 +1033,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2ebc8013b4426d5b81a4364c419a95ed0b404af2b82e2457de52d9348f0e474" dependencies = [ - "combine", + "combine 3.8.1", "thiserror", ] @@ -1202,7 +1241,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -1449,6 +1488,21 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "log-broker" +version = "0.1.0" +source = "git+https://github.com/aicers/log-broker.git?tag=0.1.0#f9d4f5109d967d5cc8aff4509a329c4b3b1c5b00" +dependencies = [ + "anyhow", + "bb8-redis", + "chrono", + "lazy_static", + "tokio", + "tracing", + "tracing-appender", + "tracing-subscriber", +] + [[package]] name = "lz4-sys" version = "1.9.4" @@ -2087,7 +2141,7 @@ checksum = "055b4e778e8feb9f93c4e439f71dc2156ef13360b432b799e179a8c4cdf0b1d7" dependencies = [ "bytes", "libc", - "socket2", + "socket2 0.5.5", "tracing", "windows-sys 0.48.0", ] @@ -2151,6 +2205,27 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redis" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +dependencies = [ + "async-trait", + "bytes", + "combine 4.6.6", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.4.10", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -2570,6 +2645,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "sha2" version = "0.10.8" @@ -2626,6 +2707,16 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.5" @@ -2864,7 +2955,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.5", "tokio-macros", "windows-sys 0.48.0", ] @@ -3047,6 +3138,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -3057,12 +3158,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ec5b417..75d5ff4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,12 +28,14 @@ graphql_client = "0.13" humantime = "2.1" humantime-serde = "1" libc = "0.2" +log-broker = { git = "https://github.com/aicers/log-broker.git", tag = "0.1.0" } num_enum = "0.7" num-traits = "0.2" pcap = "1" proc-macro2 = "1.0" quinn = "0.10" quote = "1.0" +redis = { version = "0.24", features = ["tokio-comp"]} reqwest = { version = "0.11", features = ["rustls-tls", "json"] } rocksdb = "0.21" roxy = { git = "https://github.com/aicers/roxy.git", tag = "0.2.1" } @@ -46,7 +48,6 @@ syn = "2.0" tempfile = "3" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } toml_edit = "0.21" -tracing = "0.1" warp = { version = "0.3", features = ["tls"] } x509-parser = "0.15" diff --git a/README.md b/README.md index 9f89ffe..853b923 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,9 @@ roots = ["ca1.pem", "ca2.pem", "ca3.pem"] # paths to CA certificate files ingest_address = "0.0.0.0:38370" # address to listen for ingest QUIC publish_address = "0.0.0.0:38371" # address to listen for publish QUIC graphql_address = "127.0.0.1:8443" # giganto's graphql address +redis_log_address = "127.0.0.1:6379" # address to redis server +redis_log_agent_id = "giganto@localhost" # agent id to send log to redis +redis_log_fetch_interval = "10m" # log fetch interval from redis data_dir = "tests/data" # path to directory to store data retention = "100d" # retention period for data log_dir = "/data/logs/apps" # path to giganto's syslog file diff --git a/src/graphql.rs b/src/graphql.rs index 9c707e9..7971ce9 100644 --- a/src/graphql.rs +++ b/src/graphql.rs @@ -29,6 +29,7 @@ use chrono::{DateTime, TimeZone, Utc}; use giganto_client::ingest::Packet as pk; use graphql_client::Response as GraphQlResponse; use libc::timeval; +use log_broker::{error, LogLocation}; use num_traits::AsPrimitive; use pcap::{Capture, Linktype, Packet, PacketHeader}; use serde::Deserialize; @@ -48,7 +49,6 @@ use std::{ }; use tempfile::tempfile; use tokio::sync::Notify; -use tracing::error; pub const TIMESTAMP_SIZE: usize = 8; @@ -412,6 +412,7 @@ where if records.len() == size { if invalid_data_cnt > 1 { error!( + LogLocation::Both, "failed to read database or invalid data of {data_type} #{invalid_data_cnt}" ); } diff --git a/src/graphql/export.rs b/src/graphql/export.rs index eb6beb9..05039ed 100644 --- a/src/graphql/export.rs +++ b/src/graphql/export.rs @@ -37,6 +37,7 @@ use giganto_client::{ RawEventKind, }; use graphql_client::GraphQLQuery; +use log_broker::{error, info, LogLocation}; use serde::{de::DeserializeOwned, Serialize}; use std::{ borrow::Cow, @@ -47,7 +48,6 @@ use std::{ net::IpAddr, path::{Path, PathBuf}, }; -use tracing::{error, info}; const NON_NETWORK: [&str; 20] = [ "log", @@ -1582,504 +1582,504 @@ fn export_by_protocol( if let Ok(store) = db.conn_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "dns" => tokio::spawn(async move { if let Ok(store) = db.dns_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "http" => tokio::spawn(async move { if let Ok(store) = db.http_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "log" => tokio::spawn(async move { if let Ok(store) = db.log_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "rdp" => tokio::spawn(async move { if let Ok(store) = db.rdp_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "smtp" => tokio::spawn(async move { if let Ok(store) = db.smtp_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "periodic time series" => tokio::spawn(async move { if let Ok(store) = db.periodic_time_series_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "ntlm" => tokio::spawn(async move { if let Ok(store) = db.ntlm_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "kerberos" => tokio::spawn(async move { if let Ok(store) = db.kerberos_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "ssh" => tokio::spawn(async move { if let Ok(store) = db.ssh_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "dce rpc" => tokio::spawn(async move { if let Ok(store) = db.dce_rpc_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "op_log" => tokio::spawn(async move { if let Ok(store) = db.op_log_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "ftp" => tokio::spawn(async move { if let Ok(store) = db.ftp_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "mqtt" => tokio::spawn(async move { if let Ok(store) = db.mqtt_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "ldap" => tokio::spawn(async move { if let Ok(store) = db.ldap_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "tls" => tokio::spawn(async move { if let Ok(store) = db.tls_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "smb" => tokio::spawn(async move { if let Ok(store) = db.smb_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "nfs" => tokio::spawn(async move { if let Ok(store) = db.nfs_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "statistics" => tokio::spawn(async move { if let Ok(store) = db.statistics_store() { match process_statistics_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "process_create" => tokio::spawn(async move { if let Ok(store) = db.process_create_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "file_create_time" => tokio::spawn(async move { if let Ok(store) = db.file_create_time_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "network_connect" => tokio::spawn(async move { if let Ok(store) = db.network_connect_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "process_terminate" => tokio::spawn(async move { if let Ok(store) = db.process_terminate_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "image_load" => tokio::spawn(async move { if let Ok(store) = db.image_load_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "file_create" => tokio::spawn(async move { if let Ok(store) = db.file_create_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "registry_value_set" => tokio::spawn(async move { if let Ok(store) = db.registry_value_set_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "registry_key_rename" => tokio::spawn(async move { if let Ok(store) = db.registry_key_rename_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "file_create_stream_hash" => tokio::spawn(async move { if let Ok(store) = db.file_create_stream_hash_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "pipe_event" => tokio::spawn(async move { if let Ok(store) = db.pipe_event_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "dns_query" => tokio::spawn(async move { if let Ok(store) = db.dns_query_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "file_delete" => tokio::spawn(async move { if let Ok(store) = db.file_delete_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "process_tamper" => tokio::spawn(async move { if let Ok(store) = db.process_tamper_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "file_delete_detected" => tokio::spawn(async move { if let Ok(store) = db.file_delete_detected_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "netflow5" => tokio::spawn(async move { if let Ok(store) = db.netflow5_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "netflow9" => tokio::spawn(async move { if let Ok(store) = db.netflow9_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), "secu_log" => tokio::spawn(async move { if let Ok(store) = db.secu_log_store() { match process_export(&store, &filter, &export_type, &export_path) { Ok(result) => { - info!("{}", result); + info!(LogLocation::Both, "{}", result); } Err(e) => { - error!("Failed to export file: {:?}", e); + error!(LogLocation::Both, "Failed to export file: {:?}", e); } } } else { - error!("Failed to open db store"); + error!(LogLocation::Both, "Failed to open db store"); } }), none => { @@ -2168,7 +2168,10 @@ where write_filtered_data_to_file(filter, export_type, &key, &value, &mut writer)?; } if invalid_data_cnt > 1 { - error!("failed to read database or invalid data #{invalid_data_cnt}"); + error!( + LogLocation::Both, + "failed to read database or invalid data #{invalid_data_cnt}" + ); } Ok(format!("export file success: {path:?}")) } @@ -2249,7 +2252,10 @@ fn export_statistic_file( } if invalid_data_cnt > 1 { - error!("failed to read database or invalid data #{invalid_data_cnt}"); + error!( + LogLocation::Both, + "failed to read database or invalid data #{invalid_data_cnt}" + ); } Ok(format!("export file success: {path:?}")) } diff --git a/src/graphql/statistics.rs b/src/graphql/statistics.rs index d8ac127..2ffee21 100644 --- a/src/graphql/statistics.rs +++ b/src/graphql/statistics.rs @@ -13,6 +13,7 @@ use async_graphql::{Context, Error, Object, Result, SimpleObject}; use giganto_client::{ingest::statistics::Statistics, RawEventKind}; use giganto_proc_macro::ConvertGraphQLEdgesNode; use graphql_client::GraphQLQuery; +use log_broker::{error, LogLocation}; use num_traits::NumCast; use rocksdb::Direction; use serde::de::DeserializeOwned; @@ -21,7 +22,6 @@ use std::{ iter::Peekable, str::FromStr, }; -use tracing::error; pub const MAX_CORE_SIZE: u32 = 16; // Number of queues on the collect device's NIC const BYTE_TO_BIT: u64 = 8; @@ -282,7 +282,7 @@ fn calculate_ps(period: u16, len: u64) -> f64 { return result; } } - error!("Failed to convert period/len to f64"); + error!(LogLocation::Both, "Failed to convert period/len to f64"); 0.0 } diff --git a/src/ingest.rs b/src/ingest.rs index 73e797c..9a98931 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -25,6 +25,7 @@ use giganto_client::{ }, RawEventKind, }; +use log_broker::{error, info, LogLocation}; use quinn::{Endpoint, RecvStream, SendStream, ServerConfig}; use rustls::{Certificate, PrivateKey}; use std::sync::atomic::AtomicU16; @@ -45,7 +46,6 @@ use tokio::{ task, time, time::sleep, }; -use tracing::{error, info}; use x509_parser::nom::AsBytes; const ACK_INTERVAL_TIME: u64 = 60; @@ -95,6 +95,7 @@ impl Server { ) { let endpoint = Endpoint::server(self.server_config, self.server_address).expect("endpoint"); info!( + LogLocation::Both, "listening on {}", endpoint.local_addr().expect("for local addr display") ); @@ -125,7 +126,7 @@ impl Server { if let Err(e) = handle_connection(conn, db, pcap_sources, sender, stream_direct_channels,notify_shutdown,shutdown_sig,ack_trans_cnt).await { - error!("connection failed: {}", e); + error!(LogLocation::Both, "connection failed: {}", e); } }); }, @@ -133,7 +134,7 @@ impl Server { shutdown_signal.store(true,Ordering::SeqCst); // Setting signal to handle termination on each channel. sleep(Duration::from_millis(SERVER_ENDPOINT_DELAY)).await; // Wait time for channels,connection to be ready for shutdown. endpoint.close(0_u32.into(), &[]); - info!("Shutting down ingest"); + info!(LogLocation::Both, "Shutting down ingest"); notify_shutdown.notify_one(); break; }, @@ -156,11 +157,11 @@ async fn handle_connection( let connection = conn.await?; match server_handshake(&connection, INGEST_VERSION_REQ).await { Ok((mut send, _)) => { - info!("Compatible version"); + info!(LogLocation::Both, "Compatible version"); send.finish().await?; } Err(e) => { - info!("Incompatible version"); + info!(LogLocation::Both, "Incompatible version"); connection.close(quinn::VarInt::from_u32(0), e.to_string().as_bytes()); bail!("{e}") } @@ -180,7 +181,7 @@ async fn handle_connection( .send((source.clone(), Utc::now(), ConnState::Connected, rep)) .await { - error!("Failed to send channel data : {}", error); + error!(LogLocation::Both, "Failed to send channel data : {}", error); } loop { select! { @@ -191,11 +192,11 @@ async fn handle_connection( .send((source, Utc::now(), ConnState::Disconnected, rep)) .await { - error!("Failed to send internal channel data : {}", error); + error!(LogLocation::Both, "Failed to send internal channel data : {}", error); } match conn_err { quinn::ConnectionError::ApplicationClosed(_) => { - info!("application closed"); + info!(LogLocation::Both, "application closed"); return Ok(()); } _ => return Err(conn_err.into()), @@ -210,7 +211,7 @@ async fn handle_connection( let ack_trans_cnt = ack_trans_cnt.clone(); tokio::spawn(async move { if let Err(e) = handle_request(source, stream, db, stream_direct_channels,shutdown_signal,ack_trans_cnt).await { - error!("failed: {}", e); + error!(LogLocation::Both, "failed: {}", e); } }); }, @@ -757,7 +758,10 @@ async fn handle_request( .await?; } _ => { - error!("The record type message could not be processed."); + error!( + LogLocation::Both, + "The record type message could not be processed." + ); } }; Ok(()) @@ -978,7 +982,7 @@ async fn handle_data( size += raw_event.len(); } if start.elapsed().as_secs() > 3600 { - info!( + info!(LogLocation::Both, "Ingest: source = {source} type = {raw_event_kind:?} count = {count} size = {size}, duration = {}", start.elapsed().as_secs() ); @@ -1045,7 +1049,7 @@ async fn check_sources_conn( for source_key in keys { let timestamp = Utc::now(); if source_store.insert(&source_key, timestamp).is_err(){ - error!("Failed to append source store"); + error!(LogLocation::Both, "Failed to append source store"); } sources.insert(source_key, timestamp); } @@ -1055,7 +1059,7 @@ async fn check_sources_conn( match conn_state { ConnState::Connected => { if source_store.insert(&source_key, timestamp_val).is_err() { - error!("Failed to append source store"); + error!(LogLocation::Both, "Failed to append source store"); } ingest_sources.write().await.insert(source_key, timestamp_val); if let Some(ref notify) = notify_source { @@ -1064,7 +1068,7 @@ async fn check_sources_conn( } ConnState::Disconnected => { if source_store.insert(&source_key, timestamp_val).is_err() { - error!("Failed to append source store"); + error!(LogLocation::Both, "Failed to append source store"); } if !rep { ingest_sources.write().await.remove(&source_key); diff --git a/src/main.rs b/src/main.rs index f4748e6..51bcc06 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod graphql; mod ingest; mod peer; mod publish; +mod redis; mod server; mod settings; mod storage; @@ -9,12 +10,13 @@ mod web; use crate::{ graphql::NodeName, + redis::fetch_and_store_op_logs, server::{certificate_info, SERVER_REBOOT_DELAY}, storage::migrate_data_dir, }; use anyhow::{anyhow, Context, Result}; use chrono::{DateTime, Utc}; -use giganto_client::init_tracing; +use log_broker::{error, info, init_redis_connection, init_tracing, warn, LogLocation}; use peer::{PeerInfo, Peers}; use quinn::Connection; use rocksdb::DB; @@ -33,7 +35,6 @@ use tokio::{ task, time::{self, sleep}, }; -use tracing::{error, info, warn}; const ONE_DAY: u64 = 60 * 60 * 24; const USAGE: &str = "\ @@ -79,19 +80,27 @@ async fn main() -> Result<()> { let key = to_private_key(&key_pem).context("cannot read private key")?; let _guard = init_tracing(&settings.log_dir, env!("CARGO_PKG_NAME"))?; + + init_redis_connection( + settings.redis_log_address.ip(), + settings.redis_log_address.port(), + settings.redis_log_agent_id, + ) + .await?; + let db_path = settings.data_dir.join("db"); let db_options = crate::storage::DbOptions::new(settings.max_open_files, settings.max_mb_of_level_base); if repair { let start = Instant::now(); let (db_opts, _) = storage::rocksdb_options(&db_options); - info!("repair db start."); + info!(LogLocation::Both, "repair db start."); match DB::repair(&db_opts, db_path) { - Ok(()) => info!("repair ok"), - Err(e) => error!("repair error: {e}"), + Ok(()) => info!(LogLocation::Both, "repair ok"), + Err(e) => error!(LogLocation::Both, "repair error: {e}"), } let dur = start.elapsed(); - info!("{}", to_hms(dur)); + info!(LogLocation::Both, "{}", to_hms(dur)); exit(0); } let database = storage::Database::open(&db_path, &db_options)?; @@ -103,7 +112,7 @@ async fn main() -> Result<()> { } if let Err(e) = migrate_data_dir(&settings.data_dir, &database) { - error!("migration failed: {e}"); + error!(LogLocation::Both, "migration failed: {e}"); return Ok(()); } @@ -201,6 +210,13 @@ async fn main() -> Result<()> { ack_transmission_cnt, )); + task::spawn(fetch_and_store_op_logs( + database.clone(), + settings.redis_log_address, + settings.redis_log_fetch_interval, + notify_shutdown.clone(), + )); + loop { select! { () = notify_config_reload.notified() =>{ @@ -212,14 +228,14 @@ async fn main() -> Result<()> { break; } Err(e) => { - error!("Failed to load the new configuration: {:#}", e); - warn!("Run giganto with the previous config"); + error!(LogLocation::Both, "Failed to load the new configuration: {:#}", e); + warn!(LogLocation::Both, "Run giganto with the previous config"); continue; } } }, () = notify_ctrlc.notified() =>{ - info!("Termination signal: giganto daemon exit"); + info!(LogLocation::Both, "Termination signal: giganto daemon exit"); notify_shutdown.notify_waiters(); sleep(Duration::from_millis(SERVER_REBOOT_DELAY)).await; return Ok(()) diff --git a/src/peer.rs b/src/peer.rs index a1a9274..e7b231a 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -14,6 +14,7 @@ use giganto_client::{ connection::{client_handshake, server_handshake}, frame::{self, recv_bytes, recv_raw, send_bytes}, }; +use log_broker::{error, info, warn, LogLocation}; use num_enum::{IntoPrimitive, TryFromPrimitive}; use quinn::{ ClientConfig, Connection, ConnectionError, Endpoint, RecvStream, SendStream, ServerConfig, @@ -36,7 +37,6 @@ use tokio::{ time::sleep, }; use toml_edit::Document; -use tracing::{error, info, warn}; const PEER_VERSION_REQ: &str = ">=0.16.0-alpha.1,<0.17.0"; const PEER_RETRY_INTERVAL: u64 = 5; @@ -73,6 +73,7 @@ impl TomlPeers for PeerIdentity { fn get_host_name(&self) -> String { self.host_name.clone() } + fn get_address(&self) -> String { self.address.to_string() } @@ -140,6 +141,7 @@ impl Peer { let server_endpoint = Endpoint::server(self.server_config, self.local_address).expect("endpoint"); info!( + LogLocation::Both, "listening on {}", server_endpoint .local_addr() @@ -192,7 +194,7 @@ impl Peer { ) .await { - error!("connection failed: {}", e); + error!(LogLocation::Both, "connection failed: {}", e); } }); }, @@ -208,7 +210,7 @@ impl Peer { () = notify_shutdown.notified() => { sleep(Duration::from_millis(SERVER_ENDPOINT_DELAY)).await; // Wait time for connection to be ready for shutdown. server_endpoint.close(0_u32.into(), &[]); - info!("Shutting down peer"); + info!(LogLocation::Both, "Shutting down peer"); return Ok(()) } @@ -283,7 +285,10 @@ async fn client_connection( .await { Ok((addr, name)) => { - info!("Connection established to {}/{} (client role)", addr, name); + info!( + LogLocation::Both, + "Connection established to {}/{} (client role)", addr, name + ); (addr, name) } Err(_) => { @@ -366,7 +371,7 @@ async fn client_connection( peer_conn_info.peer_conns.write().await.remove(&remote_host_name); peer_conn_info.peers.write().await.remove(&remote_addr); if let quinn::ConnectionError::ApplicationClosed(_) = e { - info!("giganto peer({}/{}) closed",remote_host_name, remote_addr); + info!(LogLocation::Both, "giganto peer({}/{}) closed",remote_host_name, remote_addr); return Ok(()); } continue 'connection; @@ -382,7 +387,7 @@ async fn client_connection( let path= peer_conn_info.config_path.clone(); tokio::spawn(async move { if let Err(e) = handle_request(stream, peer_conn_info.local_address, remote_addr, peer_list, peers, sender, doc, path).await { - error!("failed: {}", e); + error!(LogLocation::Both, "failed: {}", e); } }); }, @@ -417,8 +422,10 @@ async fn client_connection( | ConnectionError::Reset | ConnectionError::TimedOut => { warn!( + LogLocation::Both, "Retry connection to {} after {} seconds.", - peer_info.address, PEER_RETRY_INTERVAL, + peer_info.address, + PEER_RETRY_INTERVAL, ); sleep(Duration::from_secs(PEER_RETRY_INTERVAL)).await; continue 'connection; @@ -454,7 +461,10 @@ async fn server_connection( match check_for_duplicate_connections(&connection, peer_conn_info.peer_conns.clone()).await { Ok((addr, name)) => { - info!("Connection established to {}/{} (server role)", addr, name); + info!( + LogLocation::Both, + "Connection established to {}/{} (server role)", addr, name + ); (addr, name) } Err(_) => { @@ -531,7 +541,7 @@ async fn server_connection( peer_conn_info.peer_conns.write().await.remove(&remote_host_name); peer_conn_info.peers.write().await.remove(&remote_addr); if let quinn::ConnectionError::ApplicationClosed(_) = e { - info!("giganto peer({}/{}) closed",remote_host_name, remote_addr); + info!(LogLocation::Both, "giganto peer({}/{}) closed",remote_host_name, remote_addr); return Ok(()); } return Err(e.into()); @@ -547,7 +557,7 @@ async fn server_connection( let path= peer_conn_info.config_path.clone(); tokio::spawn(async move { if let Err(e) = handle_request(stream, peer_conn_info.local_address, remote_addr, peer_list, peers, sender, doc, path).await { - error!("failed: {}", e); + error!(LogLocation::Both, "failed: {}", e); } }); }, @@ -718,10 +728,10 @@ async fn update_to_new_peer_list( if is_change { let data: Vec = peer_list.read().await.iter().cloned().collect(); if let Err(e) = insert_toml_peers(&mut doc, Some(data)) { - error!("{e:?}"); + error!(LogLocation::Both, "{e:?}"); } if let Err(e) = write_toml_file(&doc, path) { - error!("{e:?}"); + error!(LogLocation::Both, "{e:?}"); } } @@ -800,10 +810,10 @@ pub mod tests { Ok(x) => x, Err(_) => { panic!( - "failed to read (cert, key) file, {}, {} read file error. Cert or key doesn't exist in default test folder", - CERT_PATH, - KEY_PATH, - ); + "failed to read (cert, key) file, {}, {} read file error. Cert or key doesn't exist in default test folder", + CERT_PATH, + KEY_PATH, + ); } }; @@ -824,8 +834,8 @@ pub mod tests { Some(x) => rustls::PrivateKey(x), None => { panic!( - "no private keys found. Private key doesn't exist in default test folder" - ); + "no private keys found. Private key doesn't exist in default test folder" + ); } } } diff --git a/src/publish.rs b/src/publish.rs index dd143a5..c0e1589 100644 --- a/src/publish.rs +++ b/src/publish.rs @@ -26,6 +26,7 @@ use giganto_client::{ }, RawEventKind, }; +use log_broker::{debug, error, info, warn, LogLocation}; use quinn::{Connection, Endpoint, RecvStream, SendStream, ServerConfig}; use rustls::{Certificate, PrivateKey}; use serde::{de::DeserializeOwned, Serialize}; @@ -36,7 +37,6 @@ use tokio::{ sync::{mpsc::unbounded_channel, Notify}, time::sleep, }; -use tracing::{debug, error, info, warn}; const PUBLISH_VERSION_REQ: &str = ">=0.15.0,<0.17.0"; @@ -69,6 +69,7 @@ impl Server { ) { let endpoint = Endpoint::server(self.server_config, self.server_address).expect("endpoint"); info!( + LogLocation::Both, "listening on {}", endpoint.local_addr().expect("for local addr display") ); @@ -90,14 +91,14 @@ impl Server { ) .await { - error!("connection failed: {}", e); + error!(LogLocation::Both, "connection failed: {}", e); } }); }, () = notify_shutdown.notified() => { sleep(Duration::from_millis(SERVER_ENDPOINT_DELAY)).await; // Wait time for channels,connection to be ready for shutdown. endpoint.close(0_u32.into(), &[]); - info!("Shutting down publish"); + info!(LogLocation::Both, "Shutting down publish"); break; }, } @@ -116,11 +117,11 @@ async fn handle_connection( let (send, recv) = match server_handshake(&connection, PUBLISH_VERSION_REQ).await { Ok((send, recv)) => { - info!("Compatible version"); + info!(LogLocation::Both, "Compatible version"); (send, recv) } Err(e) => { - info!("Incompatible version"); + info!(LogLocation::Both, "Incompatible version"); connection.close(quinn::VarInt::from_u32(0), e.to_string().as_bytes()); bail!("{e}") } @@ -153,7 +154,7 @@ async fn handle_connection( let pcap_sources = pcap_sources.clone(); tokio::spawn(async move { if let Err(e) = handle_request(stream, db, pcap_sources).await { - error!("failed: {}", e); + error!(LogLocation::Both, "failed: {}", e); } }); }, @@ -203,11 +204,14 @@ async fn request_stream( ) .await { - error!("{}", e); + error!(LogLocation::Both, "{}", e); } } Err(_) => { - error!("Failed to deserialize hog message"); + error!( + LogLocation::Both, + "Failed to deserialize hog message" + ); } } } @@ -226,11 +230,14 @@ async fn request_stream( ) .await { - error!("{}", e); + error!(LogLocation::Both, "{}", e); } } Err(_) => { - error!("Failed to deserialize crusher message"); + error!( + LogLocation::Both, + "Failed to deserialize crusher message" + ); } } } @@ -239,7 +246,7 @@ async fn request_stream( } } Err(e) => { - error!("{}", e); + error!(LogLocation::Both, "{}", e); break; } } @@ -274,10 +281,13 @@ async fn process_pcap_extract( // send/receive extract request from piglet match pcap_extract_request(source_conn, &filter).await { Ok(()) => (), - Err(e) => debug!("failed to relay pcap request, {e}"), + Err(e) => debug!(LogLocation::Local, "failed to relay pcap request, {e}"), } } else { - error!("Failed to get {}'s connection", filter.source); + error!( + LogLocation::Both, + "Failed to get {}'s connection", filter.source + ); } } }); @@ -313,10 +323,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open conn store"); + error!(LogLocation::Both, "Failed to open conn store"); } } RequestStreamRecord::Dns => { @@ -333,10 +343,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open dns store"); + error!(LogLocation::Both, "Failed to open dns store"); } } RequestStreamRecord::Rdp => { @@ -353,10 +363,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open rdp store"); + error!(LogLocation::Both, "Failed to open rdp store"); } } RequestStreamRecord::Http => { @@ -373,10 +383,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open http store"); + error!(LogLocation::Both, "Failed to open http store"); } } RequestStreamRecord::Log => { @@ -393,10 +403,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open log store"); + error!(LogLocation::Both, "Failed to open log store"); } } RequestStreamRecord::Smtp => { @@ -413,10 +423,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open smtp store"); + error!(LogLocation::Both, "Failed to open smtp store"); } } RequestStreamRecord::Ntlm => { @@ -433,10 +443,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open ntlm store"); + error!(LogLocation::Both, "Failed to open ntlm store"); } } RequestStreamRecord::Kerberos => { @@ -453,10 +463,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open kerberos store"); + error!(LogLocation::Both, "Failed to open kerberos store"); } } RequestStreamRecord::Ssh => { @@ -473,10 +483,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open ssh store"); + error!(LogLocation::Both, "Failed to open ssh store"); } } RequestStreamRecord::DceRpc => { @@ -493,10 +503,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open dce rpc store"); + error!(LogLocation::Both, "Failed to open dce rpc store"); } } RequestStreamRecord::Ftp => { @@ -513,10 +523,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open ftp store"); + error!(LogLocation::Both, "Failed to open ftp store"); } } RequestStreamRecord::Mqtt => { @@ -533,10 +543,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open mqtt store"); + error!(LogLocation::Both, "Failed to open mqtt store"); } } RequestStreamRecord::Ldap => { @@ -553,10 +563,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open ldap store"); + error!(LogLocation::Both, "Failed to open ldap store"); } } RequestStreamRecord::Tls => { @@ -573,10 +583,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open tls store"); + error!(LogLocation::Both, "Failed to open tls store"); } } RequestStreamRecord::Smb => { @@ -593,10 +603,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open smb store"); + error!(LogLocation::Both, "Failed to open smb store"); } } RequestStreamRecord::Nfs => { @@ -613,10 +623,10 @@ where ) .await { - error!("Failed to send network stream : {}", e); + error!(LogLocation::Both, "Failed to send network stream : {}", e); } } else { - error!("Failed to open nfs store"); + error!(LogLocation::Both, "Failed to open nfs store"); } } RequestStreamRecord::FileCreate => { @@ -633,10 +643,10 @@ where ) .await { - error!("Failed to send sysmon stream : {}", e); + error!(LogLocation::Both, "Failed to send sysmon stream : {}", e); } } else { - error!("Failed to open file_create store"); + error!(LogLocation::Both, "Failed to open file_create store"); } } RequestStreamRecord::FileDelete => { @@ -653,10 +663,10 @@ where ) .await { - error!("Failed to send sysmon stream : {}", e); + error!(LogLocation::Both, "Failed to send sysmon stream : {}", e); } } else { - error!("Failed to open file_delete store"); + error!(LogLocation::Both, "Failed to open file_delete store"); } } RequestStreamRecord::Pcap => {} @@ -727,7 +737,10 @@ where send_hog_stream_start_message(&mut sender, record_type) .await .map_err(|e| anyhow!("Failed to write hog start message: {}", e))?; - info!("start hog's publish stream : {:?}", record_type); + info!( + LogLocation::Both, + "start hog's publish stream : {:?}", record_type + ); } NodeType::Crusher => { // crusher's policy Id always exists. @@ -735,7 +748,10 @@ where send_crusher_stream_start_message(&mut sender, id) .await .map_err(|e| anyhow!("Failed to write crusher start message: {}", e))?; - info!("start crusher's publish stream : {:?}", record_type); + info!( + LogLocation::Both, + "start crusher's publish stream : {:?}", record_type + ); let key_builder = StorageKey::builder() .start_key(&msg.source()?) @@ -1142,7 +1158,7 @@ async fn handle_request( } _ => { // do nothing - warn!("Not expected to reach here"); + warn!(LogLocation::Both, "Not expected to reach here"); } } } @@ -1260,7 +1276,7 @@ async fn handle_request( } _ => { // do nothing - warn!("Not expected to reach here"); + warn!(LogLocation::Both, "Not expected to reach here"); } } } diff --git a/src/redis.rs b/src/redis.rs new file mode 100644 index 0000000..cce5390 --- /dev/null +++ b/src/redis.rs @@ -0,0 +1,141 @@ +use crate::storage::{Database, StorageKey}; +use anyhow::{anyhow, Result}; +use chrono::{DateTime, Utc}; +use giganto_client::ingest::log::{OpLog, OpLogLevel}; +use redis::AsyncCommands; +use std::{net::SocketAddr, num::NonZeroUsize, str::FromStr, sync::Arc, time::Duration}; +use tokio::{select, sync::Notify, time}; + +const REDIS_LOG_FETCH_BULK_COUNT: Option = NonZeroUsize::new(100); + +/// Fetch and store operation log from redis every `fetch_interval`. +pub async fn fetch_and_store_op_logs( + db: Database, + server_addr: SocketAddr, + fetch_interval: Duration, + notify_shutdown: Arc, +) -> Result<()> { + let mut itv = time::interval(fetch_interval); + let store = db.op_log_store()?; + + loop { + select! { + _ = itv.tick() => { + let logs = fetch_from_redis(server_addr).await?; + for (agent, log) in logs { + let log: Vec<&str> = log.splitn(3, '\t').collect(); + let Ok(log_level) = filter_by_log_level(log[1]) else { + continue; + }; + let timestamp = DateTime::::from_str(log[0])? + .timestamp_nanos_opt() + .unwrap_or_default(); + + let oplog = OpLog { + agent_name: agent.split('@').collect::>()[0].to_string(), + log_level, + contents: log[2].to_string(), + }; + let storage_key = StorageKey::builder() + .start_key(&agent) + .end_key(timestamp) + .build(); + + store.append(&storage_key.key(), &bincode::serialize(&oplog)?)?; + } + } + () = notify_shutdown.notified() => { + return Ok(()); + }, + } + } +} + +async fn fetch_from_redis(server_addr: SocketAddr) -> Result> { + let client = redis::Client::open(format!("redis://{server_addr}"))?; + let mut con = client.get_async_connection().await?; + + // get all keys from redis. + let keys: Vec = con.keys("*").await?; + + let mut data = Vec::new(); + for key in keys { + loop { + let values: Vec = con.lpop(&key, REDIS_LOG_FETCH_BULK_COUNT).await?; + if values.is_empty() { + break; + } + for value in values { + data.push((key.clone(), value)); + } + } + } + + Ok(data) +} + +fn filter_by_log_level(level: &str) -> Result { + match level { + "INFO" => Ok(OpLogLevel::Info), + "WARN" => Ok(OpLogLevel::Warn), + "ERROR" => Ok(OpLogLevel::Error), + _ => Err(anyhow!("invalid log level")), + } +} + +#[cfg(test)] +mod tests { + use crate::{ + redis::fetch_and_store_op_logs, + storage::{Database, DbOptions}, + }; + use giganto_client::ingest::log::OpLog; + use log_broker::{info, init_redis_connection, LogLocation}; + use std::{net::SocketAddr, sync::Arc, time::Duration}; + use tokio::{sync::Notify, time::sleep}; + + #[tokio::test] + #[ignore = "it requires connection to redis"] + async fn test_fetch_and_store_op_logs() { + const TEST_ID: &str = "test@localhost"; + const SERVER_ADDR: &str = "127.0.0.1:6379"; + + // Open temporary database + let db_dir = tempfile::tempdir().unwrap(); + let db = Database::open(db_dir.path(), &DbOptions::default()).unwrap(); + + let server_addr: SocketAddr = SERVER_ADDR.parse().unwrap(); + + init_redis_connection(server_addr.ip(), server_addr.port(), TEST_ID.to_string()) + .await + .unwrap(); + + let fetch_interval = Duration::from_secs(1); + let notify_shutdown = Arc::new(Notify::new()); + let notify_shutdown_2 = notify_shutdown.clone(); + + // Notify shutdown after 5 seconds. + tokio::spawn(async move { + sleep(Duration::from_secs(5)).await; + notify_shutdown_2.notify_one(); + }); + + info!(LogLocation::Both, "Test log"); + + let res = + fetch_and_store_op_logs(db.clone(), server_addr, fetch_interval, notify_shutdown).await; + + assert!(res.is_ok()); + + let store = db.op_log_store().unwrap(); + + if let Some(Ok((_, value))) = store.iter_forward().next() { + let value = bincode::deserialize::(&value).unwrap(); + + assert_eq!(value.agent_name, "test".to_string()); + assert_eq!(value.contents, "Test log".to_string()); + } else { + panic!("there must be an item in op_log_store"); + }; + } +} diff --git a/src/server.rs b/src/server.rs index 079ecfb..0f89489 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,8 +1,8 @@ use anyhow::{bail, Context, Result}; +use log_broker::{info, LogLocation}; use quinn::{ClientConfig, Connection, ServerConfig, TransportConfig}; use rustls::{Certificate, PrivateKey}; use std::{sync::Arc, time::Duration}; -use tracing::info; use x509_parser::nom::Parser; pub const SERVER_REBOOT_DELAY: u64 = 3000; @@ -74,7 +74,7 @@ pub fn certificate_info(cert_info: &[Certificate]) -> Result<(String, String)> { .and_then(|cn| cn.as_str().ok()) .context("the subject of the certificate is not valid")?; if subject.contains('@') { - info!("Connected client name : {}", subject); + info!(LogLocation::Both, "Connected client name : {}", subject); let parsed = subject.split('@').collect::>(); Ok((String::from(parsed[0]), String::from(parsed[1]))) diff --git a/src/settings.rs b/src/settings.rs index 22b3641..7943978 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -8,7 +8,12 @@ const DEFAULT_INGEST_ADDRESS: &str = "[::]:38370"; const DEFAULT_PUBLISH_ADDRESS: &str = "[::]:38371"; const DEFAULT_GRAPHQL_ADDRESS: &str = "[::]:8443"; const DEFAULT_INVALID_PEER_ADDRESS: &str = "254.254.254.254:38383"; +const DEFAULT_REDIS_ADDRESS: &str = "127.0.0.1:6379"; const DEFAULT_ACK_TRANSMISSION: u16 = 1024; +const DEFAULT_RETENTION: &str = "100d"; +const DEFAULT_REDIS_FETCH_INTERVAL: &str = "10m"; +const DEFAULT_MAX_OPEN_FILES: i32 = 8000; +const DEFAULT_MAX_MB_OF_LEVEL_BASE: u64 = 512; /// The application settings. #[derive(Clone, Debug, Deserialize)] @@ -32,16 +37,23 @@ pub struct Settings { pub max_open_files: i32, pub max_mb_of_level_base: u64, - //config file path + // config file path pub cfg_path: String, - //peers + // peers #[serde(deserialize_with = "deserialize_peer_addr")] pub peer_address: Option, // IP address & port for peer connection pub peers: Option>, - //ack transmission interval + // ack transmission interval pub ack_transmission: u16, + + // redis log + #[serde(deserialize_with = "deserialize_socket_addr")] + pub redis_log_address: SocketAddr, // IP address & port to redis + pub redis_log_agent_id: String, // redis client ID to logging + #[serde(with = "humantime_serde")] + pub redis_log_fetch_interval: Duration, // redis fetch interval } impl Settings { @@ -107,17 +119,21 @@ fn default_config_builder() -> ConfigBuilder { .expect("valid address") .set_default("graphql_address", DEFAULT_GRAPHQL_ADDRESS) .expect("local address") + .set_default("redis_log_address", DEFAULT_REDIS_ADDRESS) + .expect("local address") + .set_default("redis_log_fetch_interval", DEFAULT_REDIS_FETCH_INTERVAL) + .expect("retention") .set_default("data_dir", db_path) .expect("data dir") - .set_default("retention", "100d") + .set_default("retention", DEFAULT_RETENTION) .expect("retention") .set_default("log_path", log_path) .expect("log dir") .set_default("export_path", export_path) .expect("export_dir") - .set_default("max_open_files", 8000) + .set_default("max_open_files", DEFAULT_MAX_OPEN_FILES) .expect("default max open files") - .set_default("max_mb_of_level_base", 512) + .set_default("max_mb_of_level_base", DEFAULT_MAX_MB_OF_LEVEL_BASE) .expect("default max mb of level base") .set_default("cfg_path", config_path.to_str().expect("path to string")) .expect("default config dir") diff --git a/src/storage.rs b/src/storage.rs index c4c3fe1..ac9848a 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -23,6 +23,7 @@ use giganto_client::ingest::{ timeseries::PeriodicTimeSeries, Packet, }; +use log_broker::{debug, error, info, warn, LogLocation}; pub use migration::migrate_data_dir; #[cfg(debug_assertions)] use rocksdb::properties; @@ -33,7 +34,6 @@ use rocksdb::{ use serde::de::DeserializeOwned; use std::{marker::PhantomData, ops::Deref, path::Path, sync::Arc, time::Duration}; use tokio::{select, sync::Notify, time}; -use tracing::{debug, error, info, warn}; const RAW_DATA_COLUMN_FAMILY_NAMES: [&str; 37] = [ "conn", @@ -836,7 +836,7 @@ pub async fn retain_periodically( let mut usage_flag = false; if check_db_usage().await.0 { - info!("Disk usage is over {USAGE_THRESHOLD}%."); + info!(LogLocation::Both, "Disk usage is over {USAGE_THRESHOLD}%."); retention_timestamp += ONE_DAY_TIMESTAMP_NANOS; usage_flag = true; } @@ -867,7 +867,7 @@ pub async fn retain_periodically( store.db.compact_range_cf(store.cf, Some(&from), Some(&to)); } } else { - error!("Failed to delete file in range"); + error!(LogLocation::Both, "Failed to delete file in range"); } } @@ -883,7 +883,7 @@ pub async fn retain_periodically( if retention_timestamp > data_timestamp { if store.delete(&key).is_err() { - error!("Failed to delete data"); + error!(LogLocation::Both, "Failed to delete data"); } } else { break; @@ -895,11 +895,11 @@ pub async fn retain_periodically( if check_db_usage().await.1 && usage_flag { retention_timestamp += ONE_DAY_TIMESTAMP_NANOS; if retention_timestamp > now.timestamp_nanos_opt().unwrap_or(0) { - warn!("cannot delete data to usage under {USAGE_LOW}"); + warn!(LogLocation::Both, "cannot delete data to usage under {USAGE_LOW}"); break; } } else if usage_flag { - info!("Disk usage is under {USAGE_LOW}%"); + info!(LogLocation::Both, "Disk usage is under {USAGE_LOW}%"); break; } else { break; @@ -917,7 +917,7 @@ pub async fn retain_periodically( async fn check_db_usage() -> (bool, bool) { let resource_usage = roxy::resource_usage().await; let usage = (resource_usage.used_disk_space * 100) / resource_usage.total_disk_space; - debug!("Disk usage: {usage}%"); + debug!(LogLocation::Local, "Disk usage: {usage}%"); (usage > USAGE_THRESHOLD, usage > USAGE_LOW) } diff --git a/src/storage/migration.rs b/src/storage/migration.rs index fcb01aa..202d243 100644 --- a/src/storage/migration.rs +++ b/src/storage/migration.rs @@ -1,6 +1,7 @@ //! Routines to check the database format version and migrate it if necessary. use super::Database; use anyhow::{anyhow, Context, Result}; +use log_broker::{info, LogLocation}; use semver::{Version, VersionReq}; use serde::{Deserialize, Serialize}; use std::{ @@ -9,7 +10,6 @@ use std::{ net::IpAddr, path::Path, }; -use tracing::info; const COMPATIBLE_VERSION_REQ: &str = ">0.13.0-alpha,<0.16.0-alpha.2"; @@ -43,7 +43,7 @@ pub fn migrate_data_dir(data_dir: &Path, db: &Database) -> Result<()> { .iter() .find(|(req, _to, _m)| req.matches(&version)) { - info!("Migrating database to {to}"); + info!(LogLocation::Both, "Migrating database to {to}"); m(db)?; version = to.clone(); if compatible.matches(&version) { diff --git a/src/web.rs b/src/web.rs index 8a04d6e..14b929d 100644 --- a/src/web.rs +++ b/src/web.rs @@ -1,8 +1,8 @@ use crate::graphql::Schema; use async_graphql::http::{playground_source, GraphQLPlaygroundConfig}; +use log_broker::{info, LogLocation}; use std::{convert::Infallible, net::SocketAddr, sync::Arc}; use tokio::{sync::Notify, task}; -use tracing::info; use warp::{http::Response as HttpResponse, Filter}; /// Runs the GraphQL server. @@ -42,6 +42,6 @@ pub async fn serve( .bind_with_graceful_shutdown(addr, async move { notify_shutdown.notified().await }); // start Graphql Server - info!("listening on https://{addr:?}"); + info!(LogLocation::Both, "listening on https://{addr:?}"); task::spawn(server); } diff --git a/tests/config.toml b/tests/config.toml index 5b4c11e..c157384 100644 --- a/tests/config.toml +++ b/tests/config.toml @@ -4,6 +4,9 @@ roots = ["ca1.pem", "ca2.pem", "ca3.pem"] ingest_address = "0.0.0.0:38370" publish_address = "0.0.0.0:38371" graphql_address = "127.0.0.1:8443" +redis_log_address = "127.0.0.1:6379" +redis_log_agent_id = "giganto@localhost" +redis_log_fetch_interval = "10m" data_dir = "tests/data" retention = "100d" log_dir = "/data/logs/apps"