Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): add support for poll watcher #21290

Merged
merged 23 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#![allow(missing_docs)]
use std::{num::NonZeroUsize, path::PathBuf, process::ExitStatus, time::Duration};
use std::{
num::{NonZeroU64, NonZeroUsize},
path::PathBuf,
process::ExitStatus,
time::Duration,
};

use exitcode::ExitCode;
use futures::StreamExt;
Expand All @@ -12,7 +17,7 @@ use crate::extra_context::ExtraContext;
#[cfg(feature = "api")]
use crate::{api, internal_events::ApiStarted};
use crate::{
cli::{handle_config_errors, LogFormat, Opts, RootOpts},
cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod},
config::{self, Config, ConfigPath},
heartbeat,
internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
Expand Down Expand Up @@ -59,9 +64,18 @@ impl ApplicationConfig {
let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
.then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));

let watcher_conf = if opts.watch_config {
Some(watcher_config(
opts.watch_config_method,
opts.watch_config_poll_interval_seconds,
))
} else {
None
};

let config = load_configs(
&config_paths,
opts.watch_config,
watcher_conf,
opts.require_healthy,
opts.allow_empty_config,
graceful_shutdown_duration,
Expand Down Expand Up @@ -466,17 +480,18 @@ pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtim

pub async fn load_configs(
config_paths: &[ConfigPath],
watch_config: bool,
watcher_conf: Option<config::watcher::WatcherConfig>,
require_healthy: Option<bool>,
allow_empty_config: bool,
graceful_shutdown_duration: Option<Duration>,
signal_handler: &mut SignalHandler,
) -> Result<Config, ExitCode> {
let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;

if watch_config {
if let Some(watcher_conf) = watcher_conf {
// Start listening for config changes immediately.
config::watcher::spawn_thread(
watcher_conf,
signal_handler.clone_tx(),
config_paths.iter().map(Into::into),
None,
Expand Down Expand Up @@ -526,3 +541,13 @@ pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64)
);
info!(message = "Log level is enabled.", level = ?level);
}

pub fn watcher_config(
method: WatchConfigMethod,
interval: NonZeroU64,
) -> config::watcher::WatcherConfig {
match method {
WatchConfigMethod::Inotify => config::watcher::WatcherConfig::RecommendedWatcher,
amribm marked this conversation as resolved.
Show resolved Hide resolved
WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
}
}
25 changes: 25 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,23 @@ pub struct RootOpts {
#[arg(short, long, env = "VECTOR_WATCH_CONFIG")]
pub watch_config: bool,

/// Method for watching config
///
/// By default `vector` use `inotify` which is recommended for linux based system
/// `poll` watcher can be used where inotify not work. ie, attaching config using NFS
#[arg(long, default_value = "inotify", env = "VECTOR_WATCH_METHOD")]
amribm marked this conversation as resolved.
Show resolved Hide resolved
pub watch_config_method: WatchConfigMethod,
amribm marked this conversation as resolved.
Show resolved Hide resolved

/// poll for changes in configuration file on given interval
///
/// This config is only applicable if poll is enabled in `--watch-config-method`
amribm marked this conversation as resolved.
Show resolved Hide resolved
#[arg(
long,
env = "VECTOR_WATCH_CONFIG_POLL_INTERVAL_SECONDS",
default_value = "30"
)]
pub watch_config_poll_interval_seconds: NonZeroU64,
amribm marked this conversation as resolved.
Show resolved Hide resolved

/// Set the internal log rate limit
#[arg(
short,
Expand Down Expand Up @@ -354,6 +371,14 @@ pub enum LogFormat {
Json,
}

#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
pub enum WatchConfigMethod {
/// recommended for linux based systems
Inotify,
/// works for EFS/NFS like network storage systems
Poll,
}

pub fn handle_config_errors(errors: Vec<String>) -> exitcode::ExitCode {
for error in errors {
error!(message = "Configuration error.", %error);
Expand Down
107 changes: 82 additions & 25 deletions src/config/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::{path::PathBuf, time::Duration};
use std::{
path::{Path, PathBuf},
time::Duration,
};
use std::{
sync::mpsc::{channel, Receiver},
thread,
};

use notify::{recommended_watcher, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{recommended_watcher, EventKind, RecursiveMode};

use crate::Error;

Expand All @@ -19,11 +22,60 @@ const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1

const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

pub enum WatcherConfig {
/// recommended watcher for os, usually inotify for linux based systems
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can refactor create_watcher to use the WatchConfigMethod enum directly. AFAIK, due to clap parsing, the WatchConfigMethod needs to have plain enum variants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since i want that enum to have values about the interval and i don't want to pass the interval in the function parameters.

One thing we can do is we can eliminate is watch config method enum. we can make it as string and add the possible values for clap arguments. using that we can create the WatcherConfig enum

RecommendedWatcher,
/// poll based watcher. for watching files from NFS.
PollWatcher(u64),
}

amribm marked this conversation as resolved.
Show resolved Hide resolved
enum Watcher {
/// recommended watcher for os, usually inotify for linux based systems
RecommendedWatcher(notify::RecommendedWatcher),
/// poll based watcher. for watching files from NFS.
amribm marked this conversation as resolved.
Show resolved Hide resolved
PollWatcher(notify::PollWatcher),
}
amribm marked this conversation as resolved.
Show resolved Hide resolved

impl Watcher {
fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
for path in config_paths {
self.watch(path, RecursiveMode::Recursive)?;
}
Ok(())
}

fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
use notify::Watcher as NotifyWatcher;
match self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any reason to use this here vs at the top like the other imports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we already have name watcher as enum. it was colliding with watcher from notify.
Renaming the import at the top level user might not able to understand what is NotifyWathcher and we need the methods for notify::Watcher on that specific scope. that's why i used Watcher on the specific scope

Watcher::RecommendedWatcher(ref mut watcher) => {
watcher.watch(path, recursive_mode)?;
amribm marked this conversation as resolved.
Show resolved Hide resolved
}
Watcher::PollWatcher(ref mut watcher) => {
watcher.watch(path, recursive_mode)?;
amribm marked this conversation as resolved.
Show resolved Hide resolved
}
}
Ok(())
}
}

// impl From<notify::RecommendedWatcher> for Watcher {
// fn from(recommended_watcher: notify::RecommendedWatcher) -> Self {
amribm marked this conversation as resolved.
Show resolved Hide resolved
// Watcher::RecommendedWatcher(recommended_watcher)
// }
// }

// impl From<notify::PollWatcher> for Watcher {
// fn from(poll_watcher: notify::PollWatcher) -> Self {
// Watcher::PollWatcher(poll_watcher)
// }
// }

/// Sends a ReloadFromDisk on config_path changes.
/// Accumulates file changes until no change for given duration has occurred.
/// Has best effort guarantee of detecting all file changes from the end of
/// this function until the main thread stops.
pub fn spawn_thread<'a>(
watcher_conf: WatcherConfig,
signal_tx: crate::signal::SignalTx,
config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
delay: impl Into<Option<Duration>>,
Expand All @@ -33,7 +85,7 @@ pub fn spawn_thread<'a>(

// Create watcher now so not to miss any changes happening between
// returning from this function and the thread starting.
let mut watcher = Some(create_watcher(&config_paths)?);
let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);

