Skip to content

Commit

Permalink
feat(codec,sources): influxdb line protcol decoder (#19637)
Browse files Browse the repository at this point in the history
* feat(codec,sources): influxdb line protocol decoder

Signed-off-by: Michael Hoffmann <[email protected]>
Co-authored-by: Sebin Sunny <[email protected]>

* Changelog

Signed-off-by: Sebin Sunny <[email protected]>

* chore(docs): update doc about InfluxDB Decoder

Signed-off-by: Sebin Sunny <[email protected]>

* chore(docs): fix typo about InfluxDB Decoder

Signed-off-by: Sebin Sunny <[email protected]>

---------

Signed-off-by: Sebin Sunny <[email protected]>
Co-authored-by: Sebin Sunny <[email protected]>
  • Loading branch information
MichaHoffmann and sebinsunny authored Jul 24, 2024
1 parent 56bd0dd commit 3fbb66c
Show file tree
Hide file tree
Showing 27 changed files with 655 additions and 2 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ indexmap,https://github.com/bluss/indexmap,Apache-2.0 OR MIT,The indexmap Author
indexmap,https://github.com/indexmap-rs/indexmap,Apache-2.0 OR MIT,The indexmap Authors
indoc,https://github.com/dtolnay/indoc,MIT OR Apache-2.0,David Tolnay <[email protected]>
infer,https://github.com/bojand/infer,MIT,Bojan <[email protected]>
influxdb-line-protocol,https://github.com/influxdata/influxdb_iox/tree/main/influxdb_line_protocol,MIT OR Apache-2.0,InfluxDB IOx Project Developers
inotify,https://github.com/hannobraun/inotify,ISC,"Hanno Braun <[email protected]>, Félix Saparelli <[email protected]>, Cristian Kubis <[email protected]>, Frank Denis <[email protected]>"
inotify-sys,https://github.com/hannobraun/inotify-sys,ISC,Hanno Braun <[email protected]>
inout,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Introduced support for decoding InfluxDB line protocol messages, allowing these messages to be deserialized into the Vector metric format.

authors: MichaHoffmann sebinsunny
1 change: 1 addition & 0 deletions lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ chrono.workspace = true
csv-core = { version = "0.1.10", default-features = false }
derivative = { version = "2", default-features = false }
dyn-clone = { version = "1", default-features = false }
influxdb-line-protocol = { version = "2", default-features = false }
lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false, features = ["test"] }
memchr = { version = "2", default-features = false }
once_cell = { version = "1.19", default-features = false }
Expand Down
211 changes: 211 additions & 0 deletions lib/codecs/src/decoding/format/influxdb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
use std::borrow::Cow;

use bytes::Bytes;
use chrono::DateTime;
use derivative::Derivative;
use influxdb_line_protocol::{FieldValue, ParsedLine};
use smallvec::SmallVec;
use vector_config::configurable_component;
use vector_core::config::LogNamespace;
use vector_core::event::{Event, Metric, MetricKind, MetricTags, MetricValue};
use vector_core::{config::DataType, schema};
use vrl::value::kind::Collection;
use vrl::value::Kind;

use crate::decoding::format::default_lossy;

use super::Deserializer;

/// Config used to build a `InfluxdbDeserializer`.
/// - [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v1/write_protocols/line_protocol_tutorial/):
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct InfluxdbDeserializerConfig {
/// Influxdb-specific decoding options.
#[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
pub influxdb: InfluxdbDeserializerOptions,
}

impl InfluxdbDeserializerConfig {
/// new constructs a new InfluxdbDeserializerConfig
pub fn new(options: InfluxdbDeserializerOptions) -> Self {
Self { influxdb: options }
}

/// build constructs a new InfluxdbDeserializer
pub fn build(&self) -> InfluxdbDeserializer {
Into::<InfluxdbDeserializer>::into(self)
}

/// The output type produced by the deserializer.
pub fn output_type(&self) -> DataType {
DataType::Metric
}

/// The schema produced by the deserializer.
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
schema::Definition::new_with_default_metadata(
Kind::object(Collection::empty()),
[log_namespace],
)
}
}

