Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: utilze tracing spans for more powerful log filtering #583

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
REPO := logdna-agent-v2

# for testing
TESTS=foo

SHELLFLAGS := -ic
.DEFAULT_GOAL := test

Expand Down
2 changes: 1 addition & 1 deletion bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

[package]
name = "logdna-agent"
version = "3.9.0-dev"
version = "3.9.0-dev-logging"
authors = ["CJP10 <[email protected]>"]
edition = "2018"
build = "build.rs"
Expand Down
2 changes: 1 addition & 1 deletion bin/tests/it/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ where
debug!("event info: {:?}", event_info);
for _safeguard in 0..100_000 {
assert!(
instant.elapsed() < delay.unwrap_or(std::time::Duration::from_secs(20)),
instant.elapsed() < delay.unwrap_or(std::time::Duration::from_secs(30)),
"Timed out waiting for condition"
);

Expand Down
3 changes: 3 additions & 0 deletions bin/tests/it/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ async fn test_metrics_endpoint() {
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}

// Sleep some more, just for good measure
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

tokio::fs::remove_file(&included_file).await.unwrap();
common::wait_for_event("Delete Event", &mut stderr_reader);
common::consume_output(stderr_reader.into_inner());
Expand Down
1 change: 1 addition & 0 deletions common/fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ priority-queue = "1"
#logging
lazy_static = "1"
tracing = "0.1"
tracing-futures = { version = "0.2", features = ["futures-03"] }

#async
async-trait = "0.1"
Expand Down
15 changes: 13 additions & 2 deletions common/fs/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,11 +637,22 @@ impl FileSystem {
}

/// Handles inotify events and may produce Event(s) that are returned upstream through sender
#[instrument(level = "debug", skip_all)]
#[instrument(level = "debug", skip_all, fields(file_path))]
fn process(&mut self, watch_event: &WatchEvent) -> FsResult<SmallVec<[Event; 2]>> {
let _entries = self.entries.clone();
let mut _entries = _entries.borrow_mut();

match watch_event {
// jakedipity: ignoring the second path in a rename isn't ideal
WatchEvent::Create(path)
| WatchEvent::Write(path)
| WatchEvent::Remove(path)
| WatchEvent::Rename(path, _) => {
tracing::Span::current().record("file_path", path.as_path().to_str().unwrap());
}
_ => {}
};

debug!("handling notify event {:#?}", watch_event);

// TODO: Remove OsString names
Expand Down Expand Up @@ -1520,7 +1531,7 @@ impl FileSystem {

/// Determines whether the path is within the initial dir
/// and either passes the master rules (e.g. "*.log") or it's a directory
#[instrument(level = "debug", skip_all)]
#[instrument(level = "debug", skip(self, path), fields(file_path = path.to_str().unwrap()))]
pub(crate) fn is_initial_dir_target(&self, path: &Path) -> bool {
// Must be within the initial dir
if self.initial_dir_rules.passes(path) != Status::Ok {
Expand Down
9 changes: 7 additions & 2 deletions common/fs/src/cache/tailed_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use serde_json::Value;
use time::OffsetDateTime;
use tokio::io::{AsyncSeekExt, BufReader, SeekFrom};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, instrument, warn};
use tracing_futures::Instrument;

#[derive(Debug)]
pub struct LazyLineSerializer {
Expand Down Expand Up @@ -567,6 +568,7 @@ impl LazyLines {

impl TailedFile<LazyLineSerializer> {
// tail a file for new line(s)
#[instrument(level = "debug", skip_all, fields(file_path = &paths[0].as_path().to_str().unwrap()))]
pub(crate) async fn tail(
&mut self,
paths: &[PathBuf],
Expand Down Expand Up @@ -602,6 +604,8 @@ impl TailedFile<LazyLineSerializer> {

let target_read = inner.initial_offsets.first().map(|offsets| offsets.start);

debug!("tailing from {} to {}", inner.offset, len);

// if we are at the end of the file there's no work to do
if inner.offset == len {
return None;
Expand Down Expand Up @@ -780,7 +784,8 @@ impl TailedFile<LazyLineSerializer> {
// Discard errors
line_res.ok()
})
.flatten(),
.flatten()
.in_current_span(),
)
}
}
Expand Down
17 changes: 16 additions & 1 deletion common/fs/src/tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use metrics::Metrics;
use state::{FileId, SpanVec};
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use tracing::{debug, info, warn};
use tracing::{debug, info, instrument, warn};

use futures::{ready, Future, Stream, StreamExt};

Expand Down Expand Up @@ -42,10 +42,25 @@ fn get_file_for_path(fs: &FileSystem, next_path: &std::path::Path) -> Option<Ent
}

#[allow(clippy::await_holding_refcell_ref)]
#[instrument(level = "debug", skip_all, fields(file_path))]
async fn handle_event(
event: Event,
fs: &FileSystem,
) -> Option<impl Stream<Item = LazyLineSerializer>> {
match event {
Event::Initialize(entry_ptr)
| Event::New(entry_ptr)
| Event::Write(entry_ptr)
| Event::Delete(entry_ptr) => {
let entries = fs.entries.borrow();
let entry = entries.get(entry_ptr)?;
let paths = fs.resolve_valid_paths(entry, &entries);
if !paths.is_empty() {
tracing::Span::current().record("file_path", paths[0].as_path().to_str().unwrap());
}
}
}

match event {
Event::Initialize(entry_ptr) => {
debug!("Initialize Event");
Expand Down