Skip to content

Commit

Permalink
Implement select data lookup on metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryjvl committed Apr 14, 2024
1 parent 99afd7f commit 7d3aadd
Showing 1 changed file with 26 additions and 4 deletions.
30 changes: 26 additions & 4 deletions src/sinks/kafka/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytes::Bytes;
use rdkafka::message::{Header, OwnedHeaders};
use vector_lib::lookup::OwnedTargetPath;
use vrl::path::{OwnedSegment, PathPrefix};

use crate::{
internal_events::KafkaHeaderExtractionError,
Expand Down Expand Up @@ -67,14 +68,35 @@ impl RequestBuilder<(String, Event)> for KafkaRequestBuilder {
fn get_key(event: &Event, key_field: Option<&OwnedTargetPath>) -> Option<Bytes> {
key_field.and_then(|key_field| match event {
Event::Log(log) => log.get(key_field).map(|value| value.coerce_to_bytes()),
Event::Metric(metric) => metric
.tags()
.and_then(|tags| tags.get(key_field.to_string().as_str()))
.map(|value| value.to_owned().into()),
Event::Metric(metric) => metric_get(metric, key_field).map(|value| value.to_owned().into()),
_ => None,
})
}

// A version of this logic should be moved into "Metric" as "get" analogous to
// "LogEvent" when metrics can be interpreted as "Value"s.
pub fn metric_get<'a>(metric: &'a vector_lib::event::metric::Metric, key: &OwnedTargetPath) -> Option<&'a str> {
match key.prefix {
PathPrefix::Event =>
match key.path.segments.get(0) {
Some(OwnedSegment::Field(first_field)) =>
match first_field.as_ref() {
"name" => Some(metric.name()),
"tags" => match key.path.segments.len() {
2 => match key.path.segments.get(1) {
Some(OwnedSegment::Field(second_field)) => metric.tags().as_ref().and_then(|tags| tags.get(second_field.as_ref())),
_ => None,
}
_ => None,
}
_ => metric.tags().as_ref().and_then(|tags| tags.get(key.to_string().as_str())),
}
_ => None,
}
_ => None,
}
}

fn get_timestamp_millis(event: &Event) -> Option<i64> {
match &event {
Event::Log(log) => log.get_timestamp().and_then(|v| v.as_timestamp()).copied(),
Expand Down

0 comments on commit 7d3aadd

Please sign in to comment.