From 3773b14d7ec880a9da1078a0aaa639ce43623f23 Mon Sep 17 00:00:00 2001 From: Ameer Ibrahim Date: Fri, 13 Sep 2024 10:27:50 +0530 Subject: [PATCH] feat(watcher): change for add poll watcher support --- src/app.rs | 35 ++++++++++++++--- src/cli.rs | 25 +++++++++++++ src/config/watcher.rs | 87 +++++++++++++++++++++++++++++++------------ 3 files changed, 118 insertions(+), 29 deletions(-) diff --git a/src/app.rs b/src/app.rs index e27d68e5a6cab1..16238dac49ff88 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,5 +1,10 @@ #![allow(missing_docs)] -use std::{num::NonZeroUsize, path::PathBuf, process::ExitStatus, time::Duration}; +use std::{ + num::{NonZeroU64, NonZeroUsize}, + path::PathBuf, + process::ExitStatus, + time::Duration, +}; use exitcode::ExitCode; use futures::StreamExt; @@ -12,7 +17,7 @@ use crate::extra_context::ExtraContext; #[cfg(feature = "api")] use crate::{api, internal_events::ApiStarted}; use crate::{ - cli::{handle_config_errors, LogFormat, Opts, RootOpts}, + cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod}, config::{self, Config, ConfigPath}, heartbeat, internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped}, @@ -59,9 +64,18 @@ impl ApplicationConfig { let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit) .then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs))); + let watcher_conf = if opts.watch_config { + Some(watcher_config( + opts.watch_config_method, + opts.watch_config_poll_interval_seconds, + )) + } else { + None + }; + let config = load_configs( &config_paths, - opts.watch_config, + watcher_conf, opts.require_healthy, opts.allow_empty_config, graceful_shutdown_duration, @@ -466,7 +480,7 @@ pub fn build_runtime(threads: Option, thread_name: &str) -> Result, require_healthy: Option, allow_empty_config: bool, graceful_shutdown_duration: Option, @@ -474,9 +488,10 @@ pub async fn load_configs( ) -> Result { let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?; - if watch_config { + if watcher_conf.is_some() { // Start listening for config changes immediately. config::watcher::spawn_thread( + watcher_conf.unwrap(), signal_handler.clone_tx(), config_paths.iter().map(Into::into), None, @@ -526,3 +541,13 @@ pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) ); info!(message = "Log level is enabled.", level = ?level); } + +pub fn watcher_config( + method: WatchConfigMethod, + interval: NonZeroU64, +) -> config::watcher::WatcherConfig { + match method { + WatchConfigMethod::Inotify => config::watcher::WatcherConfig::RecommendedWatcher, + WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()), + } +} diff --git a/src/cli.rs b/src/cli.rs index e94911b1dfaedf..a239371e4c3540 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -152,6 +152,23 @@ pub struct RootOpts { #[arg(short, long, env = "VECTOR_WATCH_CONFIG")] pub watch_config: bool, + /// Method for watching config + /// + /// By default `vector` use `inotify` which is recommended for linux based system + /// `poll` watcher can be used where inotify not work. ie, attaching config using NFS + #[arg(long, default_value = "inotify", env = "VECTOR_WATCH_METHOD")] + pub watch_config_method: WatchConfigMethod, + + /// poll for changes in configuration file on given interval + /// + /// This config is only applicable if poll is enabled in `--watch-config-method` + #[arg( + long, + env = "VECTOR_WATCH_CONFIG_POLL_INTERVAL_SECONDS", + default_value = "30" + )] + pub watch_config_poll_interval_seconds: NonZeroU64, + /// Set the internal log rate limit #[arg( short, @@ -354,6 +371,14 @@ pub enum LogFormat { Json, } +#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)] +pub enum WatchConfigMethod { + /// recommended for linux based systems + Inotify, + /// works for EFS/NFS like network storage systems + Poll, +} + pub fn handle_config_errors(errors: Vec) -> exitcode::ExitCode { for error in errors { error!(message = "Configuration error.", %error); diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 584dcc3bd64fed..482d51f0a1a47d 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -4,7 +4,10 @@ use std::{ thread, }; -use notify::{recommended_watcher, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use notify::{ + recommended_watcher, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, + Watcher as NotifyWatcher, +}; use crate::Error; @@ -19,11 +22,40 @@ const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1 const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); +pub enum WatcherConfig { + /// recommended watcher for os, usally inotify for linux based systems + RecommendedWatcher, + /// poll based watcher. for watching files from NFS. + PollWatcher(u64), +} + +enum Watcher { + /// recommended watcher for os, usally inotify for linux based systems + RecommendedWatcher(RecommendedWatcher), + /// poll based watcher. for watching files from NFS. + PollWatcher(PollWatcher), +} + +impl Watcher { + fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> { + let watcher: &mut dyn NotifyWatcher = match self { + &mut Watcher::RecommendedWatcher(ref mut w) => w, + &mut Watcher::PollWatcher(ref mut w) => w, + }; + + for path in config_paths { + watcher.watch(path, RecursiveMode::Recursive)?; + } + Ok(()) + } +} + /// Sends a ReloadFromDisk on config_path changes. /// Accumulates file changes until no change for given duration has occurred. /// Has best effort guarantee of detecting all file changes from the end of /// this function until the main thread stops. pub fn spawn_thread<'a>( + watcher_conf: WatcherConfig, signal_tx: crate::signal::SignalTx, config_paths: impl IntoIterator + 'a, delay: impl Into>, @@ -33,7 +65,7 @@ pub fn spawn_thread<'a>( // Create watcher now so not to miss any changes happening between // returning from this function and the thread starting. - let mut watcher = Some(create_watcher(&config_paths)?); + let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?); info!("Watching configuration files."); @@ -53,7 +85,7 @@ pub fn spawn_thread<'a>( // We need to read paths to resolve any inode changes that may have happened. // And we need to do it before raising sighup to avoid missing any change. - if let Err(error) = add_paths(&mut watcher, &config_paths) { + if let Err(error) = watcher.add_paths(&config_paths) { error!(message = "Failed to read files to watch.", %error); break; } @@ -72,7 +104,7 @@ pub fn spawn_thread<'a>( thread::sleep(RETRY_TIMEOUT); - watcher = create_watcher(&config_paths) + watcher = create_watcher(&watcher_conf, &config_paths) .map_err(|error| error!(message = "Failed to create file watcher.", %error)) .ok(); @@ -91,26 +123,28 @@ pub fn spawn_thread<'a>( } fn create_watcher( + watcher_conf: &WatcherConfig, config_paths: &[PathBuf], -) -> Result< - ( - RecommendedWatcher, - Receiver>, - ), - Error, -> { +) -> Result<(Watcher, Receiver>), Error> { info!("Creating configuration file watcher."); - let (sender, receiver) = channel(); - let mut watcher = recommended_watcher(sender)?; - add_paths(&mut watcher, config_paths)?; - Ok((watcher, receiver)) -} -fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Result<(), Error> { - for path in config_paths { - watcher.watch(path, RecursiveMode::Recursive)?; + let (sender, receiver) = channel(); + match watcher_conf { + WatcherConfig::RecommendedWatcher => { + let recommended_watcher = recommended_watcher(sender)?; + let mut watcher = Watcher::RecommendedWatcher(recommended_watcher); + watcher.add_paths(config_paths)?; + Ok((watcher, receiver)) + } + WatcherConfig::PollWatcher(interval) => { + let config = + notify::Config::default().with_poll_interval(Duration::from_secs(*interval)); + let poll_watcher = PollWatcher::new(sender, config)?; + let mut watcher = Watcher::PollWatcher(poll_watcher); + watcher.add_paths(config_paths)?; + Ok((watcher, receiver)) + } } - Ok(()) } #[cfg(all(test, unix, not(target_os = "macos")))] // https://github.com/vectordotdev/vector/issues/5000 @@ -140,12 +174,13 @@ mod tests { let delay = Duration::from_secs(3); let dir = temp_dir().to_path_buf(); let file_path = dir.join("vector.toml"); + let watcher_conf = WatcherConfig::RecommendedWatcher; std::fs::create_dir(&dir).unwrap(); let mut file = File::create(&file_path).unwrap(); let (signal_tx, signal_rx) = broadcast::channel(128); - spawn_thread(signal_tx, &[dir], delay).unwrap(); + spawn_thread(watcher_conf, signal_tx, &[dir], delay).unwrap(); if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); @@ -159,9 +194,10 @@ mod tests { let delay = Duration::from_secs(3); let file_path = temp_file(); let mut file = File::create(&file_path).unwrap(); + let watcher_conf = WatcherConfig::RecommendedWatcher; let (signal_tx, signal_rx) = broadcast::channel(128); - spawn_thread(signal_tx, &[file_path], delay).unwrap(); + spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap(); if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); @@ -179,8 +215,10 @@ mod tests { let mut file = File::create(&file_path).unwrap(); std::os::unix::fs::symlink(&file_path, &sym_file).unwrap(); + let watcher_conf = WatcherConfig::RecommendedWatcher; + let (signal_tx, signal_rx) = broadcast::channel(128); - spawn_thread(signal_tx, &[sym_file], delay).unwrap(); + spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap(); if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); @@ -195,12 +233,13 @@ mod tests { let dir = temp_dir().to_path_buf(); let sub_dir = dir.join("sources"); let file_path = sub_dir.join("input.toml"); + let watcher_conf = WatcherConfig::RecommendedWatcher; std::fs::create_dir_all(&sub_dir).unwrap(); let mut file = File::create(&file_path).unwrap(); let (signal_tx, signal_rx) = broadcast::channel(128); - spawn_thread(signal_tx, &[sub_dir], delay).unwrap(); + spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap(); if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out");