Skip to content

Commit

Permalink
fix(elasticsearch): Encode bulk action parameters as JSON
Browse files Browse the repository at this point in the history
They are currently using string templating which, if there are special characters in the value, will
end up creating an invalid JSON payload; an issue that can be difficult to track down. This happened
in #21288.

Signed-off-by: Jesse Szwedko <[email protected]>
  • Loading branch information
jszwedko committed Sep 13, 2024
1 parent e71016c commit e874265
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 20 deletions.
2 changes: 2 additions & 0 deletions changelog.d/elasticsearch-encoding.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The `elasticsearch` sink now encodes parameters such as `index` that contain characters that need to
be escaped in JSON strings.
100 changes: 80 additions & 20 deletions src/sinks/elasticsearch/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{io, io::Write};

use serde::Serialize;
use serde_json::json;
use vector_lib::buffers::EventCount;
use vector_lib::{config::telemetry, event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf};
use vector_lib::{
Expand Down Expand Up @@ -154,48 +155,78 @@ fn write_bulk_action(
(true, DocumentMetadata::Id(id)) => {
write!(
writer,
r#"{{"{}":{{"_index":"{}","_id":"{}"}}}}"#,
bulk_action, index, id
"{}",
json!({
bulk_action: {
"_index": index,
"_id": id,
}
}),
)
}
(false, DocumentMetadata::Id(id)) => {
write!(
writer,
r#"{{"{}":{{"_index":"{}","_type":"{}","_id":"{}"}}}}"#,
bulk_action, index, doc_type, id
"{}",
json!({
bulk_action: {
"_type": doc_type,
"_index": index,
"_id": id,
}
}),
)
}
(true, DocumentMetadata::WithoutId) => {
write!(writer, r#"{{"{}":{{"_index":"{}"}}}}"#, bulk_action, index)
write!(
writer,
"{}",
json!({
bulk_action: {
"_index": index,
}
}),
)
}
(false, DocumentMetadata::WithoutId) => {
write!(
writer,
r#"{{"{}":{{"_index":"{}","_type":"{}"}}}}"#,
bulk_action, index, doc_type
"{}",
json!({
bulk_action: {
"_type": doc_type,
"_index": index,
}
}),
)
}
(true, DocumentMetadata::IdAndVersion(id, version)) => {
write!(
writer,
r#"{{"{}":{{"_index":"{}","_id":"{}","version_type":"{}","version":{}}}}}"#,
bulk_action,
index,
id,
version.kind.as_str(),
version.value
"{}",
json!({
bulk_action: {
"_id": id,
"_index": index,
"version_type": version.kind.as_str(),
"version": version.value,
}
}),
)
}
(false, DocumentMetadata::IdAndVersion(id, version)) => {
write!(
writer,
r#"{{"{}":{{"_index":"{}","_type":"{}","_id":"{}","version_type":"{}","version":{}}}}}"#,
bulk_action,
index,
doc_type,
id,
version.kind.as_str(),
version.value
"{}",
json!({
bulk_action: {
"_id": id,
"_type": doc_type,
"_index": index,
"version_type": version.kind.as_str(),
"version": version.value,
}
}),
)
}
},
Expand Down Expand Up @@ -317,4 +348,33 @@ mod tests {
assert!(nested.contains_key("_type"));
assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE"));
}

#[test]
fn encodes_fields_with_newlines() {
let mut writer = Vec::new();

_ = write_bulk_action(
&mut writer,
"ACTION\n",
"INDEX\n",
"TYPE\n",
false,
&DocumentMetadata::Id("ID\n".to_string()),
);

let value: serde_json::Value = serde_json::from_slice(&writer).unwrap();
let value = value.as_object().unwrap();

assert!(value.contains_key("ACTION\n"));

let nested = value.get("ACTION\n").unwrap();
let nested = nested.as_object().unwrap();

assert!(nested.contains_key("_index"));
assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX\n"));
assert!(nested.contains_key("_id"));
assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID\n"));
assert!(nested.contains_key("_type"));
assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE\n"));
}
}

0 comments on commit e874265

Please sign in to comment.