diff --git a/CHANGELOG.md b/CHANGELOG.md index 00f2722..ba0197b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,9 +68,11 @@ Versioning](https://semver.org/spec/v2.0.0.html). integers beyond `i32`. - Changed the `from_key_value` macro to additionally receive `str_num_field` for `StringNumber` conversion. -- Changed to support command line interface. +- Changed command line interface. - Removed `cert`, `key`, `root` fields from config file. -- Changed `set_giganto_config` to receive toml-string with full configuration. + - Added cli options `-c`, `--cert`, `--key` and `--ca-certs`. +- Changed `setGigantoConfig` to receive toml-string with full configuration. +- Updated `gigantoConfig` to respond full configuration. ## [0.20.0] - 2024-05-17 diff --git a/Cargo.lock b/Cargo.lock index acd50f7..ccb198d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1041,6 +1041,7 @@ dependencies = [ "futures-util", "giganto-client", "graphql_client", + "humantime", "humantime-serde", "libc", "mockito", diff --git a/Cargo.toml b/Cargo.toml index 2ca5d90..c234a35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ directories = "5.0" futures-util = "0.3" giganto-client = { git = "https://github.com/aicers/giganto-client.git", rev = "cc83f71" } graphql_client = "0.14" +humantime = "2.1" humantime-serde = "1" libc = "0.2" num_enum = "0.7" diff --git a/README.md b/README.md index 2b7e055..823353e 100644 --- a/README.md +++ b/README.md @@ -24,13 +24,15 @@ processing and real-time analytics. You can run giganto by invoking the following command: ```sh -giganto --cert --key --ca +giganto --cert --key --ca-certs \ +--ca-certs ``` If you want to run giganto with local configuration file, ```sh -giganto -c --cert --key --ca +giganto -c --cert --key --ca-certs \ + --ca-certs ``` In the config file, you can specify the following options: @@ -76,8 +78,8 @@ Run giganto with the prepared configuration file. (Settings to use the certificate/key from the tests folder.) ```sh -cargo run -- -c tests/node1/config.toml --cert tests/node1/cert.pem \ ---key tests/node1/key.pem --root tests/root.pem hostname@address +cargo run -- -c tests/config.toml --cert tests/certs/node1/cert.pem \ +--key tests/certs/node1/key.pem --ca-certs tests/certs/ca_cert.pem ``` ## License diff --git a/src/graphql.rs b/src/graphql.rs index 39af139..7adb02f 100644 --- a/src/graphql.rs +++ b/src/graphql.rs @@ -1543,7 +1543,11 @@ mod tests { } impl TestSchema { - fn setup(ingest_sources: IngestSources, peers: Peers) -> Self { + fn setup( + ingest_sources: IngestSources, + peers: Peers, + config_file_path: Option, + ) -> Self { let db_dir = tempfile::tempdir().unwrap(); let db = Database::open(db_dir.path(), &DbOptions::default()).unwrap(); let pcap_sources = new_pcap_sources(); @@ -1565,7 +1569,7 @@ mod tests { notify_reboot, notify_power_off, notify_terminate, - "file_path".to_string(), + config_file_path.unwrap_or("file_path".to_string()), Arc::new(RwLock::new(1024)), ); @@ -1585,7 +1589,7 @@ mod tests { )); let peers = Arc::new(tokio::sync::RwLock::new(HashMap::new())); - Self::setup(ingest_sources, peers) + Self::setup(ingest_sources, peers, None) } pub fn new_with_graphql_peer(port: u16) -> Self { @@ -1608,7 +1612,19 @@ mod tests { }, )]))); - Self::setup(ingest_sources, peers) + Self::setup(ingest_sources, peers, None) + } + + pub fn new_with_config_file_path(path: String) -> Self { + let ingest_sources = Arc::new(tokio::sync::RwLock::new( + CURRENT_GIGANTO_INGEST_SOURCES + .into_iter() + .map(|source| source.to_string()) + .collect::>(), + )); + + let peers = Arc::new(tokio::sync::RwLock::new(HashMap::new())); + Self::setup(ingest_sources, peers, Some(path)) } pub async fn execute(&self, query: &str) -> async_graphql::Response { diff --git a/src/graphql/status.rs b/src/graphql/status.rs index 8426a2c..5399c7b 100644 --- a/src/graphql/status.rs +++ b/src/graphql/status.rs @@ -7,21 +7,18 @@ use std::{ use anyhow::{anyhow, Context as ct}; use async_graphql::{Context, InputObject, Object, Result, SimpleObject}; use toml_edit::{value, DocumentMut, InlineTable}; +use tracing::info; use super::{PowerOffNotify, RebootNotify, ReloadNotify, TerminateNotify}; +use crate::peer::PeerIdentity; +use crate::settings::GigantoConfig; #[cfg(debug_assertions)] use crate::storage::Database; use crate::AckTransmissionCount; const GRAPHQL_REBOOT_DELAY: u64 = 100; -const CONFIG_INGEST_SRV_ADDR: &str = "ingest_srv_addr"; pub const CONFIG_PUBLISH_SRV_ADDR: &str = "publish_srv_addr"; pub const CONFIG_GRAPHQL_SRV_ADDR: &str = "graphql_srv_addr"; -const CONFIG_RETENTION: &str = "retention"; -const CONFIG_MAX_OPEN_FILES: &str = "max_open_files"; -const CONFIG_MAX_MB_OF_LEVEL_BASE: &str = "max_mb_of_level_base"; -const CONFIG_ADDR_TO_PEERS: &str = "addr_to_peers"; -const CONFIG_PEER_LIST: &str = "peers"; const CONFIG_ACK_TRANSMISSION: &str = "ack_transmission"; pub const TEMP_TOML_POST_FIX: &str = ".temp.toml"; @@ -52,33 +49,74 @@ struct Properties { stats: String, } -#[derive(InputObject, SimpleObject, Debug)] -#[graphql(input_name = "InputPeerList")] -pub struct PeerList { - pub addr: String, - pub hostname: String, -} +#[Object] +impl GigantoConfig { + async fn ingest_srv_addr(&self) -> String { + self.ingest_srv_addr.to_string() + } -impl TomlPeers for PeerList { - fn get_hostname(&self) -> String { - self.hostname.clone() + async fn publish_srv_addr(&self) -> String { + self.publish_srv_addr.to_string() + } + + async fn graphql_srv_addr(&self) -> String { + self.graphql_srv_addr.to_string() + } + + async fn retention(&self) -> String { + humantime::format_duration(self.retention).to_string() + } + + async fn data_dir(&self) -> String { + self.data_dir.to_string_lossy().to_string() } - fn get_addr(&self) -> String { - self.addr.clone() + + async fn log_dir(&self) -> String { + self.log_dir.to_string_lossy().to_string() + } + + async fn export_dir(&self) -> String { + self.export_dir.to_string_lossy().to_string() + } + + async fn max_open_files(&self) -> i32 { + self.max_open_files + } + + async fn max_mb_of_level_base(&self) -> u64 { + self.max_mb_of_level_base + } + + async fn num_of_thread(&self) -> i32 { + self.num_of_thread + } + + async fn max_sub_compactions(&self) -> u32 { + self.max_sub_compactions + } + + async fn addr_to_peers(&self) -> Option { + self.addr_to_peers.map(|addr| addr.to_string()) + } + + async fn peers(&self) -> Option> { + self.peers.clone().map(|peers| peers.into_iter().collect()) + } + + async fn ack_transmission(&self) -> u16 { + self.ack_transmission } } -#[derive(SimpleObject, Debug)] -struct GigantoConfig { - ingest_srv_addr: String, - publish_srv_addr: String, - graphql_srv_addr: String, - retention: String, - max_open_files: i32, - max_mb_of_level_base: u64, - addr_to_peers: String, - peer_list: Vec, - ack_transmission_cnt: u16, +#[Object] +impl PeerIdentity { + async fn addr(&self) -> String { + self.addr.to_string() + } + + async fn hostname(&self) -> String { + self.hostname.clone() + } } #[derive(Default)] @@ -125,55 +163,11 @@ impl GigantoStatusQuery { #[allow(clippy::unused_async)] async fn giganto_config<'ctx>(&self, ctx: &Context<'ctx>) -> Result { let cfg_path = ctx.data::()?; - let doc = read_toml_file(cfg_path)?; - let ingest_srv_addr = parse_toml_element_to_string(CONFIG_INGEST_SRV_ADDR, &doc)?; - let publish_srv_addr = parse_toml_element_to_string(CONFIG_PUBLISH_SRV_ADDR, &doc)?; - let graphql_srv_addr = parse_toml_element_to_string(CONFIG_GRAPHQL_SRV_ADDR, &doc)?; - let retention = parse_toml_element_to_string(CONFIG_RETENTION, &doc)?; - let max_open_files = parse_toml_element_to_integer(CONFIG_MAX_OPEN_FILES, &doc)?; - let max_mb_of_level_base = - parse_toml_element_to_integer(CONFIG_MAX_MB_OF_LEVEL_BASE, &doc)?; - let ack_transmission_cnt = parse_toml_element_to_integer(CONFIG_ACK_TRANSMISSION, &doc)?; - let mut peer_list = Vec::new(); - let addr_to_peers = if doc.get(CONFIG_ADDR_TO_PEERS).is_some() { - let peers_value = doc - .get(CONFIG_PEER_LIST) - .context("peers not found")? - .as_array() - .context("invalid peers format")?; - for peer in peers_value { - if let Some(peer_data) = peer.as_inline_table() { - let (Some(addr_val), Some(hostname_val)) = - (peer_data.get("addr"), peer_data.get("hostname")) - else { - return Err(anyhow!("Invalid `addr`, `hostname` Value format").into()); - }; - let (Some(addr), Some(hostname)) = (addr_val.as_str(), hostname_val.as_str()) - else { - return Err(anyhow!("Invalid `addr`, `hostname` String format").into()); - }; - peer_list.push(PeerList { - addr: addr.to_string(), - hostname: hostname.to_string(), - }); - } - } - parse_toml_element_to_string(CONFIG_ADDR_TO_PEERS, &doc)? - } else { - String::new() - }; + let toml = fs::read_to_string(cfg_path).context("toml not found")?; - Ok(GigantoConfig { - ingest_srv_addr, - publish_srv_addr, - graphql_srv_addr, - retention, - max_open_files, - max_mb_of_level_base, - addr_to_peers, - peer_list, - ack_transmission_cnt, - }) + let config: GigantoConfig = toml::from_str(&toml)?; + + Ok(config) } #[allow(clippy::unused_async)] @@ -185,15 +179,22 @@ impl GigantoStatusQuery { #[Object] impl GigantoConfigMutation { #[allow(clippy::unused_async)] - async fn set_giganto_config<'ctx>( - &self, - ctx: &Context<'ctx>, - config: String, - ) -> Result { + async fn set_giganto_config<'ctx>(&self, ctx: &Context<'ctx>, draft: String) -> Result { + let config_draft: GigantoConfig = toml::from_str(&draft)?; + let cfg_path = ctx.data::()?; + + let config_toml = fs::read_to_string(cfg_path).context("toml not found")?; + let config: GigantoConfig = toml::from_str(&config_toml)?; + + if config == config_draft { + info!("No changes. config: {config:?}, draft: {config_draft:?}"); + return Err("No changes".to_string().into()); + } + let new_path = copy_toml_file(cfg_path)?; - fs::write(new_path, config)?; + fs::write(new_path, draft)?; let reload_notify = ctx.data::()?; let config_reload = reload_notify.0.clone(); @@ -203,6 +204,7 @@ impl GigantoConfigMutation { tokio::time::sleep(Duration::from_millis(GRAPHQL_REBOOT_DELAY)).await; config_reload.notify_one(); }); + info!("Draft applied. config: {config:?}, draft: {config_draft:?}"); Ok("Done".to_string()) } @@ -281,22 +283,6 @@ pub fn parse_toml_element_to_string(key: &str, doc: &DocumentMut) -> Result(key: &str, doc: &DocumentMut) -> Result -where - T: std::convert::TryFrom, -{ - let Some(item) = doc.get(key) else { - return Err(anyhow!("{} not found.", key).into()); - }; - let Some(value) = item.as_integer() else { - return Err(anyhow!("parse failed: {}'s item format is not available.", key).into()); - }; - let Ok(value) = T::try_from(value) else { - return Err(anyhow!("parse failed: {}'s value format is not available.", key).into()); - }; - Ok(value) -} - fn insert_toml_element(key: &str, doc: &mut DocumentMut, input: Option) where T: std::convert::Into, @@ -348,4 +334,149 @@ mod tests { assert_eq!(res.data.to_string(), "{ping: true}"); } + + #[tokio::test] + async fn test_status() { + let schema = TestSchema::new(); + + let query = r#" + { + gigantoStatus { + name + cpuUsage + totalMemory + usedMemory + totalDiskSpace + usedDiskSpace + } + } + "#; + + let res = schema.execute(query).await; + assert!(res.errors.is_empty()); + } + + #[tokio::test] + async fn test_giganto_config() { + let dir = tempfile::tempdir().unwrap(); + let config_path = dir.path().join("config.toml"); + + let toml_content = test_toml_content(); + + std::fs::write(&config_path, toml_content).unwrap(); + + let schema = + TestSchema::new_with_config_file_path(config_path.to_string_lossy().to_string()); + + let query = r#" + { + gigantoConfig { + ingestSrvAddr + publishSrvAddr + graphqlSrvAddr + dataDir + retention + logDir + exportDir + ackTransmission + maxOpenFiles + maxMbOfLevelBase + numOfThread + maxSubCompactions + addrToPeers + peers { + addr + hostname + } + } + } + "#; + + let res = schema.execute(query).await; + + let data = res.data.to_string(); + assert_eq!( + data, + "{gigantoConfig: {ingestSrvAddr: \"0.0.0.0:38370\", publishSrvAddr: \"0.0.0.0:38371\", graphqlSrvAddr: \"127.0.0.1:8442\", dataDir: \"tests/data\", retention: \"3months 8days 16h 19m 12s\", logDir: \"/data/logs/apps\", exportDir: \"tests/export\", ackTransmission: 1024, maxOpenFiles: 8000, maxMbOfLevelBase: 512, numOfThread: 8, maxSubCompactions: 2, addrToPeers: \"127.0.0.1:48383\", peers: [{addr: \"127.0.0.1:60192\", hostname: \"node2\"}]}}".to_string() + ); + } + + #[tokio::test] + async fn test_set_giganto_config() { + let dir = tempfile::tempdir().unwrap(); + let config_path = dir.path().join("config.toml"); + + let toml_content = test_toml_content(); + + std::fs::write(&config_path, &toml_content).unwrap(); + + let schema = + TestSchema::new_with_config_file_path(config_path.to_string_lossy().to_string()); + + // No changes + let query = format!( + r#" + mutation {{ + setGigantoConfig(draft: {toml_content:?}) + }} + "# + ); + + let res = schema.execute(&query).await; + + assert_eq!( + res.errors.first().unwrap().message, + "No changes".to_string() + ); + + // Change publish port + let new_draft = r#" + ingest_srv_addr = "0.0.0.0:38370" + publish_srv_addr = "0.0.0.0:38372" + graphql_srv_addr = "127.0.0.1:8442" + data_dir = "tests/data" + retention = "100d" + log_dir = "/data/logs/apps" + export_dir = "tests/export" + ack_transmission = 1024 + max_open_files = 8000 + max_mb_of_level_base = 512 + num_of_thread = 8 + max_sub_compactions = 2 + addr_to_peers = "127.0.0.1:48383" + peers = [{ addr = "127.0.0.1:60192", hostname = "node2" }] + "#; + + let query = format!( + r#" + mutation {{ + setGigantoConfig(draft: {new_draft:?}) + }} + "# + ); + + let res = schema.execute(&query).await; + + assert_eq!(res.data.to_string(), "{setGigantoConfig: \"Done\"}"); + } + + fn test_toml_content() -> String { + r#" + ingest_srv_addr = "0.0.0.0:38370" + publish_srv_addr = "0.0.0.0:38371" + graphql_srv_addr = "127.0.0.1:8442" + data_dir = "tests/data" + retention = "100d" + log_dir = "/data/logs/apps" + export_dir = "tests/export" + ack_transmission = 1024 + max_open_files = 8000 + max_mb_of_level_base = 512 + num_of_thread = 8 + max_sub_compactions = 2 + addr_to_peers = "127.0.0.1:48383" + peers = [{ addr = "127.0.0.1:60192", hostname = "node2" }] + "# + .to_string() + } } diff --git a/src/ingest/tests.rs b/src/ingest/tests.rs index 65aaef0..0328ba8 100644 --- a/src/ingest/tests.rs +++ b/src/ingest/tests.rs @@ -47,7 +47,7 @@ fn get_token() -> &'static Mutex { const CERT_PATH: &str = "tests/certs/node1/cert.pem"; const KEY_PATH: &str = "tests/certs/node1/key.pem"; -const ROOT_PATH: &str = "tests/certs/root.pem"; +const CA_CERT_PATH: &str = "tests/certs/ca_cert.pem"; const HOST: &str = "node1"; const TEST_PORT: u16 = 60190; const PROTOCOL_VERSION: &str = "0.21.0-alpha.2"; @@ -80,8 +80,8 @@ fn server() -> Server { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, @@ -130,8 +130,8 @@ fn init_client() -> Endpoint { .collect::>() .expect("invalid PEM-encoded certificate") }; - let root = fs::read(ROOT_PATH).expect("Failed to read file"); - let server_root = to_root_cert(&root).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let server_root = to_root_cert(&ca_cert_path).unwrap(); let client_crypto = rustls::ClientConfig::builder() .with_root_certificates(server_root) diff --git a/src/main.rs b/src/main.rs index e030ad8..abb8290 100644 --- a/src/main.rs +++ b/src/main.rs @@ -73,23 +73,21 @@ async fn main() -> Result<()> { let key_pem = fs::read(&args.key) .with_context(|| format!("failed to read private key file: {}", args.key))?; let key = to_private_key(&key_pem).context("cannot read private key")?; - let root_pem = fs::read(&args.root) - .with_context(|| format!("failed to read root certificate file: {}", args.root))?; - let root_cert = to_root_cert(&root_pem)?; + let root_cert = to_root_cert(&args.ca_certs)?; let certs = Arc::new(Certs { certs: cert.clone(), key: key.clone_key(), root: root_cert.clone(), }); - let _guard = init_tracing(&settings.log_dir)?; + let _guard = init_tracing(&settings.config.log_dir)?; - let db_path = settings.data_dir.join("db"); + let db_path = settings.config.data_dir.join("db"); let db_options = crate::storage::DbOptions::new( - settings.max_open_files, - settings.max_mb_of_level_base, - settings.num_of_thread, - settings.max_sub_compactions, + settings.config.max_open_files, + settings.config.max_mb_of_level_base, + settings.config.num_of_thread, + settings.config.max_sub_compactions, ); if args.repair { let start = Instant::now(); @@ -109,7 +107,7 @@ async fn main() -> Result<()> { let database = storage::Database::open(&db_path, &db_options)?; - if let Err(e) = migrate_data_dir(&settings.data_dir, &database) { + if let Err(e) = migrate_data_dir(&settings.config.data_dir, &database) { error!("migration failed: {e}"); return Ok(()); } @@ -131,13 +129,13 @@ async fn main() -> Result<()> { let ingest_sources = new_ingest_sources(&database); let runtime_ingest_sources = new_runtime_ingest_sources(); let stream_direct_channels = new_stream_direct_channels(); - let (peers, peer_idents) = new_peers_data(settings.peers.clone()); + let (peers, peer_idents) = new_peers_data(settings.config.peers.clone()); let notify_config_reload = Arc::new(Notify::new()); let notify_shutdown = Arc::new(Notify::new()); let notify_reboot = Arc::new(Notify::new()); let notify_power_off = Arc::new(Notify::new()); let mut notify_source_change = None; - let ack_transmission_cnt = new_ack_transmission_count(settings.ack_transmission); + let ack_transmission_cnt = new_ack_transmission_count(settings.config.ack_transmission); let schema = graphql::schema( NodeName(subject_from_cert(&cert)?.1), @@ -146,7 +144,7 @@ async fn main() -> Result<()> { ingest_sources.clone(), peers.clone(), request_client_pool.clone(), - settings.export_dir.clone(), + settings.config.export_dir.clone(), notify_config_reload.clone(), notify_reboot.clone(), notify_power_off.clone(), @@ -157,7 +155,7 @@ async fn main() -> Result<()> { task::spawn(web::serve( schema, - settings.graphql_srv_addr, + settings.config.graphql_srv_addr, cert_pem.clone(), key_pem.clone(), notify_shutdown.clone(), @@ -175,7 +173,7 @@ async fn main() -> Result<()> { .expect("Cannot create runtime for retain_periodically.") .block_on(storage::retain_periodically( time::Duration::from_secs(ONE_DAY), - settings.retention, + settings.config.retention, db, notify_shutdown_copy, running_flag, @@ -185,7 +183,7 @@ async fn main() -> Result<()> { }); }); - if let Some(addr_to_peers) = settings.addr_to_peers { + if let Some(addr_to_peers) = settings.config.addr_to_peers { let peer_server = peer::Peer::new(addr_to_peers, &certs.clone())?; let notify_source = Arc::new(Notify::new()); task::spawn(peer_server.run( @@ -199,7 +197,7 @@ async fn main() -> Result<()> { notify_source_change = Some(notify_source); } - let publish_server = publish::Server::new(settings.publish_srv_addr, &certs.clone()); + let publish_server = publish::Server::new(settings.config.publish_srv_addr, &certs.clone()); task::spawn(publish_server.run( database.clone(), pcap_sources.clone(), @@ -211,7 +209,7 @@ async fn main() -> Result<()> { notify_shutdown.clone(), )); - let ingest_server = ingest::Server::new(settings.ingest_srv_addr, &certs.clone()); + let ingest_server = ingest::Server::new(settings.config.ingest_srv_addr, &certs.clone()); task::spawn(ingest_server.run( database.clone(), pcap_sources, @@ -314,16 +312,27 @@ fn to_private_key(pem: &[u8]) -> Result> { } } -fn to_root_cert(pem: &[u8]) -> Result { +fn to_root_cert(ca_certs_paths: &[String]) -> Result { + let mut ca_certs_files = Vec::new(); + + for ca_cert in ca_certs_paths { + let file = fs::read(ca_cert) + .with_context(|| format!("failed to read root certificate file: {ca_cert}"))?; + + ca_certs_files.push(file); + } let mut root_cert = rustls::RootCertStore::empty(); - let root_certs: Vec = rustls_pemfile::certs(&mut &*pem) - .collect::>() - .context("invalid PEM-encoded certificate")?; - if let Some(cert) = root_certs.first() { - root_cert - .add(cert.to_owned()) - .context("failed to add root cert")?; + for file in ca_certs_files { + let root_certs: Vec = rustls_pemfile::certs(&mut &*file) + .collect::>() + .context("invalid PEM-encoded certificate")?; + if let Some(cert) = root_certs.first() { + root_cert + .add(cert.to_owned()) + .context("failed to add root cert")?; + } } + Ok(root_cert) } diff --git a/src/peer.rs b/src/peer.rs index 4155aca..63bf9dd 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -756,7 +756,7 @@ pub mod tests { const CERT_PATH: &str = "tests/certs/node1/cert.pem"; const KEY_PATH: &str = "tests/certs/node1/key.pem"; - const ROOT_PATH: &str = "tests/certs/root.pem"; + const CA_CERT_PATH: &str = "tests/certs/ca_cert.pem"; const HOST: &str = "node1"; const TEST_PORT: u16 = 60191; const PROTOCOL_VERSION: &str = "0.21.0-alpha.2"; @@ -819,8 +819,8 @@ pub mod tests { .expect("invalid PEM-encoded certificate") }; - let root = fs::read(ROOT_PATH).expect("Failed to read file"); - let server_root = to_root_cert(&root).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let server_root = to_root_cert(&ca_cert_path).unwrap(); let client_crypto = rustls::ClientConfig::builder() .with_root_certificates(server_root) @@ -842,8 +842,8 @@ pub mod tests { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, diff --git a/src/publish/tests.rs b/src/publish/tests.rs index a9129b4..aa91af0 100644 --- a/src/publish/tests.rs +++ b/src/publish/tests.rs @@ -47,7 +47,7 @@ fn get_token() -> &'static Mutex { TOKEN.get_or_init(|| Mutex::new(0)) } -const ROOT_PATH: &str = "tests/certs/root.pem"; +const CA_CERT_PATH: &str = "tests/certs/ca_cert.pem"; const PROTOCOL_VERSION: &str = "0.21.0-alpha.2"; const NODE1_CERT_PATH: &str = "tests/certs/node1/cert.pem"; @@ -96,8 +96,8 @@ fn server() -> Server { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE1_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, @@ -149,8 +149,8 @@ fn init_client() -> Endpoint { .collect::>() .expect("invalid PEM-encoded certificate") }; - let root = fs::read(ROOT_PATH).expect("Failed to read file"); - let server_root = to_root_cert(&root).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let server_root = to_root_cert(&ca_cert_path).unwrap(); let client_crypto = rustls::ClientConfig::builder() .with_root_certificates(server_root) @@ -783,8 +783,8 @@ async fn request_range_data_with_protocol() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE1_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, @@ -1901,8 +1901,8 @@ async fn request_range_data_with_log() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE1_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, @@ -2008,8 +2008,8 @@ async fn request_range_data_with_period_time_series() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE1_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, @@ -2156,8 +2156,8 @@ async fn request_network_event_stream() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE1_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, @@ -4046,8 +4046,8 @@ async fn request_raw_events() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE1_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, @@ -4132,8 +4132,8 @@ async fn request_range_data_with_protocol_giganto_cluster() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE2_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, key, @@ -4229,8 +4229,8 @@ async fn request_range_data_with_protocol_giganto_cluster() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE1_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, @@ -4339,8 +4339,8 @@ async fn request_range_data_with_log_giganto_cluster() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE2_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, key, @@ -4436,8 +4436,8 @@ async fn request_range_data_with_log_giganto_cluster() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE1_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, @@ -4544,8 +4544,8 @@ async fn request_range_data_with_period_time_series_giganto_cluster() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE2_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, key, @@ -4646,8 +4646,8 @@ async fn request_range_data_with_period_time_series_giganto_cluster() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE1_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, @@ -4754,8 +4754,8 @@ async fn request_raw_events_giganto_cluster() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE2_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, key, @@ -4848,8 +4848,8 @@ async fn request_raw_events_giganto_cluster() { let cert = to_cert_chain(&cert_pem).unwrap(); let key_pem = fs::read(NODE1_KEY_PATH).unwrap(); let key = to_private_key(&key_pem).unwrap(); - let root_pem = fs::read(ROOT_PATH).unwrap(); - let root = to_root_cert(&root_pem).unwrap(); + let ca_cert_path = vec![CA_CERT_PATH.to_string()]; + let root = to_root_cert(&ca_cert_path).unwrap(); let certs = Arc::new(Certs { certs: cert, diff --git a/src/settings.rs b/src/settings.rs index 0075394..f21a97f 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,7 +1,7 @@ //! Configurations for the application. use std::{collections::HashSet, net::SocketAddr, path::PathBuf, time::Duration}; -use clap::Parser; +use clap::{ArgAction, Parser}; use config::{builder::DefaultState, Config, ConfigBuilder, ConfigError, File}; use serde::{de::Error, Deserialize, Deserializer}; @@ -16,32 +16,35 @@ const DEFAULT_RETENTION: &str = "100d"; const DEFAULT_MAX_OPEN_FILES: i32 = 8000; const DEFAULT_MAX_MB_OF_LEVEL_BASE: u64 = 512; const DEFAULT_NUM_OF_THREAD: i32 = 8; -const DEFAULT_MAX_SUBCOMPACTIONS: u32 = 2; +const DEFAULT_MAX_SUB_COMPACTIONS: u32 = 2; #[derive(Parser, Debug)] +#[command(version)] pub struct Args { - /// Path to the local configuration TOML file + /// Path to the local configuration TOML file. #[arg(short, value_name = "CONFIG_PATH")] pub config: Option, - /// Path to the certificate file + + /// Path to the certificate file. #[arg(long, value_name = "CERT_PATH")] pub cert: String, - /// Path to the key file + + /// Path to the key file. #[arg(long, value_name = "KEY_PATH")] pub key: String, - /// Path to the root CA file - #[arg(long, value_name = "ROOT_PATH")] - pub root: String, - /// Central management server "hostname@address" - pub central_server: String, - /// Enable the repair mode + + /// Paths to the CA certificate files. + #[arg(long, value_name = "CA_CERTS_PATHS", action = ArgAction::Append)] + pub ca_certs: Vec, + + /// Enable the repair mode. #[arg(long)] pub repair: bool, } /// The application settings. -#[derive(Clone, Debug, Deserialize)] -pub struct Settings { +#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] +pub struct GigantoConfig { #[serde(deserialize_with = "deserialize_socket_addr")] pub ingest_srv_addr: SocketAddr, // IP address & port to ingest data #[serde(deserialize_with = "deserialize_socket_addr")] @@ -51,8 +54,8 @@ pub struct Settings { pub retention: Duration, // Data retention period #[serde(deserialize_with = "deserialize_socket_addr")] pub graphql_srv_addr: SocketAddr, // IP address & port to graphql - pub log_dir: PathBuf, //giganto's syslog path - pub export_dir: PathBuf, //giganto's export file path + pub log_dir: PathBuf, // giganto's syslog path + pub export_dir: PathBuf, // giganto's export file path // db options pub max_open_files: i32, @@ -60,11 +63,8 @@ pub struct Settings { pub num_of_thread: i32, pub max_sub_compactions: u32, - // config file path - pub cfg_path: String, - // peers - #[serde(deserialize_with = "deserialize_peer_addr")] + #[serde(default, deserialize_with = "deserialize_peer_addr")] pub addr_to_peers: Option, // IP address & port for peer connection pub peers: Option>, @@ -72,6 +72,14 @@ pub struct Settings { pub ack_transmission: u16, } +#[derive(Clone, Debug, Deserialize)] +pub struct Settings { + pub config: GigantoConfig, + + // config file path + pub cfg_path: String, +} + impl Settings { /// Creates a new `Settings` instance, populated from the default /// configuration file if it exists. @@ -88,7 +96,10 @@ impl Settings { )) } } else { - default_config_builder().build()?.try_deserialize() + let config: GigantoConfig = default_config_builder().build()?.try_deserialize()?; + let cfg_path = config_path.to_str().expect("path to string").to_string(); + + Ok(Self { config, cfg_path }) } } @@ -98,15 +109,17 @@ impl Settings { let s = default_config_builder() .add_source(File::with_name(cfg_path)) .build()?; - let mut setting: Settings = s.try_deserialize()?; - setting.cfg_path = cfg_path.to_string(); - Ok(setting) + let config: GigantoConfig = s.try_deserialize()?; + + Ok(Self { + config, + cfg_path: cfg_path.to_string(), + }) } } /// Creates a new `ConfigBuilder` instance with the default configuration. fn default_config_builder() -> ConfigBuilder { - let dirs = directories::ProjectDirs::from("com", "einsis", "giganto").expect("unreachable"); let db_dir = directories::ProjectDirs::from_path(PathBuf::from("db")).expect("unreachable db dir"); let log_dir = directories::ProjectDirs::from_path(PathBuf::from("logs/apps")) @@ -119,8 +132,6 @@ fn default_config_builder() -> ConfigBuilder { .data_dir() .to_str() .expect("unreachable export path"); - let config_dir = dirs.config_dir(); - let config_path = config_dir.join("config.toml"); Config::builder() .set_default("ingest_srv_addr", DEFAULT_INGEST_SRV_ADDR) @@ -143,10 +154,8 @@ fn default_config_builder() -> ConfigBuilder { .expect("default max mb of level base") .set_default("num_of_thread", DEFAULT_NUM_OF_THREAD) .expect("default number of thread") - .set_default("max_sub_compactions", DEFAULT_MAX_SUBCOMPACTIONS) + .set_default("max_sub_compactions", DEFAULT_MAX_SUB_COMPACTIONS) .expect("default max subcompactions") - .set_default("cfg_path", config_path.to_str().expect("path to string")) - .expect("default config dir") .set_default("addr_to_peers", DEFAULT_INVALID_ADDR_TO_PEERS) .expect("default ack transmission") .set_default("ack_transmission", DEFAULT_ACK_TRANSMISSION) diff --git a/tests/certs/root.pem b/tests/certs/ca_cert.pem similarity index 100% rename from tests/certs/root.pem rename to tests/certs/ca_cert.pem diff --git a/tests/certs/node1/config.toml b/tests/certs/node1/config.toml deleted file mode 100644 index e6fcda2..0000000 --- a/tests/certs/node1/config.toml +++ /dev/null @@ -1,14 +0,0 @@ -ingest_srv_addr = "0.0.0.0:38370" -publish_srv_addr = "0.0.0.0:38371" -graphql_srv_addr = "127.0.0.1:8442" -data_dir = "tests/data" -retention = "100d" -log_dir = "/data/logs/apps" -export_dir = "tests/export" -ack_transmission = 1024 -max_open_files = 8000 -max_mb_of_level_base = 512 -num_of_thread = 8 -max_sub_compactions = 2 -addr_to_peers = "127.0.0.1:48383" -peers = [{ addr = "127.0.0.1:60192", hostname = "node2" }]