diff --git a/Cargo.lock b/Cargo.lock index f63913ed2..b0d96c05e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1092,6 +1092,7 @@ dependencies = [ "tokio-test", "tokio-util", "tracing", + "tracing-futures", "walkdir", "winapi-util", ] @@ -1941,7 +1942,7 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "logdna-agent" -version = "3.9.0-dev" +version = "3.9.0-dev-logging" dependencies = [ "anyhow", "api", @@ -3985,6 +3986,18 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "futures", + "futures-task", + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.1.4" diff --git a/Makefile b/Makefile index 45219cb3a..5a74f8759 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,8 @@ REPO := logdna-agent-v2 +# for testing +TESTS=foo + SHELLFLAGS := -ic .DEFAULT_GOAL := test diff --git a/bin/Cargo.toml b/bin/Cargo.toml index ef48176f4..f2ab2cbbb 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -10,7 +10,7 @@ [package] name = "logdna-agent" -version = "3.9.0-dev" +version = "3.9.0-dev-logging" authors = ["CJP10 "] edition = "2018" build = "build.rs" diff --git a/bin/tests/it/common.rs b/bin/tests/it/common.rs index 53231bfb1..e8cd4626e 100644 --- a/bin/tests/it/common.rs +++ b/bin/tests/it/common.rs @@ -304,7 +304,7 @@ where debug!("event info: {:?}", event_info); for _safeguard in 0..100_000 { assert!( - instant.elapsed() < delay.unwrap_or(std::time::Duration::from_secs(20)), + instant.elapsed() < delay.unwrap_or(std::time::Duration::from_secs(30)), "Timed out waiting for condition" ); diff --git a/bin/tests/it/metrics.rs b/bin/tests/it/metrics.rs index 8658aa2ad..d0c5a00c0 100644 --- a/bin/tests/it/metrics.rs +++ b/bin/tests/it/metrics.rs @@ -356,6 +356,9 @@ async fn test_metrics_endpoint() { tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; } + // Sleep some more, just for good measure + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + tokio::fs::remove_file(&included_file).await.unwrap(); common::wait_for_event("Delete Event", &mut stderr_reader); common::consume_output(stderr_reader.into_inner()); diff --git a/common/fs/Cargo.toml b/common/fs/Cargo.toml index 136c32009..e51f4955d 100644 --- a/common/fs/Cargo.toml +++ b/common/fs/Cargo.toml @@ -36,6 +36,7 @@ priority-queue = "1" #logging lazy_static = "1" tracing = "0.1" +tracing-futures = { version = "0.2", features = ["futures-03"] } #async async-trait = "0.1" diff --git a/common/fs/src/cache/mod.rs b/common/fs/src/cache/mod.rs index 874d72956..a73ca83bf 100755 --- a/common/fs/src/cache/mod.rs +++ b/common/fs/src/cache/mod.rs @@ -637,11 +637,22 @@ impl FileSystem { } /// Handles inotify events and may produce Event(s) that are returned upstream through sender - #[instrument(level = "debug", skip_all)] + #[instrument(level = "debug", skip_all, fields(file_path))] fn process(&mut self, watch_event: &WatchEvent) -> FsResult> { let _entries = self.entries.clone(); let mut _entries = _entries.borrow_mut(); + match watch_event { + // jakedipity: ignoring the second path in a rename isn't ideal + WatchEvent::Create(path) + | WatchEvent::Write(path) + | WatchEvent::Remove(path) + | WatchEvent::Rename(path, _) => { + tracing::Span::current().record("file_path", path.as_path().to_str().unwrap()); + } + _ => {} + }; + debug!("handling notify event {:#?}", watch_event); // TODO: Remove OsString names @@ -1520,7 +1531,7 @@ impl FileSystem { /// Determines whether the path is within the initial dir /// and either passes the master rules (e.g. "*.log") or it's a directory - #[instrument(level = "debug", skip_all)] + #[instrument(level = "debug", skip(self, path), fields(file_path = path.to_str().unwrap()))] pub(crate) fn is_initial_dir_target(&self, path: &Path) -> bool { // Must be within the initial dir if self.initial_dir_rules.passes(path) != Status::Ok { diff --git a/common/fs/src/cache/tailed_file.rs b/common/fs/src/cache/tailed_file.rs index 1946e5ed2..7f3f9f82f 100644 --- a/common/fs/src/cache/tailed_file.rs +++ b/common/fs/src/cache/tailed_file.rs @@ -32,7 +32,8 @@ use serde_json::Value; use time::OffsetDateTime; use tokio::io::{AsyncSeekExt, BufReader, SeekFrom}; use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, instrument, warn}; +use tracing_futures::Instrument; #[derive(Debug)] pub struct LazyLineSerializer { @@ -567,6 +568,7 @@ impl LazyLines { impl TailedFile { // tail a file for new line(s) + #[instrument(level = "debug", skip_all, fields(file_path = &paths[0].as_path().to_str().unwrap()))] pub(crate) async fn tail( &mut self, paths: &[PathBuf], @@ -602,6 +604,8 @@ impl TailedFile { let target_read = inner.initial_offsets.first().map(|offsets| offsets.start); + debug!("tailing from {} to {}", inner.offset, len); + // if we are at the end of the file there's no work to do if inner.offset == len { return None; @@ -780,7 +784,8 @@ impl TailedFile { // Discard errors line_res.ok() }) - .flatten(), + .flatten() + .in_current_span(), ) } } diff --git a/common/fs/src/tail.rs b/common/fs/src/tail.rs index ef57a9395..3d36910eb 100644 --- a/common/fs/src/tail.rs +++ b/common/fs/src/tail.rs @@ -10,7 +10,7 @@ use metrics::Metrics; use state::{FileId, SpanVec}; use std::collections::HashMap; use std::ops::{Deref, DerefMut}; -use tracing::{debug, info, warn}; +use tracing::{debug, info, instrument, warn}; use futures::{ready, Future, Stream, StreamExt}; @@ -42,10 +42,25 @@ fn get_file_for_path(fs: &FileSystem, next_path: &std::path::Path) -> Option Option> { + match event { + Event::Initialize(entry_ptr) + | Event::New(entry_ptr) + | Event::Write(entry_ptr) + | Event::Delete(entry_ptr) => { + let entries = fs.entries.borrow(); + let entry = entries.get(entry_ptr)?; + let paths = fs.resolve_valid_paths(entry, &entries); + if !paths.is_empty() { + tracing::Span::current().record("file_path", paths[0].as_path().to_str().unwrap()); + } + } + } + match event { Event::Initialize(entry_ptr) => { debug!("Initialize Event");