diff --git a/src/transforms/dedupe/transform.rs b/src/transforms/dedupe/transform.rs index 9c1432d2abb12..17bdbe30c2478 100644 --- a/src/transforms/dedupe/transform.rs +++ b/src/transforms/dedupe/transform.rs @@ -137,3 +137,103 @@ impl TaskTransform for Dedupe { Box::pin(task.filter_map(move |v| ready(inner.transform_one(v)))) } } + +#[cfg(test)] +mod tests { + use crate::event::LogEvent; + use crate::test_util::components::assert_transform_compliance; + use crate::transforms::dedupe::common::{default_cache_config, FieldMatchConfig}; + use crate::transforms::dedupe::config::DedupeConfig; + use crate::transforms::test::create_topology; + use tokio::sync::mpsc; + use tokio_stream::wrappers::ReceiverStream; + use vector_lib::lookup::lookup_v2::ConfigTargetPath; + use vrl::value::Value; + + pub fn assert_eq_values(left: LogEvent, right: LogEvent) { + let inner_left = left.into_parts().0; + let inner_right = right.into_parts().0; + assert_eq!(inner_left, inner_right); + } + + #[tokio::test] + async fn default_match() { + let config = DedupeConfig { + cache: default_cache_config(), + fields: None, + }; + + assert_transform_compliance(async move { + let (tx, rx) = mpsc::channel(1); + + let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await; + + let event1 = LogEvent::from(btreemap! { + "message" => "foo", + "host" => "bar", + "timestamp" => "t1", + }); + tx.send(event1.clone().into()).await.unwrap(); + let output = out.recv().await.unwrap().into_log(); + assert_eq_values(event1.clone(), output); + + let event2 = event1.clone(); + tx.send(event2.into()).await.unwrap(); + + let mut event3 = event1.clone(); + event3.insert("message", Value::from("another")); + tx.send(event3.clone().into()).await.unwrap(); + + let output = out.recv().await.unwrap().into_log(); + assert_eq_values(event3, output); + + drop(tx); + topology.stop().await; + assert_eq!(out.recv().await, None); + }) + .await + } + + #[tokio::test] + async fn custom_match() { + let config = DedupeConfig { + cache: default_cache_config(), + fields: Some(FieldMatchConfig::MatchFields(vec![ + ConfigTargetPath::from("a"), + ConfigTargetPath::from("b"), + ])), + }; + + assert_transform_compliance(async move { + let (tx, rx) = mpsc::channel(1); + + let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await; + + let event1 = LogEvent::from(btreemap! { + "message" => "foo", + "a" => 1, + "b" => 2, + }); + tx.send(event1.clone().into()).await.unwrap(); + let output = out.recv().await.unwrap().into_log(); + assert_eq_values(event1.clone(), output); + + let event2 = event1.clone(); + tx.send(event2.into()).await.unwrap(); + + let event3 = LogEvent::from(btreemap! { + "message" => "bar", + "a" => 3, + "b" => 2, + }); + tx.send(event3.clone().into()).await.unwrap(); + let output = out.recv().await.unwrap().into_log(); + assert_eq_values(event3, output); + + drop(tx); + topology.stop().await; + assert_eq!(out.recv().await, None); + }) + .await + } +}