Skip to content

Commit

Permalink
chore(splunk_hec_logs sink): support log namespaced host and timestam…
Browse files Browse the repository at this point in the history
…p attributes (#20211)

- Currently the splunk hec logs sink has default paths for the timestamp_key and host_key configurable settings which statically point to the global log namespace location for these keys.
- In order to support log namespacing for these settings, the logic for determining the default case is moved from the configuration serialization down into the sink's encoding of the event. That is necessary because we must determine at runtime if each event received is namespaced or not.
- Note that this necessitated a small change to the Humio sinks as well simply due to those sinks being wrappers over Splunk HEC, to preserve existing behavior.
  • Loading branch information
neuronull committed Apr 4, 2024
1 parent 1eb18e2 commit 7b85728
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 113 deletions.
11 changes: 7 additions & 4 deletions src/sinks/humio/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ pub struct HumioLogsConfig {

/// Overrides the name of the log field used to retrieve the hostname to send to Humio.
///
/// By default, the [global `log_schema.host_key` option][global_host_key] is used.
/// By default, the [global `log_schema.host_key` option][global_host_key] is used if log
/// events are Legacy namespaced, or the semantic meaning of "host" is used, if defined.
///
/// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
#[serde(default = "config_host_key_target_path")]
Expand Down Expand Up @@ -128,8 +129,10 @@ pub struct HumioLogsConfig {
pub acknowledgements: AcknowledgementsConfig,

/// Overrides the name of the log field used to retrieve the timestamp to send to Humio.
/// When set to `“”`, a timestamp is not set in the events sent to Humio.
///
/// By default, the [global `log_schema.timestamp_key` option][global_timestamp_key] is used.
/// By default, either the [global `log_schema.timestamp_key` option][global_timestamp_key] is used
/// if log events are Legacy namespaced, or the semantic meaning of "timestamp" is used, if defined.
///
/// [global_timestamp_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.timestamp_key
#[serde(default = "config_timestamp_key_target_path")]
Expand Down Expand Up @@ -188,7 +191,7 @@ impl HumioLogsConfig {
HecLogsSinkConfig {
default_token: self.token.clone(),
endpoint: self.endpoint.clone(),
host_key: self.host_key.clone(),
host_key: Some(self.host_key.clone()),
indexed_fields: self.indexed_fields.clone(),
index: self.index.clone(),
sourcetype: self.event_type.clone(),
Expand All @@ -203,7 +206,7 @@ impl HumioLogsConfig {
indexer_acknowledgements_enabled: false,
..Default::default()
},
timestamp_key: config_timestamp_key_target_path(),
timestamp_key: Some(config_timestamp_key_target_path()),
endpoint_target: EndpointTarget::Event,
auto_extract_timestamp: None,
}
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/humio/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ pub struct HumioMetricsConfig {

/// Overrides the name of the log field used to retrieve the hostname to send to Humio.
///
/// By default, the [global `log_schema.host_key` option][global_host_key] is used.
/// By default, the [global `log_schema.host_key` option][global_host_key] is used if log
/// events are Legacy namespaced, or the semantic meaning of "host" is used, if defined.
///
/// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
#[serde(default = "config_host_key")]
Expand Down
12 changes: 0 additions & 12 deletions src/sinks/splunk_hec/common/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,6 @@ pub fn build_uri(
uri.parse::<Uri>()
}

pub fn config_host_key_target_path() -> OptionalTargetPath {
OptionalTargetPath {
path: crate::config::log_schema().host_key_target_path().cloned(),
}
}

pub fn config_host_key() -> OptionalValuePath {
OptionalValuePath {
path: crate::config::log_schema().host_key().cloned(),
Expand All @@ -155,12 +149,6 @@ pub fn config_timestamp_key_target_path() -> OptionalTargetPath {
}
}

pub fn config_timestamp_key() -> OptionalValuePath {
OptionalValuePath {
path: crate::config::log_schema().timestamp_key().cloned(),
}
}

pub fn render_template_string<'a>(
template: &Template,
event: impl Into<EventRef<'a>>,
Expand Down
33 changes: 19 additions & 14 deletions src/sinks/splunk_hec/logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use crate::{
prelude::*,
splunk_hec::common::{
acknowledgements::HecClientAcknowledgementsConfig,
build_healthcheck, build_http_batch_service, config_host_key_target_path,
config_timestamp_key_target_path, create_client,
build_healthcheck, build_http_batch_service, create_client,
service::{HecService, HttpRequestBuilder},
EndpointTarget, SplunkHecDefaultBatchSettings,
},
Expand Down Expand Up @@ -53,12 +52,15 @@ pub struct HecLogsSinkConfig {

/// Overrides the name of the log field used to retrieve the hostname to send to Splunk HEC.
///
/// By default, the [global `log_schema.host_key` option][global_host_key] is used.
/// By default, the [global `log_schema.host_key` option][global_host_key] is used if log
/// events are Legacy namespaced, or the semantic meaning of "host" is used, if defined.
///
/// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
// NOTE: The `OptionalTargetPath` is wrapped in an `Option` in order to distinguish between a true
// `None` type and an empty string. This is necessary because `OptionalTargetPath` deserializes an
// empty string to a `None` path internally.
#[configurable(metadata(docs::advanced))]
#[serde(default = "config_host_key_target_path")]
pub host_key: OptionalTargetPath,
pub host_key: Option<OptionalTargetPath>,

/// Fields to be [added to Splunk index][splunk_field_index_docs].
///
Expand Down Expand Up @@ -124,13 +126,16 @@ pub struct HecLogsSinkConfig {
/// Overrides the name of the log field used to retrieve the timestamp to send to Splunk HEC.
/// When set to `“”`, a timestamp is not set in the events sent to Splunk HEC.
///
/// By default, the [global `log_schema.timestamp_key` option][global_timestamp_key] is used.
/// By default, either the [global `log_schema.timestamp_key` option][global_timestamp_key] is used
/// if log events are Legacy namespaced, or the semantic meaning of "timestamp" is used, if defined.
///
/// [global_timestamp_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.timestamp_key
#[configurable(metadata(docs::advanced))]
#[serde(default = "crate::sinks::splunk_hec::common::config_timestamp_key_target_path")]
#[configurable(metadata(docs::examples = "timestamp", docs::examples = ""))]
pub timestamp_key: OptionalTargetPath,
// NOTE: The `OptionalTargetPath` is wrapped in an `Option` in order to distinguish between a true
// `None` type and an empty string. This is necessary because `OptionalTargetPath` deserializes an
// empty string to a `None` path internally.
pub timestamp_key: Option<OptionalTargetPath>,

/// Passes the `auto_extract_timestamp` option to Splunk.
///
Expand Down Expand Up @@ -158,7 +163,7 @@ impl GenerateConfig for HecLogsSinkConfig {
toml::Value::try_from(Self {
default_token: "${VECTOR_SPLUNK_HEC_TOKEN}".to_owned().into(),
endpoint: "endpoint".to_owned(),
host_key: config_host_key_target_path(),
host_key: None,
indexed_fields: vec![],
index: None,
sourcetype: None,
Expand All @@ -170,7 +175,7 @@ impl GenerateConfig for HecLogsSinkConfig {
tls: None,
acknowledgements: Default::default(),
timestamp_nanos_key: None,
timestamp_key: config_timestamp_key_target_path(),
timestamp_key: None,
auto_extract_timestamp: None,
endpoint_target: EndpointTarget::Event,
})
Expand Down Expand Up @@ -270,9 +275,9 @@ impl HecLogsSinkConfig {
.iter()
.map(|config_path| config_path.0.clone())
.collect(),
host_key: self.host_key.path.clone(),
host_key: self.host_key.clone(),
timestamp_nanos_key: self.timestamp_nanos_key.clone(),
timestamp_key: self.timestamp_key.path.clone(),
timestamp_key: self.timestamp_key.clone(),
endpoint_target: self.endpoint_target,
auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(),
};
Expand Down Expand Up @@ -305,7 +310,7 @@ mod tests {
let config = Self {
endpoint: endpoint.clone(),
default_token: "i_am_an_island".to_string().into(),
host_key: config_host_key_target_path(),
host_key: None,
indexed_fields: vec![],
index: None,
sourcetype: None,
Expand All @@ -327,7 +332,7 @@ mod tests {
..Default::default()
},
timestamp_nanos_key: None,
timestamp_key: config_timestamp_key_target_path(),
timestamp_key: None,
auto_extract_timestamp: None,
endpoint_target: EndpointTarget::Raw,
};
Expand Down
14 changes: 7 additions & 7 deletions src/sinks/splunk_hec/logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn config(
HecLogsSinkConfig {
default_token: get_token().await.into(),
endpoint: splunk_hec_address(),
host_key: OptionalTargetPath::event("host"),
host_key: Some(OptionalTargetPath::event("host")),
indexed_fields,
index: None,
sourcetype: None,
Expand All @@ -130,7 +130,7 @@ async fn config(
tls: None,
acknowledgements: Default::default(),
timestamp_nanos_key: None,
timestamp_key: Default::default(),
timestamp_key: None,
auto_extract_timestamp: None,
endpoint_target: EndpointTarget::Event,
}
Expand Down Expand Up @@ -409,7 +409,7 @@ async fn splunk_configure_hostname() {
let cx = SinkContext::default();

let config = HecLogsSinkConfig {
host_key: OptionalTargetPath::event("roast"),
host_key: Some(OptionalTargetPath::event("roast")),
..config(JsonSerializerConfig::default().into(), vec!["asdf".into()]).await
};

Expand Down Expand Up @@ -488,11 +488,11 @@ async fn splunk_auto_extracted_timestamp() {

let config = HecLogsSinkConfig {
auto_extract_timestamp: Some(true),
timestamp_key: OptionalTargetPath {
timestamp_key: Some(OptionalTargetPath {
path: Some(OwnedTargetPath::event(lookup::owned_value_path!(
"timestamp"
))),
},
}),
..config(JsonSerializerConfig::default().into(), vec![]).await
};

Expand Down Expand Up @@ -551,11 +551,11 @@ async fn splunk_non_auto_extracted_timestamp() {

let config = HecLogsSinkConfig {
auto_extract_timestamp: Some(false),
timestamp_key: OptionalTargetPath {
timestamp_key: Some(OptionalTargetPath {
path: Some(OwnedTargetPath::event(lookup::owned_value_path!(
"timestamp"
))),
},
}),
..config(JsonSerializerConfig::default().into(), vec![]).await
};

Expand Down
75 changes: 60 additions & 15 deletions src/sinks/splunk_hec/logs/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@ use crate::{
util::processed_event::ProcessedEvent,
},
};
use vector_lib::lookup::{event_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
use vector_lib::{
config::{log_schema, LogNamespace},
lookup::{event_path, lookup_v2::OptionalTargetPath, OwnedValuePath, PathPrefix},
schema::meaning,
};
use vrl::path::OwnedTargetPath;

// NOTE: The `OptionalTargetPath`s are wrapped in an `Option` in order to distinguish between a true
// `None` type and an empty string. This is necessary because `OptionalTargetPath` deserializes an
// empty string to a `None` path internally.
pub struct HecLogsSink<S> {
pub context: SinkContext,
pub service: S,
Expand All @@ -24,9 +32,9 @@ pub struct HecLogsSink<S> {
pub source: Option<Template>,
pub index: Option<Template>,
pub indexed_fields: Vec<OwnedValuePath>,
pub host_key: Option<OwnedTargetPath>,
pub host_key: Option<OptionalTargetPath>,
pub timestamp_nanos_key: Option<String>,
pub timestamp_key: Option<OwnedTargetPath>,
pub timestamp_key: Option<OptionalTargetPath>,
pub endpoint_target: EndpointTarget,
pub auto_extract_timestamp: bool,
}
Expand All @@ -36,9 +44,9 @@ pub struct HecLogData<'a> {
pub source: Option<&'a Template>,
pub index: Option<&'a Template>,
pub indexed_fields: &'a [OwnedValuePath],
pub host_key: Option<OwnedTargetPath>,
pub host_key: Option<OptionalTargetPath>,
pub timestamp_nanos_key: Option<&'a String>,
pub timestamp_key: Option<OwnedTargetPath>,
pub timestamp_key: Option<OptionalTargetPath>,
pub endpoint_target: EndpointTarget,
pub auto_extract_timestamp: bool,
}
Expand Down Expand Up @@ -127,15 +135,15 @@ struct EventPartitioner {
pub sourcetype: Option<Template>,
pub source: Option<Template>,
pub index: Option<Template>,
pub host_key: Option<OwnedTargetPath>,
pub host_key: Option<OptionalTargetPath>,
}

impl EventPartitioner {
const fn new(
sourcetype: Option<Template>,
source: Option<Template>,
index: Option<Template>,
host_key: Option<OwnedTargetPath>,
host_key: Option<OptionalTargetPath>,
) -> Self {
Self {
sourcetype,
Expand Down Expand Up @@ -180,11 +188,14 @@ impl Partitioner for EventPartitioner {
.ok()
});

let host = self
.host_key
.as_ref()
.and_then(|host_key| item.event.get(host_key))
.and_then(|value| value.as_str().map(|s| s.to_string()));
let host = user_or_namespaced_path(
&item.event,
self.host_key.as_ref(),
meaning::HOST,
log_schema().host_key_target_path(),
)
.and_then(|path| item.event.get(&path))
.and_then(|value| value.as_str().map(|s| s.to_string()));

Some(Partitioned {
token: item.event.metadata().splunk_hec_token(),
Expand Down Expand Up @@ -219,6 +230,27 @@ impl ByteSizeOf for HecLogsProcessedEventMetadata {

pub type HecProcessedEvent = ProcessedEvent<LogEvent, HecLogsProcessedEventMetadata>;

// determine the path for a field from one of the following use cases:
// 1. user provided a path in the config settings
// a. If the path provided was an empty string, None is returned
// 2. namespaced path ("default")
// a. if Legacy namespace, use the provided path from the global log schema
// b. if Vector namespace, use the semantically defined path
fn user_or_namespaced_path(
log: &LogEvent,
user_key: Option<&OptionalTargetPath>,
semantic: &str,
legacy_path: Option<&OwnedTargetPath>,
) -> Option<OwnedTargetPath> {
match user_key {
Some(maybe_key) => maybe_key.path.clone(),
None => match log.namespace() {
LogNamespace::Vector => log.find_key_by_meaning(semantic).cloned(),
LogNamespace::Legacy => legacy_path.cloned(),
},
}
}

pub fn process_log(event: Event, data: &HecLogData) -> HecProcessedEvent {
let mut log = event.into_log();

Expand All @@ -234,15 +266,28 @@ pub fn process_log(event: Event, data: &HecLogData) -> HecProcessedEvent {
.index
.and_then(|index| render_template_string(index, &log, INDEX_FIELD));

let host = data.host_key.as_ref().and_then(|key| log.get(key)).cloned();
let host = user_or_namespaced_path(
&log,
data.host_key.as_ref(),
meaning::HOST,
log_schema().host_key_target_path(),
)
.and_then(|path| log.get(&path))
.cloned();

// only extract the timestamp if this is the Event endpoint, and if the setting
// `auto_extract_timestamp` is false (because that indicates that we should leave
// the timestamp in the event as-is, and let Splunk do the extraction).
let timestamp = if EndpointTarget::Event == data.endpoint_target && !data.auto_extract_timestamp
{
data.timestamp_key.as_ref().and_then(|timestamp_key| {
match log.remove(timestamp_key) {
user_or_namespaced_path(
&log,
data.timestamp_key.as_ref(),
meaning::TIMESTAMP,
log_schema().timestamp_key_target_path(),
)
.and_then(|timestamp_path| {
match log.remove(&timestamp_path) {
Some(Value::Timestamp(ts)) => {
// set nanos in log if valid timestamp in event and timestamp_nanos_key is configured
if let Some(key) = data.timestamp_nanos_key {
Expand Down
Loading

0 comments on commit 7b85728

Please sign in to comment.