Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(reduce transform): use the correct merge strategy for top level objects #21067

Merged
merged 11 commits into from
Aug 16, 2024
1 change: 1 addition & 0 deletions changelog.d/21067_reduce.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes a Vector v0.40.0 regression where the `reduce` transform would not group top level objects correctly.
29 changes: 28 additions & 1 deletion src/internal_events/reduce.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use metrics::counter;
use vector_lib::internal_event::InternalEvent;
use vector_lib::internal_event::{error_stage, error_type, InternalEvent};
use vrl::path::PathParseError;
use vrl::value::KeyString;

#[derive(Debug)]
pub struct ReduceStaleEventFlushed;
Expand All @@ -9,3 +11,28 @@ impl InternalEvent for ReduceStaleEventFlushed {
counter!("stale_events_flushed_total").increment(1);
}
}

#[derive(Debug)]
pub struct ReduceAddEventError {
pub error: PathParseError,
pub path: KeyString,
}

impl InternalEvent for ReduceAddEventError {
fn emit(self) {
error!(
message = "Event field could not be reduced.",
path = ?self.path,
error = ?self.error,
error_type = error_type::CONDITION_FAILED,
stage = error_stage::PROCESSING,
pront marked this conversation as resolved.
Show resolved Hide resolved
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
"error_type" => error_type::PARSER_FAILED,
"stage" => error_stage::PROCESSING,
)
.increment(1);
}
}
85 changes: 68 additions & 17 deletions src/transforms/reduce/merge_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bytes::{Bytes, BytesMut};
use chrono::{DateTime, Utc};
use ordered_float::NotNan;
use vector_lib::configurable::configurable_component;
use vrl::path::OwnedTargetPath;

/// Strategies for merging events.
#[configurable_component]
Expand Down Expand Up @@ -67,7 +68,11 @@ impl ReduceValueMerger for DiscardMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, self.v);
Ok(())
}
Expand All @@ -93,7 +98,11 @@ impl ReduceValueMerger for RetainMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, self.v);
Ok(())
}
Expand Down Expand Up @@ -133,7 +142,11 @@ impl ReduceValueMerger for ConcatMerger {
}
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Bytes(self.v.into()));
Ok(())
}
Expand All @@ -160,7 +173,11 @@ impl ReduceValueMerger for ConcatArrayMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Array(self.v));
Ok(())
}
Expand All @@ -183,7 +200,11 @@ impl ReduceValueMerger for ArrayMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Array(self.v));
Ok(())
}
Expand Down Expand Up @@ -215,7 +236,11 @@ impl ReduceValueMerger for LongestArrayMerger {
}
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Array(self.v));
Ok(())
}
Expand Down Expand Up @@ -247,7 +272,11 @@ impl ReduceValueMerger for ShortestArrayMerger {
}
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Array(self.v));
Ok(())
}
Expand Down Expand Up @@ -292,7 +321,11 @@ impl ReduceValueMerger for FlatUniqueMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(path, Value::Array(self.v.into_iter().collect()));
Ok(())
}
Expand Down Expand Up @@ -326,7 +359,11 @@ impl ReduceValueMerger for TimestampWindowMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
v.insert(
format!("{}_end", path).as_str(),
Value::Timestamp(self.latest),
Expand Down Expand Up @@ -390,7 +427,11 @@ impl ReduceValueMerger for AddNumbersMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
match self.v {
NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
Expand Down Expand Up @@ -449,7 +490,11 @@ impl ReduceValueMerger for MaxNumberMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
match self.v {
NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
Expand Down Expand Up @@ -508,7 +553,11 @@ impl ReduceValueMerger for MinNumberMerger {
Ok(())
}

fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String> {
fn insert_into(
self: Box<Self>,
path: &OwnedTargetPath,
v: &mut LogEvent,
) -> Result<(), String> {
match self.v {
NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
Expand All @@ -519,7 +568,8 @@ impl ReduceValueMerger for MinNumberMerger {

pub trait ReduceValueMerger: std::fmt::Debug + Send + Sync {
fn add(&mut self, v: Value) -> Result<(), String>;
fn insert_into(self: Box<Self>, path: &str, v: &mut LogEvent) -> Result<(), String>;
fn insert_into(self: Box<Self>, path: &OwnedTargetPath, v: &mut LogEvent)
-> Result<(), String>;
}

impl From<Value> for Box<dyn ReduceValueMerger> {
Expand Down Expand Up @@ -612,10 +662,10 @@ pub(crate) fn get_value_merger(

#[cfg(test)]
mod test {
use serde_json::json;

use super::*;
use crate::event::LogEvent;
use serde_json::json;
use vrl::owned_event_path;

#[test]
fn initial_values() {
Expand Down Expand Up @@ -876,7 +926,8 @@ mod test {
let mut merger = get_value_merger(initial, strategy)?;
merger.add(additional)?;
let mut output = LogEvent::default();
merger.insert_into("out", &mut output)?;
Ok(output.remove("out").unwrap())
let out_path = owned_event_path!("out");
merger.insert_into(&out_path, &mut output)?;
Ok(output.remove(&out_path).unwrap())
}
}
Loading
Loading