Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sinks, codecs): Remove legacy EncodingConfiguration #13518

Merged
merged 14 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions benches/codecs/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,6 @@ fn encoder(c: &mut Criterion) {
"key3" => "value3"
}));

group.throughput(Throughput::Bytes(input.size_of() as u64));
group.bench_with_input("vector::sinks::util::encode_log", &(), |b, ()| {
b.iter_batched(
|| vector::sinks::util::Encoding::Json.into(),
|encoding| {
vector::sinks::util::encode_log(input.clone(), &encoding).unwrap();
},
BatchSize::SmallInput,
)
});

group.throughput(Throughput::Bytes(input.size_of() as u64));
group.bench_with_input("JsonLogVecSerializer::encode", &(), |b, ()| {
b.iter_batched(
Expand Down
6 changes: 2 additions & 4 deletions benches/files.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{convert::TryInto, path::PathBuf};

use bytes::Bytes;
use codecs::{encoding::FramingConfig, TextSerializerConfig};
use criterion::{criterion_group, BatchSize, Criterion, SamplingMode, Throughput};
use futures::{stream, SinkExt, StreamExt};
use tempfile::tempdir;
Expand Down Expand Up @@ -54,10 +55,7 @@ fn benchmark_files_no_partitions(c: &mut Criterion) {
sinks::file::FileSinkConfig {
path: output.try_into().unwrap(),
idle_timeout_secs: None,
encoding: sinks::util::encoding::EncodingConfig::from(
sinks::file::Encoding::Text,
)
.into(),
encoding: (None::<FramingConfig>, TextSerializerConfig::new()).into(),
compression: sinks::file::Compression::None,
acknowledgements: Default::default(),
},
Expand Down
8 changes: 4 additions & 4 deletions benches/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::net::SocketAddr;

use codecs::{encoding::FramingConfig, TextSerializerConfig};
use criterion::{criterion_group, BatchSize, BenchmarkId, Criterion, SamplingMode, Throughput};
use futures::TryFutureExt;
use hyper::{
Expand All @@ -9,7 +10,7 @@ use hyper::{
use tokio::runtime::Runtime;
use vector::{
config, sinks,
sinks::util::{encoding::EncodingConfigWithFramingAdapter, BatchConfig, Compression},
sinks::util::{BatchConfig, Compression},
sources,
test_util::{next_addr, random_lines, runtime, send_lines, start_topology, wait_for_tcp},
Error,
Expand Down Expand Up @@ -53,9 +54,8 @@ fn benchmark_http(c: &mut Criterion) {
auth: Default::default(),
headers: Default::default(),
batch,
encoding: EncodingConfigWithFramingAdapter::legacy(
sinks::http::Encoding::Text.into(),
),
encoding: (None::<FramingConfig>, TextSerializerConfig::new())
.into(),
request: Default::default(),
tls: Default::default(),
acknowledgements: Default::default(),
Expand Down
17 changes: 9 additions & 8 deletions config/examples/docs_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ index = "vector-%Y-%m-%d" # daily indices

# Send structured data to a cost-effective long-term storage
[sinks.s3_archives]
inputs = ["apache_parser"] # don't sample for S3
type = "aws_s3"
region = "us-east-1"
bucket = "my-log-archives"
key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format
compression = "gzip" # compress final objects
encoding = "ndjson" # new line delimited JSON
inputs = ["apache_parser"] # don't sample for S3
type = "aws_s3"
region = "us-east-1"
bucket = "my-log-archives"
key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format
compression = "gzip" # compress final objects
framing.method = "newline_delimited" # new line delimited...
encoding.codec = "json" # ...JSON
[sinks.s3_archives.batch]
max_bytes = 10000000 # 10mb uncompressed
max_bytes = 10000000 # 10mb uncompressed
6 changes: 3 additions & 3 deletions config/examples/environment_variables.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ data_dir = "/var/lib/vector"
# Print the data to STDOUT for inspection
# Docs: https://vector.dev/docs/reference/sinks/console
[sinks.out]
inputs = ["add_host"]
type = "console"
encoding = "json"
inputs = ["add_host"]
type = "console"
encoding.codec = "json"
15 changes: 8 additions & 7 deletions config/examples/es_s3_hybrid.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ data_dir = "/var/lib/vector"

# Send structured data to S3, a durable long-term storage
[sinks.s3_archives]
inputs = ["apache_logs"] # don't sample
type = "aws_s3"
region = "us-east-1"
bucket = "my_log_archives"
encoding = "ndjson"
compression = "gzip"
inputs = ["apache_logs"] # don't sample
type = "aws_s3"
region = "us-east-1"
bucket = "my_log_archives"
framing.method = "newline_delimited"
encoding.codec = "json"
compression = "gzip"
[sinks.s3_archives.batch]
max_size = 10000000 # 10mb uncompressed
max_size = 10000000 # 10mb uncompressed
4 changes: 2 additions & 2 deletions config/examples/file_to_cloudwatch_metrics.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ tags = {method = "{{method}}", status = "{{status}}"}
[sinks.console_metrics]
inputs = ["log_to_metric"]
type = "console"
encoding = "json"
encoding.codec = "json"

[sinks.console_logs]
inputs = ["remap"]
type = "console"
encoding = "json"
encoding.codec = "json"

[sinks.cloudwatch]
inputs = ["log_to_metric"]
Expand Down
4 changes: 2 additions & 2 deletions config/examples/file_to_prometheus.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ name = "bytes_out_histogram"
[sinks.console_metrics]
inputs = ["log_to_metric"]
type = "console"
encoding = "json"
encoding.codec = "json"

[sinks.console_logs]
inputs = ["remap"]
type = "console"
encoding = "text"
encoding.codec = "text"

[sinks.prometheus]
inputs = ["log_to_metric"]
Expand Down
17 changes: 9 additions & 8 deletions config/examples/namespacing/sinks/s3_archives.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Send structured data to a cost-effective long-term storage
inputs = ["apache_parser"] # don't sample for S3
type = "aws_s3"
region = "us-east-1"
bucket = "my-log-archives"
key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format
compression = "gzip" # compress final objects
encoding = "ndjson" # new line delimited JSON
inputs = ["apache_parser"] # don't sample for S3
type = "aws_s3"
region = "us-east-1"
bucket = "my-log-archives"
key_prefix = "date=%Y-%m-%d" # daily partitions, hive friendly format
compression = "gzip" # compress final objects
framing.method = "newline_delimited" # new line delimited...
encoding.codec = "json" # ...JSON
[batch]
max_bytes = 10000000 # 10mb uncompressed
max_bytes = 10000000 # 10mb uncompressed
2 changes: 1 addition & 1 deletion config/examples/prometheus_to_console.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ scrape_interval_secs = 2
[sinks.console]
inputs = ["prometheus"]
type = "console"
encoding = "json"
encoding.codec = "json"
2 changes: 1 addition & 1 deletion config/examples/stdio.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
[sinks.out]
inputs = ["in"]
type = "console"
encoding = "text"
encoding.codec = "text"
6 changes: 3 additions & 3 deletions config/examples/wrapped_json.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ data_dir = "/var/lib/vector"
# Print the data to STDOUT for inspection
# Docs: https://vector.dev/docs/reference/sinks/console
[sinks.out]
inputs = ["parse_json"]
type = "console"
encoding = "json"
inputs = ["parse_json"]
type = "console"
encoding.codec = "json"
2 changes: 1 addition & 1 deletion docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ type = "stdin"
[sinks.bar]
type = "console"
inputs = ["foo"]
encoding = "json"
encoding.codec = "json"
```

After the component construction phase, we'll be left with the tasks for each
Expand Down
86 changes: 82 additions & 4 deletions lib/codecs/src/encoding/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,100 @@ impl Encoder<Event> for JsonSerializer {
#[cfg(test)]
mod tests {
use bytes::BytesMut;
use chrono::{TimeZone, Utc};
use vector_common::btreemap;
use vector_core::event::{LogEvent, Value};
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};

use super::*;

#[test]
fn serialize_json() {
fn serialize_json_log() {
let event = Event::Log(LogEvent::from(btreemap! {
"foo" => Value::from("bar")
"x" => Value::from("23"),
"z" => Value::from(25),
"a" => Value::from("0"),
}));
let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(bytes.freeze(), r#"{"foo":"bar"}"#);
assert_eq!(bytes.freeze(), r#"{"a":"0","x":"23","z":25}"#);
}

#[test]
fn serialize_json_metric_counter() {
let event = Event::Metric(
Metric::new(
"foos",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_namespace(Some("vector"))
.with_tags(Some(
vec![
("key2".to_owned(), "value2".to_owned()),
("key1".to_owned(), "value1".to_owned()),
("Key3".to_owned(), "Value3".to_owned()),
]
.into_iter()
.collect(),
))
.with_timestamp(Some(Utc.ymd(2018, 11, 14).and_hms_nano(8, 9, 10, 11))),
);

let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(
bytes.freeze(),
r#"{"name":"foos","namespace":"vector","tags":{"Key3":"Value3","key1":"value1","key2":"value2"},"timestamp":"2018-11-14T08:09:10.000000011Z","kind":"incremental","counter":{"value":100.0}}"#
);
}

#[test]
fn serialize_json_metric_set() {
let event = Event::Metric(Metric::new(
"users",
MetricKind::Incremental,
MetricValue::Set {
values: vec!["bob".into()].into_iter().collect(),
},
));

let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(
bytes.freeze(),
r#"{"name":"users","kind":"incremental","set":{"values":["bob"]}}"#
);
}

#[test]
fn serialize_json_metric_histogram_without_timestamp() {
let event = Event::Metric(Metric::new(
"glork",
MetricKind::Incremental,
MetricValue::Distribution {
samples: vector_core::samples![10.0 => 1],
statistic: StatisticKind::Histogram,
},
));

let mut serializer = JsonSerializer::new();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(
bytes.freeze(),
r#"{"name":"glork","kind":"incremental","distribution":{"samples":[{"value":10.0,"rate":1}],"statistic":"histogram"}}"#
);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion lib/k8s-e2e-tests/tests/vector-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const CUSTOM_RESOURCE_VECTOR_CONFIG: &str = indoc! {r#"
[sinks.stdout]
type = "console"
inputs = ["kubernetes_logs"]
encoding = "json"
encoding.codec = "json"
"#};

/// This test validates that vector picks up logs at the simplest case
Expand Down
2 changes: 1 addition & 1 deletion rfcs/2020-03-06-1999-api-extensions-for-lua-transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ Here `event` is an encoded event to be produced by the transform, and `lane` is
> [sinks.example_console]
> type = "console"
> inputs = ["example_transform.example_lane"] # would output the event from `example_lane`
> encoding = "text"
> encoding.codec = "text"
> ```
>
> Other components connected to the same transform, but with different lanes names or without lane names at all would not receive any event.
Expand Down
2 changes: 1 addition & 1 deletion rfcs/2020-04-15-2341-wasm-plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ module = "target/wasm32-wasi/release/echo.wasm"
healthcheck = true
inputs = ["demo"]
type = "console"
encoding = "json"
encoding.codec = "json"
buffer.type = "memory"
buffer.max_events = 500
buffer.when_full = "block"
Expand Down
2 changes: 1 addition & 1 deletion rfcs/2021-07-20-8288-csv-enrichment.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ To represent the CSV file we have a new top level configuration option.
```toml
[enrichment_tables.csv_file]
type = "file"
encoding = "csv"
encoding.codec = "csv"
path = "\path_to_csv"
delimiter = ","
```
Expand Down
2 changes: 1 addition & 1 deletion skaffold/manifests/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ data:
type = "console"
inputs = ["kubernetes_logs", "internal_metrics"]
target = "stdout"
encoding = "json"
encoding.codec = "json"
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ stream_name = "soak_fluent_remap_firehose"
endpoint = "http://localhost:8080"
healthcheck.enabled = true
compression = "none"
encoding = "json"
encoding.codec = "json"
region = "us-east-2"
auth.access_key_id = "totallyanaccesskeyid"
auth.secret_access_key = "alsoasecretaccesskey"
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ type = "internal_metrics"
[sources.logs]
type = "http"
address = "0.0.0.0:8282"
encoding = "ndjson"
framing.method = "newline_delimited"
decoding.codec = "json"

##
## Transforms
Expand Down
2 changes: 1 addition & 1 deletion soaks/tests/http_pipelines_blackhole/vector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type = "internal_metrics"
[sources.logs]
type = "http"
address = "0.0.0.0:8282"
encoding = "text"
decoding.codec = "bytes"

##
## Transforms
Expand Down
2 changes: 1 addition & 1 deletion soaks/tests/http_pipelines_blackhole_acks/vector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type = "internal_metrics"
[sources.logs]
type = "http"
address = "0.0.0.0:8282"
encoding = "text"
decoding.codec = "bytes"
acknowledgements = true

##
Expand Down
2 changes: 1 addition & 1 deletion soaks/tests/http_pipelines_no_grok_blackhole/vector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type = "internal_metrics"
[sources.logs]
type = "http"
address = "0.0.0.0:8282"
encoding = "text"
decoding.codec = "bytes"

##
## Transforms
Expand Down
Loading