Skip to content

Commit

Permalink
fix(reduce transform): surround invalid path segments with quotes (#2…
Browse files Browse the repository at this point in the history
…1201)

* fix(FieldsIter): surround invalid path segments with quotes

* changelog

* fix changelog typos

* Update changelog.d/21201_fields_iter.fix.md

Co-authored-by: Jesse Szwedko <[email protected]>

* update test

* fix changelog message, I believe we didn't panic before

---------

Co-authored-by: Jesse Szwedko <[email protected]>
  • Loading branch information
pront and jszwedko committed Sep 6, 2024
1 parent a9392b0 commit 360db35
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 107 deletions.
1 change: 1 addition & 0 deletions changelog.d/21201_fields_iter.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The `reduce` transform can now reduce fields that contain special characters.
5 changes: 4 additions & 1 deletion lib/vector-core/src/event/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ fn event_iteration() {
assert_eq!(
all,
vec![
("Ke$ha".into(), "It's going down, I'm yelling timber".into()),
(
"Pitbull".into(),
"The bigger they are, the harder they fall".into()
),
(
"\"Ke$ha\"".into(),
"It's going down, I'm yelling timber".into()
),
]
.into_iter()
.collect::<HashSet<_>>()
Expand Down
45 changes: 38 additions & 7 deletions lib/vector-core/src/event/util/log/all_fields.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::{collections::btree_map, fmt::Write as _, iter, slice};

use once_cell::sync::Lazy;
use regex::Regex;
use serde::{Serialize, Serializer};
use std::{collections::btree_map, fmt::Write as _, iter, slice};
use vrl::path::PathPrefix;

use crate::event::{KeyString, ObjectMap, Value};

static IS_VALID_PATH_SEGMENT: Lazy<Regex> = Lazy::new(|| Regex::new(r"^[a-zA-Z0-9_]+$").unwrap());

/// Iterates over all paths in form `a.b[0].c[1]` in alphabetical order
/// and their corresponding values.
pub fn all_fields(fields: &ObjectMap) -> FieldsIter {
Expand Down Expand Up @@ -35,7 +38,7 @@ enum LeafIter<'a> {
Array(iter::Enumerate<slice::Iter<'a, Value>>),
}

#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug)]
enum PathComponent<'a> {
Key(&'a KeyString),
Index(usize),
Expand Down Expand Up @@ -134,10 +137,10 @@ impl<'a> FieldsIter<'a> {
match path_iter.next() {
None => break res.into(),
Some(PathComponent::Key(key)) => {
if key.contains('.') {
res.push_str(&key.replace('.', "\\."));
} else {
if IS_VALID_PATH_SEGMENT.is_match(key) {
res.push_str(key);
} else {
res.push_str(&format!("\"{key}\""));
}
}
Some(PathComponent::Index(index)) => {
Expand Down Expand Up @@ -223,6 +226,34 @@ mod test {
assert_eq!(collected, expected);
}

#[test]
fn keys_special() {
let fields = fields_from_json(json!({
"a-b": 1,
"a*b": 2,
"a b": 3,
".a .b*": 4,
"\"a\"": 5,
}));
let mut collected: Vec<_> = all_fields(&fields).collect();
collected.sort_by(|(a, _), (b, _)| a.cmp(b));

let mut expected: Vec<(KeyString, &Value)> = vec![
("\"a-b\"", &Value::Integer(1)),
("\"a*b\"", &Value::Integer(2)),
("\"a b\"", &Value::Integer(3)),
("\".a .b*\"", &Value::Integer(4)),
("\"\"a\"\"", &Value::Integer(5)),
]
.into_iter()
.map(|(k, v)| (k.into(), v))
.collect();
// Compare without the leading `"` char so that the order is the same as the collected fields.
expected.sort_by(|(a, _), (b, _)| a[1..].cmp(&b[1..]));

assert_eq!(collected, expected);
}

#[test]
fn metadata_keys_simple() {
let fields = fields_from_json(json!({
Expand Down Expand Up @@ -266,7 +297,7 @@ mod test {
("a.array[2].x", Value::Integer(1)),
("a.array[3][0]", Value::Integer(2)),
("a.b.c", Value::Integer(5)),
("a\\.b\\.c", Value::Integer(6)),
("\"a.b.c\"", Value::Integer(6)),
("d", Value::Object(ObjectMap::new())),
("e", Value::Array(Vec::new())),
]
Expand Down
104 changes: 5 additions & 99 deletions src/transforms/reduce/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,8 @@ impl ReduceState {

if let Some(fields_iter) = e.all_event_fields_skip_array_elements() {
for (path, value) in fields_iter {
// TODO: This can be removed once issue 21077 is resolved.
// Technically we need to quote any special characters (like `-` or `*` or ` `).
let parsable_path = if path.contains("\\.") {
quote_invalid_paths(&path).into()
} else {
path.clone()
};

// This should not return an error, unless there is a bug in the event fields iterator.
let parsed_path = match parse_target_path(&parsable_path) {
let parsed_path = match parse_target_path(&path) {
Ok(path) => path,
Err(error) => {
emit!(ReduceAddEventError { error, path });
Expand Down Expand Up @@ -342,53 +334,6 @@ impl TaskTransform<Event> for Reduce {
}
}

// TODO delete after issue 21077 is resolved.
fn quote_invalid_paths(path: &str) -> String {
let components: Vec<&str> = path.split('.').collect();

let index: Vec<bool> = components
.iter()
.map(|component| component.ends_with('\\'))
.collect();

let mut escaping = false;
let mut result = String::new();
index.iter().enumerate().for_each(|(i, _)| {
let current = components[i].trim_end_matches('\\');
if i == 0 {
if index[0] {
escaping = true;
result.push('"');
}
result.push_str(current);
} else if i == index.len() - 1 {
result.push_str(current);
if escaping {
escaping = false;
result.push('"');
};
} else if !index[i - 1] && index[i] {
escaping = true;
result.push('"');
result.push_str(current);
} else if index[i - 1] && index[i] {
escaping = true;
result.push_str(current);
} else if index[i - 1] && !index[i] {
result.push_str(current);
escaping = false;
result.push('"');
} else {
result.push_str(current);
}

if i < components.len() - 1 {
result.push('.');
}
});
result
}

#[cfg(test)]
mod test {
use indoc::indoc;
Expand Down Expand Up @@ -993,6 +938,7 @@ merge_strategies.bar = "concat"
r#"
group_by = [ "id" ]
merge_strategies.events = "array"
merge_strategies."\"a-b\"" = "retain"
merge_strategies.another = "discard"
[ends_when]
Expand All @@ -1017,6 +963,7 @@ merge_strategies.bar = "concat"
btreemap! {"id" => 777, "another" => btreemap!{ "a" => 1}},
));
e_1.insert("events", v_1.clone());
e_1.insert("\"a-b\"", 2);
tx.send(e_1.into()).await.unwrap();

let v_2 = Value::from(btreemap! {
Expand All @@ -1029,13 +976,15 @@ merge_strategies.bar = "concat"
btreemap! {"id" => 777, "test_end" => "done", "another" => btreemap!{ "b" => 2}},
));
e_2.insert("events", v_2.clone());
e_2.insert("\"a-b\"", 2);
tx.send(e_2.into()).await.unwrap();

let output = out.recv().await.unwrap().into_log();
let expected_value = Value::from(btreemap! {
"id" => 1554,
"events" => vec![v_1, v_2],
"another" => btreemap!{ "a" => 1},
"a-b" => 2,
"test_end" => "done"
});
assert_eq!(*output.value(), expected_value);
Expand All @@ -1046,47 +995,4 @@ merge_strategies.bar = "concat"
})
.await
}

#[test]
fn quote_paths_tests() {
let input = "one";
let expected = "one";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = ".one";
let expected = ".one";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = "one.two.three.four";
let expected = "one.two.three.four";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = r"one.two.three\.four";
let expected = "one.two.\"three.four\"";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = r"one.two.three\.four\.five";
let expected = "one.two.\"three.four.five\"";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = r"one.two.three\.four\.five.six";
let expected = "one.two.\"three.four.five\".six";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = r"one.two\.three.four\.five.six.seven\.eight";
let expected = "one.\"two.three\".\"four.five\".six.\"seven.eight\"";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = r"one.two\.three\.four\.five.six\.seven.eight";
let expected = "one.\"two.three.four.five\".\"six.seven\".eight";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);
}
}

0 comments on commit 360db35

Please sign in to comment.