Skip to content

Commit

Permalink
enhancement(new_relic sink): Use millisecond timestamp with metrics
Browse files Browse the repository at this point in the history
This updates the metric data model for the `new_relic` sink to enhance the
resolution with millisecond timestamps.

Internally, it also updates the data model to use a fixed structure instead of a
dynamic map for efficiency.
  • Loading branch information
bruceg committed Sep 18, 2024
1 parent 162d9b5 commit 1d4bdb4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Timestamps on metrics are now sent to the New Relic metrics API with millisecond resolution.
65 changes: 29 additions & 36 deletions src/sinks/new_relic/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use std::{
collections::{BTreeMap, HashMap},
convert::TryFrom,
fmt::Debug,
time::SystemTime,
};

use chrono::{DateTime, Utc};
use chrono::Utc;
use ordered_float::NotNan;
use serde::Serialize;
use vector_lib::internal_event::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL};
Expand All @@ -30,11 +29,23 @@ pub(super) struct MetricsApiModel(pub [MetricDataStore; 1]);

#[derive(Debug, Serialize)]
pub(super) struct MetricDataStore {
pub metrics: Vec<ObjectMap>,
pub metrics: Vec<MetricData>,
}

#[derive(Debug, Serialize)]
pub(super) struct MetricData {
#[serde(rename = "interval.ms", skip_serializing_if = "Option::is_none")]
pub interval_ms: Option<i64>,
pub name: String,
pub r#type: &'static str,
pub value: f64,
pub timestamp: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub attributes: Option<BTreeMap<String, String>>,
}

impl MetricsApiModel {
pub(super) fn new(metrics: Vec<ObjectMap>) -> Self {
pub(super) fn new(metrics: Vec<MetricData>) -> Self {
Self([MetricDataStore { metrics }])
}
}
Expand All @@ -59,23 +70,19 @@ impl TryFrom<Vec<Event>> for MetricsApiModel {
// Generate Value::Object() from BTreeMap<String, String>
let (series, data, _) = metric.into_parts();

let mut metric_data = ObjectMap::new();

// We only handle gauge and counter metrics
// Extract value & type and set type-related attributes
let (value, metric_type) = match (data.value, &data.kind) {
let (value, metric_type, interval_ms) = match (data.value, &data.kind) {
(MetricValue::Counter { value }, MetricKind::Incremental) => {
let Some(interval_ms) = data.time.interval_ms else {
// Incremental counter without an interval is worthless, skip this metric
num_missing_interval += 1;
return None;
};
metric_data
.insert("interval.ms".into(), Value::from(interval_ms.get() as i64));
(value, "count")
(value, "count", Some(interval_ms.get() as i64))
}
(MetricValue::Counter { value }, MetricKind::Absolute) => (value, "gauge"),
(MetricValue::Gauge { value }, _) => (value, "gauge"),
(MetricValue::Counter { value }, MetricKind::Absolute)
| (MetricValue::Gauge { value }, _) => (value, "gauge", None),
_ => {
// Unsupported metric type
num_unsupported_metric_type += 1;
Expand All @@ -84,34 +91,20 @@ impl TryFrom<Vec<Event>> for MetricsApiModel {
};

// Set name, type, value, timestamp, and attributes
metric_data.insert("name".into(), Value::from(series.name.name));
metric_data.insert("type".into(), Value::from(metric_type));
let Some(value) = NotNan::new(value).ok() else {
if value.is_nan() {
num_nan_value += 1;
return None;
};
metric_data.insert("value".into(), Value::from(value));
metric_data.insert(
"timestamp".into(),
Value::from(
data.time
.timestamp
.unwrap_or_else(|| DateTime::<Utc>::from(SystemTime::now()))
.timestamp(),
),
);
if let Some(tags) = series.tags {
metric_data.insert(
"attributes".into(),
Value::from(
tags.iter_single()
.map(|(key, value)| (key.into(), Value::from(value)))
.collect::<BTreeMap<_, _>>(),
),
);
}

Some(metric_data)
let timestamp = data.time.timestamp.unwrap_or_else(Utc::now);
Some(MetricData {
interval_ms,
name: series.name.name,
r#type: metric_type,
value,
timestamp: timestamp.timestamp_millis(),
attributes: series.tags.map(|tags| tags.into_iter_single().collect()),
})
})
.collect();

Expand Down
20 changes: 10 additions & 10 deletions src/sinks/new_relic/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{convert::TryFrom, num::NonZeroU32, time::SystemTime};
use std::{convert::TryFrom, num::NonZeroU32};

use chrono::{DateTime, Utc};
use chrono::Utc;
use futures::{future::ready, stream};
use serde::Deserialize;
use serde_json::{json, to_value};
Expand Down Expand Up @@ -194,7 +194,7 @@ fn generates_log_api_model_with_dotted_fields() {

#[test]
fn generates_log_api_model_with_timestamp() {
let stamp = DateTime::<Utc>::from(SystemTime::now());
let stamp = Utc::now();
let event = Event::Log(LogEvent::from(value!({
"timestamp": stamp,
"tag_key": "tag_value",
Expand Down Expand Up @@ -231,7 +231,7 @@ fn generates_metric_api_model_without_timestamp() {
"metrics": [{
"name": "my_metric",
"value": 100.0,
"timestamp": metrics[0].get("timestamp").unwrap().clone(),
"timestamp": metrics[0].timestamp,
"type": "gauge",
}]
}])
Expand All @@ -240,7 +240,7 @@ fn generates_metric_api_model_without_timestamp() {

#[test]
fn generates_metric_api_model_with_timestamp() {
let stamp = DateTime::<Utc>::from(SystemTime::now());
let stamp = Utc::now();
let m = Metric::new(
"my_metric",
MetricKind::Absolute,
Expand All @@ -252,12 +252,12 @@ fn generates_metric_api_model_with_timestamp() {
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");

assert_eq!(
to_value(&model).unwrap(),
to_value(model).unwrap(),
json!([{
"metrics": [{
"name": "my_metric",
"value": 100.0,
"timestamp": stamp.timestamp(),
"timestamp": stamp.timestamp_millis(),
"type": "gauge",
}]
}])
Expand All @@ -266,7 +266,7 @@ fn generates_metric_api_model_with_timestamp() {

#[test]
fn generates_metric_api_model_incremental_counter() {
let stamp = DateTime::<Utc>::from(SystemTime::now());
let stamp = Utc::now();
let m = Metric::new(
"my_metric",
MetricKind::Incremental,
Expand All @@ -279,13 +279,13 @@ fn generates_metric_api_model_incremental_counter() {
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");

assert_eq!(
to_value(&model).unwrap(),
to_value(model).unwrap(),
json!([{
"metrics": [{
"name": "my_metric",
"value": 100.0,
"interval.ms": 1000,
"timestamp": stamp.timestamp(),
"timestamp": stamp.timestamp_millis(),
"type": "count",
}]
}])
Expand Down

0 comments on commit 1d4bdb4

Please sign in to comment.