Skip to content

Commit

Permalink
revert config changes to preserve backwards compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Aug 13, 2024
1 parent 7158ca9 commit 12a727b
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
3 changes: 0 additions & 3 deletions lib/vector-lookup/src/lookup_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub use optional_path::{OptionalTargetPath, OptionalValuePath};
use std::fmt;
use vector_config_macros::configurable_component;

use vector_config::ConfigurableString;
pub use vrl::path::{
parse_target_path, parse_value_path, BorrowedSegment, OwnedSegment, OwnedTargetPath,
OwnedValuePath, PathConcat, PathParseError, PathPrefix, TargetPath, ValuePath,
Expand Down Expand Up @@ -65,8 +64,6 @@ impl From<&str> for ConfigValuePath {
#[serde(try_from = "String", into = "String")]
pub struct ConfigTargetPath(pub OwnedTargetPath);

impl ConfigurableString for ConfigTargetPath {}

impl fmt::Display for ConfigTargetPath {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
Expand Down
28 changes: 16 additions & 12 deletions src/transforms/reduce/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use std::time::Duration;

use indexmap::IndexMap;
use serde_with::serde_as;
use vrl::path::{PathPrefix, TargetPath};
use vrl::prelude::{Collection, Kind};
use vrl::path::{parse_target_path, PathPrefix};
use vrl::prelude::{Collection, KeyString, Kind};

use vector_lib::configurable::configurable_component;

use crate::conditions::AnyCondition;
use crate::config::{
Expand All @@ -15,8 +17,6 @@ use crate::config::{
use crate::schema::Definition;
use crate::transforms::reduce::merge_strategy::MergeStrategy;
use crate::transforms::{reduce::transform::Reduce, Transform};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::lookup_v2::ConfigTargetPath;

/// Configuration for the `reduce` transform.
#[serde_as]
Expand Down Expand Up @@ -85,7 +85,7 @@ pub struct ReduceConfig {
#[configurable(metadata(
docs::additional_props_description = "An individual merge strategy."
))]
pub merge_strategies: IndexMap<ConfigTargetPath, MergeStrategy>,
pub merge_strategies: IndexMap<KeyString, MergeStrategy>,

/// A condition used to distinguish the final event of a transaction.
///
Expand Down Expand Up @@ -137,12 +137,16 @@ impl TransformConfig for ReduceConfig {

let mut schema_definition = merged_definition;

for (path, merge_strategy) in self.merge_strategies.iter() {
let input_kind = match path.prefix() {
PathPrefix::Event => schema_definition.event_kind().at_path(path.value_path()),
PathPrefix::Metadata => {
schema_definition.metadata_kind().at_path(path.value_path())
}
for (key, merge_strategy) in self.merge_strategies.iter() {
let key = if let Ok(key) = parse_target_path(key) {
key
} else {
continue;
};

let input_kind = match key.prefix {
PathPrefix::Event => schema_definition.event_kind().at_path(&key.path),
PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path),
};

let new_kind = match merge_strategy {
Expand Down Expand Up @@ -211,7 +215,7 @@ impl TransformConfig for ReduceConfig {
new_kind
};

schema_definition = schema_definition.with_field(&path.0, new_kind, None);
schema_definition = schema_definition.with_field(&key, new_kind, None);
}

// the same schema definition is used for all inputs
Expand Down
18 changes: 11 additions & 7 deletions src/transforms/reduce/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use crate::{
};
use futures::Stream;
use indexmap::IndexMap;
use vector_lib::lookup::lookup_v2::ConfigTargetPath;
use vector_lib::stream::expiration_map::{map_with_expiration, Emitter};
use vrl::path::{parse_target_path, OwnedTargetPath};
use vrl::prelude::KeyString;

#[derive(Debug)]
struct ReduceState {
Expand Down Expand Up @@ -143,12 +143,10 @@ pub struct Reduce {
max_events: Option<usize>,
}

fn validate_merge_strategies(
strategies: IndexMap<ConfigTargetPath, MergeStrategy>,
) -> crate::Result<()> {
fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
for (path, _) in &strategies {
let contains_index = path
.0
let contains_index = parse_target_path(path)
.map_err(|_| format!("Could not parse path: `{path}`"))?
.path
.segments
.iter()
Expand Down Expand Up @@ -196,7 +194,13 @@ impl Reduce {
merge_strategies: config
.merge_strategies
.iter()
.map(|(path, strategy)| (path.0.clone(), strategy.clone()))
.filter_map(|(path, strategy)| {
// TODO Invalid paths are ignored to preserve backwards compatibility.
// Merge strategy paths should ideally be [`lookup_v2::ConfigTargetPath`]
// which means an invalid path would result in an configuration error.
let parsed_path = parse_target_path(path).ok();
parsed_path.map(|path| (path, strategy.clone()))
})
.collect(),
reduce_merge_states: HashMap::new(),
ends_when,
Expand Down

0 comments on commit 12a727b

Please sign in to comment.