From c3cd2325e180907c1c9e1e8547547e0e9b240df8 Mon Sep 17 00:00:00 2001 From: Benjamin Podszun Date: Wed, 28 Aug 2024 22:41:33 +0800 Subject: [PATCH] enhancement(config): Remove the watcher's coupling to SIGHUP / prepare for automatic Windows config reload (#20989) * Remove the watcher's dependency on SIGHUP / unix Right now the watcher is broken/unimplemented for Windows and a large part of the reason for that might be how it works: * the watcher subscribes to filesystem events (crate: notify) * the watcher raises a unix signal (SIGHUP) on detecting changes * the signal handler (again, unix only) reacts to SIGHUP with an internal reload signal * (vector magically reloads/compares the configs) With these changes that unix signal dependency is removed * the watcher clones a SignalTx to emit its very own signals * the watcher sends an internal reload signal on detecting changes * (vector magically reloads/compares the configs) All of this _should_ also work on Windows, has nothing to do with unix signals anymore. I slightly modified the tests to not just wait for any Ok, but to state explicitly what signal we expect to receive. Cargo fmt * Make clippy happy * Add changelog entry * Update changelog.d/support-config-watch-for-windows.enhancement.md * Fix newly merged test case --------- Co-authored-by: Jesse Szwedko Co-authored-by: Jesse Szwedko --- ...rt-config-watch-for-windows.enhancement.md | 4 + src/app.rs | 15 ++-- src/config/watcher.rs | 76 ++++++++----------- 3 files changed, 45 insertions(+), 50 deletions(-) create mode 100644 changelog.d/support-config-watch-for-windows.enhancement.md diff --git a/changelog.d/support-config-watch-for-windows.enhancement.md b/changelog.d/support-config-watch-for-windows.enhancement.md new file mode 100644 index 0000000000000..f49cbb60c7315 --- /dev/null +++ b/changelog.d/support-config-watch-for-windows.enhancement.md @@ -0,0 +1,4 @@ +Windows now supports the -w command line parameter just like every other +platform and will reload the configuration files on any change + +authors: darklajid diff --git a/src/app.rs b/src/app.rs index b526f1fb0f686..e27d68e5a6cab 100644 --- a/src/app.rs +++ b/src/app.rs @@ -476,12 +476,15 @@ pub async fn load_configs( if watch_config { // Start listening for config changes immediately. - config::watcher::spawn_thread(config_paths.iter().map(Into::into), None).map_err( - |error| { - error!(message = "Unable to start config watcher.", %error); - exitcode::CONFIG - }, - )?; + config::watcher::spawn_thread( + signal_handler.clone_tx(), + config_paths.iter().map(Into::into), + None, + ) + .map_err(|error| { + error!(message = "Unable to start config watcher.", %error); + exitcode::CONFIG + })?; } info!( diff --git a/src/config/watcher.rs b/src/config/watcher.rs index 64dcbd0d99fc2..584dcc3bd64fe 100644 --- a/src/config/watcher.rs +++ b/src/config/watcher.rs @@ -1,11 +1,9 @@ use std::{path::PathBuf, time::Duration}; -#[cfg(unix)] use std::{ sync::mpsc::{channel, Receiver}, thread, }; -#[cfg(unix)] use notify::{recommended_watcher, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use crate::Error; @@ -17,18 +15,16 @@ use crate::Error; /// - Invalid config, caused either by user or by data race. /// - Frequent changes, caused by user/editor modifying/saving file in small chunks. /// so we can use smaller, more responsive delay. -#[cfg(unix)] const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1); -#[cfg(unix)] const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); -/// Triggers SIGHUP when file on config_path changes. +/// Sends a ReloadFromDisk on config_path changes. /// Accumulates file changes until no change for given duration has occurred. /// Has best effort guarantee of detecting all file changes from the end of /// this function until the main thread stops. -#[cfg(unix)] pub fn spawn_thread<'a>( + signal_tx: crate::signal::SignalTx, config_paths: impl IntoIterator + 'a, delay: impl Into>, ) -> Result<(), Error> { @@ -65,7 +61,9 @@ pub fn spawn_thread<'a>( debug!(message = "Reloaded paths."); info!("Configuration file changed."); - raise_sighup(); + _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| { + error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error) + }); } else { debug!(message = "Ignoring event.", event = ?event) } @@ -83,31 +81,15 @@ pub fn spawn_thread<'a>( // so for a good measure raise SIGHUP and let reload logic // determine if anything changed. info!("Speculating that configuration files have changed."); - raise_sighup(); + _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| { + error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error) + }); } }); Ok(()) } -#[cfg(windows)] -/// Errors on Windows. -pub fn spawn_thread<'a>( - _config_paths: impl IntoIterator + 'a, - _delay: impl Into>, -) -> Result<(), Error> { - Err("Reloading config on Windows isn't currently supported. Related issue https://github.com/vectordotdev/vector/issues/938 .".into()) -} - -#[cfg(unix)] -fn raise_sighup() { - use nix::sys::signal; - _ = signal::raise(signal::Signal::SIGHUP).map_err(|error| { - error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error) - }); -} - -#[cfg(unix)] fn create_watcher( config_paths: &[PathBuf], ) -> Result< @@ -124,7 +106,6 @@ fn create_watcher( Ok((watcher, receiver)) } -#[cfg(unix)] fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Result<(), Error> { for path in config_paths { watcher.watch(path, RecursiveMode::Recursive)?; @@ -134,20 +115,22 @@ fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Resu #[cfg(all(test, unix, not(target_os = "macos")))] // https://github.com/vectordotdev/vector/issues/5000 mod tests { - use std::{fs::File, io::Write, time::Duration}; - - use tokio::signal::unix::{signal, SignalKind}; - use super::*; - use crate::test_util::{temp_dir, temp_file, trace_init}; - - async fn test(file: &mut File, timeout: Duration) -> bool { - let mut signal = signal(SignalKind::hangup()).expect("Signal handlers should not panic."); + use crate::{ + signal::SignalRx, + test_util::{temp_dir, temp_file, trace_init}, + }; + use std::{fs::File, io::Write, time::Duration}; + use tokio::sync::broadcast; + async fn test(file: &mut File, timeout: Duration, mut receiver: SignalRx) -> bool { file.write_all(&[0]).unwrap(); file.sync_all().unwrap(); - tokio::time::timeout(timeout, signal.recv()).await.is_ok() + matches!( + tokio::time::timeout(timeout, receiver.recv()).await, + Ok(Ok(crate::signal::SignalTo::ReloadFromDisk)) + ) } #[tokio::test] @@ -161,9 +144,10 @@ mod tests { std::fs::create_dir(&dir).unwrap(); let mut file = File::create(&file_path).unwrap(); - spawn_thread(&[dir], delay).unwrap(); + let (signal_tx, signal_rx) = broadcast::channel(128); + spawn_thread(signal_tx, &[dir], delay).unwrap(); - if !test(&mut file, delay * 5).await { + if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); } } @@ -176,14 +160,16 @@ mod tests { let file_path = temp_file(); let mut file = File::create(&file_path).unwrap(); - spawn_thread(&[file_path], delay).unwrap(); + let (signal_tx, signal_rx) = broadcast::channel(128); + spawn_thread(signal_tx, &[file_path], delay).unwrap(); - if !test(&mut file, delay * 5).await { + if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); } } #[tokio::test] + #[cfg(unix)] async fn sym_file_update() { trace_init(); @@ -193,9 +179,10 @@ mod tests { let mut file = File::create(&file_path).unwrap(); std::os::unix::fs::symlink(&file_path, &sym_file).unwrap(); - spawn_thread(&[sym_file], delay).unwrap(); + let (signal_tx, signal_rx) = broadcast::channel(128); + spawn_thread(signal_tx, &[sym_file], delay).unwrap(); - if !test(&mut file, delay * 5).await { + if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); } } @@ -212,9 +199,10 @@ mod tests { std::fs::create_dir_all(&sub_dir).unwrap(); let mut file = File::create(&file_path).unwrap(); - spawn_thread(&[sub_dir], delay).unwrap(); + let (signal_tx, signal_rx) = broadcast::channel(128); + spawn_thread(signal_tx, &[sub_dir], delay).unwrap(); - if !test(&mut file, delay * 5).await { + if !test(&mut file, delay * 5, signal_rx).await { panic!("Test timed out"); } }