/// Influxdb-specific decoding options.
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct InfluxdbDeserializerOptions {
/// Determines whether or not to replace invalid UTF-8 sequences instead of failing.
///
/// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
///
/// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
#[serde(
default = "default_lossy",
skip_serializing_if = "vector_core::serde::is_default"
)]
#[derivative(Default(value = "default_lossy()"))]
pub lossy: bool,
}

/// Deserializer for the influxdb line protocol
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct InfluxdbDeserializer {
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}

impl InfluxdbDeserializer {
/// new constructs a new InfluxdbDeserializer
pub fn new(lossy: bool) -> Self {
Self { lossy }
}
}

impl Deserializer for InfluxdbDeserializer {
fn parse(
&self,
bytes: Bytes,
_log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
let line: Cow<str> = match self.lossy {
true => String::from_utf8_lossy(&bytes),
false => Cow::from(std::str::from_utf8(&bytes)?),
};
let parsed_line = influxdb_line_protocol::parse_lines(&line);

let res = parsed_line
.collect::<Result<Vec<_>, _>>()?
.iter()
.flat_map(|line| {
let ParsedLine {
series,
field_set,
timestamp,
} = line;

field_set
.iter()
.filter_map(|f| {
let measurement = series.measurement.clone();
let tags = series.tag_set.as_ref();
let val = match f.1 {
FieldValue::I64(v) => v as f64,
FieldValue::U64(v) => v as f64,
FieldValue::F64(v) => v,
FieldValue::Boolean(v) => {
if v {
1.0
} else {
0.0
}
}
FieldValue::String(_) => return None, // String values cannot be modelled in our schema
};
Some(Event::Metric(
Metric::new(
format!("{0}_{1}", measurement, f.0),
MetricKind::Absolute,
MetricValue::Gauge { value: val },
)
.with_tags(tags.map(|ts| {
MetricTags::from_iter(
ts.iter().map(|t| (t.0.to_string(), t.1.to_string())),
)
}))
.with_timestamp(timestamp.and_then(DateTime::from_timestamp_micros)),
))
})
.collect::<Vec<_>>()
})
.collect();

Ok(res)
}
}

impl From<&InfluxdbDeserializerConfig> for InfluxdbDeserializer {
fn from(config: &InfluxdbDeserializerConfig) -> Self {
Self {
lossy: config.influxdb.lossy,
}
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use chrono::DateTime;
use vector_core::{
config::LogNamespace,
event::{Metric, MetricKind, MetricTags, MetricValue},
};

use crate::decoding::format::{Deserializer, InfluxdbDeserializer};

#[test]
fn deserialize_success() {
let deser = InfluxdbDeserializer::new(true);
let buffer = Bytes::from(
"cpu,host=A,region=west usage_system=64i,usage_user=10i 1590488773254420000",
);
let events = deser.parse(buffer, LogNamespace::default()).unwrap();
assert_eq!(events.len(), 2);

assert_eq!(
events[0].as_metric(),
&Metric::new(
"cpu_usage_system",
MetricKind::Absolute,
MetricValue::Gauge { value: 64. },
)
.with_tags(Some(MetricTags::from_iter([
("host".to_string(), "A".to_string()),
("region".to_string(), "west".to_string()),
])))
.with_timestamp(DateTime::from_timestamp_micros(1590488773254420000))
);
assert_eq!(
events[1].as_metric(),
&Metric::new(
"cpu_usage_user",
MetricKind::Absolute,
MetricValue::Gauge { value: 10. },
)
.with_tags(Some(MetricTags::from_iter([
("host".to_string(), "A".to_string()),
("region".to_string(), "west".to_string()),
])))
.with_timestamp(DateTime::from_timestamp_micros(1590488773254420000))
);
}

#[test]
fn deserialize_error() {
let deser = InfluxdbDeserializer::new(true);
let buffer = Bytes::from("some invalid string");
assert!(deser.parse(buffer, LogNamespace::default()).is_err());
}
}
2 changes: 2 additions & 0 deletions lib/codecs/src/decoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
mod avro;
mod bytes;
mod gelf;
mod influxdb;
mod json;
mod native;
mod native_json;
Expand All @@ -18,6 +19,7 @@ use ::bytes::Bytes;
pub use avro::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
use dyn_clone::DynClone;
pub use gelf::{GelfDeserializer, GelfDeserializerConfig, GelfDeserializerOptions};
pub use influxdb::{InfluxdbDeserializer, InfluxdbDeserializerConfig};
pub use json::{JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions};
pub use native::{NativeDeserializer, NativeDeserializerConfig};
pub use native_json::{
Expand Down
24 changes: 22 additions & 2 deletions lib/codecs/src/decoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use bytes::{Bytes, BytesMut};
pub use error::StreamDecodingError;
pub use format::{
BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
GelfDeserializerConfig, GelfDeserializerOptions, JsonDeserializer, JsonDeserializerConfig,
JsonDeserializerOptions, NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
ProtobufDeserializerConfig, ProtobufDeserializerOptions,
};
Expand Down Expand Up @@ -252,6 +253,11 @@ pub enum DeserializerConfig {
/// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
Gelf(GelfDeserializerConfig),

/// Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message.
///
/// [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol
Influxdb(InfluxdbDeserializerConfig),

/// Decodes the raw bytes as as an [Apache Avro][apache_avro] message.
///
/// [apache_avro]: https://avro.apache.org/
Expand Down Expand Up @@ -303,6 +309,12 @@ impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
}
}

impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
fn from(config: InfluxdbDeserializerConfig) -> Self {
Self::Influxdb(config)
}
}

