Skip to content

Commit

Permalink
more review points
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Aug 16, 2024
1 parent 04b181b commit f32a2b7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
5 changes: 4 additions & 1 deletion src/internal_events/reduce.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use metrics::counter;
use vector_lib::internal_event::{error_stage, error_type, InternalEvent};
use vrl::path::PathParseError;
use vrl::value::KeyString;

#[derive(Debug)]
pub struct ReduceStaleEventFlushed;
Expand All @@ -14,12 +15,14 @@ impl InternalEvent for ReduceStaleEventFlushed {
#[derive(Debug)]
pub struct ReduceAddEventError {
pub error: PathParseError,
pub path: KeyString,
}

impl InternalEvent for ReduceAddEventError {
fn emit(self) {
error!(
message = "Event could not be reduced.",
message = "Event field could not be reduced.",
path = ?self.path,
error = ?self.error,
error_type = error_type::CONDITION_FAILED,
stage = error_stage::PROCESSING,
Expand Down
20 changes: 13 additions & 7 deletions src/transforms/reduce/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,22 @@ impl ReduceState {
}

if let Some(fields_iter) = e.all_event_fields_skip_array_elements() {
for (mut path, value) in fields_iter {
// TODO: Addressed in issue 21077.
if path.contains("\\.") {
path = quote_invalid_paths(&path).into();
}
for (path, value) in fields_iter {
// TODO: This can be removed once issue 21077 is resolved.
// Technically we need to quote any special characters (like `-` or `*` or ` `).
let parsable_path = if path.contains("\\.") {
quote_invalid_paths(&path).into()
} else {
path.clone()
};

// This should not return an error, unless there is a bug in the event fields iterator.
let parsed_path = match parse_target_path(&path) {
let parsed_path = match parse_target_path(&parsable_path) {
Ok(path) => path,
Err(error) => return emit!(ReduceAddEventError { error }),
Err(error) => {
emit!(ReduceAddEventError { error, path });
continue;
}
};
if is_covered_by_strategy(&parsed_path, strategies) {
continue;
Expand Down

0 comments on commit f32a2b7

Please sign in to comment.