Skip to content

Commit

Permalink
Add extra logging and metrics to allow users to determine if what exp…
Browse files Browse the repository at this point in the history
…orters are falling behind.

The message now contains the provider: `OpenTelemetry trace error occurred: cannot send message to batch processor '<provider>-tracing' as the channel is full`

In addition, a new metric has been added to `apollo.router.telemetry.batch_processor.errors`. Which allows users to determine which batch processor needs modifying and an indicator of how much.
  • Loading branch information
bryn committed Jan 18, 2025
1 parent 4fc63d5 commit 2e90737
Show file tree
Hide file tree
Showing 23 changed files with 367 additions and 61 deletions.
13 changes: 13 additions & 0 deletions .changesets/feat_bryn_named_runtime_channel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
### Improved BatchProcessor observability ([Issue #6558](https://github.com/apollographql/router/issues/6558))

A new metric has been introduced to allow observation of how many spans are being dropped by an telemetry batch processor.

- `apollo.router.telemetry.batch_processor.errors` - The number of errors encountered by exporter batch processors.
- `name`: One of `apollo-tracing`, `datadog-tracing`, `otlp-tracing`, `zipkin-tracing`.
- `error` = One of `channel closed`, `channel full`.

By observing the number of spans dropped it is possible to estimate what batch processor settings will work for you.

In addition, the log message for dropped spans will now indicate which batch processor is affected.

