From ed75e7da4cb462cd1e6c9ea1288b5da6f5cc16b9 Mon Sep 17 00:00:00 2001 From: ProBrian Date: Sun, 24 Dec 2023 21:21:57 +0800 Subject: [PATCH] refine config interface and support all sources (#51) Co-authored-by: Jingbo Chen --- src/lib.rs | 2 + src/topology_controller.rs | 51 ++++++++-- tests/data/file_to_file_invalid.json | 24 +++++ tests/topology_test.cpp | 146 +++++++++++++++++++++++++++ 4 files changed, 213 insertions(+), 10 deletions(-) create mode 100644 tests/data/file_to_file_invalid.json diff --git a/src/lib.rs b/src/lib.rs index 1668139..7699ce7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,8 @@ mod ffi { fn stop(self: &mut TopologyController) -> bool; fn get_generation_id(self: &mut TopologyController) -> u32; + + fn handle_config_reload(self: &mut TopologyController, config: &str) -> Result; } extern "Rust" { diff --git a/src/topology_controller.rs b/src/topology_controller.rs index 6d1f88a..c249984 100644 --- a/src/topology_controller.rs +++ b/src/topology_controller.rs @@ -4,7 +4,7 @@ use std::sync::Once; use std::sync::{Arc, Mutex}; use std::collections::HashMap; use tracing::{debug, error, info, Level}; -use vector::config::{ConfigBuilder, Config, ComponentKey, ConfigDiff}; +use vector::config::{ConfigBuilder, Config, ComponentKey, ConfigDiff, load_from_str}; use vector::topology::RunningTopology; use vector::{config, config::format, metrics, test_util::runtime}; @@ -25,7 +25,7 @@ static INIT: Once = Once::new(); pub fn setup_logging() { let timer = tracing_subscriber::fmt::time::time(); let collector = tracing_subscriber::fmt() - .with_max_level(Level::DEBUG) + .with_max_level(Level::INFO) // disable color to make CLion happy .with_ansi(false) .with_thread_ids(true) @@ -136,6 +136,33 @@ async fn reload_vector( true } +async fn reload_vector_from_str(config_str: &str, topology: &mut RunningTopology) -> Result { + match load_from_str(config_str, config::Format::Json) { + Ok(config) => { + info!("config str: {:?}", config_str); + match topology + .reload_config_and_respawn(config) + .await + { + Ok(true) => { + info!("config reloaded succeed"); + }, + Ok(false) => { + info!("reload and respawn failed, restore old config"); + return Err("reload and respawn failed, restored old config".to_string()); + }, + Err(()) => { + return Err("reload and respawn failed, failed to restore old config".to_string()); + } + } + } + Err(err) => { + return Err(format!("reload from str error: {:?}", err)); + } + } + Ok(true) +} + fn advance_generation(result: bool, generation_id: &AtomicU32) -> bool { if result { generation_id.fetch_add(1, Ordering::Relaxed); @@ -143,15 +170,14 @@ fn advance_generation(result: bool, generation_id: &AtomicU32) -> bool { result } -pub fn init_config(config_str: &str) -> Option { +pub fn init_config(config_str: &str) -> Result { START.call_once(|| { setup_logging(); }); let res = format::deserialize(config_str, config::Format::Json); if res.is_err() { - error!("deserialize error {:?}", res.unwrap_err()); - return None; + return Err(format!("configuration error: {:?}", res.unwrap_err().join(","))); } let config_builder: ConfigBuilder = res.unwrap(); debug!( @@ -169,7 +195,7 @@ pub fn init_config(config_str: &str) -> Option { }); info!("config constructed via config builder"); - Some(config_builder) + Ok(config_builder) } impl TopologyController { @@ -186,8 +212,8 @@ impl TopologyController { pub fn start(&mut self, topology_config: &str) -> Result { let builder = init_config(topology_config); - if builder.is_none() { - return Err("failed to init topology config".to_string()); + if builder.is_err() { + return Err(builder.unwrap_err()); } info!("start vector service"); @@ -278,6 +304,11 @@ impl TopologyController { config_str, }, self.config_builder.lock().unwrap().as_mut().unwrap(), self.topology.lock().unwrap().as_mut().unwrap()).await } + + pub fn handle_config_reload(&self, config_str: &str) -> Result { + let res = self.rt.block_on(reload_vector_from_str(config_str, self.topology.lock().unwrap().as_mut().unwrap())); + res + } } impl OneShotTopologyController { @@ -290,8 +321,8 @@ impl OneShotTopologyController { // run topology and return after finished, no need to maintain datas for long run pub fn start(&mut self, config_str: &str) -> Result { let config_builder = init_config(config_str); - if config_builder.is_none() { - return Err("failed to init topology config".to_string()); + if config_builder.is_err() { + return Err(config_builder.unwrap_err()); } info!("start one time vector topology"); diff --git a/tests/data/file_to_file_invalid.json b/tests/data/file_to_file_invalid.json new file mode 100644 index 0000000..2f376de --- /dev/null +++ b/tests/data/file_to_file_invalid.json @@ -0,0 +1,24 @@ +{ + "data_dir": "/tmp/vector/", + "sources": { + "source_file": { + "type": "file", + "read_from": "beginnings", + "include": [ + "/tmp/vector_test_source.log" + ] + } + }, + "sinks": { + "sink_file": { + "type": "file", + "inputs": [ + "source_*" + ], + "encoding": { + "codec": "json" + }, + "path": "/tmp/vector_test_sink.log" + } + } +} \ No newline at end of file diff --git a/tests/topology_test.cpp b/tests/topology_test.cpp index cfab1dd..0e0cdd6 100644 --- a/tests/topology_test.cpp +++ b/tests/topology_test.cpp @@ -7,6 +7,7 @@ #include #include #include +#include using Catch::Matchers::ContainsSubstring; using vectorcxx::test::run; @@ -152,4 +153,149 @@ TEST_CASE("run vector service with one time topology") { }); auto events = read_events_from_sink(); REQUIRE(events.size() == 6); +} + +TEST_CASE("test one shot topology with invalid source config") { + try { + run_one_shot("file_to_file_invalid", [](rust::Box &tc) {}); + } catch (std::exception &e) { + std::cout << "error: " << e.what() << std::endl; + REQUIRE_THAT(e.what(), ContainsSubstring("configuration error: \"unknown variant `beginnings`")); + } +} + +TEST_CASE("test reload vector from valid config string can proceed") { + auto config_string = R"j( + { + "data_dir": "/tmp/vector/", + "sources": { + "source_http": { + "type": "http_server", + "address": "0.0.0.0:9999", + "encoding": "text" + } + }, + "sinks": { + "sink_file": { + "type": "file", + "inputs": [ + "transform_*" + ], + "encoding": { + "codec": "json" + }, + "path": "/tmp/vector_test_sink.log" + } + }, + "transforms": { + "transform_add_field": { + "type": "remap", + "inputs": ["source_*"], + "source": ".age = 99 \n .abc=1" + } + } + } + )j"; + // start topology controller + run("http_to_file", + [&config_string](rust::Box &tc) { + send_http_events({"hello"}); + // reload with config str + auto config_str = rust::String(config_string); + REQUIRE(tc->handle_config_reload(config_str)); + send_http_events({"hello", "world"}); + }); + auto events = read_events_from_sink(); + REQUIRE(events.size() == 3); + REQUIRE_THAT(events[0], !ContainsSubstring(R"("age")")); + REQUIRE_THAT(events[1], ContainsSubstring(R"("age")")); + REQUIRE_THAT(events[2], ContainsSubstring(R"("age")")); +} + +TEST_CASE("test reload vector from invalid config string should fallback to existing topology config") { + auto config_string = R"j({"data_dir": "/tmp/vector/","sources": {"source_http": {"type": "http_server_invalid","address": "0.0.0.0:9999","encoding": "text"}}, + "sinks": { + "sink_file": { + "type": "file", + "inputs": [ + "transform_*" + ], + "encoding": { + "codec": "json" + }, + "path": "/tmp/vector_test_sink.log" + } + }, + "transforms": { + "transform_add_field": { + "type": "remap", + "inputs": ["source_*"], + "source": ".age = 42" + } + } + } + )j"; + // start topology controller + run("http_to_file", + [&config_string](rust::Box &tc) { + send_http_events({"hello"}); + // reload with config str + auto config_str = rust::String(config_string); + REQUIRE_THROWS(tc->handle_config_reload(config_str)); + send_http_events({"hello", "world"}); + }); + auto events = read_events_from_sink(); + REQUIRE(events.size() == 3); + REQUIRE_THAT(events[0], !ContainsSubstring(R"("age")")); + REQUIRE_THAT(events[1], !ContainsSubstring(R"("age")")); + REQUIRE_THAT(events[2], !ContainsSubstring(R"("age")")); +} + +TEST_CASE("test reload vector from config string with health check") { + // source includes an invalid port `99999`, which should fail the health check and reload. + auto config_string = R"j( + { + "data_dir": "/tmp/vector/", + "sources": { + "source_http": { + "type": "http_server", + "address": "0.0.0.0:99999", + "encoding": "text" + } + }, + "sinks": { + "sink_file": { + "type": "file", + "inputs": [ + "transform_*" + ], + "encoding": { + "codec": "json" + }, + "path": "/tmp/vector_test_sink.log" + } + }, + "transforms": { + "transform_add_field": { + "type": "remap", + "inputs": ["source_*"], + "source": ".age = 42" + } + } + } + )j"; + // start topology controller + run("http_to_file", + [&config_string](rust::Box &tc) { + send_http_events({"hello"}); + // reload with config str + auto config_str = rust::String(config_string); + REQUIRE_THROWS(tc->handle_config_reload(config_str)); + send_http_events({"hello", "world"}); + }); + auto events = read_events_from_sink(); + REQUIRE(events.size() == 3); + REQUIRE_THAT(events[0], !ContainsSubstring(R"("age")")); + REQUIRE_THAT(events[1], !ContainsSubstring(R"("age")")); + REQUIRE_THAT(events[2], !ContainsSubstring(R"("age")")); } \ No newline at end of file