Skip to content

Commit

Permalink
Update giganto_config to respond full configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
BLYKIM committed Aug 22, 2024
1 parent 6eaac91 commit d7c8453
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 112 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Versioning](https://semver.org/spec/v2.0.0.html).
- Changed to support command line interface.
- Removed `cert`, `key`, `root` fields from config file.
- Changed `set_giganto_config` to receive toml-string with full configuration.
- Updated `giganto_config` to respond full configuration.

## [0.20.0] - 2024-05-17

Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ processing and real-time analytics.
You can run giganto by invoking the following command:

```sh
giganto --cert <CERT_PATH> --key <KEY_PATH> --ca <CA_PATH> <CENTRAL_SERVER>
giganto --cert <CERT_PATH> --key <KEY_PATH> --root <ROOT_PATH>
```

If you want to run giganto with local configuration file,

```sh
giganto -c <CONFIG_PATH> --cert <CERT_PATH> --key <KEY_PATH> --ca <CA_PATH> <CENTRAL_SERVER>
giganto -c <CONFIG_PATH> --cert <CERT_PATH> --key <KEY_PATH> --root <ROOT_PATH>
```

In the config file, you can specify the following options:
Expand Down Expand Up @@ -77,7 +77,7 @@ certificate/key from the tests folder.)

```sh
cargo run -- -c tests/node1/config.toml --cert tests/node1/cert.pem \
--key tests/node1/key.pem --root tests/root.pem hostname@address
--key tests/node1/key.pem --root tests/root.pem
```

## License
Expand Down
93 changes: 18 additions & 75 deletions src/graphql/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use anyhow::{anyhow, Context as ct};
use async_graphql::{Context, InputObject, Object, Result, SimpleObject};
use serde::Deserialize;
use toml_edit::{value, DocumentMut, InlineTable};

use super::{PowerOffNotify, RebootNotify, ReloadNotify, TerminateNotify};
Expand All @@ -14,14 +15,8 @@ use crate::storage::Database;
use crate::AckTransmissionCount;

const GRAPHQL_REBOOT_DELAY: u64 = 100;
const CONFIG_INGEST_SRV_ADDR: &str = "ingest_srv_addr";
pub const CONFIG_PUBLISH_SRV_ADDR: &str = "publish_srv_addr";
pub const CONFIG_GRAPHQL_SRV_ADDR: &str = "graphql_srv_addr";
const CONFIG_RETENTION: &str = "retention";
const CONFIG_MAX_OPEN_FILES: &str = "max_open_files";
const CONFIG_MAX_MB_OF_LEVEL_BASE: &str = "max_mb_of_level_base";
const CONFIG_ADDR_TO_PEERS: &str = "addr_to_peers";
const CONFIG_PEER_LIST: &str = "peers";
const CONFIG_ACK_TRANSMISSION: &str = "ack_transmission";
pub const TEMP_TOML_POST_FIX: &str = ".temp.toml";

Expand Down Expand Up @@ -52,7 +47,7 @@ struct Properties {
stats: String,
}

#[derive(InputObject, SimpleObject, Debug)]
#[derive(InputObject, SimpleObject, Debug, Deserialize)]
#[graphql(input_name = "InputPeerList")]
pub struct PeerList {
pub addr: String,
Expand All @@ -68,17 +63,25 @@ impl TomlPeers for PeerList {
}
}

#[derive(SimpleObject, Debug)]
#[derive(SimpleObject, Debug, Deserialize)]
struct GigantoConfig {
ingest_srv_addr: String,
publish_srv_addr: String,
graphql_srv_addr: String,
retention: String,
data_dir: String,
log_dir: String,
export_dir: String,

max_open_files: i32,
max_mb_of_level_base: u64,
addr_to_peers: String,
peer_list: Vec<PeerList>,
ack_transmission_cnt: u16,
num_of_thread: i32,
max_sub_compactions: u32,

addr_to_peers: Option<String>,
peers: Option<Vec<PeerList>>,

ack_transmission: u16,
}

