diff --git a/changelog.d/21314-new-relic-metric-timestamp.enhancement.md b/changelog.d/21314-new-relic-metric-timestamp.enhancement.md new file mode 100644 index 0000000000000..b78372c33defc --- /dev/null +++ b/changelog.d/21314-new-relic-metric-timestamp.enhancement.md @@ -0,0 +1 @@ +Timestamps on metrics are now sent to the New Relic metrics API with millisecond resolution. diff --git a/src/sinks/new_relic/model.rs b/src/sinks/new_relic/model.rs index 7874ada5da9ba..0efc352449f2d 100644 --- a/src/sinks/new_relic/model.rs +++ b/src/sinks/new_relic/model.rs @@ -2,10 +2,9 @@ use std::{ collections::{BTreeMap, HashMap}, convert::TryFrom, fmt::Debug, - time::SystemTime, }; -use chrono::{DateTime, Utc}; +use chrono::Utc; use ordered_float::NotNan; use serde::Serialize; use vector_lib::internal_event::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL}; @@ -30,11 +29,23 @@ pub(super) struct MetricsApiModel(pub [MetricDataStore; 1]); #[derive(Debug, Serialize)] pub(super) struct MetricDataStore { - pub metrics: Vec, + pub metrics: Vec, +} + +#[derive(Debug, Serialize)] +pub(super) struct MetricData { + #[serde(rename = "interval.ms", skip_serializing_if = "Option::is_none")] + pub interval_ms: Option, + pub name: String, + pub r#type: &'static str, + pub value: f64, + pub timestamp: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub attributes: Option>, } impl MetricsApiModel { - pub(super) fn new(metrics: Vec) -> Self { + pub(super) fn new(metrics: Vec) -> Self { Self([MetricDataStore { metrics }]) } } @@ -59,23 +70,19 @@ impl TryFrom> for MetricsApiModel { // Generate Value::Object() from BTreeMap let (series, data, _) = metric.into_parts(); - let mut metric_data = ObjectMap::new(); - // We only handle gauge and counter metrics // Extract value & type and set type-related attributes - let (value, metric_type) = match (data.value, &data.kind) { + let (value, metric_type, interval_ms) = match (data.value, &data.kind) { (MetricValue::Counter { value }, MetricKind::Incremental) => { let Some(interval_ms) = data.time.interval_ms else { // Incremental counter without an interval is worthless, skip this metric num_missing_interval += 1; return None; }; - metric_data - .insert("interval.ms".into(), Value::from(interval_ms.get() as i64)); - (value, "count") + (value, "count", Some(interval_ms.get() as i64)) } - (MetricValue::Counter { value }, MetricKind::Absolute) => (value, "gauge"), - (MetricValue::Gauge { value }, _) => (value, "gauge"), + (MetricValue::Counter { value }, MetricKind::Absolute) + | (MetricValue::Gauge { value }, _) => (value, "gauge", None), _ => { // Unsupported metric type num_unsupported_metric_type += 1; @@ -84,34 +91,20 @@ impl TryFrom> for MetricsApiModel { }; // Set name, type, value, timestamp, and attributes - metric_data.insert("name".into(), Value::from(series.name.name)); - metric_data.insert("type".into(), Value::from(metric_type)); - let Some(value) = NotNan::new(value).ok() else { + if value.is_nan() { num_nan_value += 1; return None; }; - metric_data.insert("value".into(), Value::from(value)); - metric_data.insert( - "timestamp".into(), - Value::from( - data.time - .timestamp - .unwrap_or_else(|| DateTime::::from(SystemTime::now())) - .timestamp(), - ), - ); - if let Some(tags) = series.tags { - metric_data.insert( - "attributes".into(), - Value::from( - tags.iter_single() - .map(|(key, value)| (key.into(), Value::from(value))) - .collect::>(), - ), - ); - } - Some(metric_data) + let timestamp = data.time.timestamp.unwrap_or_else(Utc::now); + Some(MetricData { + interval_ms, + name: series.name.name, + r#type: metric_type, + value, + timestamp: timestamp.timestamp_millis(), + attributes: series.tags.map(|tags| tags.into_iter_single().collect()), + }) }) .collect(); diff --git a/src/sinks/new_relic/tests.rs b/src/sinks/new_relic/tests.rs index 0c82141776b4c..5fbd4f35fb54d 100644 --- a/src/sinks/new_relic/tests.rs +++ b/src/sinks/new_relic/tests.rs @@ -1,6 +1,6 @@ -use std::{convert::TryFrom, num::NonZeroU32, time::SystemTime}; +use std::{convert::TryFrom, num::NonZeroU32}; -use chrono::{DateTime, Utc}; +use chrono::Utc; use futures::{future::ready, stream}; use serde::Deserialize; use serde_json::{json, to_value}; @@ -194,7 +194,7 @@ fn generates_log_api_model_with_dotted_fields() { #[test] fn generates_log_api_model_with_timestamp() { - let stamp = DateTime::::from(SystemTime::now()); + let stamp = Utc::now(); let event = Event::Log(LogEvent::from(value!({ "timestamp": stamp, "tag_key": "tag_value", @@ -231,7 +231,7 @@ fn generates_metric_api_model_without_timestamp() { "metrics": [{ "name": "my_metric", "value": 100.0, - "timestamp": metrics[0].get("timestamp").unwrap().clone(), + "timestamp": metrics[0].timestamp, "type": "gauge", }] }]) @@ -240,7 +240,7 @@ fn generates_metric_api_model_without_timestamp() { #[test] fn generates_metric_api_model_with_timestamp() { - let stamp = DateTime::::from(SystemTime::now()); + let stamp = Utc::now(); let m = Metric::new( "my_metric", MetricKind::Absolute, @@ -252,12 +252,12 @@ fn generates_metric_api_model_with_timestamp() { MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); assert_eq!( - to_value(&model).unwrap(), + to_value(model).unwrap(), json!([{ "metrics": [{ "name": "my_metric", "value": 100.0, - "timestamp": stamp.timestamp(), + "timestamp": stamp.timestamp_millis(), "type": "gauge", }] }]) @@ -266,7 +266,7 @@ fn generates_metric_api_model_with_timestamp() { #[test] fn generates_metric_api_model_incremental_counter() { - let stamp = DateTime::::from(SystemTime::now()); + let stamp = Utc::now(); let m = Metric::new( "my_metric", MetricKind::Incremental, @@ -279,13 +279,13 @@ fn generates_metric_api_model_incremental_counter() { MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model"); assert_eq!( - to_value(&model).unwrap(), + to_value(model).unwrap(), json!([{ "metrics": [{ "name": "my_metric", "value": 100.0, "interval.ms": 1000, - "timestamp": stamp.timestamp(), + "timestamp": stamp.timestamp_millis(), "type": "count", }] }])