By [@bryncooke](https://github.com/bryncooke) in https://github.com/apollographql/router/pull/6558
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ dependencies = [
"libc",
"libtest-mimic",
"linkme",
"log",
"lru",
"maplit",
"mediatype",
Expand Down
1 change: 1 addition & 0 deletions apollo-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ ahash = "0.8.11"
itoa = "1.0.9"
ryu = "1.0.15"
apollo-environment-detector = "0.1.0"
log = "0.4.22"

[target.'cfg(macos)'.dependencies]
uname = "0.1.1"
Expand Down
9 changes: 5 additions & 4 deletions apollo-router/src/plugins/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1943,24 +1943,25 @@ fn handle_error_internal<T: Into<opentelemetry::global::Error>>(
.or_insert_with(|| now);

if last_logged == now {
// These events are logged with explicitly no parent. This allows them to be detached from traces.
match err {
opentelemetry::global::Error::Trace(err) => {
::tracing::error!("OpenTelemetry trace error occurred: {}", err)
}
opentelemetry::global::Error::Metric(err) => {
if let MetricsError::Other(msg) = &err {
if msg.contains("Warning") {
::tracing::warn!("OpenTelemetry metric warning occurred: {}", msg);
::tracing::warn!(parent: None, "OpenTelemetry metric warning occurred: {}", msg);
return;
}
}
::tracing::error!("OpenTelemetry metric error occurred: {}", err);
::tracing::error!(parent: None, "OpenTelemetry metric error occurred: {}", err);
}
opentelemetry::global::Error::Other(err) => {
::tracing::error!("OpenTelemetry error occurred: {}", err)
::tracing::error!(parent: None, "OpenTelemetry error occurred: {}", err)
}
other => {
::tracing::error!("OpenTelemetry error occurred: {:?}", other)
::tracing::error!(parent: None, "OpenTelemetry error occurred: {:?}", other)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions apollo-router/src/plugins/telemetry/otel/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/// Implementation of the trace::Layer as a source of OpenTelemetry data.
pub(crate) mod layer;
pub(crate) mod named_runtime_channel;
/// Span extension which enables OpenTelemetry context management.
pub(crate) mod span_ext;
/// Protocols for OpenTelemetry Tracers that are compatible with Tracing
Expand Down
181 changes: 181 additions & 0 deletions apollo-router/src/plugins/telemetry/otel/named_runtime_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use std::fmt::Debug;
use std::time::Duration;

use futures::future::BoxFuture;
use opentelemetry_sdk::runtime::Runtime;
use opentelemetry_sdk::runtime::RuntimeChannel;
use opentelemetry_sdk::runtime::Tokio;
use opentelemetry_sdk::runtime::TrySend;
use opentelemetry_sdk::runtime::TrySendError;

/// Wraps an otel tokio runtime to provide a name in the error messages and metrics
#[derive(Debug, Clone)]
pub(crate) struct NamedTokioRuntime {
name: &'static str,
parent: Tokio,
}

impl NamedTokioRuntime {
pub(crate) fn new(name: &'static str) -> Self {
Self {
name,
parent: Tokio,
}
}
}

impl Runtime for NamedTokioRuntime {
type Interval = <Tokio as Runtime>::Interval;
type Delay = <Tokio as Runtime>::Delay;

fn interval(&self, duration: Duration) -> Self::Interval {
self.parent.interval(duration)
}

fn spawn(&self, future: BoxFuture<'static, ()>) {
self.parent.spawn(future)
}

fn delay(&self, duration: Duration) -> Self::Delay {
self.parent.delay(duration)
}
}

impl<T: Debug + Send> RuntimeChannel<T> for NamedTokioRuntime {
type Receiver = <Tokio as RuntimeChannel<T>>::Receiver;
type Sender = NamedSender<T>;

fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) {
let (sender, receiver) = tokio::sync::mpsc::channel(capacity);
(
NamedSender::new(self.name, sender),
tokio_stream::wrappers::ReceiverStream::new(receiver),
)
}
}

#[derive(Debug)]
pub(crate) struct NamedSender<T> {
name: &'static str,
channel_full_message: String,
channel_closed_message: String,
sender: tokio::sync::mpsc::Sender<T>,
}

impl<T: Send> NamedSender<T> {
fn new(name: &'static str, sender: tokio::sync::mpsc::Sender<T>) -> NamedSender<T> {
NamedSender {
name,
channel_full_message: format!(
"cannot send message to batch processor '{}' as the channel is full",
name
),
channel_closed_message: format!(
"cannot send message to batch processor '{}' as the channel is closed",
name
),
sender,
}
}
}

impl<T: Send> TrySend for NamedSender<T> {
type Message = T;

fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> {
// Convert the error into something that has a name
self.sender.try_send(item).map_err(|err| {
let error = match &err {
tokio::sync::mpsc::error::TrySendError::Full(_) => "channel full",
tokio::sync::mpsc::error::TrySendError::Closed(_) => "channel closed",
};
u64_counter!(
"apollo.router.telemetry.batch_processor.errors",
"Errors when sending to a batch processor",
1,
"name" = self.name,
"error" = error
);

match err {
tokio::sync::mpsc::error::TrySendError::Full(_) => {
TrySendError::Other(self.channel_full_message.as_str().into())
}
tokio::sync::mpsc::error::TrySendError::Closed(_) => {
TrySendError::Other(self.channel_closed_message.as_str().into())
}
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metrics::FutureMetricsExt;

#[tokio::test]
async fn test_channel_full_error_metrics() {
async {
let runtime = NamedTokioRuntime::new("test_processor");
let (sender, mut _receiver) = runtime.batch_message_channel(1);

// Fill the channel
sender.try_send("first").expect("should send first message");

// This should fail and emit metrics
let result = sender.try_send("second");
assert!(result.is_err());

assert_counter!(
"apollo.router.telemetry.batch_processor.errors",
1,
"name" = "test_processor",
"error" = "channel full"
);
}
.with_metrics()
.await;
}

#[tokio::test]
async fn test_channel_closed_error_metrics() {
async {
let runtime = NamedTokioRuntime::new("test_processor");
let (sender, receiver) = runtime.batch_message_channel(1);

// Drop receiver to close channel
drop(receiver);

let result = sender.try_send("message");
assert!(result.is_err());

assert_counter!(
"apollo.router.telemetry.batch_processor.errors",
1,
"name" = "test_processor",
"error" = "channel closed"
);
}
.with_metrics()
.await;
}

#[tokio::test]
async fn test_successful_message_send() {
async {
let runtime = NamedTokioRuntime::new("test_processor");
let (sender, _receiver) = runtime.batch_message_channel(1);

let result = sender.try_send("message");
assert!(result.is_ok());

// No metrics should be emitted for success case
let metrics = crate::metrics::collect_metrics();
assert!(metrics
.find("apollo.router.telemetry.batch_processor.errors")
.is_none());
}
.with_metrics()
.await;
}
}
3 changes: 2 additions & 1 deletion apollo-router/src/plugins/telemetry/tracing/apollo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::plugins::telemetry::apollo::Config;
use crate::plugins::telemetry::apollo_exporter::proto::reports::Trace;
use crate::plugins::telemetry::config;
use crate::plugins::telemetry::config_new::spans::Spans;
use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime;
use crate::plugins::telemetry::span_factory::SpanMode;
use crate::plugins::telemetry::tracing::apollo_telemetry;
use crate::plugins::telemetry::tracing::TracingConfigurator;
Expand Down Expand Up @@ -48,7 +49,7 @@ impl TracingConfigurator for Config {
.metrics_reference_mode(self.metrics_reference_mode)
.build()?;
Ok(builder.with_span_processor(
BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio)
BatchSpanProcessor::builder(exporter, NamedTokioRuntime::new("apollo-tracing"))
.with_batch_config(self.batch_processor.clone().into())
.build(),
))
Expand Down
3 changes: 2 additions & 1 deletion apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::plugins::telemetry::consts::SUBGRAPH_REQUEST_SPAN_NAME;
use crate::plugins::telemetry::consts::SUBGRAPH_SPAN_NAME;
use crate::plugins::telemetry::consts::SUPERGRAPH_SPAN_NAME;
use crate::plugins::telemetry::endpoint::UriEndpoint;
use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime;
use crate::plugins::telemetry::tracing::datadog_exporter;
use crate::plugins::telemetry::tracing::datadog_exporter::DatadogTraceState;
use crate::plugins::telemetry::tracing::BatchProcessorConfig;
Expand Down Expand Up @@ -219,7 +220,7 @@ impl TracingConfigurator for Config {
delegate: exporter,
span_metrics,
},
opentelemetry::runtime::Tokio,
NamedTokioRuntime::new("datadog-tracing"),
)
.with_batch_config(self.batch_processor.clone().into())
.build()
Expand Down
14 changes: 9 additions & 5 deletions apollo-router/src/plugins/telemetry/tracing/jaeger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::plugins::telemetry::config::TracingCommon;
use crate::plugins::telemetry::config_new::spans::Spans;
use crate::plugins::telemetry::endpoint::SocketEndpoint;
use crate::plugins::telemetry::endpoint::UriEndpoint;
use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime;
use crate::plugins::telemetry::tracing::BatchProcessorConfig;
use crate::plugins::telemetry::tracing::SpanProcessorExt;
use crate::plugins::telemetry::tracing::TracingConfigurator;
Expand Down Expand Up @@ -105,7 +106,7 @@ impl TracingConfigurator for Config {
.with(&agent.endpoint.to_socket(), |b, s| b.with_endpoint(s))
.build_async_agent_exporter(opentelemetry::runtime::Tokio)?;
Ok(builder.with_span_processor(
BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio)
BatchSpanProcessor::builder(exporter, NamedTokioRuntime::new("jaeger-agent"))
.with_batch_config(batch_processor.clone().into())
.build()
.filtered(),
Expand Down Expand Up @@ -137,10 +138,13 @@ impl TracingConfigurator for Config {
.with_batch_processor_config(batch_processor.clone().into())
.build_collector_exporter::<runtime::Tokio>()?;
Ok(builder.with_span_processor(
BatchSpanProcessor::builder(exporter, runtime::Tokio)
.with_batch_config(batch_processor.clone().into())
.build()
.filtered(),
BatchSpanProcessor::builder(
exporter,
NamedTokioRuntime::new("jaeger-collector"),
)
.with_batch_config(batch_processor.clone().into())
.build()
.filtered(),
))
}
_ => Ok(builder),
Expand Down
3 changes: 2 additions & 1 deletion apollo-router/src/plugins/telemetry/tracing/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tower::BoxError;

use crate::plugins::telemetry::config::TracingCommon;
use crate::plugins::telemetry::config_new::spans::Spans;
use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime;
use crate::plugins::telemetry::otlp::TelemetryDataKind;
use crate::plugins::telemetry::tracing::SpanProcessorExt;
use crate::plugins::telemetry::tracing::TracingConfigurator;
Expand All @@ -26,7 +27,7 @@ impl TracingConfigurator for super::super::otlp::Config {
let exporter: SpanExporterBuilder = self.exporter(TelemetryDataKind::Traces)?;
let batch_span_processor = BatchSpanProcessor::builder(
exporter.build_span_exporter()?,
opentelemetry::runtime::Tokio,
NamedTokioRuntime::new("otlp-tracing"),
)
.with_batch_config(self.batch_processor.clone().into())
.build()
Expand Down
3 changes: 2 additions & 1 deletion apollo-router/src/plugins/telemetry/tracing/zipkin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::plugins::telemetry::config::GenericWith;
use crate::plugins::telemetry::config::TracingCommon;
use crate::plugins::telemetry::config_new::spans::Spans;
use crate::plugins::telemetry::endpoint::UriEndpoint;
use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime;
use crate::plugins::telemetry::tracing::BatchProcessorConfig;
use crate::plugins::telemetry::tracing::SpanProcessorExt;
use crate::plugins::telemetry::tracing::TracingConfigurator;
Expand Down Expand Up @@ -65,7 +66,7 @@ impl TracingConfigurator for Config {
.init_exporter()?;

Ok(builder.with_span_processor(
BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio)
BatchSpanProcessor::builder(exporter, NamedTokioRuntime::new("zipkin-tracing"))
.with_batch_config(self.batch_processor.clone().into())
.build()
.filtered(),
Expand Down
Loading

0 comments on commit 2e90737

Please sign in to comment.