#[derive(Default)]
Expand Down Expand Up @@ -125,55 +128,11 @@ impl GigantoStatusQuery {
#[allow(clippy::unused_async)]
async fn giganto_config<'ctx>(&self, ctx: &Context<'ctx>) -> Result<GigantoConfig> {
let cfg_path = ctx.data::<String>()?;
let doc = read_toml_file(cfg_path)?;
let ingest_srv_addr = parse_toml_element_to_string(CONFIG_INGEST_SRV_ADDR, &doc)?;
let publish_srv_addr = parse_toml_element_to_string(CONFIG_PUBLISH_SRV_ADDR, &doc)?;
let graphql_srv_addr = parse_toml_element_to_string(CONFIG_GRAPHQL_SRV_ADDR, &doc)?;
let retention = parse_toml_element_to_string(CONFIG_RETENTION, &doc)?;
let max_open_files = parse_toml_element_to_integer(CONFIG_MAX_OPEN_FILES, &doc)?;
let max_mb_of_level_base =
parse_toml_element_to_integer(CONFIG_MAX_MB_OF_LEVEL_BASE, &doc)?;
let ack_transmission_cnt = parse_toml_element_to_integer(CONFIG_ACK_TRANSMISSION, &doc)?;
let mut peer_list = Vec::new();
let addr_to_peers = if doc.get(CONFIG_ADDR_TO_PEERS).is_some() {
let peers_value = doc
.get(CONFIG_PEER_LIST)
.context("peers not found")?
.as_array()
.context("invalid peers format")?;
for peer in peers_value {
if let Some(peer_data) = peer.as_inline_table() {
let (Some(addr_val), Some(hostname_val)) =
(peer_data.get("addr"), peer_data.get("hostname"))
else {
return Err(anyhow!("Invalid `addr`, `hostname` Value format").into());
};
let (Some(addr), Some(hostname)) = (addr_val.as_str(), hostname_val.as_str())
else {
return Err(anyhow!("Invalid `addr`, `hostname` String format").into());
};
peer_list.push(PeerList {
addr: addr.to_string(),
hostname: hostname.to_string(),
});
}
}
parse_toml_element_to_string(CONFIG_ADDR_TO_PEERS, &doc)?
} else {
String::new()
};
let toml = fs::read_to_string(cfg_path).context("toml not found")?;

Ok(GigantoConfig {
ingest_srv_addr,
publish_srv_addr,
graphql_srv_addr,
retention,
max_open_files,
max_mb_of_level_base,
addr_to_peers,
peer_list,
ack_transmission_cnt,
})
let config: GigantoConfig = toml::from_str(&toml)?;

Ok(config)
}

#[allow(clippy::unused_async)]
Expand Down Expand Up @@ -281,22 +240,6 @@ pub fn parse_toml_element_to_string(key: &str, doc: &DocumentMut) -> Result<Stri
Ok(value.to_string())
}

fn parse_toml_element_to_integer<T>(key: &str, doc: &DocumentMut) -> Result<T>
where
T: std::convert::TryFrom<i64>,
{
let Some(item) = doc.get(key) else {
return Err(anyhow!("{} not found.", key).into());
};
let Some(value) = item.as_integer() else {
return Err(anyhow!("parse failed: {}'s item format is not available.", key).into());
};
let Ok(value) = T::try_from(value) else {
return Err(anyhow!("parse failed: {}'s value format is not available.", key).into());
};
Ok(value)
}

fn insert_toml_element<T>(key: &str, doc: &mut DocumentMut, input: Option<T>)
where
T: std::convert::Into<toml_edit::Value>,
Expand Down
30 changes: 15 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ async fn main() -> Result<()> {
root: root_cert.clone(),
});

let _guard = init_tracing(&settings.log_dir)?;
let _guard = init_tracing(&settings.config.log_dir)?;

