diff --git a/src/internal_events/reduce.rs b/src/internal_events/reduce.rs index a773d0a7ac5026..51dc74815d085c 100644 --- a/src/internal_events/reduce.rs +++ b/src/internal_events/reduce.rs @@ -1,5 +1,6 @@ use metrics::counter; -use vector_lib::internal_event::InternalEvent; +use vector_lib::internal_event::{error_stage, error_type, InternalEvent}; +use vrl::path::PathParseError; #[derive(Debug)] pub struct ReduceStaleEventFlushed; @@ -9,3 +10,26 @@ impl InternalEvent for ReduceStaleEventFlushed { counter!("stale_events_flushed_total").increment(1); } } + +#[derive(Debug)] +pub struct ReduceAddEventError { + pub error: PathParseError, +} + +impl InternalEvent for ReduceAddEventError { + fn emit(self) { + error!( + message = "Event could not be reduced", + error = ?self.error, + error_type = error_type::CONDITION_FAILED, + stage = error_stage::PROCESSING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_type" => error_type::PARSER_FAILED, + "stage" => error_stage::PROCESSING, + ) + .increment(1); + } +} diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index d6b7bdb512b239..e68560763afdd0 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -3,6 +3,7 @@ use std::collections::{hash_map, HashMap}; use std::pin::Pin; use std::time::{Duration, Instant}; +use crate::internal_events::ReduceAddEventError; use crate::transforms::reduce::merge_strategy::{ get_value_merger, MergeStrategy, ReduceValueMerger, }; @@ -52,11 +53,7 @@ impl ReduceState { } } - fn add_event( - &mut self, - e: LogEvent, - strategies: &IndexMap, - ) -> crate::Result<()> { + fn add_event(&mut self, e: LogEvent, strategies: &IndexMap) { self.metadata.merge(e.metadata().clone()); for (path, strategy) in strategies { @@ -80,11 +77,17 @@ impl ReduceState { } if let Some(fields_iter) = e.all_event_fields_skip_array_elements() { - for (path, value) in fields_iter { - // See issue 21077. - let path = path.replace("\\.", "."); + for (mut path, value) in fields_iter { + // TODO: Addressed in issue 21077. + if !path.contains("\\.") { + path = quote_invalid_paths(&path).into(); + } + // This should not return an error, unless there is a bug in the event fields iterator. - let parsed_path = parse_target_path(&path)?; + let parsed_path = match parse_target_path(&path) { + Ok(path) => path, + Err(error) => return emit!(ReduceAddEventError { error }), + }; if is_covered_by_strategy(&parsed_path, strategies) { continue; } @@ -117,7 +120,6 @@ impl ReduceState { self.events += 1; self.stale_since = Instant::now(); - Ok(()) } fn flush(mut self) -> LogEvent { @@ -201,6 +203,9 @@ impl Reduce { // Merge strategy paths should ideally be [`lookup_v2::ConfigTargetPath`] // which means an invalid path would result in an configuration error. let parsed_path = parse_target_path(path).ok(); + if parsed_path.is_none() { + warn!(message = "Ignoring strategy with invalid path.", %path); + } parsed_path.map(|path| (path, strategy.clone())) }) .collect(), @@ -239,29 +244,20 @@ impl Reduce { .for_each(|(_, s)| emitter.emit(Event::from(s.flush()))); } - fn push_or_new_reduce_state( - &mut self, - event: LogEvent, - discriminant: Discriminant, - ) -> crate::Result<()> { + fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) { match self.reduce_merge_states.entry(discriminant) { hash_map::Entry::Vacant(entry) => { let mut state = ReduceState::new(); - state.add_event(event, &self.merge_strategies)?; + state.add_event(event, &self.merge_strategies); entry.insert(state); } hash_map::Entry::Occupied(mut entry) => { - entry.get_mut().add_event(event, &self.merge_strategies)?; + entry.get_mut().add_event(event, &self.merge_strategies); } }; - Ok(()) } - pub(crate) fn transform_one( - &mut self, - emitter: &mut Emitter, - event: Event, - ) -> crate::Result<()> { + pub(crate) fn transform_one(&mut self, emitter: &mut Emitter, event: Event) { let (starts_here, event) = match &self.starts_when { Some(condition) => condition.check(event), None => (false, event), @@ -295,16 +291,15 @@ impl Reduce { } else if ends_here { emitter.emit(match self.reduce_merge_states.remove(&discriminant) { Some(mut state) => { - state.add_event(event, &self.merge_strategies)?; + state.add_event(event, &self.merge_strategies); state.flush().into() } None => { let mut state = ReduceState::new(); - state.add_event(event, &self.merge_strategies)?; + state.add_event(event, &self.merge_strategies); state.flush().into() } }); - Ok(()) } else { self.push_or_new_reduce_state(event, discriminant) } @@ -327,9 +322,7 @@ impl TaskTransform for Reduce { flush_period, |me: &mut Box, event, emitter: &mut Emitter| { // called for each event - if let Err(error) = me.transform_one(emitter, event) { - error!(%error, rate_limit = 30) - } + me.transform_one(emitter, event); }, |me: &mut Box, emitter: &mut Emitter| { // called periodically to check for expired events @@ -343,6 +336,53 @@ impl TaskTransform for Reduce { } } +// TODO delete after issue 21077 is resolved. +fn quote_invalid_paths(path: &str) -> String { + let components: Vec<&str> = path.split('.').collect(); + + let index: Vec = components + .iter() + .map(|component| component.ends_with('\\')) + .collect(); + + let mut escaping = false; + let mut result = String::new(); + index.iter().enumerate().for_each(|(i, _)| { + let current = components[i].trim_end_matches('\\'); + if i == 0 { + if index[0] { + escaping = true; + result.push('"'); + } + result.push_str(current); + result.push('.'); + } else if i == index.len() - 1 { + result.push_str(current); + if escaping { + escaping = false; + result.push('"'); + }; + } else { + if !index[i - 1] && index[i] { + escaping = true; + result.push('"'); + result.push_str(current); + } else if index[i - 1] && index[i] { + escaping = true; + result.push_str(current); + } else if index[i - 1] && !index[i] { + result.push_str(current); + escaping = false; + result.push('"'); + } else { + result.push_str(current); + } + result.push('.'); + } + }); + result +} + #[cfg(test)] mod test { use indoc::indoc; @@ -852,7 +892,7 @@ merge_strategies.bar = "concat" group_by = [ "id" ] merge_strategies.id = "discard" - merge_strategies."message.a.b" = "array" + merge_strategies."message.three.msg" = "array" [ends_when] type = "vrl" @@ -930,14 +970,14 @@ merge_strategies.bar = "concat" group_by = [ "id" ] merge_strategies.id = "discard" - merge_strategies."a.b[0]" = "array" + merge_strategies."nested.msg[0]" = "array" "#, )) .unwrap(); let error = Reduce::new(&config, &TableRegistry::default()).unwrap_err(); assert_eq!( error.to_string(), - "Merge strategies with indexes are currently not supported. Path: `a.b[0]`" + "Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`" ); } @@ -1000,4 +1040,37 @@ merge_strategies.bar = "concat" }) .await } + + #[test] + fn quote_paths_tests() { + let input = "one.two.three.four"; + let expected = "one.two.three.four"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two.three\.four"; + let expected = "one.two.\"three.four\""; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two.three\.four\.five"; + let expected = "one.two.\"three.four.five\""; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two.three\.four\.five.six"; + let expected = "one.two.\"three.four.five\".six"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two\.three.four\.five.six.seven\.eight"; + let expected = "one.\"two.three\".\"four.five\".six.\"seven.eight\""; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + + let input = r"one.two\.three\.four\.five.six\.seven.eight"; + let expected = "one.\"two.three.four.five\".\"six.seven\".eight"; + let result = quote_invalid_paths(input); + assert_eq!(result, expected); + } }