Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(watcher): add support for poll watcher #21290

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#![allow(missing_docs)]
use std::{num::NonZeroUsize, path::PathBuf, process::ExitStatus, time::Duration};
use std::{
num::{NonZeroU64, NonZeroUsize},
path::PathBuf,
process::ExitStatus,
time::Duration,
};

use exitcode::ExitCode;
use futures::StreamExt;
Expand All @@ -12,7 +17,7 @@ use crate::extra_context::ExtraContext;
#[cfg(feature = "api")]
use crate::{api, internal_events::ApiStarted};
use crate::{
cli::{handle_config_errors, LogFormat, Opts, RootOpts},
cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod},
config::{self, Config, ConfigPath},
heartbeat,
internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
Expand Down Expand Up @@ -59,9 +64,18 @@ impl ApplicationConfig {
let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
.then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));

let watcher_conf = if opts.watch_config {
Some(watcher_config(
opts.watch_config_method,
opts.watch_config_poll_interval_seconds,
))
} else {
None
};

let config = load_configs(
&config_paths,
opts.watch_config,
watcher_conf,
opts.require_healthy,
opts.allow_empty_config,
graceful_shutdown_duration,
Expand Down Expand Up @@ -466,17 +480,18 @@ pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtim

pub async fn load_configs(
config_paths: &[ConfigPath],
watch_config: bool,
watcher_conf: Option<config::watcher::WatcherConfig>,
require_healthy: Option<bool>,
allow_empty_config: bool,
graceful_shutdown_duration: Option<Duration>,
signal_handler: &mut SignalHandler,
) -> Result<Config, ExitCode> {
let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;

if watch_config {
if let Some(watcher_conf) = watcher_conf {
// Start listening for config changes immediately.
config::watcher::spawn_thread(
watcher_conf,
signal_handler.clone_tx(),
config_paths.iter().map(Into::into),
None,
Expand Down Expand Up @@ -526,3 +541,13 @@ pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64)
);
info!(message = "Log level is enabled.", level = ?level);
}

pub fn watcher_config(
method: WatchConfigMethod,
interval: NonZeroU64,
) -> config::watcher::WatcherConfig {
match method {
WatchConfigMethod::Inotify => config::watcher::WatcherConfig::RecommendedWatcher,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, take a look into my comment about this in your closed PR

#21258 (comment)

Copy link
Author

@amribm amribm Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this as Recommended or any thing that comes to ur mind? @jszwedko

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that would fit

WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
}
}
25 changes: 25 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,23 @@ pub struct RootOpts {
#[arg(short, long, env = "VECTOR_WATCH_CONFIG")]
pub watch_config: bool,

/// Method for watching config
///
/// By default `vector` use `inotify` which is recommended for linux based system
/// `poll` watcher can be used where inotify not work. ie, attaching config using NFS
#[arg(long, default_value = "inotify", env = "VECTOR_WATCH_METHOD")]
pub watch_config_method: WatchConfigMethod,

/// poll for changes in configuration file on given interval
///
/// This config is only applicable if poll is enabled in `--watch-config-method`
#[arg(
long,
env = "VECTOR_WATCH_CONFIG_POLL_INTERVAL_SECONDS",
default_value = "30"
)]
pub watch_config_poll_interval_seconds: NonZeroU64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should update this documentation to explain the polling watcher and when to use it

https://vector.dev/docs/administration/management/#automatic-reloading-on-configuration-change

But first lets see what maintainers think about documenting this


/// Set the internal log rate limit
#[arg(
short,
Expand Down Expand Up @@ -354,6 +371,14 @@ pub enum LogFormat {
Json,
}

#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
pub enum WatchConfigMethod {
/// recommended for linux based systems
Inotify,
/// works for EFS/NFS like network storage systems
Poll,
}

pub fn handle_config_errors(errors: Vec<String>) -> exitcode::ExitCode {
for error in errors {
error!(message = "Configuration error.", %error);
Expand Down
105 changes: 80 additions & 25 deletions src/config/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::{path::PathBuf, time::Duration};
use std::{
path::{Path, PathBuf},
time::Duration,
};
use std::{
sync::mpsc::{channel, Receiver},
thread,
};

use notify::{recommended_watcher, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{recommended_watcher, EventKind, RecursiveMode};

use crate::Error;

Expand All @@ -19,11 +22,56 @@ const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1

const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

pub enum WatcherConfig {
/// recommended watcher for os, usually inotify for linux based systems
RecommendedWatcher,
/// poll based watcher. for watching files from NFS.
PollWatcher(u64),
}

enum Watcher {
/// recommended watcher for os, usually inotify for linux based systems
RecommendedWatcher(notify::RecommendedWatcher),
/// poll based watcher. for watching files from NFS.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more idiomatic to use only RecommendedWatcher, adding use notify::RecommendedWatcher and the top of the file (as you has before)

Suggested change
RecommendedWatcher(notify::RecommendedWatcher),
RecommendedWatcher(RecommendedWatcher),

My previous suggestion about this was only about the notify::Watcher trait, and because it conflicted with your Watcher enum

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the Watcher is necessary for use watch method on PollWatcher and RecommendWatcher . removing that makes watch method is not defined error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use notify::RecommendedWatcher and use notify::PollingWatcher, that have nothing related to what you're saying about the Watcher trait. The previous code imported those two structs

and indeed, maybe you need to use notify::Watcher to use the watch method

PollWatcher(notify::PollWatcher),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PollWatcher(notify::PollWatcher),
PollWatcher(PollWatcher),


impl Watcher {
fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
for path in config_paths {
self.watch(path, RecursiveMode::Recursive)?;
}
Ok(())
}

fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
match self {
&mut Watcher::RecommendedWatcher(ref mut watcher) => {
watcher.watch(path, recursive_mode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can get rid of this (In the other match arm too)

This behaviour was improved a few years ago

Read about it here: https://github.com/rust-lang/rfcs/blob/master/text/2005-match-ergonomics.md

Suggested change
&mut Watcher::RecommendedWatcher(ref mut watcher) => {
Watcher::RecommendedWatcher(watcher) => {

}
&mut Watcher::PollWatcher(ref mut watcher) => watcher.watch(path, recursive_mode),
}
}
}

// impl From<notify::RecommendedWatcher> for Watcher {
// fn from(recommended_watcher: notify::RecommendedWatcher) -> Self {
// Watcher::RecommendedWatcher(recommended_watcher)
// }
// }

// impl From<notify::PollWatcher> for Watcher {
// fn from(poll_watcher: notify::PollWatcher) -> Self {
// Watcher::PollWatcher(poll_watcher)
// }
// }

/// Sends a ReloadFromDisk on config_path changes.
/// Accumulates file changes until no change for given duration has occurred.
/// Has best effort guarantee of detecting all file changes from the end of
/// this function until the main thread stops.
pub fn spawn_thread<'a>(
watcher_conf: WatcherConfig,
signal_tx: crate::signal::SignalTx,
config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
delay: impl Into<Option<Duration>>,
Expand All @@ -33,7 +81,7 @@ pub fn spawn_thread<'a>(

// Create watcher now so not to miss any changes happening between
// returning from this function and the thread starting.
let mut watcher = Some(create_watcher(&config_paths)?);
let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);

info!("Watching configuration files.");

Expand All @@ -53,7 +101,7 @@ pub fn spawn_thread<'a>(

// We need to read paths to resolve any inode changes that may have happened.
// And we need to do it before raising sighup to avoid missing any change.
if let Err(error) = add_paths(&mut watcher, &config_paths) {
if let Err(error) = watcher.add_paths(&config_paths) {
error!(message = "Failed to read files to watch.", %error);
break;
}
Expand All @@ -72,7 +120,7 @@ pub fn spawn_thread<'a>(

thread::sleep(RETRY_TIMEOUT);

watcher = create_watcher(&config_paths)
watcher = create_watcher(&watcher_conf, &config_paths)
.map_err(|error| error!(message = "Failed to create file watcher.", %error))
.ok();

Expand All @@ -91,26 +139,28 @@ pub fn spawn_thread<'a>(
}

fn create_watcher(
watcher_conf: &WatcherConfig,
config_paths: &[PathBuf],
) -> Result<
(
RecommendedWatcher,
Receiver<Result<notify::Event, notify::Error>>,
),
Error,
> {
) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
info!("Creating configuration file watcher.");
let (sender, receiver) = channel();
let mut watcher = recommended_watcher(sender)?;
add_paths(&mut watcher, config_paths)?;
Ok((watcher, receiver))
}

fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Result<(), Error> {
for path in config_paths {
watcher.watch(path, RecursiveMode::Recursive)?;
let (sender, receiver) = channel();
match watcher_conf {
WatcherConfig::RecommendedWatcher => {
let recommended_watcher = recommended_watcher(sender)?;
let mut watcher = Watcher::RecommendedWatcher(recommended_watcher);
watcher.add_paths(config_paths)?;
Ok((watcher, receiver))
}
WatcherConfig::PollWatcher(interval) => {
let config =
notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
let poll_watcher = notify::PollWatcher::new(sender, config)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow the user to specify compare_contents? https://docs.rs/notify/latest/notify/struct.Config.html#method.with_compare_contents
maybe another argument? Im afraid we are adding too much arguments for this though

@jszwedko

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd vote against it being user controllable and hope for it to just have a sane default. I can't think of a scenario where I'd wanna toggle it!

Copy link
Contributor

@jorgehermo9 jorgehermo9 Sep 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We found issues in openshift environments where the inotify watcher was constantly triggered and the file contents did not change at all. In this case, we would like to hash the contents to check if it really changed before reloading vector config. I think having sane defaults is good (for example, this would default to false), but there are situations where this is useful

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be a downside to always hashing the changes?

Copy link
Contributor

@jorgehermo9 jorgehermo9 Sep 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as the docs I pointed states:

By enabling this feature, performance will be significantly impacted as all files will need to be read and hashed at each poll_interval.

I prefer that downside to be opt-in

let mut watcher = Watcher::PollWatcher(poll_watcher);
watcher.add_paths(config_paths)?;
Ok((watcher, receiver))
}
}
Comment on lines +148 to +162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can take

    watcher.add_paths(config_paths)?;
            Ok((watcher, receiver))

out of the match branches

Suggested change
match watcher_conf {
WatcherConfig::RecommendedWatcher => {
let recommended_watcher = recommended_watcher(sender)?;
let mut watcher = Watcher::RecommendedWatcher(recommended_watcher);
watcher.add_paths(config_paths)?;
Ok((watcher, receiver))
}
WatcherConfig::PollWatcher(interval) => {
let config =
notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
let poll_watcher = notify::PollWatcher::new(sender, config)?;
let mut watcher = Watcher::PollWatcher(poll_watcher);
watcher.add_paths(config_paths)?;
Ok((watcher, receiver))
}
let watcher = match watcher_conf {
WatcherConfig::RecommendedWatcher => {
let recommended_watcher = recommended_watcher(sender)?;
Watcher::RecommendedWatcher(recommended_watcher)
}
WatcherConfig::PollWatcher(interval) => {
let config =
notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
let poll_watcher = notify::PollWatcher::new(sender, config)?;
Watcher::PollWatcher(poll_watcher)
};
watcher.add_paths(config_paths)?;
Ok((watcher, receiver))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this seems elegant. i'll do that

Ok(())
}

#[cfg(all(test, unix, not(target_os = "macos")))] // https://github.com/vectordotdev/vector/issues/5000
Expand Down Expand Up @@ -140,12 +190,13 @@ mod tests {
let delay = Duration::from_secs(3);
let dir = temp_dir().to_path_buf();
let file_path = dir.join("vector.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;

std::fs::create_dir(&dir).unwrap();
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[dir], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[dir], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -159,9 +210,10 @@ mod tests {
let delay = Duration::from_secs(3);
let file_path = temp_file();
let mut file = File::create(&file_path).unwrap();
let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[file_path], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -179,8 +231,10 @@ mod tests {
let mut file = File::create(&file_path).unwrap();
std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();

let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[sym_file], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -195,12 +249,13 @@ mod tests {
let dir = temp_dir().to_path_buf();
let sub_dir = dir.join("sources");
let file_path = sub_dir.join("input.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;

std::fs::create_dir_all(&sub_dir).unwrap();
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[sub_dir], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand Down
Loading