let db_path = settings.data_dir.join("db");
let db_path = settings.config.data_dir.join("db");
let db_options = crate::storage::DbOptions::new(
settings.max_open_files,
settings.max_mb_of_level_base,
settings.num_of_thread,
settings.max_sub_compactions,
settings.config.max_open_files,
settings.config.max_mb_of_level_base,
settings.config.num_of_thread,
settings.config.max_sub_compactions,
);
if args.repair {
let start = Instant::now();
Expand All @@ -109,7 +109,7 @@ async fn main() -> Result<()> {

let database = storage::Database::open(&db_path, &db_options)?;

if let Err(e) = migrate_data_dir(&settings.data_dir, &database) {
if let Err(e) = migrate_data_dir(&settings.config.data_dir, &database) {
error!("migration failed: {e}");
return Ok(());
}
Expand All @@ -131,13 +131,13 @@ async fn main() -> Result<()> {
let ingest_sources = new_ingest_sources(&database);
let runtime_ingest_sources = new_runtime_ingest_sources();
let stream_direct_channels = new_stream_direct_channels();
let (peers, peer_idents) = new_peers_data(settings.peers.clone());
let (peers, peer_idents) = new_peers_data(settings.config.peers.clone());
let notify_config_reload = Arc::new(Notify::new());
let notify_shutdown = Arc::new(Notify::new());
let notify_reboot = Arc::new(Notify::new());
let notify_power_off = Arc::new(Notify::new());
let mut notify_source_change = None;
let ack_transmission_cnt = new_ack_transmission_count(settings.ack_transmission);
let ack_transmission_cnt = new_ack_transmission_count(settings.config.ack_transmission);

let schema = graphql::schema(
NodeName(subject_from_cert(&cert)?.1),
Expand All @@ -146,7 +146,7 @@ async fn main() -> Result<()> {
ingest_sources.clone(),
peers.clone(),
request_client_pool.clone(),
settings.export_dir.clone(),
settings.config.export_dir.clone(),
notify_config_reload.clone(),
notify_reboot.clone(),
notify_power_off.clone(),
Expand All @@ -157,7 +157,7 @@ async fn main() -> Result<()> {

task::spawn(web::serve(
schema,
settings.graphql_srv_addr,
settings.config.graphql_srv_addr,
cert_pem.clone(),
key_pem.clone(),
notify_shutdown.clone(),
Expand All @@ -175,7 +175,7 @@ async fn main() -> Result<()> {
.expect("Cannot create runtime for retain_periodically.")
.block_on(storage::retain_periodically(
time::Duration::from_secs(ONE_DAY),
settings.retention,
settings.config.retention,
db,
notify_shutdown_copy,
running_flag,
Expand All @@ -185,7 +185,7 @@ async fn main() -> Result<()> {
});
});

if let Some(addr_to_peers) = settings.addr_to_peers {
if let Some(addr_to_peers) = settings.config.addr_to_peers {
let peer_server = peer::Peer::new(addr_to_peers, &certs.clone())?;
let notify_source = Arc::new(Notify::new());
task::spawn(peer_server.run(
Expand All @@ -199,7 +199,7 @@ async fn main() -> Result<()> {
notify_source_change = Some(notify_source);
}

let publish_server = publish::Server::new(settings.publish_srv_addr, &certs.clone());
let publish_server = publish::Server::new(settings.config.publish_srv_addr, &certs.clone());
task::spawn(publish_server.run(
database.clone(),
pcap_sources.clone(),
Expand All @@ -211,7 +211,7 @@ async fn main() -> Result<()> {
notify_shutdown.clone(),
));

let ingest_server = ingest::Server::new(settings.ingest_srv_addr, &certs.clone());
let ingest_server = ingest::Server::new(settings.config.ingest_srv_addr, &certs.clone());
task::spawn(ingest_server.run(
database.clone(),
pcap_sources,
Expand Down
Loading

0 comments on commit d7c8453

Please sign in to comment.