Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Aug 16, 2024
1 parent a857bac commit 6bbab4d
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 33 deletions.
26 changes: 25 additions & 1 deletion src/internal_events/reduce.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use metrics::counter;
use vector_lib::internal_event::InternalEvent;
use vector_lib::internal_event::{error_stage, error_type, InternalEvent};
use vrl::path::PathParseError;

#[derive(Debug)]
pub struct ReduceStaleEventFlushed;
Expand All @@ -9,3 +10,26 @@ impl InternalEvent for ReduceStaleEventFlushed {
counter!("stale_events_flushed_total").increment(1);
}
}

#[derive(Debug)]
pub struct ReduceAddEventError {
pub error: PathParseError,
}

impl InternalEvent for ReduceAddEventError {
fn emit(self) {
error!(
message = "Event could not be reduced",
error = ?self.error,
error_type = error_type::CONDITION_FAILED,
stage = error_stage::PROCESSING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
"error_type" => error_type::PARSER_FAILED,
"stage" => error_stage::PROCESSING,
)
.increment(1);
}
}
137 changes: 105 additions & 32 deletions src/transforms/reduce/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::{hash_map, HashMap};
use std::pin::Pin;
use std::time::{Duration, Instant};

use crate::internal_events::ReduceAddEventError;
use crate::transforms::reduce::merge_strategy::{
get_value_merger, MergeStrategy, ReduceValueMerger,
};
Expand Down Expand Up @@ -52,11 +53,7 @@ impl ReduceState {
}
}

