Skip to content

Commit

Permalink
feat(watcher): change for add poll watcher support
Browse files Browse the repository at this point in the history
  • Loading branch information
Ameer Ibrahim authored and Ameer Ibrahim committed Sep 13, 2024
1 parent e71016c commit 3773b14
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 29 deletions.
35 changes: 30 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#![allow(missing_docs)]
use std::{num::NonZeroUsize, path::PathBuf, process::ExitStatus, time::Duration};
use std::{
num::{NonZeroU64, NonZeroUsize},
path::PathBuf,
process::ExitStatus,
time::Duration,
};

use exitcode::ExitCode;
use futures::StreamExt;
Expand All @@ -12,7 +17,7 @@ use crate::extra_context::ExtraContext;
#[cfg(feature = "api")]
use crate::{api, internal_events::ApiStarted};
use crate::{
cli::{handle_config_errors, LogFormat, Opts, RootOpts},
cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod},
config::{self, Config, ConfigPath},
heartbeat,
internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
Expand Down Expand Up @@ -59,9 +64,18 @@ impl ApplicationConfig {
let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
.then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));

let watcher_conf = if opts.watch_config {
Some(watcher_config(
opts.watch_config_method,
opts.watch_config_poll_interval_seconds,
))
} else {
None
};

let config = load_configs(
&config_paths,
opts.watch_config,
watcher_conf,
opts.require_healthy,
opts.allow_empty_config,
graceful_shutdown_duration,
Expand Down Expand Up @@ -466,17 +480,18 @@ pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtim

pub async fn load_configs(
config_paths: &[ConfigPath],
watch_config: bool,
watcher_conf: Option<config::watcher::WatcherConfig>,
require_healthy: Option<bool>,
allow_empty_config: bool,
graceful_shutdown_duration: Option<Duration>,
signal_handler: &mut SignalHandler,
) -> Result<Config, ExitCode> {
let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;

if watch_config {
if watcher_conf.is_some() {
// Start listening for config changes immediately.
config::watcher::spawn_thread(
watcher_conf.unwrap(),
signal_handler.clone_tx(),
config_paths.iter().map(Into::into),
None,
Expand Down Expand Up @@ -526,3 +541,13 @@ pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64)
);
info!(message = "Log level is enabled.", level = ?level);
}

pub fn watcher_config(
method: WatchConfigMethod,
interval: NonZeroU64,
) -> config::watcher::WatcherConfig {
match method {
WatchConfigMethod::Inotify => config::watcher::WatcherConfig::RecommendedWatcher,
WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
}
}
25 changes: 25 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,23 @@ pub struct RootOpts {
#[arg(short, long, env = "VECTOR_WATCH_CONFIG")]
pub watch_config: bool,

/// Method for watching config
///
/// By default `vector` use `inotify` which is recommended for linux based system
/// `poll` watcher can be used where inotify not work. ie, attaching config using NFS
#[arg(long, default_value = "inotify", env = "VECTOR_WATCH_METHOD")]
pub watch_config_method: WatchConfigMethod,

/// poll for changes in configuration file on given interval
///
/// This config is only applicable if poll is enabled in `--watch-config-method`
#[arg(
long,
env = "VECTOR_WATCH_CONFIG_POLL_INTERVAL_SECONDS",
default_value = "30"
)]
pub watch_config_poll_interval_seconds: NonZeroU64,

/// Set the internal log rate limit
#[arg(
short,
Expand Down Expand Up @@ -354,6 +371,14 @@ pub enum LogFormat {
Json,
}

#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
pub enum WatchConfigMethod {
/// recommended for linux based systems
Inotify,
/// works for EFS/NFS like network storage systems
Poll,
}

pub fn handle_config_errors(errors: Vec<String>) -> exitcode::ExitCode {
for error in errors {
error!(message = "Configuration error.", %error);
Expand Down
87 changes: 63 additions & 24 deletions src/config/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::{
thread,
};

use notify::{recommended_watcher, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{
recommended_watcher, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode,
Watcher as NotifyWatcher,
};

use crate::Error;

Expand All @@ -19,11 +22,40 @@ const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1

const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

pub enum WatcherConfig {
/// recommended watcher for os, usally inotify for linux based systems
RecommendedWatcher,
/// poll based watcher. for watching files from NFS.
PollWatcher(u64),
}

enum Watcher {
/// recommended watcher for os, usally inotify for linux based systems
RecommendedWatcher(RecommendedWatcher),
/// poll based watcher. for watching files from NFS.
PollWatcher(PollWatcher),
}

impl Watcher {
fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
let watcher: &mut dyn NotifyWatcher = match self {
&mut Watcher::RecommendedWatcher(ref mut w) => w,
&mut Watcher::PollWatcher(ref mut w) => w,
};

for path in config_paths {
watcher.watch(path, RecursiveMode::Recursive)?;
}
Ok(())
}
}

/// Sends a ReloadFromDisk on config_path changes.
/// Accumulates file changes until no change for given duration has occurred.
/// Has best effort guarantee of detecting all file changes from the end of
/// this function until the main thread stops.
pub fn spawn_thread<'a>(
watcher_conf: WatcherConfig,
signal_tx: crate::signal::SignalTx,
config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
delay: impl Into<Option<Duration>>,
Expand All @@ -33,7 +65,7 @@ pub fn spawn_thread<'a>(

// Create watcher now so not to miss any changes happening between
// returning from this function and the thread starting.
let mut watcher = Some(create_watcher(&config_paths)?);
let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);

info!("Watching configuration files.");

Expand All @@ -53,7 +85,7 @@ pub fn spawn_thread<'a>(

// We need to read paths to resolve any inode changes that may have happened.
// And we need to do it before raising sighup to avoid missing any change.
if let Err(error) = add_paths(&mut watcher, &config_paths) {
if let Err(error) = watcher.add_paths(&config_paths) {
error!(message = "Failed to read files to watch.", %error);
break;
}
Expand All @@ -72,7 +104,7 @@ pub fn spawn_thread<'a>(

thread::sleep(RETRY_TIMEOUT);

watcher = create_watcher(&config_paths)
watcher = create_watcher(&watcher_conf, &config_paths)
.map_err(|error| error!(message = "Failed to create file watcher.", %error))
.ok();

Expand All @@ -91,26 +123,28 @@ pub fn spawn_thread<'a>(
}

fn create_watcher(
watcher_conf: &WatcherConfig,
config_paths: &[PathBuf],
) -> Result<
(
RecommendedWatcher,
Receiver<Result<notify::Event, notify::Error>>,
),
Error,
> {
) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
info!("Creating configuration file watcher.");
let (sender, receiver) = channel();
let mut watcher = recommended_watcher(sender)?;
add_paths(&mut watcher, config_paths)?;
Ok((watcher, receiver))
}

fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Result<(), Error> {
for path in config_paths {
watcher.watch(path, RecursiveMode::Recursive)?;
let (sender, receiver) = channel();
match watcher_conf {
WatcherConfig::RecommendedWatcher => {
let recommended_watcher = recommended_watcher(sender)?;
let mut watcher = Watcher::RecommendedWatcher(recommended_watcher);
watcher.add_paths(config_paths)?;
Ok((watcher, receiver))
}
WatcherConfig::PollWatcher(interval) => {
let config =
notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
let poll_watcher = PollWatcher::new(sender, config)?;
let mut watcher = Watcher::PollWatcher(poll_watcher);
watcher.add_paths(config_paths)?;
Ok((watcher, receiver))
}
}
Ok(())
}

#[cfg(all(test, unix, not(target_os = "macos")))] // https://github.com/vectordotdev/vector/issues/5000
Expand Down Expand Up @@ -140,12 +174,13 @@ mod tests {
let delay = Duration::from_secs(3);
let dir = temp_dir().to_path_buf();
let file_path = dir.join("vector.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;

std::fs::create_dir(&dir).unwrap();
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[dir], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[dir], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -159,9 +194,10 @@ mod tests {
let delay = Duration::from_secs(3);
let file_path = temp_file();
let mut file = File::create(&file_path).unwrap();
let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[file_path], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -179,8 +215,10 @@ mod tests {
let mut file = File::create(&file_path).unwrap();
std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();

let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[sym_file], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -195,12 +233,13 @@ mod tests {
let dir = temp_dir().to_path_buf();
let sub_dir = dir.join("sources");
let file_path = sub_dir.join("input.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;

std::fs::create_dir_all(&sub_dir).unwrap();
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[sub_dir], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand Down

0 comments on commit 3773b14

Please sign in to comment.