From 7546afc79272433a7932020a5478991572e4f009 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 13 Aug 2024 15:20:33 -0400 Subject: [PATCH 01/11] fix(issue 21065): use the correct merge strategy for top level objects --- lib/vector-lookup/src/lookup_v2/mod.rs | 3 + src/transforms/reduce/config.rs | 34 ++--- src/transforms/reduce/merge_strategy.rs | 85 +++++++++--- src/transforms/reduce/transform.rs | 171 ++++++++++++++++++++---- 4 files changed, 233 insertions(+), 60 deletions(-) diff --git a/lib/vector-lookup/src/lookup_v2/mod.rs b/lib/vector-lookup/src/lookup_v2/mod.rs index a6d5be15b37f6..227c35a78f697 100644 --- a/lib/vector-lookup/src/lookup_v2/mod.rs +++ b/lib/vector-lookup/src/lookup_v2/mod.rs @@ -4,6 +4,7 @@ pub use optional_path::{OptionalTargetPath, OptionalValuePath}; use std::fmt; use vector_config_macros::configurable_component; +use vector_config::ConfigurableString; pub use vrl::path::{ parse_target_path, parse_value_path, BorrowedSegment, OwnedSegment, OwnedTargetPath, OwnedValuePath, PathConcat, PathParseError, PathPrefix, TargetPath, ValuePath, @@ -64,6 +65,8 @@ impl From<&str> for ConfigValuePath { #[serde(try_from = "String", into = "String")] pub struct ConfigTargetPath(pub OwnedTargetPath); +impl ConfigurableString for ConfigTargetPath {} + impl fmt::Display for ConfigTargetPath { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) diff --git a/src/transforms/reduce/config.rs b/src/transforms/reduce/config.rs index 6c254e000925d..9f0222f451f34 100644 --- a/src/transforms/reduce/config.rs +++ b/src/transforms/reduce/config.rs @@ -4,10 +4,8 @@ use std::time::Duration; use indexmap::IndexMap; use serde_with::serde_as; -use vrl::path::{parse_target_path, PathPrefix}; -use vrl::prelude::{Collection, KeyString, Kind}; - -use vector_lib::configurable::configurable_component; +use vrl::path::{PathPrefix, TargetPath}; +use vrl::prelude::{Collection, Kind}; use crate::conditions::AnyCondition; use crate::config::{ @@ -17,6 +15,8 @@ use crate::config::{ use crate::schema::Definition; use crate::transforms::reduce::merge_strategy::MergeStrategy; use crate::transforms::{reduce::transform::Reduce, Transform}; +use vector_lib::configurable::configurable_component; +use vector_lib::lookup::lookup_v2::ConfigTargetPath; /// Configuration for the `reduce` transform. #[serde_as] @@ -85,7 +85,7 @@ pub struct ReduceConfig { #[configurable(metadata( docs::additional_props_description = "An individual merge strategy." ))] - pub merge_strategies: IndexMap, + pub merge_strategies: IndexMap, /// A condition used to distinguish the final event of a transaction. /// @@ -137,16 +137,18 @@ impl TransformConfig for ReduceConfig { let mut schema_definition = merged_definition; - for (key, merge_strategy) in self.merge_strategies.iter() { - let key = if let Ok(key) = parse_target_path(key) { - key - } else { - continue; - }; - - let input_kind = match key.prefix { - PathPrefix::Event => schema_definition.event_kind().at_path(&key.path), - PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path), + for (path, merge_strategy) in self.merge_strategies.iter() { + // let key = if let Ok(key) = parse_target_path(key) { + // key + // } else { + // continue; + // }; + + let input_kind = match path.prefix() { + PathPrefix::Event => schema_definition.event_kind().at_path(path.value_path()), + PathPrefix::Metadata => { + schema_definition.metadata_kind().at_path(path.value_path()) + } }; let new_kind = match merge_strategy { @@ -215,7 +217,7 @@ impl TransformConfig for ReduceConfig { new_kind }; - schema_definition = schema_definition.with_field(&key, new_kind, None); + schema_definition = schema_definition.with_field(&path.0, new_kind, None); } // the same schema definition is used for all inputs diff --git a/src/transforms/reduce/merge_strategy.rs b/src/transforms/reduce/merge_strategy.rs index 1dd076af4f09f..b58d5db94bc98 100644 --- a/src/transforms/reduce/merge_strategy.rs +++ b/src/transforms/reduce/merge_strategy.rs @@ -5,6 +5,7 @@ use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Utc}; use ordered_float::NotNan; use vector_lib::configurable::configurable_component; +use vrl::path::OwnedTargetPath; /// Strategies for merging events. #[configurable_component] @@ -67,7 +68,11 @@ impl ReduceValueMerger for DiscardMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, self.v); Ok(()) } @@ -93,7 +98,11 @@ impl ReduceValueMerger for RetainMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, self.v); Ok(()) } @@ -133,7 +142,11 @@ impl ReduceValueMerger for ConcatMerger { } } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Bytes(self.v.into())); Ok(()) } @@ -160,7 +173,11 @@ impl ReduceValueMerger for ConcatArrayMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Array(self.v)); Ok(()) } @@ -183,7 +200,11 @@ impl ReduceValueMerger for ArrayMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Array(self.v)); Ok(()) } @@ -215,7 +236,11 @@ impl ReduceValueMerger for LongestArrayMerger { } } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Array(self.v)); Ok(()) } @@ -247,7 +272,11 @@ impl ReduceValueMerger for ShortestArrayMerger { } } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Array(self.v)); Ok(()) } @@ -292,7 +321,11 @@ impl ReduceValueMerger for FlatUniqueMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert(path, Value::Array(self.v.into_iter().collect())); Ok(()) } @@ -326,7 +359,11 @@ impl ReduceValueMerger for TimestampWindowMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { v.insert( format!("{}_end", path).as_str(), Value::Timestamp(self.latest), @@ -390,7 +427,11 @@ impl ReduceValueMerger for AddNumbersMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)), NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)), @@ -449,7 +490,11 @@ impl ReduceValueMerger for MaxNumberMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)), NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)), @@ -508,7 +553,11 @@ impl ReduceValueMerger for MinNumberMerger { Ok(()) } - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String> { + fn insert_into( + self: Box, + path: &OwnedTargetPath, + v: &mut LogEvent, + ) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)), NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)), @@ -519,7 +568,8 @@ impl ReduceValueMerger for MinNumberMerger { pub trait ReduceValueMerger: std::fmt::Debug + Send + Sync { fn add(&mut self, v: Value) -> Result<(), String>; - fn insert_into(self: Box, path: &str, v: &mut LogEvent) -> Result<(), String>; + fn insert_into(self: Box, path: &OwnedTargetPath, v: &mut LogEvent) + -> Result<(), String>; } impl From for Box { @@ -612,10 +662,10 @@ pub(crate) fn get_value_merger( #[cfg(test)] mod test { - use serde_json::json; - use super::*; use crate::event::LogEvent; + use serde_json::json; + use vrl::owned_event_path; #[test] fn initial_values() { @@ -876,7 +926,8 @@ mod test { let mut merger = get_value_merger(initial, strategy)?; merger.add(additional)?; let mut output = LogEvent::default(); - merger.insert_into("out", &mut output)?; - Ok(output.remove("out").unwrap()) + let out_path = owned_event_path!("out"); + merger.insert_into(&out_path, &mut output)?; + Ok(output.remove(&out_path).unwrap()) } } diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index 07f117b91aef8..8b5182225c5be 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -3,12 +3,6 @@ use std::collections::{hash_map, HashMap}; use std::pin::Pin; use std::time::{Duration, Instant}; -use futures::Stream; -use indexmap::IndexMap; -use vector_lib::stream::expiration_map::{map_with_expiration, Emitter}; -use vrl::path::parse_target_path; -use vrl::prelude::KeyString; - use crate::transforms::reduce::merge_strategy::{ get_value_merger, MergeStrategy, ReduceValueMerger, }; @@ -18,16 +12,35 @@ use crate::{ internal_events::ReduceStaleEventFlushed, transforms::{reduce::config::ReduceConfig, TaskTransform}, }; +use futures::Stream; +use indexmap::IndexMap; +use vector_lib::lookup::lookup_v2::ConfigTargetPath; +use vector_lib::stream::expiration_map::{map_with_expiration, Emitter}; +use vrl::path::{parse_target_path, OwnedTargetPath}; #[derive(Debug)] struct ReduceState { events: usize, - fields: HashMap>, + fields: HashMap>, stale_since: Instant, creation: Instant, metadata: EventMetadata, } +fn is_covered_by_strategy( + path: &OwnedTargetPath, + strategies: &IndexMap, +) -> bool { + let mut current = OwnedTargetPath::event_root(); + for component in &path.path.segments { + current = current.with_field_appended(&component.to_string()); + if strategies.contains_key(¤t) { + return true; + } + } + false +} + impl ReduceState { fn new() -> Self { Self { @@ -39,13 +52,43 @@ impl ReduceState { } } - fn add_event(&mut self, e: LogEvent, strategies: &IndexMap) { + fn add_event( + &mut self, + e: LogEvent, + strategies: &IndexMap, + ) -> crate::Result<()> { self.metadata.merge(e.metadata().clone()); + for (path, strategy) in strategies { + if let Some(value) = e.get(path) { + match self.fields.entry(path.clone()) { + Entry::Vacant(entry) => match get_value_merger(value.clone(), strategy) { + Ok(m) => { + entry.insert(m); + } + Err(error) => { + warn!(message = "Failed to create value merger.", %error, %path); + } + }, + Entry::Occupied(mut entry) => { + if let Err(error) = entry.get_mut().add(value.clone()) { + warn!(message = "Failed to merge value.", %error); + } + } + } + } + } + if let Some(fields_iter) = e.all_event_fields_skip_array_elements() { for (path, value) in fields_iter { - let maybe_strategy = strategies.get(&path); - match self.fields.entry(path) { + // This should not return an error, unless there is a bug in the event fields iterator. + let parsed_path = parse_target_path(&path)?; + if is_covered_by_strategy(&parsed_path, strategies) { + continue; + } + + let maybe_strategy = strategies.get(&parsed_path); + match self.fields.entry(parsed_path) { Entry::Vacant(entry) => { if let Some(strategy) = maybe_strategy { match get_value_merger(value.clone(), strategy) { @@ -72,12 +115,13 @@ impl ReduceState { self.events += 1; self.stale_since = Instant::now(); + Ok(()) } fn flush(mut self) -> LogEvent { let mut event = LogEvent::new_with_metadata(self.metadata); - for (k, v) in self.fields.drain() { - if let Err(error) = v.insert_into(k.as_str(), &mut event) { + for (path, v) in self.fields.drain() { + if let Err(error) = v.insert_into(&path, &mut event) { warn!(message = "Failed to merge values for field.", %error); } } @@ -92,17 +136,19 @@ pub struct Reduce { flush_period: Duration, end_every_period: Option, group_by: Vec, - merge_strategies: IndexMap, + merge_strategies: IndexMap, reduce_merge_states: HashMap, ends_when: Option, starts_when: Option, max_events: Option, } -fn validate_merge_strategies(strategies: IndexMap) -> crate::Result<()> { +fn validate_merge_strategies( + strategies: IndexMap, +) -> crate::Result<()> { for (path, _) in &strategies { - let contains_index = parse_target_path(path) - .map_err(|_| format!("Could not parse path: `{path}`"))? + let contains_index = path + .0 .path .segments .iter() @@ -147,7 +193,11 @@ impl Reduce { flush_period: config.flush_period_ms, end_every_period: config.end_every_period_ms, group_by, - merge_strategies: config.merge_strategies.clone(), + merge_strategies: config + .merge_strategies + .iter() + .map(|(path, strategy)| (path.0.clone(), strategy.clone())) + .collect(), reduce_merge_states: HashMap::new(), ends_when, starts_when, @@ -183,20 +233,29 @@ impl Reduce { .for_each(|(_, s)| emitter.emit(Event::from(s.flush()))); } - fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) { + fn push_or_new_reduce_state( + &mut self, + event: LogEvent, + discriminant: Discriminant, + ) -> crate::Result<()> { match self.reduce_merge_states.entry(discriminant) { hash_map::Entry::Vacant(entry) => { let mut state = ReduceState::new(); - state.add_event(event, &self.merge_strategies); + state.add_event(event, &self.merge_strategies)?; entry.insert(state); } hash_map::Entry::Occupied(mut entry) => { - entry.get_mut().add_event(event, &self.merge_strategies); + entry.get_mut().add_event(event, &self.merge_strategies)?; } - } + }; + Ok(()) } - pub(crate) fn transform_one(&mut self, emitter: &mut Emitter, event: Event) { + pub(crate) fn transform_one( + &mut self, + emitter: &mut Emitter, + event: Event, + ) -> crate::Result<()> { let (starts_here, event) = match &self.starts_when { Some(condition) => condition.check(event), None => (false, event), @@ -230,15 +289,16 @@ impl Reduce { } else if ends_here { emitter.emit(match self.reduce_merge_states.remove(&discriminant) { Some(mut state) => { - state.add_event(event, &self.merge_strategies); + state.add_event(event, &self.merge_strategies)?; state.flush().into() } None => { let mut state = ReduceState::new(); - state.add_event(event, &self.merge_strategies); + state.add_event(event, &self.merge_strategies)?; state.flush().into() } - }) + }); + Ok(()) } else { self.push_or_new_reduce_state(event, discriminant) } @@ -261,7 +321,9 @@ impl TaskTransform for Reduce { flush_period, |me: &mut Box, event, emitter: &mut Emitter| { // called for each event - me.transform_one(emitter, event); + if let Err(error) = me.transform_one(emitter, event) { + error!(%error, rate_limit = 30) + } }, |me: &mut Box, emitter: &mut Emitter| { // called periodically to check for expired events @@ -869,7 +931,62 @@ merge_strategies.bar = "concat" let error = Reduce::new(&config, &TableRegistry::default()).unwrap_err(); assert_eq!( error.to_string(), - "Merge strategies with indexes are currently not supported. Path: `a.b[0]`" + "Merge strategies with indexes are currently not supported. Path: `.a.b[0]`" ); } + + #[tokio::test] + async fn merge_objects_in_array() { + let config = toml::from_str::(indoc!( + r#" + group_by = [ "id" ] + merge_strategies."events" = "array" + + [ends_when] + type = "vrl" + source = "exists(.test_end)" + "#, + )) + .unwrap(); + + assert_transform_compliance(async move { + let (tx, rx) = mpsc::channel(1); + + let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await; + + let v_1 = Value::from(btreemap! { + "attrs" => btreemap! { + "msg" => "foo", + }, + "sev" => 2, + }); + let mut e_1 = LogEvent::from(Value::from(btreemap! {"id" => 777})); + e_1.insert("events", v_1.clone()); + tx.send(e_1.into()).await.unwrap(); + + let v_2 = Value::from(btreemap! { + "attrs" => btreemap! { + "msg" => "bar", + }, + "sev" => 3, + }); + let mut e_2 = + LogEvent::from(Value::from(btreemap! {"id" => 777, "test_end" => "done"})); + e_2.insert("events", v_2.clone()); + tx.send(e_2.into()).await.unwrap(); + + let output = out.recv().await.unwrap().into_log(); + let expected_value = Value::from(btreemap! { + "id" => 1554, + "events" => vec![v_1, v_2], + "test_end" => "done" + }); + assert_eq!(*output.value(), expected_value); + + drop(tx); + topology.stop().await; + assert_eq!(out.recv().await, None); + }) + .await + } } From 7158ca9681e23f0d12b3c2984faf85f35ceb64fe Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 13 Aug 2024 15:38:13 -0400 Subject: [PATCH 02/11] changelog --- changelog.d/21067_reduce.fix.md | 1 + src/transforms/reduce/config.rs | 6 ------ 2 files changed, 1 insertion(+), 6 deletions(-) create mode 100644 changelog.d/21067_reduce.fix.md diff --git a/changelog.d/21067_reduce.fix.md b/changelog.d/21067_reduce.fix.md new file mode 100644 index 0000000000000..6a1148bd6d190 --- /dev/null +++ b/changelog.d/21067_reduce.fix.md @@ -0,0 +1 @@ +Fixes a Vector v0.40 regression where the `reduce` transform would not group top level objects correctly. diff --git a/src/transforms/reduce/config.rs b/src/transforms/reduce/config.rs index 9f0222f451f34..e0b0d96d7f732 100644 --- a/src/transforms/reduce/config.rs +++ b/src/transforms/reduce/config.rs @@ -138,12 +138,6 @@ impl TransformConfig for ReduceConfig { let mut schema_definition = merged_definition; for (path, merge_strategy) in self.merge_strategies.iter() { - // let key = if let Ok(key) = parse_target_path(key) { - // key - // } else { - // continue; - // }; - let input_kind = match path.prefix() { PathPrefix::Event => schema_definition.event_kind().at_path(path.value_path()), PathPrefix::Metadata => { From 8e8c32b9f90faedfd988a8d42dded8bac525771b Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 13 Aug 2024 16:05:03 -0400 Subject: [PATCH 03/11] revert config changes to preserve backwards compatibility --- lib/vector-lookup/src/lookup_v2/mod.rs | 3 --- src/transforms/reduce/config.rs | 28 +++++++++++++++----------- src/transforms/reduce/transform.rs | 20 ++++++++++-------- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/lib/vector-lookup/src/lookup_v2/mod.rs b/lib/vector-lookup/src/lookup_v2/mod.rs index 227c35a78f697..a6d5be15b37f6 100644 --- a/lib/vector-lookup/src/lookup_v2/mod.rs +++ b/lib/vector-lookup/src/lookup_v2/mod.rs @@ -4,7 +4,6 @@ pub use optional_path::{OptionalTargetPath, OptionalValuePath}; use std::fmt; use vector_config_macros::configurable_component; -use vector_config::ConfigurableString; pub use vrl::path::{ parse_target_path, parse_value_path, BorrowedSegment, OwnedSegment, OwnedTargetPath, OwnedValuePath, PathConcat, PathParseError, PathPrefix, TargetPath, ValuePath, @@ -65,8 +64,6 @@ impl From<&str> for ConfigValuePath { #[serde(try_from = "String", into = "String")] pub struct ConfigTargetPath(pub OwnedTargetPath); -impl ConfigurableString for ConfigTargetPath {} - impl fmt::Display for ConfigTargetPath { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) diff --git a/src/transforms/reduce/config.rs b/src/transforms/reduce/config.rs index e0b0d96d7f732..6c254e000925d 100644 --- a/src/transforms/reduce/config.rs +++ b/src/transforms/reduce/config.rs @@ -4,8 +4,10 @@ use std::time::Duration; use indexmap::IndexMap; use serde_with::serde_as; -use vrl::path::{PathPrefix, TargetPath}; -use vrl::prelude::{Collection, Kind}; +use vrl::path::{parse_target_path, PathPrefix}; +use vrl::prelude::{Collection, KeyString, Kind}; + +use vector_lib::configurable::configurable_component; use crate::conditions::AnyCondition; use crate::config::{ @@ -15,8 +17,6 @@ use crate::config::{ use crate::schema::Definition; use crate::transforms::reduce::merge_strategy::MergeStrategy; use crate::transforms::{reduce::transform::Reduce, Transform}; -use vector_lib::configurable::configurable_component; -use vector_lib::lookup::lookup_v2::ConfigTargetPath; /// Configuration for the `reduce` transform. #[serde_as] @@ -85,7 +85,7 @@ pub struct ReduceConfig { #[configurable(metadata( docs::additional_props_description = "An individual merge strategy." ))] - pub merge_strategies: IndexMap, + pub merge_strategies: IndexMap, /// A condition used to distinguish the final event of a transaction. /// @@ -137,12 +137,16 @@ impl TransformConfig for ReduceConfig { let mut schema_definition = merged_definition; - for (path, merge_strategy) in self.merge_strategies.iter() { - let input_kind = match path.prefix() { - PathPrefix::Event => schema_definition.event_kind().at_path(path.value_path()), - PathPrefix::Metadata => { - schema_definition.metadata_kind().at_path(path.value_path()) - } + for (key, merge_strategy) in self.merge_strategies.iter() { + let key = if let Ok(key) = parse_target_path(key) { + key + } else { + continue; + }; + + let input_kind = match key.prefix { + PathPrefix::Event => schema_definition.event_kind().at_path(&key.path), + PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path), }; let new_kind = match merge_strategy { @@ -211,7 +215,7 @@ impl TransformConfig for ReduceConfig { new_kind }; - schema_definition = schema_definition.with_field(&path.0, new_kind, None); + schema_definition = schema_definition.with_field(&key, new_kind, None); } // the same schema definition is used for all inputs diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index 8b5182225c5be..2fb7cff37ed58 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -14,9 +14,9 @@ use crate::{ }; use futures::Stream; use indexmap::IndexMap; -use vector_lib::lookup::lookup_v2::ConfigTargetPath; use vector_lib::stream::expiration_map::{map_with_expiration, Emitter}; use vrl::path::{parse_target_path, OwnedTargetPath}; +use vrl::prelude::KeyString; #[derive(Debug)] struct ReduceState { @@ -143,12 +143,10 @@ pub struct Reduce { max_events: Option, } -fn validate_merge_strategies( - strategies: IndexMap, -) -> crate::Result<()> { +fn validate_merge_strategies(strategies: IndexMap) -> crate::Result<()> { for (path, _) in &strategies { - let contains_index = path - .0 + let contains_index = parse_target_path(path) + .map_err(|_| format!("Could not parse path: `{path}`"))? .path .segments .iter() @@ -196,7 +194,13 @@ impl Reduce { merge_strategies: config .merge_strategies .iter() - .map(|(path, strategy)| (path.0.clone(), strategy.clone())) + .filter_map(|(path, strategy)| { + // TODO Invalid paths are ignored to preserve backwards compatibility. + // Merge strategy paths should ideally be [`lookup_v2::ConfigTargetPath`] + // which means an invalid path would result in an configuration error. + let parsed_path = parse_target_path(path).ok(); + parsed_path.map(|path| (path, strategy.clone())) + }) .collect(), reduce_merge_states: HashMap::new(), ends_when, @@ -931,7 +935,7 @@ merge_strategies.bar = "concat" let error = Reduce::new(&config, &TableRegistry::default()).unwrap_err(); assert_eq!( error.to_string(), - "Merge strategies with indexes are currently not supported. Path: `.a.b[0]`" + "Merge strategies with indexes are currently not supported. Path: `a.b[0]`" ); } From 50367609d8d9f2989ff700c10e4836e1d0b47949 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 13 Aug 2024 16:15:24 -0400 Subject: [PATCH 04/11] enhance test case --- src/transforms/reduce/transform.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index 2fb7cff37ed58..016769677baf0 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -944,7 +944,8 @@ merge_strategies.bar = "concat" let config = toml::from_str::(indoc!( r#" group_by = [ "id" ] - merge_strategies."events" = "array" + merge_strategies.events = "array" + merge_strategies."events.sev" = "discard" [ends_when] type = "vrl" From 7eefa1ac87aecf7b2d4b9618aa76b661ccdceeae Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 13 Aug 2024 16:26:08 -0400 Subject: [PATCH 05/11] update test with another merge strategy, note we cannot handle strategies for both 'a' and 'a.b' --- src/transforms/reduce/transform.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index 016769677baf0..2a4244f53cce2 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -945,7 +945,7 @@ merge_strategies.bar = "concat" r#" group_by = [ "id" ] merge_strategies.events = "array" - merge_strategies."events.sev" = "discard" + merge_strategies.another = "discard" [ends_when] type = "vrl" @@ -965,7 +965,9 @@ merge_strategies.bar = "concat" }, "sev" => 2, }); - let mut e_1 = LogEvent::from(Value::from(btreemap! {"id" => 777})); + let mut e_1 = LogEvent::from(Value::from( + btreemap! {"id" => 777, "another" => btreemap!{ "a" => 1}}, + )); e_1.insert("events", v_1.clone()); tx.send(e_1.into()).await.unwrap(); @@ -975,8 +977,9 @@ merge_strategies.bar = "concat" }, "sev" => 3, }); - let mut e_2 = - LogEvent::from(Value::from(btreemap! {"id" => 777, "test_end" => "done"})); + let mut e_2 = LogEvent::from(Value::from( + btreemap! {"id" => 777, "test_end" => "done", "another" => btreemap!{ "b" => 2}}, + )); e_2.insert("events", v_2.clone()); tx.send(e_2.into()).await.unwrap(); @@ -984,6 +987,7 @@ merge_strategies.bar = "concat" let expected_value = Value::from(btreemap! { "id" => 1554, "events" => vec![v_1, v_2], + "another" => btreemap!{ "a" => 1}, "test_end" => "done" }); assert_eq!(*output.value(), expected_value); From 63e5cce7fe31359d37eb602ba0a1218991f7a7c7 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 14 Aug 2024 16:58:00 -0400 Subject: [PATCH 06/11] tweak changelog text --- changelog.d/21067_reduce.fix.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/21067_reduce.fix.md b/changelog.d/21067_reduce.fix.md index 6a1148bd6d190..67af1e4e16374 100644 --- a/changelog.d/21067_reduce.fix.md +++ b/changelog.d/21067_reduce.fix.md @@ -1 +1 @@ -Fixes a Vector v0.40 regression where the `reduce` transform would not group top level objects correctly. +Fixes a Vector v0.40.0 regression where the `reduce` transform would not group top level objects correctly. From d17b7a5a07c523f11ce7b7f68a03c9c3a05326af Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 14 Aug 2024 17:14:53 -0400 Subject: [PATCH 07/11] workaround for issue 21077 --- src/transforms/reduce/transform.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index 2a4244f53cce2..a250554e06137 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -81,6 +81,8 @@ impl ReduceState { if let Some(fields_iter) = e.all_event_fields_skip_array_elements() { for (path, value) in fields_iter { + // See issue 21077. + let path = path.replace("\\.", "."); // This should not return an error, unless there is a bug in the event fields iterator. let parsed_path = parse_target_path(&path)?; if is_covered_by_strategy(&parsed_path, strategies) { From a857baced5bfbe4eabcffa7214c2838dbefcbb4b Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Wed, 14 Aug 2024 17:26:37 -0400 Subject: [PATCH 08/11] add nested path to the test case --- src/transforms/reduce/transform.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index a250554e06137..d6b7bdb512b23 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -963,7 +963,7 @@ merge_strategies.bar = "concat" let v_1 = Value::from(btreemap! { "attrs" => btreemap! { - "msg" => "foo", + "nested.msg" => "foo", }, "sev" => 2, }); @@ -975,7 +975,7 @@ merge_strategies.bar = "concat" let v_2 = Value::from(btreemap! { "attrs" => btreemap! { - "msg" => "bar", + "nested.msg" => "bar", }, "sev" => 3, }); From c7c07ae1bfd5584a3e4e7d7724a3417f8c62bb77 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 15 Aug 2024 16:26:28 -0400 Subject: [PATCH 09/11] Address review points --- src/internal_events/reduce.rs | 26 +++++- src/transforms/reduce/transform.rs | 145 +++++++++++++++++++++++------ 2 files changed, 139 insertions(+), 32 deletions(-) diff --git a/src/internal_events/reduce.rs b/src/internal_events/reduce.rs index a773d0a7ac502..51dc74815d085 100644 --- a/src/internal_events/reduce.rs +++ b/src/internal_events/reduce.rs @@ -1,5 +1,6 @@ use metrics::counter; -use vector_lib::internal_event::InternalEvent; +use vector_lib::internal_event::{error_stage, error_type, InternalEvent}; +use vrl::path::PathParseError; #[derive(Debug)] pub struct ReduceStaleEventFlushed; @@ -9,3 +10,26 @@ impl InternalEvent for ReduceStaleEventFlushed { counter!("stale_events_flushed_total").increment(1); } } + +#[derive(Debug)] +pub struct ReduceAddEventError { + pub error: PathParseError, +} + +impl InternalEvent for ReduceAddEventError { + fn emit(self) { + error!( + message = "Event could not be reduced", + error = ?self.error, + error_type = error_type::CONDITION_FAILED, + stage = error_stage::PROCESSING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_type" => error_type::PARSER_FAILED, + "stage" => error_stage::PROCESSING, + ) + .increment(1); + } +} diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index d6b7bdb512b23..d630611ae53dd 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -3,6 +3,7 @@ use std::collections::{hash_map, HashMap}; use std::pin::Pin; use std::time::{Duration, Instant}; +use crate::internal_events::ReduceAddEventError; use crate::transforms::reduce::merge_strategy::{ get_value_merger, MergeStrategy, ReduceValueMerger, }; @@ -52,11 +53,7 @@ impl ReduceState { } } - fn add_event( - &mut self, - e: LogEvent, - strategies: &IndexMap, - ) -> crate::Result<()> { + fn add_event(&mut self, e: LogEvent, strategies: &IndexMap) { self.metadata.merge(e.metadata().clone()); for (path, strategy) in strategies { @@ -80,11 +77,17 @@ impl ReduceState { } if let Some(fields_iter) = e.all_event_fields_skip_array_elements() { - for (path, value) in fields_iter { - // See issue 21077. - let path = path.replace("\\.", "."); + for (mut path, value) in fields_iter { + // TODO: Addressed in issue 21077. + if path.contains("\\.") { + path = quote_invalid_paths(&path).into(); + } + // This should not return an error, unless there is a bug in the event fields iterator. - let parsed_path = parse_target_path(&path)?; + let parsed_path = match parse_target_path(&path) { + Ok(path) => path, + Err(error) => return emit!(ReduceAddEventError { error }), + }; if is_covered_by_strategy(&parsed_path, strategies) { continue; } @@ -117,7 +120,6 @@ impl ReduceState { self.events += 1; self.stale_since = Instant::now(); - Ok(()) } fn flush(mut self) -> LogEvent { @@ -201,6 +203,9 @@ impl Reduce { // Merge strategy paths should ideally be [`lookup_v2::ConfigTargetPath`] // which means an invalid path would result in an configuration error. let parsed_path = parse_target_path(path).ok(); + if parsed_path.is_none() { + warn!(message = "Ignoring strategy with invalid path.", %path); + } parsed_path.map(|path| (path, strategy.clone())) }) .collect(), @@ -239,29 +244,20 @@ impl Reduce { .for_each(|(_, s)| emitter.emit(Event::from(s.flush()))); } - fn push_or_new_reduce_state( - &mut self, - event: LogEvent, - discriminant: Discriminant, - ) -> crate::Result<()> { + fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) { match self.reduce_merge_states.entry(discriminant) { hash_map::Entry::Vacant(entry) => { let mut state = ReduceState::new(); - state.add_event(event, &self.merge_strategies)?; + state.add_event(event, &self.merge_strategies); entry.insert(state); } hash_map::Entry::Occupied(mut entry) => { - entry.get_mut().add_event(event, &self.merge_strategies)?; + entry.get_mut().add_event(event, &self.merge_strategies); } }; - Ok(()) } - pub(crate) fn transform_one( - &mut self, - emitter: &mut Emitter, - event: Event, - ) -> crate::Result<()> { + pub(crate) fn transform_one(&mut self, emitter: &mut Emitter, event: Event) { let (starts_here, event) = match &self.starts_when { Some(condition) => condition.check(event), None => (false, event), @@ -295,16 +291,15 @@ impl Reduce { } else if ends_here { emitter.emit(match self.reduce_merge_states.remove(&discriminant) { Some(mut state) => { - state.add_event(event, &self.merge_strategies)?; + state.add_event(event, &self.merge_strategies); state.flush().into() } None => { let mut state = ReduceState::new(); - state.add_event(event, &self.merge_strategies)?; + state.add_event(event, &self.merge_strategies); state.flush().into() } }); - Ok(()) } else { self.push_or_new_reduce_state(event, discriminant) } @@ -327,9 +322,7 @@ impl TaskTransform for Reduce { flush_period, |me: &mut Box, event, emitter: &mut Emitter| { // called for each event - if let Err(error) = me.transform_one(emitter, event) { - error!(%error, rate_limit = 30) - } + me.transform_one(emitter, event); }, |me: &mut Box, emitter: &mut Emitter| { // called periodically to check for expired events @@ -343,6 +336,53 @@ impl TaskTransform for Reduce { } } +// TODO delete after issue 21077 is resolved. +fn quote_invalid_paths(path: &str) -> String { + let components: Vec<&str> = path.split('.').collect(); + + let index: Vec = components + .iter() + .map(|component| component.ends_with('\\')) + .collect(); + + let mut escaping = false; + let mut result = String::new(); + index.iter().enumerate().for_each(|(i, _)| { + let current = components[i].trim_end_matches('\\'); + if i == 0 { + if index[0] { + escaping = true; + result.push('"'); + } + result.push_str(current); + } else if i == index.len() - 1 { + result.push_str(current); + if escaping { + escaping = false; + result.push('"'); + }; + } else if !index[i - 1] && index[i] { + escaping = true; + result.push('"'); + result.push_str(current); + } else if index[i - 1] && index[i] { + escaping = true; + result.push_str(current); + } else if index[i - 1] && !index[i] { + result.push_str(current); + escaping = false; + result.push('"'); + } else { + result.push_str(current); + } + + if i < components.len() - 1 { + result.push('.'); + } + }); + result +} + #[cfg(test)] mod test { use indoc::indoc; @@ -930,14 +970,14 @@ merge_strategies.bar = "concat" group_by = [ "id" ] merge_strategies.id = "discard" - merge_strategies."a.b[0]" = "array" + merge_strategies."nested.msg[0]" = "array" "#, )) .unwrap(); let error = Reduce::new(&config, &TableRegistry::default()).unwrap_err(); assert_eq!( error.to_string(), - "Merge strategies with indexes are currently not supported. Path: `a.b[0]`" + "Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`" ); } @@ -1000,4 +1040,47 @@ merge_strategies.bar = "concat" }) .await } + + #[test] + fn quote_paths_tests() { + let input = "one"; + let expected = "one"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = ".one"; + let expected = ".one"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = "one.two.three.four"; + let expected = "one.two.three.four"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two.three\.four"; + let expected = "one.two.\"three.four\""; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two.three\.four\.five"; + let expected = "one.two.\"three.four.five\""; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two.three\.four\.five.six"; + let expected = "one.two.\"three.four.five\".six"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two\.three.four\.five.six.seven\.eight"; + let expected = "one.\"two.three\".\"four.five\".six.\"seven.eight\""; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two\.three\.four\.five.six\.seven.eight"; + let expected = "one.\"two.three.four.five\".\"six.seven\".eight"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + } } From 04b181b5720ad5b163b9c1ca20fe3b15f0ce3f74 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 16 Aug 2024 12:00:58 -0400 Subject: [PATCH 10/11] add dot to message --- src/internal_events/reduce.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/internal_events/reduce.rs b/src/internal_events/reduce.rs index 51dc74815d085..74aace0925698 100644 --- a/src/internal_events/reduce.rs +++ b/src/internal_events/reduce.rs @@ -19,7 +19,7 @@ pub struct ReduceAddEventError { impl InternalEvent for ReduceAddEventError { fn emit(self) { error!( - message = "Event could not be reduced", + message = "Event could not be reduced.", error = ?self.error, error_type = error_type::CONDITION_FAILED, stage = error_stage::PROCESSING, From f32a2b76acf811e11018b3b9feea9518442d9a43 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 16 Aug 2024 14:32:01 -0400 Subject: [PATCH 11/11] more review points --- src/internal_events/reduce.rs | 5 ++++- src/transforms/reduce/transform.rs | 20 +++++++++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/internal_events/reduce.rs b/src/internal_events/reduce.rs index 74aace0925698..b9887aaf54bb7 100644 --- a/src/internal_events/reduce.rs +++ b/src/internal_events/reduce.rs @@ -1,6 +1,7 @@ use metrics::counter; use vector_lib::internal_event::{error_stage, error_type, InternalEvent}; use vrl::path::PathParseError; +use vrl::value::KeyString; #[derive(Debug)] pub struct ReduceStaleEventFlushed; @@ -14,12 +15,14 @@ impl InternalEvent for ReduceStaleEventFlushed { #[derive(Debug)] pub struct ReduceAddEventError { pub error: PathParseError, + pub path: KeyString, } impl InternalEvent for ReduceAddEventError { fn emit(self) { error!( - message = "Event could not be reduced.", + message = "Event field could not be reduced.", + path = ?self.path, error = ?self.error, error_type = error_type::CONDITION_FAILED, stage = error_stage::PROCESSING, diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index d630611ae53dd..f3e4e8050054f 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -77,16 +77,22 @@ impl ReduceState { } if let Some(fields_iter) = e.all_event_fields_skip_array_elements() { - for (mut path, value) in fields_iter { - // TODO: Addressed in issue 21077. - if path.contains("\\.") { - path = quote_invalid_paths(&path).into(); - } + for (path, value) in fields_iter { + // TODO: This can be removed once issue 21077 is resolved. + // Technically we need to quote any special characters (like `-` or `*` or ` `). + let parsable_path = if path.contains("\\.") { + quote_invalid_paths(&path).into() + } else { + path.clone() + }; // This should not return an error, unless there is a bug in the event fields iterator. - let parsed_path = match parse_target_path(&path) { + let parsed_path = match parse_target_path(&parsable_path) { Ok(path) => path, - Err(error) => return emit!(ReduceAddEventError { error }), + Err(error) => { + emit!(ReduceAddEventError { error, path }); + continue; + } }; if is_covered_by_strategy(&parsed_path, strategies) { continue;