Skip to content

Commit

Permalink
fix(file sources): exclude pattern with multi slashes can not match s…
Browse files Browse the repository at this point in the history
…ome files (#21082)

* fix: file source exclude pattern with multi slashes

* add changelog
  • Loading branch information
suikammd authored Aug 20, 2024
1 parent c86bdcc commit e95f098
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `file` source now properly handle exclude patterns with multiple slashes when matching files.

authors: suikammd
53 changes: 52 additions & 1 deletion src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,11 @@ pub fn file_source(
return Box::pin(future::ready(Err(())));
}

let exclude_patterns = config
.exclude
.iter()
.map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
.collect::<Vec<PathBuf>>();
let ignore_before = calculate_ignore_before(config.ignore_older_secs);
let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
let (ignore_checkpoints, read_from) = reconcile_position_options(
Expand All @@ -525,7 +530,7 @@ pub fn file_source(

let paths_provider = Glob::new(
&config.include,
&config.exclude,
&exclude_patterns,
MatchOptions::default(),
emitter.clone(),
)
Expand Down Expand Up @@ -1407,6 +1412,52 @@ mod tests {
assert_eq!(is, [n as usize; 3]);
}

#[tokio::test]
async fn file_exclude_paths() {
let n = 5;

let dir = tempdir().unwrap();
let config = file::FileConfig {
include: vec![dir.path().join("a//b/*.log.*")],
exclude: vec![dir.path().join("a//b/test.log.*")],
..test_default_file_config(&dir)
};

let path1 = dir.path().join("a//b/a.log.1");
let path2 = dir.path().join("a//b/test.log.1");
let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
let mut file1 = File::create(&path1).unwrap();
let mut file2 = File::create(&path2).unwrap();

sleep_500_millis().await; // The files must be observed at their original lengths before writing to them

for i in 0..n {
writeln!(&mut file1, "1 {}", i).unwrap();
writeln!(&mut file2, "2 {}", i).unwrap();
}

sleep_500_millis().await;
})
.await;

let mut is = [0; 1];

for event in received {
let line =
event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
let mut split = line.split(' ');
let file = split.next().unwrap().parse::<usize>().unwrap();
assert_ne!(file, 4);
let i = split.next().unwrap().parse::<usize>().unwrap();

assert_eq!(is[file - 1], i);
is[file - 1] += 1;
}

assert_eq!(is, [n as usize; 1]);
}

#[tokio::test]
async fn file_key_acknowledged() {
file_key(Acks).await
Expand Down

0 comments on commit e95f098

Please sign in to comment.