Skip to content

Commit

Permalink
refine config interface and support all sources (#51)
Browse files Browse the repository at this point in the history
Co-authored-by: Jingbo Chen <[email protected]>
  • Loading branch information
ProBrian and Jingbo Chen authored Dec 24, 2023
1 parent 2180897 commit ed75e7d
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ mod ffi {
fn stop(self: &mut TopologyController) -> bool;

fn get_generation_id(self: &mut TopologyController) -> u32;

fn handle_config_reload(self: &mut TopologyController, config: &str) -> Result<bool>;
}

extern "Rust" {
Expand Down
51 changes: 41 additions & 10 deletions src/topology_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Once;
use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use tracing::{debug, error, info, Level};
use vector::config::{ConfigBuilder, Config, ComponentKey, ConfigDiff};
use vector::config::{ConfigBuilder, Config, ComponentKey, ConfigDiff, load_from_str};
use vector::topology::RunningTopology;
use vector::{config, config::format, metrics, test_util::runtime};

Expand All @@ -25,7 +25,7 @@ static INIT: Once = Once::new();
pub fn setup_logging() {
let timer = tracing_subscriber::fmt::time::time();
let collector = tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.with_max_level(Level::INFO)
// disable color to make CLion happy
.with_ansi(false)
.with_thread_ids(true)
Expand Down Expand Up @@ -136,22 +136,48 @@ async fn reload_vector(
true
}

async fn reload_vector_from_str(config_str: &str, topology: &mut RunningTopology) -> Result<bool, String> {
match load_from_str(config_str, config::Format::Json) {
Ok(config) => {
info!("config str: {:?}", config_str);
match topology
.reload_config_and_respawn(config)
.await
{
Ok(true) => {
info!("config reloaded succeed");
},
Ok(false) => {
info!("reload and respawn failed, restore old config");
return Err("reload and respawn failed, restored old config".to_string());
},
Err(()) => {
return Err("reload and respawn failed, failed to restore old config".to_string());
}
}
}
Err(err) => {
return Err(format!("reload from str error: {:?}", err));
}
}
Ok(true)
}

fn advance_generation(result: bool, generation_id: &AtomicU32) -> bool {
if result {
generation_id.fetch_add(1, Ordering::Relaxed);
}
result
}

pub fn init_config(config_str: &str) -> Option<ConfigBuilder> {
pub fn init_config(config_str: &str) -> Result<ConfigBuilder, String> {
START.call_once(|| {
setup_logging();
});

let res = format::deserialize(config_str, config::Format::Json);
if res.is_err() {
error!("deserialize error {:?}", res.unwrap_err());
return None;
return Err(format!("configuration error: {:?}", res.unwrap_err().join(",")));
}
let config_builder: ConfigBuilder = res.unwrap();
debug!(
Expand All @@ -169,7 +195,7 @@ pub fn init_config(config_str: &str) -> Option<ConfigBuilder> {
});

info!("config constructed via config builder");
Some(config_builder)
Ok(config_builder)
}

impl TopologyController {
Expand All @@ -186,8 +212,8 @@ impl TopologyController {
pub fn start(&mut self, topology_config: &str) -> Result<bool, String> {

let builder = init_config(topology_config);
if builder.is_none() {
return Err("failed to init topology config".to_string());
if builder.is_err() {
return Err(builder.unwrap_err());
}
info!("start vector service");

Expand Down Expand Up @@ -278,6 +304,11 @@ impl TopologyController {
config_str,
}, self.config_builder.lock().unwrap().as_mut().unwrap(), self.topology.lock().unwrap().as_mut().unwrap()).await
}

pub fn handle_config_reload(&self, config_str: &str) -> Result<bool, String> {
let res = self.rt.block_on(reload_vector_from_str(config_str, self.topology.lock().unwrap().as_mut().unwrap()));
res
}
}

impl OneShotTopologyController {
Expand All @@ -290,8 +321,8 @@ impl OneShotTopologyController {
// run topology and return after finished, no need to maintain datas for long run
pub fn start(&mut self, config_str: &str) -> Result<bool, String> {
let config_builder = init_config(config_str);
if config_builder.is_none() {
return Err("failed to init topology config".to_string());
if config_builder.is_err() {
return Err(config_builder.unwrap_err());
}
info!("start one time vector topology");

Expand Down
24 changes: 24 additions & 0 deletions tests/data/file_to_file_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"data_dir": "/tmp/vector/",
"sources": {
"source_file": {
"type": "file",
"read_from": "beginnings",
"include": [
"/tmp/vector_test_source.log"
]
}
},
"sinks": {
"sink_file": {
"type": "file",
"inputs": [
"source_*"
],
"encoding": {
"codec": "json"
},
"path": "/tmp/vector_test_sink.log"
}
}
}
146 changes: 146 additions & 0 deletions tests/topology_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <exception>
#include <regex>
#include <string>
#include <iostream>

using Catch::Matchers::ContainsSubstring;
using vectorcxx::test::run;
Expand Down Expand Up @@ -152,4 +153,149 @@ TEST_CASE("run vector service with one time topology") {
});
auto events = read_events_from_sink();
REQUIRE(events.size() == 6);
}

TEST_CASE("test one shot topology with invalid source config") {
try {
run_one_shot("file_to_file_invalid", [](rust::Box<OneShotTopologyController> &tc) {});
} catch (std::exception &e) {
std::cout << "error: " << e.what() << std::endl;
REQUIRE_THAT(e.what(), ContainsSubstring("configuration error: \"unknown variant `beginnings`"));
}
}

TEST_CASE("test reload vector from valid config string can proceed") {
auto config_string = R"j(
{
"data_dir": "/tmp/vector/",
"sources": {
"source_http": {
"type": "http_server",
"address": "0.0.0.0:9999",
"encoding": "text"
}
},
"sinks": {
"sink_file": {
"type": "file",
"inputs": [
"transform_*"
],
"encoding": {
"codec": "json"
},
"path": "/tmp/vector_test_sink.log"
}
},
"transforms": {
"transform_add_field": {
"type": "remap",
"inputs": ["source_*"],
"source": ".age = 99 \n .abc=1"
}
}
}
)j";
// start topology controller
run("http_to_file",
[&config_string](rust::Box<TopologyController> &tc) {
send_http_events({"hello"});
// reload with config str
auto config_str = rust::String(config_string);
REQUIRE(tc->handle_config_reload(config_str));
send_http_events({"hello", "world"});
});
auto events = read_events_from_sink();
REQUIRE(events.size() == 3);
REQUIRE_THAT(events[0], !ContainsSubstring(R"("age")"));
REQUIRE_THAT(events[1], ContainsSubstring(R"("age")"));
REQUIRE_THAT(events[2], ContainsSubstring(R"("age")"));
}

TEST_CASE("test reload vector from invalid config string should fallback to existing topology config") {
auto config_string = R"j({"data_dir": "/tmp/vector/","sources": {"source_http": {"type": "http_server_invalid","address": "0.0.0.0:9999","encoding": "text"}},
"sinks": {
"sink_file": {
"type": "file",
"inputs": [
"transform_*"
],
"encoding": {
"codec": "json"
},
"path": "/tmp/vector_test_sink.log"
}
},
"transforms": {
"transform_add_field": {
"type": "remap",
"inputs": ["source_*"],
"source": ".age = 42"
}
}
}
)j";
// start topology controller
run("http_to_file",
[&config_string](rust::Box<TopologyController> &tc) {
send_http_events({"hello"});
// reload with config str
auto config_str = rust::String(config_string);
REQUIRE_THROWS(tc->handle_config_reload(config_str));
send_http_events({"hello", "world"});
});
auto events = read_events_from_sink();
REQUIRE(events.size() == 3);
REQUIRE_THAT(events[0], !ContainsSubstring(R"("age")"));
REQUIRE_THAT(events[1], !ContainsSubstring(R"("age")"));
REQUIRE_THAT(events[2], !ContainsSubstring(R"("age")"));
}

TEST_CASE("test reload vector from config string with health check") {
// source includes an invalid port `99999`, which should fail the health check and reload.
auto config_string = R"j(
{
"data_dir": "/tmp/vector/",
"sources": {
"source_http": {
"type": "http_server",
"address": "0.0.0.0:99999",
"encoding": "text"
}
},
"sinks": {
"sink_file": {
"type": "file",
"inputs": [
"transform_*"
],
"encoding": {
"codec": "json"
},
"path": "/tmp/vector_test_sink.log"
}
},
"transforms": {
"transform_add_field": {
"type": "remap",
"inputs": ["source_*"],
"source": ".age = 42"
}
}
}
)j";
// start topology controller
run("http_to_file",
[&config_string](rust::Box<TopologyController> &tc) {
send_http_events({"hello"});
// reload with config str
auto config_str = rust::String(config_string);
REQUIRE_THROWS(tc->handle_config_reload(config_str));
send_http_events({"hello", "world"});
});
auto events = read_events_from_sink();
REQUIRE(events.size() == 3);
REQUIRE_THAT(events[0], !ContainsSubstring(R"("age")"));
REQUIRE_THAT(events[1], !ContainsSubstring(R"("age")"));
REQUIRE_THAT(events[2], !ContainsSubstring(R"("age")"));
}

0 comments on commit ed75e7d

Please sign in to comment.