From 12a727b76bec6c9259635957ed44df7be0e767cf Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 13 Aug 2024 16:05:03 -0400 Subject: [PATCH] revert config changes to preserve backwards compatibility --- lib/vector-lookup/src/lookup_v2/mod.rs | 3 --- src/transforms/reduce/config.rs | 28 +++++++++++++++----------- src/transforms/reduce/transform.rs | 18 ++++++++++------- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/lib/vector-lookup/src/lookup_v2/mod.rs b/lib/vector-lookup/src/lookup_v2/mod.rs index 227c35a78f6976..a6d5be15b37f6f 100644 --- a/lib/vector-lookup/src/lookup_v2/mod.rs +++ b/lib/vector-lookup/src/lookup_v2/mod.rs @@ -4,7 +4,6 @@ pub use optional_path::{OptionalTargetPath, OptionalValuePath}; use std::fmt; use vector_config_macros::configurable_component; -use vector_config::ConfigurableString; pub use vrl::path::{ parse_target_path, parse_value_path, BorrowedSegment, OwnedSegment, OwnedTargetPath, OwnedValuePath, PathConcat, PathParseError, PathPrefix, TargetPath, ValuePath, @@ -65,8 +64,6 @@ impl From<&str> for ConfigValuePath { #[serde(try_from = "String", into = "String")] pub struct ConfigTargetPath(pub OwnedTargetPath); -impl ConfigurableString for ConfigTargetPath {} - impl fmt::Display for ConfigTargetPath { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) diff --git a/src/transforms/reduce/config.rs b/src/transforms/reduce/config.rs index e0b0d96d7f7326..6c254e000925d9 100644 --- a/src/transforms/reduce/config.rs +++ b/src/transforms/reduce/config.rs @@ -4,8 +4,10 @@ use std::time::Duration; use indexmap::IndexMap; use serde_with::serde_as; -use vrl::path::{PathPrefix, TargetPath}; -use vrl::prelude::{Collection, Kind}; +use vrl::path::{parse_target_path, PathPrefix}; +use vrl::prelude::{Collection, KeyString, Kind}; + +use vector_lib::configurable::configurable_component; use crate::conditions::AnyCondition; use crate::config::{ @@ -15,8 +17,6 @@ use crate::config::{ use crate::schema::Definition; use crate::transforms::reduce::merge_strategy::MergeStrategy; use crate::transforms::{reduce::transform::Reduce, Transform}; -use vector_lib::configurable::configurable_component; -use vector_lib::lookup::lookup_v2::ConfigTargetPath; /// Configuration for the `reduce` transform. #[serde_as] @@ -85,7 +85,7 @@ pub struct ReduceConfig { #[configurable(metadata( docs::additional_props_description = "An individual merge strategy." ))] - pub merge_strategies: IndexMap, + pub merge_strategies: IndexMap, /// A condition used to distinguish the final event of a transaction. /// @@ -137,12 +137,16 @@ impl TransformConfig for ReduceConfig { let mut schema_definition = merged_definition; - for (path, merge_strategy) in self.merge_strategies.iter() { - let input_kind = match path.prefix() { - PathPrefix::Event => schema_definition.event_kind().at_path(path.value_path()), - PathPrefix::Metadata => { - schema_definition.metadata_kind().at_path(path.value_path()) - } + for (key, merge_strategy) in self.merge_strategies.iter() { + let key = if let Ok(key) = parse_target_path(key) { + key + } else { + continue; + }; + + let input_kind = match key.prefix { + PathPrefix::Event => schema_definition.event_kind().at_path(&key.path), + PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path), }; let new_kind = match merge_strategy { @@ -211,7 +215,7 @@ impl TransformConfig for ReduceConfig { new_kind }; - schema_definition = schema_definition.with_field(&path.0, new_kind, None); + schema_definition = schema_definition.with_field(&key, new_kind, None); } // the same schema definition is used for all inputs diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index 8b5182225c5be3..cc47acd86927be 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -14,9 +14,9 @@ use crate::{ }; use futures::Stream; use indexmap::IndexMap; -use vector_lib::lookup::lookup_v2::ConfigTargetPath; use vector_lib::stream::expiration_map::{map_with_expiration, Emitter}; use vrl::path::{parse_target_path, OwnedTargetPath}; +use vrl::prelude::KeyString; #[derive(Debug)] struct ReduceState { @@ -143,12 +143,10 @@ pub struct Reduce { max_events: Option, } -fn validate_merge_strategies( - strategies: IndexMap, -) -> crate::Result<()> { +fn validate_merge_strategies(strategies: IndexMap) -> crate::Result<()> { for (path, _) in &strategies { - let contains_index = path - .0 + let contains_index = parse_target_path(path) + .map_err(|_| format!("Could not parse path: `{path}`"))? .path .segments .iter() @@ -196,7 +194,13 @@ impl Reduce { merge_strategies: config .merge_strategies .iter() - .map(|(path, strategy)| (path.0.clone(), strategy.clone())) + .filter_map(|(path, strategy)| { + // TODO Invalid paths are ignored to preserve backwards compatibility. + // Merge strategy paths should ideally be [`lookup_v2::ConfigTargetPath`] + // which means an invalid path would result in an configuration error. + let parsed_path = parse_target_path(path).ok(); + parsed_path.map(|path| (path, strategy.clone())) + }) .collect(), reduce_merge_states: HashMap::new(), ends_when,