impl DeserializerConfig {
/// Build the `Deserializer` from this configuration.
pub fn build(&self) -> vector_common::Result<Deserializer> {
Expand All @@ -323,6 +335,7 @@ impl DeserializerConfig {
}
DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
}
}
Expand All @@ -334,6 +347,7 @@ impl DeserializerConfig {
DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
DeserializerConfig::Bytes
| DeserializerConfig::Json(_)
| DeserializerConfig::Influxdb(_)
| DeserializerConfig::NativeJson(_) => {
FramingConfig::NewlineDelimited(Default::default())
}
Expand Down Expand Up @@ -363,6 +377,7 @@ impl DeserializerConfig {
DeserializerConfig::NativeJson(config) => config.output_type(),
DeserializerConfig::Gelf(config) => config.output_type(),
DeserializerConfig::Vrl(config) => config.output_type(),
DeserializerConfig::Influxdb(config) => config.output_type(),
}
}

Expand All @@ -381,6 +396,7 @@ impl DeserializerConfig {
DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
}
}
Expand Down Expand Up @@ -413,6 +429,7 @@ impl DeserializerConfig {
| DeserializerConfig::NativeJson(_)
| DeserializerConfig::Bytes
| DeserializerConfig::Gelf(_)
| DeserializerConfig::Influxdb(_)
| DeserializerConfig::Vrl(_),
_,
) => "text/plain",
Expand Down Expand Up @@ -444,6 +461,8 @@ pub enum Deserializer {
Boxed(BoxedDeserializer),
/// Uses a `GelfDeserializer` for deserialization.
Gelf(GelfDeserializer),
/// Uses a `InfluxdbDeserializer` for deserialization.
Influxdb(InfluxdbDeserializer),
/// Uses a `VrlDeserializer` for deserialization.
Vrl(VrlDeserializer),
}
Expand All @@ -465,6 +484,7 @@ impl format::Deserializer for Deserializer {
Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/components/validation/resources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::S
DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson,
DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf,
DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() },
// TODO: Influxdb has no serializer yet
DeserializerConfig::Influxdb { .. } => todo!(),
DeserializerConfig::Vrl { .. } => unimplemented!(),
};

Expand Down
Loading

0 comments on commit 3fbb66c

Please sign in to comment.