From d7c8453382510b6302af0043d5be375d9f880c00 Mon Sep 17 00:00:00 2001 From: Bly Kim Date: Fri, 9 Aug 2024 11:54:27 +0900 Subject: [PATCH] Update `giganto_config` to respond full configuration --- CHANGELOG.md | 1 + README.md | 6 +-- src/graphql/status.rs | 93 +++++++++---------------------------------- src/main.rs | 30 +++++++------- src/settings.rs | 47 +++++++++++++--------- 5 files changed, 65 insertions(+), 112 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f4e40bd..8ae9fcca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ Versioning](https://semver.org/spec/v2.0.0.html). - Changed to support command line interface. - Removed `cert`, `key`, `root` fields from config file. - Changed `set_giganto_config` to receive toml-string with full configuration. +- Updated `giganto_config` to respond full configuration. ## [0.20.0] - 2024-05-17 diff --git a/README.md b/README.md index 2b7e0551..4207262c 100644 --- a/README.md +++ b/README.md @@ -24,13 +24,13 @@ processing and real-time analytics. You can run giganto by invoking the following command: ```sh -giganto --cert --key --ca +giganto --cert --key --root ``` If you want to run giganto with local configuration file, ```sh -giganto -c --cert --key --ca +giganto -c --cert --key --root ``` In the config file, you can specify the following options: @@ -77,7 +77,7 @@ certificate/key from the tests folder.) ```sh cargo run -- -c tests/node1/config.toml --cert tests/node1/cert.pem \ ---key tests/node1/key.pem --root tests/root.pem hostname@address +--key tests/node1/key.pem --root tests/root.pem ``` ## License diff --git a/src/graphql/status.rs b/src/graphql/status.rs index 8426a2c6..1cf60ab8 100644 --- a/src/graphql/status.rs +++ b/src/graphql/status.rs @@ -6,6 +6,7 @@ use std::{ use anyhow::{anyhow, Context as ct}; use async_graphql::{Context, InputObject, Object, Result, SimpleObject}; +use serde::Deserialize; use toml_edit::{value, DocumentMut, InlineTable}; use super::{PowerOffNotify, RebootNotify, ReloadNotify, TerminateNotify}; @@ -14,14 +15,8 @@ use crate::storage::Database; use crate::AckTransmissionCount; const GRAPHQL_REBOOT_DELAY: u64 = 100; -const CONFIG_INGEST_SRV_ADDR: &str = "ingest_srv_addr"; pub const CONFIG_PUBLISH_SRV_ADDR: &str = "publish_srv_addr"; pub const CONFIG_GRAPHQL_SRV_ADDR: &str = "graphql_srv_addr"; -const CONFIG_RETENTION: &str = "retention"; -const CONFIG_MAX_OPEN_FILES: &str = "max_open_files"; -const CONFIG_MAX_MB_OF_LEVEL_BASE: &str = "max_mb_of_level_base"; -const CONFIG_ADDR_TO_PEERS: &str = "addr_to_peers"; -const CONFIG_PEER_LIST: &str = "peers"; const CONFIG_ACK_TRANSMISSION: &str = "ack_transmission"; pub const TEMP_TOML_POST_FIX: &str = ".temp.toml"; @@ -52,7 +47,7 @@ struct Properties { stats: String, } -#[derive(InputObject, SimpleObject, Debug)] +#[derive(InputObject, SimpleObject, Debug, Deserialize)] #[graphql(input_name = "InputPeerList")] pub struct PeerList { pub addr: String, @@ -68,17 +63,25 @@ impl TomlPeers for PeerList { } } -#[derive(SimpleObject, Debug)] +#[derive(SimpleObject, Debug, Deserialize)] struct GigantoConfig { ingest_srv_addr: String, publish_srv_addr: String, graphql_srv_addr: String, retention: String, + data_dir: String, + log_dir: String, + export_dir: String, + max_open_files: i32, max_mb_of_level_base: u64, - addr_to_peers: String, - peer_list: Vec, - ack_transmission_cnt: u16, + num_of_thread: i32, + max_sub_compactions: u32, + + addr_to_peers: Option, + peers: Option>, + + ack_transmission: u16, } #[derive(Default)] @@ -125,55 +128,11 @@ impl GigantoStatusQuery { #[allow(clippy::unused_async)] async fn giganto_config<'ctx>(&self, ctx: &Context<'ctx>) -> Result { let cfg_path = ctx.data::()?; - let doc = read_toml_file(cfg_path)?; - let ingest_srv_addr = parse_toml_element_to_string(CONFIG_INGEST_SRV_ADDR, &doc)?; - let publish_srv_addr = parse_toml_element_to_string(CONFIG_PUBLISH_SRV_ADDR, &doc)?; - let graphql_srv_addr = parse_toml_element_to_string(CONFIG_GRAPHQL_SRV_ADDR, &doc)?; - let retention = parse_toml_element_to_string(CONFIG_RETENTION, &doc)?; - let max_open_files = parse_toml_element_to_integer(CONFIG_MAX_OPEN_FILES, &doc)?; - let max_mb_of_level_base = - parse_toml_element_to_integer(CONFIG_MAX_MB_OF_LEVEL_BASE, &doc)?; - let ack_transmission_cnt = parse_toml_element_to_integer(CONFIG_ACK_TRANSMISSION, &doc)?; - let mut peer_list = Vec::new(); - let addr_to_peers = if doc.get(CONFIG_ADDR_TO_PEERS).is_some() { - let peers_value = doc - .get(CONFIG_PEER_LIST) - .context("peers not found")? - .as_array() - .context("invalid peers format")?; - for peer in peers_value { - if let Some(peer_data) = peer.as_inline_table() { - let (Some(addr_val), Some(hostname_val)) = - (peer_data.get("addr"), peer_data.get("hostname")) - else { - return Err(anyhow!("Invalid `addr`, `hostname` Value format").into()); - }; - let (Some(addr), Some(hostname)) = (addr_val.as_str(), hostname_val.as_str()) - else { - return Err(anyhow!("Invalid `addr`, `hostname` String format").into()); - }; - peer_list.push(PeerList { - addr: addr.to_string(), - hostname: hostname.to_string(), - }); - } - } - parse_toml_element_to_string(CONFIG_ADDR_TO_PEERS, &doc)? - } else { - String::new() - }; + let toml = fs::read_to_string(cfg_path).context("toml not found")?; - Ok(GigantoConfig { - ingest_srv_addr, - publish_srv_addr, - graphql_srv_addr, - retention, - max_open_files, - max_mb_of_level_base, - addr_to_peers, - peer_list, - ack_transmission_cnt, - }) + let config: GigantoConfig = toml::from_str(&toml)?; + + Ok(config) } #[allow(clippy::unused_async)] @@ -281,22 +240,6 @@ pub fn parse_toml_element_to_string(key: &str, doc: &DocumentMut) -> Result(key: &str, doc: &DocumentMut) -> Result -where - T: std::convert::TryFrom, -{ - let Some(item) = doc.get(key) else { - return Err(anyhow!("{} not found.", key).into()); - }; - let Some(value) = item.as_integer() else { - return Err(anyhow!("parse failed: {}'s item format is not available.", key).into()); - }; - let Ok(value) = T::try_from(value) else { - return Err(anyhow!("parse failed: {}'s value format is not available.", key).into()); - }; - Ok(value) -} - fn insert_toml_element(key: &str, doc: &mut DocumentMut, input: Option) where T: std::convert::Into, diff --git a/src/main.rs b/src/main.rs index e030ad80..19ed1db3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -82,14 +82,14 @@ async fn main() -> Result<()> { root: root_cert.clone(), }); - let _guard = init_tracing(&settings.log_dir)?; + let _guard = init_tracing(&settings.config.log_dir)?; - let db_path = settings.data_dir.join("db"); + let db_path = settings.config.data_dir.join("db"); let db_options = crate::storage::DbOptions::new( - settings.max_open_files, - settings.max_mb_of_level_base, - settings.num_of_thread, - settings.max_sub_compactions, + settings.config.max_open_files, + settings.config.max_mb_of_level_base, + settings.config.num_of_thread, + settings.config.max_sub_compactions, ); if args.repair { let start = Instant::now(); @@ -109,7 +109,7 @@ async fn main() -> Result<()> { let database = storage::Database::open(&db_path, &db_options)?; - if let Err(e) = migrate_data_dir(&settings.data_dir, &database) { + if let Err(e) = migrate_data_dir(&settings.config.data_dir, &database) { error!("migration failed: {e}"); return Ok(()); } @@ -131,13 +131,13 @@ async fn main() -> Result<()> { let ingest_sources = new_ingest_sources(&database); let runtime_ingest_sources = new_runtime_ingest_sources(); let stream_direct_channels = new_stream_direct_channels(); - let (peers, peer_idents) = new_peers_data(settings.peers.clone()); + let (peers, peer_idents) = new_peers_data(settings.config.peers.clone()); let notify_config_reload = Arc::new(Notify::new()); let notify_shutdown = Arc::new(Notify::new()); let notify_reboot = Arc::new(Notify::new()); let notify_power_off = Arc::new(Notify::new()); let mut notify_source_change = None; - let ack_transmission_cnt = new_ack_transmission_count(settings.ack_transmission); + let ack_transmission_cnt = new_ack_transmission_count(settings.config.ack_transmission); let schema = graphql::schema( NodeName(subject_from_cert(&cert)?.1), @@ -146,7 +146,7 @@ async fn main() -> Result<()> { ingest_sources.clone(), peers.clone(), request_client_pool.clone(), - settings.export_dir.clone(), + settings.config.export_dir.clone(), notify_config_reload.clone(), notify_reboot.clone(), notify_power_off.clone(), @@ -157,7 +157,7 @@ async fn main() -> Result<()> { task::spawn(web::serve( schema, - settings.graphql_srv_addr, + settings.config.graphql_srv_addr, cert_pem.clone(), key_pem.clone(), notify_shutdown.clone(), @@ -175,7 +175,7 @@ async fn main() -> Result<()> { .expect("Cannot create runtime for retain_periodically.") .block_on(storage::retain_periodically( time::Duration::from_secs(ONE_DAY), - settings.retention, + settings.config.retention, db, notify_shutdown_copy, running_flag, @@ -185,7 +185,7 @@ async fn main() -> Result<()> { }); }); - if let Some(addr_to_peers) = settings.addr_to_peers { + if let Some(addr_to_peers) = settings.config.addr_to_peers { let peer_server = peer::Peer::new(addr_to_peers, &certs.clone())?; let notify_source = Arc::new(Notify::new()); task::spawn(peer_server.run( @@ -199,7 +199,7 @@ async fn main() -> Result<()> { notify_source_change = Some(notify_source); } - let publish_server = publish::Server::new(settings.publish_srv_addr, &certs.clone()); + let publish_server = publish::Server::new(settings.config.publish_srv_addr, &certs.clone()); task::spawn(publish_server.run( database.clone(), pcap_sources.clone(), @@ -211,7 +211,7 @@ async fn main() -> Result<()> { notify_shutdown.clone(), )); - let ingest_server = ingest::Server::new(settings.ingest_srv_addr, &certs.clone()); + let ingest_server = ingest::Server::new(settings.config.ingest_srv_addr, &certs.clone()); task::spawn(ingest_server.run( database.clone(), pcap_sources, diff --git a/src/settings.rs b/src/settings.rs index 00753940..f0085536 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -16,24 +16,27 @@ const DEFAULT_RETENTION: &str = "100d"; const DEFAULT_MAX_OPEN_FILES: i32 = 8000; const DEFAULT_MAX_MB_OF_LEVEL_BASE: u64 = 512; const DEFAULT_NUM_OF_THREAD: i32 = 8; -const DEFAULT_MAX_SUBCOMPACTIONS: u32 = 2; +const DEFAULT_MAX_SUB_COMPACTIONS: u32 = 2; #[derive(Parser, Debug)] +#[command(version = env!("CARGO_PKG_VERSION"))] pub struct Args { /// Path to the local configuration TOML file #[arg(short, value_name = "CONFIG_PATH")] pub config: Option, + /// Path to the certificate file #[arg(long, value_name = "CERT_PATH")] pub cert: String, + /// Path to the key file #[arg(long, value_name = "KEY_PATH")] pub key: String, + /// Path to the root CA file #[arg(long, value_name = "ROOT_PATH")] pub root: String, - /// Central management server "hostname@address" - pub central_server: String, + /// Enable the repair mode #[arg(long)] pub repair: bool, @@ -41,7 +44,7 @@ pub struct Args { /// The application settings. #[derive(Clone, Debug, Deserialize)] -pub struct Settings { +pub struct GigantoConfig { #[serde(deserialize_with = "deserialize_socket_addr")] pub ingest_srv_addr: SocketAddr, // IP address & port to ingest data #[serde(deserialize_with = "deserialize_socket_addr")] @@ -51,8 +54,8 @@ pub struct Settings { pub retention: Duration, // Data retention period #[serde(deserialize_with = "deserialize_socket_addr")] pub graphql_srv_addr: SocketAddr, // IP address & port to graphql - pub log_dir: PathBuf, //giganto's syslog path - pub export_dir: PathBuf, //giganto's export file path + pub log_dir: PathBuf, // giganto's syslog path + pub export_dir: PathBuf, // giganto's export file path // db options pub max_open_files: i32, @@ -60,9 +63,6 @@ pub struct Settings { pub num_of_thread: i32, pub max_sub_compactions: u32, - // config file path - pub cfg_path: String, - // peers #[serde(deserialize_with = "deserialize_peer_addr")] pub addr_to_peers: Option, // IP address & port for peer connection @@ -72,6 +72,14 @@ pub struct Settings { pub ack_transmission: u16, } +#[derive(Clone, Debug, Deserialize)] +pub struct Settings { + pub config: GigantoConfig, + + // config file path + pub cfg_path: String, +} + impl Settings { /// Creates a new `Settings` instance, populated from the default /// configuration file if it exists. @@ -88,7 +96,10 @@ impl Settings { )) } } else { - default_config_builder().build()?.try_deserialize() + let config: GigantoConfig = default_config_builder().build()?.try_deserialize()?; + let cfg_path = config_path.to_str().expect("path to string").to_string(); + + Ok(Self { config, cfg_path }) } } @@ -98,15 +109,17 @@ impl Settings { let s = default_config_builder() .add_source(File::with_name(cfg_path)) .build()?; - let mut setting: Settings = s.try_deserialize()?; - setting.cfg_path = cfg_path.to_string(); - Ok(setting) + let config: GigantoConfig = s.try_deserialize()?; + + Ok(Self { + config, + cfg_path: cfg_path.to_string(), + }) } } /// Creates a new `ConfigBuilder` instance with the default configuration. fn default_config_builder() -> ConfigBuilder { - let dirs = directories::ProjectDirs::from("com", "einsis", "giganto").expect("unreachable"); let db_dir = directories::ProjectDirs::from_path(PathBuf::from("db")).expect("unreachable db dir"); let log_dir = directories::ProjectDirs::from_path(PathBuf::from("logs/apps")) @@ -119,8 +132,6 @@ fn default_config_builder() -> ConfigBuilder { .data_dir() .to_str() .expect("unreachable export path"); - let config_dir = dirs.config_dir(); - let config_path = config_dir.join("config.toml"); Config::builder() .set_default("ingest_srv_addr", DEFAULT_INGEST_SRV_ADDR) @@ -143,10 +154,8 @@ fn default_config_builder() -> ConfigBuilder { .expect("default max mb of level base") .set_default("num_of_thread", DEFAULT_NUM_OF_THREAD) .expect("default number of thread") - .set_default("max_sub_compactions", DEFAULT_MAX_SUBCOMPACTIONS) + .set_default("max_sub_compactions", DEFAULT_MAX_SUB_COMPACTIONS) .expect("default max subcompactions") - .set_default("cfg_path", config_path.to_str().expect("path to string")) - .expect("default config dir") .set_default("addr_to_peers", DEFAULT_INVALID_ADDR_TO_PEERS) .expect("default ack transmission") .set_default("ack_transmission", DEFAULT_ACK_TRANSMISSION)