info!("Watching configuration files.");

Expand All @@ -53,7 +105,7 @@ pub fn spawn_thread<'a>(

// We need to read paths to resolve any inode changes that may have happened.
// And we need to do it before raising sighup to avoid missing any change.
if let Err(error) = add_paths(&mut watcher, &config_paths) {
if let Err(error) = watcher.add_paths(&config_paths) {
error!(message = "Failed to read files to watch.", %error);
break;
}
Expand All @@ -72,7 +124,7 @@ pub fn spawn_thread<'a>(

thread::sleep(RETRY_TIMEOUT);

watcher = create_watcher(&config_paths)
watcher = create_watcher(&watcher_conf, &config_paths)
.map_err(|error| error!(message = "Failed to create file watcher.", %error))
.ok();

Expand All @@ -91,28 +143,28 @@ pub fn spawn_thread<'a>(
}

fn create_watcher(
watcher_conf: &WatcherConfig,
config_paths: &[PathBuf],
) -> Result<
(
RecommendedWatcher,
Receiver<Result<notify::Event, notify::Error>>,
),
Error,
> {
) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
info!("Creating configuration file watcher.");

let (sender, receiver) = channel();
let mut watcher = recommended_watcher(sender)?;
add_paths(&mut watcher, config_paths)?;
let mut watcher = match watcher_conf {
WatcherConfig::RecommendedWatcher => {
let recommended_watcher = recommended_watcher(sender)?;
Watcher::RecommendedWatcher(recommended_watcher)
}
WatcherConfig::PollWatcher(interval) => {
let config =
notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
let poll_watcher = notify::PollWatcher::new(sender, config)?;
amribm marked this conversation as resolved.
Show resolved Hide resolved
Watcher::PollWatcher(poll_watcher)
}
};
watcher.add_paths(config_paths)?;
Ok((watcher, receiver))
}

fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Result<(), Error> {
for path in config_paths {
watcher.watch(path, RecursiveMode::Recursive)?;
}
Ok(())
}

#[cfg(all(test, unix, not(target_os = "macos")))] // https://github.com/vectordotdev/vector/issues/5000
mod tests {
use super::*;
Expand Down Expand Up @@ -140,12 +192,13 @@ mod tests {
let delay = Duration::from_secs(3);
let dir = temp_dir().to_path_buf();
let file_path = dir.join("vector.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;

std::fs::create_dir(&dir).unwrap();
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[dir], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[dir], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -159,9 +212,10 @@ mod tests {
let delay = Duration::from_secs(3);
let file_path = temp_file();
let mut file = File::create(&file_path).unwrap();
let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[file_path], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -179,8 +233,10 @@ mod tests {
let mut file = File::create(&file_path).unwrap();
std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();

let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[sym_file], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -195,12 +251,13 @@ mod tests {
let dir = temp_dir().to_path_buf();
let sub_dir = dir.join("sources");
let file_path = sub_dir.join("input.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;

std::fs::create_dir_all(&sub_dir).unwrap();
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[sub_dir], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand Down
Loading