Skip to content

Commit

Permalink
feat(sample transform): add sample_rate_key config option
Browse files Browse the repository at this point in the history
  • Loading branch information
dekelpilli committed Sep 12, 2024
1 parent 2e702b3 commit 32518ef
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 7 deletions.
11 changes: 10 additions & 1 deletion src/transforms/sample/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vrl::owned_value_path;
use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
use vrl::value::Kind;

use crate::{
Expand Down Expand Up @@ -43,6 +43,9 @@ pub struct SampleConfig {
/// sampled together, but that overall `1/N` transactions are sampled.
#[configurable(metadata(docs::examples = "message"))]
pub key_field: Option<String>,
#[serde(default = "default_sample_rate_key")]
#[configurable(metadata(docs::examples = "sample_rate"))]
pub sample_rate_key: OptionalValuePath,

/// A logical condition used to exclude events from sampling.
pub exclude: Option<AnyCondition>,
Expand All @@ -54,6 +57,7 @@ impl GenerateConfig for SampleConfig {
rate: 10,
key_field: None,
exclude: None::<AnyCondition>,
sample_rate_key: default_sample_rate_key(),
})
.unwrap()
}
Expand Down Expand Up @@ -105,6 +109,10 @@ impl TransformConfig for SampleConfig {
}
}

pub fn default_sample_rate_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("sample_rate"))
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -127,6 +135,7 @@ mod tests {
rate: 1,
key_field: None,
exclude: None,
sample_rate_key: default_sample_rate_key(),
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
Expand Down
42 changes: 36 additions & 6 deletions src/transforms/sample/transform.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use vector_lib::config::LegacyKey;
use vrl::event_path;

use vector_lib::lookup::lookup_v2::OptionalValuePath;
use crate::{
conditions::Condition,
event::Event,
internal_events::SampleEventDiscarded,
transforms::{FunctionTransform, OutputBuffer},
transforms::{FunctionTransform, OutputBuffer, sample::config::default_sample_rate_key},
};

#[derive(Clone)]
Expand All @@ -15,6 +16,7 @@ pub struct Sample {
key_field: Option<String>,
exclude: Option<Condition>,
count: u64,
sample_rate_key: OptionalValuePath,
}

impl Sample {
Expand All @@ -26,13 +28,15 @@ impl Sample {
rate: u64,
key_field: Option<String>,
exclude: Option<Condition>,
sample_rate_key: OptionalValuePath,
) -> Self {
Self {
name,
rate,
key_field,
exclude,
count: 0,
sample_rate_key
}
}
}
Expand Down Expand Up @@ -83,13 +87,15 @@ impl FunctionTransform for Sample {
event.namespace().insert_source_metadata(
self.name.as_str(),
event,
Some(LegacyKey::Overwrite(vrl::path!("sample_rate"))),
vrl::path!("sample_rate"),
self.sample_rate_key.path.map(|path| LegacyKey::Overwrite(path)),
self.sample_rate_key.path,
self.rate.to_string(),
);
}
Event::Trace(ref mut event) => {
event.insert(event_path!("sample_rate"), self.rate.to_string());
if let Some(path) = self.sample_rate_key.path {
event.insert(path, self.rate.to_string());
}
}
Event::Metric(_) => panic!("component can never receive metric events"),
};
Expand Down Expand Up @@ -138,6 +144,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
default_sample_rate_key(),
);
let total_passed = events
.into_iter()
Expand All @@ -160,6 +167,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
default_sample_rate_key(),
);
let total_passed = events
.into_iter()
Expand All @@ -185,6 +193,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"na",
)),
default_sample_rate_key(),
);

let first_run = events
Expand Down Expand Up @@ -220,6 +229,7 @@ mod tests {
log_schema().message_key().unwrap().to_string().as_str(),
"important",
)),
default_sample_rate_key(),
);
let iterations = 0..1000;
let total_passed = iterations
Expand All @@ -243,6 +253,7 @@ mod tests {
0,
key_field.clone(),
Some(condition_contains("other_field", "foo")),
default_sample_rate_key(),
);
let iterations = 0..1000;
let total_passed = iterations
Expand All @@ -265,6 +276,7 @@ mod tests {
10,
key_field.clone(),
Some(condition_contains(&message_key, "na")),
default_sample_rate_key(),
);
let passing = events
.into_iter()
Expand All @@ -279,20 +291,38 @@ mod tests {
25,
key_field.clone(),
Some(condition_contains(&message_key, "na")),
OptionalValuePath::from(owned_value_path!("custom_sample_rate")),
);
let passing = events
.into_iter()
.filter(|s| !s.as_log()[&message_key].to_string_lossy().contains("na"))
.find_map(|event| transform_one(&mut sampler, event))
.unwrap();
assert_eq!(passing.as_log()["sample_rate"], "25".into());
assert_eq!(passing.as_log()["custom_sample_rate"], "25".into());
assert!(passing.as_log().get("sample_rate").is_none());

let events = random_events(10000);
let mut sampler = Sample::new(
"sample".to_string(),
50,
key_field.clone(),
Some(condition_contains(&message_key, "na")),
OptionalValuePath::from(""),
);
let passing = events
.into_iter()
.filter(|s| !s.as_log()[&message_key].to_string_lossy().contains("na"))
.find_map(|event| transform_one(&mut sampler, event))
.unwrap();
assert!(passing.as_log().get("sample_rate").is_none());

// If the event passed the regex check, don't include the sampling rate
let mut sampler = Sample::new(
"sample".to_string(),
25,
key_field.clone(),
Some(condition_contains(&message_key, "na")),
default_sample_rate_key(),
);
let event = Event::Log(LogEvent::from("nananana"));
let passing = transform_one(&mut sampler, event).unwrap();
Expand All @@ -304,7 +334,7 @@ mod tests {
fn handles_trace_event() {
let event: TraceEvent = LogEvent::from("trace").into();
let trace = Event::Trace(event);
let mut sampler = Sample::new("sample".to_string(), 2, None, None);
let mut sampler = Sample::new("sample".to_string(), 2, None, None, default_sample_rate_key());
let iterations = 0..2;
let total_passed = iterations
.filter_map(|_| transform_one(&mut sampler, trace.clone()))
Expand Down

0 comments on commit 32518ef

Please sign in to comment.