fn add_event(
&mut self,
e: LogEvent,
strategies: &IndexMap<OwnedTargetPath, MergeStrategy>,
) -> crate::Result<()> {
fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<OwnedTargetPath, MergeStrategy>) {
self.metadata.merge(e.metadata().clone());

for (path, strategy) in strategies {
Expand All @@ -80,11 +77,17 @@ impl ReduceState {
}

if let Some(fields_iter) = e.all_event_fields_skip_array_elements() {
for (path, value) in fields_iter {
// See issue 21077.
let path = path.replace("\\.", ".");
for (mut path, value) in fields_iter {
// TODO: Addressed in issue 21077.
if !path.contains("\\.") {
path = quote_invalid_paths(&path).into();
}

// This should not return an error, unless there is a bug in the event fields iterator.
let parsed_path = parse_target_path(&path)?;
let parsed_path = match parse_target_path(&path) {
Ok(path) => path,
Err(error) => return emit!(ReduceAddEventError { error }),
};
if is_covered_by_strategy(&parsed_path, strategies) {
continue;
}
Expand Down Expand Up @@ -117,7 +120,6 @@ impl ReduceState {

self.events += 1;
self.stale_since = Instant::now();
Ok(())
}

fn flush(mut self) -> LogEvent {
Expand Down Expand Up @@ -201,6 +203,9 @@ impl Reduce {
// Merge strategy paths should ideally be [`lookup_v2::ConfigTargetPath`]
// which means an invalid path would result in an configuration error.
let parsed_path = parse_target_path(path).ok();
if parsed_path.is_none() {
warn!(message = "Ignoring strategy with invalid path.", %path);
}
parsed_path.map(|path| (path, strategy.clone()))
})
.collect(),
Expand Down Expand Up @@ -239,29 +244,20 @@ impl Reduce {
.for_each(|(_, s)| emitter.emit(Event::from(s.flush())));
}

fn push_or_new_reduce_state(
&mut self,
event: LogEvent,
discriminant: Discriminant,
) -> crate::Result<()> {
fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) {
match self.reduce_merge_states.entry(discriminant) {
hash_map::Entry::Vacant(entry) => {
let mut state = ReduceState::new();
state.add_event(event, &self.merge_strategies)?;
state.add_event(event, &self.merge_strategies);
entry.insert(state);
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().add_event(event, &self.merge_strategies)?;
entry.get_mut().add_event(event, &self.merge_strategies);
}
};
Ok(())
}

pub(crate) fn transform_one(
&mut self,
emitter: &mut Emitter<Event>,
event: Event,
) -> crate::Result<()> {
pub(crate) fn transform_one(&mut self, emitter: &mut Emitter<Event>, event: Event) {
let (starts_here, event) = match &self.starts_when {
Some(condition) => condition.check(event),
None => (false, event),
Expand Down Expand Up @@ -295,16 +291,15 @@ impl Reduce {
} else if ends_here {
emitter.emit(match self.reduce_merge_states.remove(&discriminant) {
Some(mut state) => {
state.add_event(event, &self.merge_strategies)?;
state.add_event(event, &self.merge_strategies);
state.flush().into()
}
None => {
let mut state = ReduceState::new();
state.add_event(event, &self.merge_strategies)?;
state.add_event(event, &self.merge_strategies);
state.flush().into()
}
});
Ok(())
} else {
self.push_or_new_reduce_state(event, discriminant)
}
Expand All @@ -327,9 +322,7 @@ impl TaskTransform<Event> for Reduce {
flush_period,
|me: &mut Box<Reduce>, event, emitter: &mut Emitter<Event>| {
// called for each event
if let Err(error) = me.transform_one(emitter, event) {
error!(%error, rate_limit = 30)
}
me.transform_one(emitter, event);
},
|me: &mut Box<Reduce>, emitter: &mut Emitter<Event>| {
// called periodically to check for expired events
Expand All @@ -343,6 +336,53 @@ impl TaskTransform<Event> for Reduce {
}
}

// TODO delete after issue 21077 is resolved.
fn quote_invalid_paths(path: &str) -> String {
let components: Vec<&str> = path.split('.').collect();

let index: Vec<bool> = components
.iter()
.map(|component| component.ends_with('\\'))
.collect();

let mut escaping = false;
let mut result = String::new();
index.iter().enumerate().for_each(|(i, _)| {
let current = components[i].trim_end_matches('\\');
if i == 0 {
if index[0] {
escaping = true;
result.push('"');
}
result.push_str(current);
result.push('.');
} else if i == index.len() - 1 {
result.push_str(current);
if escaping {
escaping = false;
result.push('"');
};
} else {
if !index[i - 1] && index[i] {
escaping = true;
result.push('"');
result.push_str(current);
} else if index[i - 1] && index[i] {
escaping = true;
result.push_str(current);
} else if index[i - 1] && !index[i] {
result.push_str(current);
escaping = false;
result.push('"');
} else {
result.push_str(current);
}
result.push('.');
}
});
result
}

#[cfg(test)]
mod test {
use indoc::indoc;
Expand Down Expand Up @@ -852,7 +892,7 @@ merge_strategies.bar = "concat"
group_by = [ "id" ]
merge_strategies.id = "discard"
merge_strategies."message.a.b" = "array"
merge_strategies."message.three.msg" = "array"
[ends_when]
type = "vrl"
Expand Down Expand Up @@ -930,14 +970,14 @@ merge_strategies.bar = "concat"
group_by = [ "id" ]
merge_strategies.id = "discard"
merge_strategies."a.b[0]" = "array"
merge_strategies."nested.msg[0]" = "array"
"#,
))
.unwrap();
let error = Reduce::new(&config, &TableRegistry::default()).unwrap_err();
assert_eq!(
error.to_string(),
"Merge strategies with indexes are currently not supported. Path: `a.b[0]`"
"Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`"
);
}

Expand Down Expand Up @@ -1000,4 +1040,37 @@ merge_strategies.bar = "concat"
})
.await
}

#[test]
fn quote_paths_tests() {
let input = "one.two.three.four";
let expected = "one.two.three.four";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = r"one.two.three\.four";
let expected = "one.two.\"three.four\"";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = r"one.two.three\.four\.five";
let expected = "one.two.\"three.four.five\"";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = r"one.two.three\.four\.five.six";
let expected = "one.two.\"three.four.five\".six";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = r"one.two\.three.four\.five.six.seven\.eight";
let expected = "one.\"two.three\".\"four.five\".six.\"seven.eight\"";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);

let input = r"one.two\.three\.four\.five.six\.seven.eight";
let expected = "one.\"two.three.four.five\".\"six.seven\".eight";
let result = quote_invalid_paths(input);
assert_eq!(result, expected);
}
}

0 comments on commit 6bbab4d

Please sign in to comment.