diff --git a/Cargo.lock b/Cargo.lock index 5d6cf0809c7e75..68b950b9a13033 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1047,7 +1047,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "bytes 1.6.1", + "bytes 1.7.1", "http 0.2.9", "regex", "tracing 0.1.40", @@ -1069,7 +1069,7 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "bytes 1.6.1", + "bytes 1.7.1", "http 0.2.9", "regex", "tracing 0.1.40", diff --git a/src/sources/file.rs b/src/sources/file.rs index 3bf8aa826e5409..362cae59cc07ba 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -511,6 +511,11 @@ pub fn file_source( return Box::pin(future::ready(Err(()))); } + let exclude_patterns = config + .exclude + .iter() + .map(|path_buf| path_buf.iter().collect::()) + .collect::>(); let ignore_before = calculate_ignore_before(config.ignore_older_secs); let glob_minimum_cooldown = config.glob_minimum_cooldown_ms; let (ignore_checkpoints, read_from) = reconcile_position_options( @@ -525,7 +530,7 @@ pub fn file_source( let paths_provider = Glob::new( &config.include, - &config.exclude, + &exclude_patterns, MatchOptions::default(), emitter.clone(), ) @@ -1407,6 +1412,52 @@ mod tests { assert_eq!(is, [n as usize; 3]); } + #[tokio::test] + async fn file_exclude_paths() { + let n = 5; + + let dir = tempdir().unwrap(); + let config = file::FileConfig { + include: vec![dir.path().join("a//b/*.log.*")], + exclude: vec![dir.path().join("a//b/test.log.*")], + ..test_default_file_config(&dir) + }; + + let path1 = dir.path().join("a//b/a.log.1"); + let path2 = dir.path().join("a//b/test.log.1"); + let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async { + std::fs::create_dir_all(dir.path().join("a/b")).unwrap(); + let mut file1 = File::create(&path1).unwrap(); + let mut file2 = File::create(&path2).unwrap(); + + sleep_500_millis().await; // The files must be observed at their original lengths before writing to them + + for i in 0..n { + writeln!(&mut file1, "1 {}", i).unwrap(); + writeln!(&mut file2, "2 {}", i).unwrap(); + } + + sleep_500_millis().await; + }) + .await; + + let mut is = [0; 1]; + + for event in received { + let line = + event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy(); + let mut split = line.split(' '); + let file = split.next().unwrap().parse::().unwrap(); + assert_ne!(file, 4); + let i = split.next().unwrap().parse::().unwrap(); + + assert_eq!(is[file - 1], i); + is[file - 1] += 1; + } + + assert_eq!(is, [n as usize; 1]); + } + #[tokio::test] async fn file_key_acknowledged() { file_key(Acks).await