Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(splunk hec sink)!: json encoding should get message first and then parse #20297

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.d/splunk_hec_sink_encode_message_only.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The Splunk HEC sink is now using the `message` meaning to retrieve the relevant value and encodes the retrieved value
only. Note that if the retrieved value is `None`, an event with an empty message will be published.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions:

  • Do we know how Splunk reacts if you send it an event with an empty event field?
  • Is the semantic field required? I.e. will Vector fail to boot if there is no semantic message defined?
  • Also, is this only true for the /event endpoint and not for /raw?

24 changes: 15 additions & 9 deletions src/sinks/splunk_hec/logs/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub enum HecEvent<'a> {
Text(Cow<'a, str>),
}

/// See https://docs.splunk.com/Documentation/Splunk/9.2.1/Data/FormateventsforHTTPEventCollector#Event_data.
#[derive(Serialize, Debug)]
pub struct HecData<'a> {
#[serde(flatten)]
Expand Down Expand Up @@ -86,16 +87,21 @@ impl Encoder<Vec<HecProcessedEvent>> for HecLogsEncoder {
}
EndpointTarget::Event => {
let serializer = encoder.serializer();
let hec_event = if serializer.supports_json() {
HecEvent::Json(
serializer
.to_json_value(event)
.map_err(|error| emit!(SplunkEventEncodeError { error }))
.ok()?,
)
let hec_event = if let Some(message) = event.as_log().get_message() {
Copy link
Contributor Author

@pront pront Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: This is expected to break all integration tests using JsonSerializerConfig. For example, look at splunk_custom_fields, looking up by message is no longer correct. The code needs to be updated to directly access the value from the event.

Also, all the tests rely on the legacy namespace. I might go ahead and use the Vector namesapce.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @vladimir-dd and @jszwedko for confirming before I go ahead and edit all those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move all other fields from LogEvent to HecData.fields otherwise we could lost some fields.

Copy link
Contributor

@vladimir-dd vladimir-dd Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, didn't notice that we pass fields through metadata.fields. But since we do not expose indexed_fields for the sink, do we always pass empty.fields? Is this an expected behaviour? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like I'm getting a bit turned around by this now 😵

I think I'd talked to Pavlos about only encoding the message field, but, thinking about it more now, the field in the HEC payload is actually called event which makes me think maybe it should be encoding the entire event and not just the message key. Their definition of event:

Event data can be assigned to the "event" key within the JSON object in the HTTP request, or it can be raw text. The "event" key is at the same level within the JSON event packet as the metadata keys. Within the "event" key-value curly brackets, the data can be in whatever format you want: a string, a number, another JSON object, and so on.

You can batch multiple events in one event packet by combining them within the request. By batching the events, you're specifying that any event metadata within the request is to apply to all of the events contained in the request. Batching events can significantly speed performance when you need to index large quantities of data.

https://docs.splunk.com/Documentation/Splunk/9.2.1/Data/FormateventsforHTTPEventCollector#Event_data:~:text=from%20the%20request.-,Event%20data,-Event%20data%20can

fields is defined as:

The fields key isn't applicable to raw data. This key specifies a JSON object that contains a flat (not nested) list of explicit custom fields to be defined at index time. Requests containing the "fields" property must be sent to the /collector/event endpoint, or else they aren't indexed. For more information, see Indexed field extractions.

I agree with @vladimir-dd that if we only encode message as event that maybe the rest of the fields should go into fields; however, now I'm not so sure that the entire event shouldn't just go in event.

This might be a case where we need to play around with it a bit and see how it looks like in the Splunk interface under various conditions (like Splunk HEC -> Splunk and Syslog -> Splunk).

Apologies for what feels like going in circles a bit here. I'm happy to chat about this if it helps resolve it more quickly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking it as a draft for now. It's not urgent to get to the bottom of this given we have other priorities.

if serializer.supports_json() {
let message_event = Event::from(LogEvent::from(message.clone()));
HecEvent::Json(
serializer
.to_json_value(message_event)
.map_err(|error| emit!(SplunkEventEncodeError { error }))
.ok()?,
)
} else {
encoder.encode(event, &mut bytes).ok()?;
Copy link
Contributor Author

@pront pront Apr 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗒️ Encoders call get_message() internally.

HecEvent::Text(String::from_utf8_lossy(&bytes))
}
} else {
encoder.encode(event, &mut bytes).ok()?;
HecEvent::Text(String::from_utf8_lossy(&bytes))
HecEvent::Text("".into())
Copy link
Contributor

@vladimir-dd vladimir-dd Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use the same logic as above here to avoid losing information - but instead of encoding message encode the whole event(either as json, or as text)?

};

let mut hec_data = HecData::new(
Expand Down
5 changes: 1 addition & 4 deletions src/sinks/splunk_hec/logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,7 @@ async fn splunk_auto_extracted_timestamp() {
// Splunk will determine the timestamp from the *message* field in the event.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment for these tests, they all use the Legacy namespace.

// Thus, we expect the `timestamp` field to still be present.
assert_eq!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗒️ cargo vdev -v int start -a splunk works in Ubuntu. Also, when working on Ubuntu cargo vdev -v int test --retries 2 -a splunk fails to runs tests.

format!(
"{{\"message\":\"{}\",\"timestamp\":\"2020-03-05T00:00:00Z\"}}",
message
),
"this message is on 2018-10-22 23:43:42",
entry["_raw"].as_str().unwrap()
);
assert_eq!(
Expand Down
112 changes: 82 additions & 30 deletions src/sinks/splunk_hec/logs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use vector_lib::{
};
use vrl::path::OwnedTargetPath;
use vrl::value::Kind;
use vrl::{event_path, metadata_path, owned_value_path};
use vrl::{metadata_path, owned_value_path};

use super::sink::{HecLogsProcessedEventMetadata, HecProcessedEvent};
use crate::sinks::util::processed_event::ProcessedEvent;
Expand Down Expand Up @@ -45,7 +45,7 @@ struct HecEventJson {

#[derive(Deserialize, Debug)]
struct HecEventText {
time: f64,
time: Option<f64>,
event: String,
fields: BTreeMap<String, String>,
source: Option<String>,
Expand All @@ -71,17 +71,25 @@ fn get_processed_event_timestamp(
timestamp_key: Option<OptionalTargetPath>,
auto_extract_timestamp: bool,
) -> HecProcessedEvent {
let mut event = Event::Log(LogEvent::from("hello world"));
// JSON object as event data
let mut event = Event::Log(LogEvent::from(btreemap! {
"msg" => "hello world",
"event_sourcetype" => "test_sourcetype",
"event_source" => "test_source",
"event_index" => "test_index",
"host_key" => "test_host",
"event_field1" => "test_value1",
"event_field2" => "test_value2",
"key" => "value",
"int_val" => 123,
}));
// This must exist because it is checked at runtime
event.as_mut_log().insert(metadata_path!("vector"), "");
let definition = Definition::new_with_default_metadata(Kind::any(), [LogNamespace::Vector])
.with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE);
event
.as_mut_log()
.insert("event_sourcetype", "test_sourcetype");
event.as_mut_log().insert("event_source", "test_source");
event.as_mut_log().insert("event_index", "test_index");
event.as_mut_log().insert("host_key", "test_host");
event.as_mut_log().insert("event_field1", "test_value1");
event.as_mut_log().insert("event_field2", "test_value2");
event.as_mut_log().insert("key", "value");
event.as_mut_log().insert("int_val", 123);
.metadata_mut()
.set_schema_definition(&Arc::new(definition));

if let Some(OptionalTargetPath {
path: Some(ts_path),
Expand Down Expand Up @@ -172,17 +180,21 @@ fn splunk_encode_log_event_json() {
let processed_event = get_processed_event();
let hec_data =
get_encoded_event::<HecEventJson>(JsonSerializerConfig::default().into(), processed_event);
let event = hec_data.event;
let hec_event = hec_data.event;

assert_eq!(event.get("key").unwrap(), &serde_json::Value::from("value"));
assert_eq!(event.get("int_val").unwrap(), &serde_json::Value::from(123));
assert_eq!(
event
.get(&log_schema().message_key().unwrap().to_string())
.unwrap(),
hec_event.get("key").unwrap(),
&serde_json::Value::from("value")
);
assert_eq!(
hec_event.get("int_val").unwrap(),
&serde_json::Value::from(123)
);
assert_eq!(
hec_event.get("msg").unwrap(),
&serde_json::Value::from("hello world")
);
assert!(event
assert!(hec_event
.get(log_schema().timestamp_key().unwrap().to_string().as_str())
.is_none());

Expand All @@ -195,7 +207,7 @@ fn splunk_encode_log_event_json() {

assert_eq!(hec_data.time, Some(1638366107.111));
assert_eq!(
event.get("ts_nanos_key").unwrap(),
hec_event.get("ts_nanos_key").unwrap(),
&serde_json::Value::from(456123)
);
}
Expand All @@ -206,7 +218,19 @@ fn splunk_encode_log_event_text() {
let hec_data =
get_encoded_event::<HecEventText>(TextSerializerConfig::default().into(), processed_event);

assert_eq!(hec_data.event.as_str(), "hello world");
assert_eq!(
hec_data.event.as_str(),
"{\"event_field1\":\"test_value1\",\
\"event_field2\":\"test_value2\",\
\"event_index\":\"test_index\",\
\"event_source\":\"test_source\",\
\"event_sourcetype\":\"test_sourcetype\",\
\"host_key\":\"test_host\",\
\"int_val\":123,\
\"key\":\"value\",\
\"msg\":\"hello world\",\
\"ts_nanos_key\":456123}"
);

assert_eq!(hec_data.source, Some("test_source".to_string()));
assert_eq!(hec_data.sourcetype, Some("test_sourcetype".to_string()));
Expand All @@ -215,7 +239,34 @@ fn splunk_encode_log_event_text() {

assert_eq!(hec_data.fields.get("event_field1").unwrap(), "test_value1");

assert_eq!(hec_data.time, 1638366107.111);
assert_eq!(hec_data.time, Some(1638366107.111));
}

#[test]
fn splunk_encode_log_event_message_none() {
let metadata = EventMetadata::default().with_schema_definition(&Arc::new(
Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
.with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
));

let processed_event = process_log(
Event::Log(LogEvent::new_with_metadata(metadata)),
&super::sink::HecLogData {
sourcetype: None,
source: None,
index: None,
host_key: None,
indexed_fields: &[],
timestamp_nanos_key: None,
timestamp_key: None,
endpoint_target: EndpointTarget::Event,
auto_extract_timestamp: false,
},
);

let hec_data =
get_encoded_event::<HecEventText>(TextSerializerConfig::default().into(), processed_event);
assert_eq!(hec_data.event.as_str(), "");
}

#[tokio::test]
Expand Down Expand Up @@ -344,14 +395,20 @@ fn splunk_encode_log_event_semantic_meanings() {
&owned_value_path!("timestamp"),
Kind::timestamp(),
Some(meaning::TIMESTAMP),
),
)
.with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
));

let mut log = LogEvent::new_with_metadata(metadata);
log.insert(event_path!("message"), "the_message");

log.insert(
&OwnedTargetPath::event_root(),
Value::from(btreemap! {"foo" => "bar"}),
);

// insert an arbitrary metadata field such that the log becomes Vector namespaced
log.insert(metadata_path!("vector", "foo"), "bar");
assert!(log.namespace() == LogNamespace::Vector);

let og_time = Utc::now();

Expand All @@ -364,12 +421,8 @@ fn splunk_encode_log_event_semantic_meanings() {
Value::Timestamp(og_time),
);

assert!(log.namespace() == LogNamespace::Vector);

let event = Event::Log(log);

let processed_event = process_log(
event,
Event::Log(log),
&super::sink::HecLogData {
sourcetype: None,
source: None,
Expand All @@ -385,7 +438,6 @@ fn splunk_encode_log_event_semantic_meanings() {

let hec_data =
get_encoded_event::<HecEventJson>(JsonSerializerConfig::default().into(), processed_event);

assert_eq!(hec_data.time, Some(expected_time));

assert_eq!(hec_data.host, Some("roast".to_string()));
Expand Down
Loading