From e874265462b3120a8153f0d6842bef7246cdbea8 Mon Sep 17 00:00:00 2001 From: Jesse Szwedko Date: Fri, 13 Sep 2024 07:46:05 -0700 Subject: [PATCH] fix(elasticsearch): Encode bulk action parameters as JSON They are currently using string templating which, if there are special characters in the value, will end up creating an invalid JSON payload; an issue that can be difficult to track down. This happened in https://github.com/vectordotdev/vector/discussions/21288. Signed-off-by: Jesse Szwedko --- changelog.d/elasticsearch-encoding.fix.md | 2 + src/sinks/elasticsearch/encoder.rs | 100 +++++++++++++++++----- 2 files changed, 82 insertions(+), 20 deletions(-) create mode 100644 changelog.d/elasticsearch-encoding.fix.md diff --git a/changelog.d/elasticsearch-encoding.fix.md b/changelog.d/elasticsearch-encoding.fix.md new file mode 100644 index 0000000000000..1c0c12184089c --- /dev/null +++ b/changelog.d/elasticsearch-encoding.fix.md @@ -0,0 +1,2 @@ +The `elasticsearch` sink now encodes parameters such as `index` that contain characters that need to +be escaped in JSON strings. diff --git a/src/sinks/elasticsearch/encoder.rs b/src/sinks/elasticsearch/encoder.rs index 2ee4f893e6c2a..bc0b638b9ce5c 100644 --- a/src/sinks/elasticsearch/encoder.rs +++ b/src/sinks/elasticsearch/encoder.rs @@ -1,6 +1,7 @@ use std::{io, io::Write}; use serde::Serialize; +use serde_json::json; use vector_lib::buffers::EventCount; use vector_lib::{config::telemetry, event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf}; use vector_lib::{ @@ -154,48 +155,78 @@ fn write_bulk_action( (true, DocumentMetadata::Id(id)) => { write!( writer, - r#"{{"{}":{{"_index":"{}","_id":"{}"}}}}"#, - bulk_action, index, id + "{}", + json!({ + bulk_action: { + "_index": index, + "_id": id, + } + }), ) } (false, DocumentMetadata::Id(id)) => { write!( writer, - r#"{{"{}":{{"_index":"{}","_type":"{}","_id":"{}"}}}}"#, - bulk_action, index, doc_type, id + "{}", + json!({ + bulk_action: { + "_type": doc_type, + "_index": index, + "_id": id, + } + }), ) } (true, DocumentMetadata::WithoutId) => { - write!(writer, r#"{{"{}":{{"_index":"{}"}}}}"#, bulk_action, index) + write!( + writer, + "{}", + json!({ + bulk_action: { + "_index": index, + } + }), + ) } (false, DocumentMetadata::WithoutId) => { write!( writer, - r#"{{"{}":{{"_index":"{}","_type":"{}"}}}}"#, - bulk_action, index, doc_type + "{}", + json!({ + bulk_action: { + "_type": doc_type, + "_index": index, + } + }), ) } (true, DocumentMetadata::IdAndVersion(id, version)) => { write!( writer, - r#"{{"{}":{{"_index":"{}","_id":"{}","version_type":"{}","version":{}}}}}"#, - bulk_action, - index, - id, - version.kind.as_str(), - version.value + "{}", + json!({ + bulk_action: { + "_id": id, + "_index": index, + "version_type": version.kind.as_str(), + "version": version.value, + } + }), ) } (false, DocumentMetadata::IdAndVersion(id, version)) => { write!( writer, - r#"{{"{}":{{"_index":"{}","_type":"{}","_id":"{}","version_type":"{}","version":{}}}}}"#, - bulk_action, - index, - doc_type, - id, - version.kind.as_str(), - version.value + "{}", + json!({ + bulk_action: { + "_id": id, + "_type": doc_type, + "_index": index, + "version_type": version.kind.as_str(), + "version": version.value, + } + }), ) } }, @@ -317,4 +348,33 @@ mod tests { assert!(nested.contains_key("_type")); assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE")); } + + #[test] + fn encodes_fields_with_newlines() { + let mut writer = Vec::new(); + + _ = write_bulk_action( + &mut writer, + "ACTION\n", + "INDEX\n", + "TYPE\n", + false, + &DocumentMetadata::Id("ID\n".to_string()), + ); + + let value: serde_json::Value = serde_json::from_slice(&writer).unwrap(); + let value = value.as_object().unwrap(); + + assert!(value.contains_key("ACTION\n")); + + let nested = value.get("ACTION\n").unwrap(); + let nested = nested.as_object().unwrap(); + + assert!(nested.contains_key("_index")); + assert_eq!(nested.get("_index").unwrap().as_str(), Some("INDEX\n")); + assert!(nested.contains_key("_id")); + assert_eq!(nested.get("_id").unwrap().as_str(), Some("ID\n")); + assert!(nested.contains_key("_type")); + assert_eq!(nested.get("_type").unwrap().as_str(), Some("TYPE\n")); + } }