From 06faaab09cafd6b97b342a4a70cf8c29abc52402 Mon Sep 17 00:00:00 2001 From: bryn Date: Thu, 16 Jan 2025 15:45:14 +0000 Subject: [PATCH] Add extra logging and metrics to allow users to determine if what exporters are falling behind. The message now contains the provider: `OpenTelemetry trace error occurred: cannot send message to batch processor '-tracing' as the channel is full` In addition, a new metric has been added to `apollo.router.telemetry.batch_processor.errors`. Which allows users to determine which batch processor needs modifying and an indicator of how much. --- .../feat_bryn_named_runtime_channel.md | 13 ++ Cargo.lock | 1 + apollo-router/Cargo.toml | 1 + .../src/plugins/telemetry/formatters/json.rs | 5 +- .../src/plugins/telemetry/formatters/text.rs | 5 +- apollo-router/src/plugins/telemetry/mod.rs | 11 +- .../src/plugins/telemetry/otel/mod.rs | 1 + .../telemetry/otel/named_runtime_channel.rs | 181 ++++++++++++++++++ .../src/plugins/telemetry/tracing/apollo.rs | 3 +- .../plugins/telemetry/tracing/datadog/mod.rs | 3 +- .../src/plugins/telemetry/tracing/jaeger.rs | 14 +- .../src/plugins/telemetry/tracing/otlp.rs | 3 +- .../src/plugins/telemetry/tracing/zipkin.rs | 3 +- apollo-router/tests/common.rs | 26 ++- .../tests/integration/coprocessor.rs | 2 +- apollo-router/tests/integration/lifecycle.rs | 4 +- .../tests/integration/query_planner.rs | 6 +- apollo-router/tests/integration/supergraph.rs | 2 +- .../otlp_invalid_endpoint.router.yaml | 16 ++ .../tests/integration/telemetry/logging.rs | 68 ++++--- .../tests/integration/telemetry/otlp.rs | 49 +++++ .../integration/telemetry/propagation.rs | 4 +- .../tests/integration/traffic_shaping.rs | 2 +- docs/shared/batch-processor-preamble.mdx | 12 +- .../instrumentation/standard-instruments.mdx | 5 + 25 files changed, 370 insertions(+), 70 deletions(-) create mode 100644 .changesets/feat_bryn_named_runtime_channel.md create mode 100644 apollo-router/src/plugins/telemetry/otel/named_runtime_channel.rs create mode 100644 apollo-router/tests/integration/telemetry/fixtures/otlp_invalid_endpoint.router.yaml diff --git a/.changesets/feat_bryn_named_runtime_channel.md b/.changesets/feat_bryn_named_runtime_channel.md new file mode 100644 index 0000000000..a1cf96d520 --- /dev/null +++ b/.changesets/feat_bryn_named_runtime_channel.md @@ -0,0 +1,13 @@ +### Improved BatchProcessor observability ([Issue #6558](https://github.com/apollographql/router/issues/6558)) + +A new metric has been introduced to allow observation of how many spans are being dropped by an telemetry batch processor. + +- `apollo.router.telemetry.batch_processor.errors` - The number of errors encountered by exporter batch processors. + - `name`: One of `apollo-tracing`, `datadog-tracing`, `otlp-tracing`, `zipkin-tracing`. + - `error` = One of `channel closed`, `channel full`. + +By observing the number of spans dropped it is possible to estimate what batch processor settings will work for you. + +In addition, the log message for dropped spans will now indicate which batch processor is affected. + +By [@bryncooke](https://github.com/bryncooke) in https://github.com/apollographql/router/pull/6558 diff --git a/Cargo.lock b/Cargo.lock index e55d81f838..752439a82e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -324,6 +324,7 @@ dependencies = [ "libc", "libtest-mimic", "linkme", + "log", "lru", "maplit", "mediatype", diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index 5563e7d699..2987bc43a2 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -264,6 +264,7 @@ ahash = "0.8.11" itoa = "1.0.9" ryu = "1.0.15" apollo-environment-detector = "0.1.0" +log = "0.4.22" [target.'cfg(macos)'.dependencies] uname = "0.1.1" diff --git a/apollo-router/src/plugins/telemetry/formatters/json.rs b/apollo-router/src/plugins/telemetry/formatters/json.rs index 7062cf6254..934ee9afa1 100644 --- a/apollo-router/src/plugins/telemetry/formatters/json.rs +++ b/apollo-router/src/plugins/telemetry/formatters/json.rs @@ -224,10 +224,7 @@ where serializer.serialize_entry("level", &meta.level().as_serde())?; } - let current_span = event - .parent() - .and_then(|id| ctx.span(id)) - .or_else(|| ctx.lookup_current()); + let current_span = event.parent().and_then(|id| ctx.span(id)); if let Some(ref span) = current_span { if let Some((trace_id, span_id)) = get_trace_and_span_id(span) { diff --git a/apollo-router/src/plugins/telemetry/formatters/text.rs b/apollo-router/src/plugins/telemetry/formatters/text.rs index e26768146c..1917b2e1dc 100644 --- a/apollo-router/src/plugins/telemetry/formatters/text.rs +++ b/apollo-router/src/plugins/telemetry/formatters/text.rs @@ -319,10 +319,7 @@ where if self.config.display_level { self.format_level(meta.level(), &mut writer)?; } - let current_span = event - .parent() - .and_then(|id| ctx.span(id)) - .or_else(|| ctx.lookup_current()); + let current_span = event.parent().and_then(|id| ctx.span(id)); if let Some(ref span) = current_span { if let Some((trace_id, span_id)) = get_trace_and_span_id(span) { diff --git a/apollo-router/src/plugins/telemetry/mod.rs b/apollo-router/src/plugins/telemetry/mod.rs index 54622d43d2..cf9ad12bae 100644 --- a/apollo-router/src/plugins/telemetry/mod.rs +++ b/apollo-router/src/plugins/telemetry/mod.rs @@ -1943,24 +1943,25 @@ fn handle_error_internal>( .or_insert_with(|| now); if last_logged == now { + // These events are logged with explicitly no parent. This allows them to be detached from traces. match err { opentelemetry::global::Error::Trace(err) => { - ::tracing::error!("OpenTelemetry trace error occurred: {}", err) + ::tracing::error!(parent: None, "OpenTelemetry trace error occurred: {}", err) } opentelemetry::global::Error::Metric(err) => { if let MetricsError::Other(msg) = &err { if msg.contains("Warning") { - ::tracing::warn!("OpenTelemetry metric warning occurred: {}", msg); + ::tracing::warn!(parent: None, "OpenTelemetry metric warning occurred: {}", msg); return; } } - ::tracing::error!("OpenTelemetry metric error occurred: {}", err); + ::tracing::error!(parent: None, "OpenTelemetry metric error occurred: {}", err); } opentelemetry::global::Error::Other(err) => { - ::tracing::error!("OpenTelemetry error occurred: {}", err) + ::tracing::error!(parent: None, "OpenTelemetry error occurred: {}", err) } other => { - ::tracing::error!("OpenTelemetry error occurred: {:?}", other) + ::tracing::error!(parent: None, "OpenTelemetry error occurred: {:?}", other) } } } diff --git a/apollo-router/src/plugins/telemetry/otel/mod.rs b/apollo-router/src/plugins/telemetry/otel/mod.rs index 63a1ae72cb..7ad9cdd4f6 100644 --- a/apollo-router/src/plugins/telemetry/otel/mod.rs +++ b/apollo-router/src/plugins/telemetry/otel/mod.rs @@ -1,5 +1,6 @@ /// Implementation of the trace::Layer as a source of OpenTelemetry data. pub(crate) mod layer; +pub(crate) mod named_runtime_channel; /// Span extension which enables OpenTelemetry context management. pub(crate) mod span_ext; /// Protocols for OpenTelemetry Tracers that are compatible with Tracing diff --git a/apollo-router/src/plugins/telemetry/otel/named_runtime_channel.rs b/apollo-router/src/plugins/telemetry/otel/named_runtime_channel.rs new file mode 100644 index 0000000000..589396e036 --- /dev/null +++ b/apollo-router/src/plugins/telemetry/otel/named_runtime_channel.rs @@ -0,0 +1,181 @@ +use std::fmt::Debug; +use std::time::Duration; + +use futures::future::BoxFuture; +use opentelemetry_sdk::runtime::Runtime; +use opentelemetry_sdk::runtime::RuntimeChannel; +use opentelemetry_sdk::runtime::Tokio; +use opentelemetry_sdk::runtime::TrySend; +use opentelemetry_sdk::runtime::TrySendError; + +/// Wraps an otel tokio runtime to provide a name in the error messages and metrics +#[derive(Debug, Clone)] +pub(crate) struct NamedTokioRuntime { + name: &'static str, + parent: Tokio, +} + +impl NamedTokioRuntime { + pub(crate) fn new(name: &'static str) -> Self { + Self { + name, + parent: Tokio, + } + } +} + +impl Runtime for NamedTokioRuntime { + type Interval = ::Interval; + type Delay = ::Delay; + + fn interval(&self, duration: Duration) -> Self::Interval { + self.parent.interval(duration) + } + + fn spawn(&self, future: BoxFuture<'static, ()>) { + self.parent.spawn(future) + } + + fn delay(&self, duration: Duration) -> Self::Delay { + self.parent.delay(duration) + } +} + +impl RuntimeChannel for NamedTokioRuntime { + type Receiver = >::Receiver; + type Sender = NamedSender; + + fn batch_message_channel(&self, capacity: usize) -> (Self::Sender, Self::Receiver) { + let (sender, receiver) = tokio::sync::mpsc::channel(capacity); + ( + NamedSender::new(self.name, sender), + tokio_stream::wrappers::ReceiverStream::new(receiver), + ) + } +} + +#[derive(Debug)] +pub(crate) struct NamedSender { + name: &'static str, + channel_full_message: String, + channel_closed_message: String, + sender: tokio::sync::mpsc::Sender, +} + +impl NamedSender { + fn new(name: &'static str, sender: tokio::sync::mpsc::Sender) -> NamedSender { + NamedSender { + name, + channel_full_message: format!( + "cannot send message to batch processor '{}' as the channel is full", + name + ), + channel_closed_message: format!( + "cannot send message to batch processor '{}' as the channel is closed", + name + ), + sender, + } + } +} + +impl TrySend for NamedSender { + type Message = T; + + fn try_send(&self, item: Self::Message) -> Result<(), TrySendError> { + // Convert the error into something that has a name + self.sender.try_send(item).map_err(|err| { + let error = match &err { + tokio::sync::mpsc::error::TrySendError::Full(_) => "channel full", + tokio::sync::mpsc::error::TrySendError::Closed(_) => "channel closed", + }; + u64_counter!( + "apollo.router.telemetry.batch_processor.errors", + "Errors when sending to a batch processor", + 1, + "name" = self.name, + "error" = error + ); + + match err { + tokio::sync::mpsc::error::TrySendError::Full(_) => { + TrySendError::Other(self.channel_full_message.as_str().into()) + } + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + TrySendError::Other(self.channel_closed_message.as_str().into()) + } + } + }) + } +} +#[cfg(test)] +mod tests { + use super::*; + use crate::metrics::FutureMetricsExt; + + #[tokio::test] + async fn test_channel_full_error_metrics() { + async { + let runtime = NamedTokioRuntime::new("test_processor"); + let (sender, mut _receiver) = runtime.batch_message_channel(1); + + // Fill the channel + sender.try_send("first").expect("should send first message"); + + // This should fail and emit metrics + let result = sender.try_send("second"); + assert!(result.is_err()); + + assert_counter!( + "apollo.router.telemetry.batch_processor.errors", + 1, + "name" = "test_processor", + "error" = "channel full" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_channel_closed_error_metrics() { + async { + let runtime = NamedTokioRuntime::new("test_processor"); + let (sender, receiver) = runtime.batch_message_channel(1); + + // Drop receiver to close channel + drop(receiver); + + let result = sender.try_send("message"); + assert!(result.is_err()); + + assert_counter!( + "apollo.router.telemetry.batch_processor.errors", + 1, + "name" = "test_processor", + "error" = "channel closed" + ); + } + .with_metrics() + .await; + } + + #[tokio::test] + async fn test_successful_message_send() { + async { + let runtime = NamedTokioRuntime::new("test_processor"); + let (sender, _receiver) = runtime.batch_message_channel(1); + + let result = sender.try_send("message"); + assert!(result.is_ok()); + + // No metrics should be emitted for success case + let metrics = crate::metrics::collect_metrics(); + assert!(metrics + .find("apollo.router.telemetry.batch_processor.errors") + .is_none()); + } + .with_metrics() + .await; + } +} diff --git a/apollo-router/src/plugins/telemetry/tracing/apollo.rs b/apollo-router/src/plugins/telemetry/tracing/apollo.rs index b4f6589e37..5cd2454363 100644 --- a/apollo-router/src/plugins/telemetry/tracing/apollo.rs +++ b/apollo-router/src/plugins/telemetry/tracing/apollo.rs @@ -8,6 +8,7 @@ use crate::plugins::telemetry::apollo::Config; use crate::plugins::telemetry::apollo_exporter::proto::reports::Trace; use crate::plugins::telemetry::config; use crate::plugins::telemetry::config_new::spans::Spans; +use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; use crate::plugins::telemetry::span_factory::SpanMode; use crate::plugins::telemetry::tracing::apollo_telemetry; use crate::plugins::telemetry::tracing::TracingConfigurator; @@ -48,7 +49,7 @@ impl TracingConfigurator for Config { .metrics_reference_mode(self.metrics_reference_mode) .build()?; Ok(builder.with_span_processor( - BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio) + BatchSpanProcessor::builder(exporter, NamedTokioRuntime::new("apollo-tracing")) .with_batch_config(self.batch_processor.clone().into()) .build(), )) diff --git a/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs b/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs index fd1b4447ae..48469d79f4 100644 --- a/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs +++ b/apollo-router/src/plugins/telemetry/tracing/datadog/mod.rs @@ -42,6 +42,7 @@ use crate::plugins::telemetry::consts::SUBGRAPH_REQUEST_SPAN_NAME; use crate::plugins::telemetry::consts::SUBGRAPH_SPAN_NAME; use crate::plugins::telemetry::consts::SUPERGRAPH_SPAN_NAME; use crate::plugins::telemetry::endpoint::UriEndpoint; +use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; use crate::plugins::telemetry::tracing::datadog_exporter; use crate::plugins::telemetry::tracing::datadog_exporter::DatadogTraceState; use crate::plugins::telemetry::tracing::BatchProcessorConfig; @@ -219,7 +220,7 @@ impl TracingConfigurator for Config { delegate: exporter, span_metrics, }, - opentelemetry::runtime::Tokio, + NamedTokioRuntime::new("datadog-tracing"), ) .with_batch_config(self.batch_processor.clone().into()) .build() diff --git a/apollo-router/src/plugins/telemetry/tracing/jaeger.rs b/apollo-router/src/plugins/telemetry/tracing/jaeger.rs index bce59978ae..763f619555 100644 --- a/apollo-router/src/plugins/telemetry/tracing/jaeger.rs +++ b/apollo-router/src/plugins/telemetry/tracing/jaeger.rs @@ -15,6 +15,7 @@ use crate::plugins::telemetry::config::TracingCommon; use crate::plugins::telemetry::config_new::spans::Spans; use crate::plugins::telemetry::endpoint::SocketEndpoint; use crate::plugins::telemetry::endpoint::UriEndpoint; +use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; use crate::plugins::telemetry::tracing::BatchProcessorConfig; use crate::plugins::telemetry::tracing::SpanProcessorExt; use crate::plugins::telemetry::tracing::TracingConfigurator; @@ -105,7 +106,7 @@ impl TracingConfigurator for Config { .with(&agent.endpoint.to_socket(), |b, s| b.with_endpoint(s)) .build_async_agent_exporter(opentelemetry::runtime::Tokio)?; Ok(builder.with_span_processor( - BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio) + BatchSpanProcessor::builder(exporter, NamedTokioRuntime::new("jaeger-agent")) .with_batch_config(batch_processor.clone().into()) .build() .filtered(), @@ -137,10 +138,13 @@ impl TracingConfigurator for Config { .with_batch_processor_config(batch_processor.clone().into()) .build_collector_exporter::()?; Ok(builder.with_span_processor( - BatchSpanProcessor::builder(exporter, runtime::Tokio) - .with_batch_config(batch_processor.clone().into()) - .build() - .filtered(), + BatchSpanProcessor::builder( + exporter, + NamedTokioRuntime::new("jaeger-collector"), + ) + .with_batch_config(batch_processor.clone().into()) + .build() + .filtered(), )) } _ => Ok(builder), diff --git a/apollo-router/src/plugins/telemetry/tracing/otlp.rs b/apollo-router/src/plugins/telemetry/tracing/otlp.rs index 9a61075e5f..15f341ffd6 100644 --- a/apollo-router/src/plugins/telemetry/tracing/otlp.rs +++ b/apollo-router/src/plugins/telemetry/tracing/otlp.rs @@ -8,6 +8,7 @@ use tower::BoxError; use crate::plugins::telemetry::config::TracingCommon; use crate::plugins::telemetry::config_new::spans::Spans; +use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; use crate::plugins::telemetry::otlp::TelemetryDataKind; use crate::plugins::telemetry::tracing::SpanProcessorExt; use crate::plugins::telemetry::tracing::TracingConfigurator; @@ -26,7 +27,7 @@ impl TracingConfigurator for super::super::otlp::Config { let exporter: SpanExporterBuilder = self.exporter(TelemetryDataKind::Traces)?; let batch_span_processor = BatchSpanProcessor::builder( exporter.build_span_exporter()?, - opentelemetry::runtime::Tokio, + NamedTokioRuntime::new("otlp-tracing"), ) .with_batch_config(self.batch_processor.clone().into()) .build() diff --git a/apollo-router/src/plugins/telemetry/tracing/zipkin.rs b/apollo-router/src/plugins/telemetry/tracing/zipkin.rs index 82160c3bef..f12c2b9a45 100644 --- a/apollo-router/src/plugins/telemetry/tracing/zipkin.rs +++ b/apollo-router/src/plugins/telemetry/tracing/zipkin.rs @@ -14,6 +14,7 @@ use crate::plugins::telemetry::config::GenericWith; use crate::plugins::telemetry::config::TracingCommon; use crate::plugins::telemetry::config_new::spans::Spans; use crate::plugins::telemetry::endpoint::UriEndpoint; +use crate::plugins::telemetry::otel::named_runtime_channel::NamedTokioRuntime; use crate::plugins::telemetry::tracing::BatchProcessorConfig; use crate::plugins::telemetry::tracing::SpanProcessorExt; use crate::plugins::telemetry::tracing::TracingConfigurator; @@ -65,7 +66,7 @@ impl TracingConfigurator for Config { .init_exporter()?; Ok(builder.with_span_processor( - BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio) + BatchSpanProcessor::builder(exporter, NamedTokioRuntime::new("zipkin-tracing")) .with_batch_config(self.batch_processor.clone().into()) .build() .filtered(), diff --git a/apollo-router/tests/common.rs b/apollo-router/tests/common.rs index e70cf5f0de..e2f51875d8 100644 --- a/apollo-router/tests/common.rs +++ b/apollo-router/tests/common.rs @@ -159,6 +159,7 @@ pub struct IntegrationTest { redis_namespace: String, log: String, subgraph_context: Arc>>, + logs: Vec, } impl IntegrationTest { @@ -483,6 +484,7 @@ impl IntegrationTest { redis_namespace, log: log.unwrap_or_else(|| "error,apollo_router=info".to_owned()), subgraph_context, + logs: vec![], } } @@ -591,12 +593,12 @@ impl IntegrationTest { #[allow(dead_code)] pub async fn assert_started(&mut self) { - self.assert_log_contains("GraphQL endpoint exposed").await; + self.wait_for_log_message("GraphQL endpoint exposed").await; } #[allow(dead_code)] pub async fn assert_not_started(&mut self) { - self.assert_log_contains("no valid configuration").await; + self.wait_for_log_message("no valid configuration").await; } #[allow(dead_code)] @@ -853,25 +855,26 @@ impl IntegrationTest { #[allow(dead_code)] pub async fn assert_reloaded(&mut self) { - self.assert_log_contains("reload complete").await; + self.wait_for_log_message("reload complete").await; } #[allow(dead_code)] pub async fn assert_no_reload_necessary(&mut self) { - self.assert_log_contains("no reload necessary").await; + self.wait_for_log_message("no reload necessary").await; } #[allow(dead_code)] pub async fn assert_not_reloaded(&mut self) { - self.assert_log_contains("continuing with previous configuration") + self.wait_for_log_message("continuing with previous configuration") .await; } #[allow(dead_code)] - pub async fn assert_log_contains(&mut self, msg: &str) { + pub async fn wait_for_log_message(&mut self, msg: &str) { let now = Instant::now(); while now.elapsed() < Duration::from_secs(10) { if let Ok(line) = self.stdio_rx.try_recv() { + self.logs.push(line.to_string()); if line.contains(msg) { return; } @@ -882,6 +885,17 @@ impl IntegrationTest { panic!("'{msg}' not detected in logs"); } + #[allow(dead_code)] + pub async fn assert_log_contained(&self, msg: &str) { + for line in &self.logs { + if line.contains(msg) { + return; + } + } + + panic!("'{msg}' not detected in logs"); + } + #[allow(dead_code)] pub async fn assert_log_not_contains(&mut self, msg: &str) { let now = Instant::now(); diff --git a/apollo-router/tests/integration/coprocessor.rs b/apollo-router/tests/integration/coprocessor.rs index d82d15ca7c..791eb2141a 100644 --- a/apollo-router/tests/integration/coprocessor.rs +++ b/apollo-router/tests/integration/coprocessor.rs @@ -27,7 +27,7 @@ async fn test_error_not_propagated_to_client() -> Result<(), BoxError> { let (_trace_id, response) = router.execute_default_query().await; assert_eq!(response.status(), 500); assert_yaml_snapshot!(response.text().await?); - router.assert_log_contains("INTERNAL_SERVER_ERROR").await; + router.wait_for_log_message("INTERNAL_SERVER_ERROR").await; router.graceful_shutdown().await; Ok(()) } diff --git a/apollo-router/tests/integration/lifecycle.rs b/apollo-router/tests/integration/lifecycle.rs index 71af2dbcf8..cb5077f984 100644 --- a/apollo-router/tests/integration/lifecycle.rs +++ b/apollo-router/tests/integration/lifecycle.rs @@ -244,7 +244,9 @@ async fn test_experimental_notice() { router.start().await; router.assert_started().await; router - .assert_log_contains("You're using some \\\"experimental\\\" features of the Apollo Router") + .wait_for_log_message( + "You're using some \\\"experimental\\\" features of the Apollo Router", + ) .await; router.graceful_shutdown().await; } diff --git a/apollo-router/tests/integration/query_planner.rs b/apollo-router/tests/integration/query_planner.rs index 179ca1df9a..99af63820b 100644 --- a/apollo-router/tests/integration/query_planner.rs +++ b/apollo-router/tests/integration/query_planner.rs @@ -16,7 +16,7 @@ async fn fed1_schema_with_new_qp() { .await; router.start().await; router - .assert_log_contains( + .wait_for_log_message( "could not create router: \ failed to initialize the query planner: \ Supergraphs composed with federation version 1 are not supported.", @@ -69,7 +69,7 @@ async fn invalid_schema_with_new_qp_fails_startup() { .await; router.start().await; router - .assert_log_contains( + .wait_for_log_message( "could not create router: \ Federation error: Invalid supergraph: must be a core schema", ) @@ -97,7 +97,7 @@ async fn valid_schema_with_new_qp_change_to_broken_schema_keeps_old_config() { .update_schema(&PathBuf::from("tests/fixtures/broken-supergraph.graphql")) .await; router - .assert_log_contains("error while reloading, continuing with previous configuration") + .wait_for_log_message("error while reloading, continuing with previous configuration") .await; router.execute_default_query().await; router.graceful_shutdown().await; diff --git a/apollo-router/tests/integration/supergraph.rs b/apollo-router/tests/integration/supergraph.rs index 07b4c81089..7c652ab195 100644 --- a/apollo-router/tests/integration/supergraph.rs +++ b/apollo-router/tests/integration/supergraph.rs @@ -21,7 +21,7 @@ async fn test_supergraph_error_http1_max_headers_config() -> Result<(), BoxError .await; router.start().await; - router.assert_log_contains("'limits.http1_max_request_headers' requires 'hyper_header_limits' feature: enable 'hyper_header_limits' feature in order to use 'limits.http1_max_request_headers'").await; + router.wait_for_log_message("'limits.http1_max_request_headers' requires 'hyper_header_limits' feature: enable 'hyper_header_limits' feature in order to use 'limits.http1_max_request_headers'").await; router.assert_not_started().await; Ok(()) } diff --git a/apollo-router/tests/integration/telemetry/fixtures/otlp_invalid_endpoint.router.yaml b/apollo-router/tests/integration/telemetry/fixtures/otlp_invalid_endpoint.router.yaml new file mode 100644 index 0000000000..0358e9a560 --- /dev/null +++ b/apollo-router/tests/integration/telemetry/fixtures/otlp_invalid_endpoint.router.yaml @@ -0,0 +1,16 @@ +telemetry: + exporters: + tracing: + common: + service_name: router + otlp: + enabled: true + endpoint: + batch_processor: + scheduled_delay: 1s + max_queue_size: 1 + max_concurrent_exports: 1 + max_export_batch_size: 1 + metrics: + prometheus: + enabled: true diff --git a/apollo-router/tests/integration/telemetry/logging.rs b/apollo-router/tests/integration/telemetry/logging.rs index 59dc1c7ccd..22d27b089b 100644 --- a/apollo-router/tests/integration/telemetry/logging.rs +++ b/apollo-router/tests/integration/telemetry/logging.rs @@ -23,16 +23,16 @@ async fn test_json() -> Result<(), BoxError> { router.assert_started().await; router.execute_default_query().await; - router.assert_log_contains("trace_id").await; + router.wait_for_log_message("trace_id").await; router.execute_default_query().await; - router.assert_log_contains("span_id").await; + router.wait_for_log_message("span_id").await; router.execute_default_query().await; - router.assert_log_contains(r#""static_one":"test""#).await; + router.wait_for_log_message(r#""static_one":"test""#).await; #[cfg(unix)] { router.execute_default_query().await; router - .assert_log_contains( + .wait_for_log_message( r#""schema.id":"dd8960ccefda82ca58e8ac0bc266459fd49ee8215fd6b3cc72e7bc3d7f3464b9""#, ) .await; @@ -40,10 +40,12 @@ async fn test_json() -> Result<(), BoxError> { router.execute_default_query().await; router - .assert_log_contains(r#""on_supergraph_response_event":"on_supergraph_event""#) + .wait_for_log_message(r#""on_supergraph_response_event":"on_supergraph_event""#) .await; router.execute_default_query().await; - router.assert_log_contains(r#""response_status":200"#).await; + router + .wait_for_log_message(r#""response_status":200"#) + .await; router.graceful_shutdown().await; Ok(()) @@ -66,20 +68,22 @@ async fn test_json_promote_span_attributes() -> Result<(), BoxError> { router.assert_started().await; router.execute_default_query().await; - router.assert_log_contains("trace_id").await; + router.wait_for_log_message("trace_id").await; router.execute_query(Query::default()).await; - router.assert_log_contains("span_id").await; + router.wait_for_log_message("span_id").await; router.execute_default_query().await; - router.assert_log_contains(r#""static_one":"test""#).await; + router.wait_for_log_message(r#""static_one":"test""#).await; router.execute_default_query().await; - router.assert_log_contains(r#""response_status":200"#).await; + router + .wait_for_log_message(r#""response_status":200"#) + .await; router.execute_default_query().await; - router.assert_log_contains(r#""too_big":true"#).await; + router.wait_for_log_message(r#""too_big":true"#).await; router.execute_default_query().await; - router.assert_log_contains(r#""too_big":"nope""#).await; + router.wait_for_log_message(r#""too_big":"nope""#).await; router.execute_default_query().await; router - .assert_log_contains(r#""graphql.document":"query ExampleQuery {topProducts{name}}""#) + .wait_for_log_message(r#""graphql.document":"query ExampleQuery {topProducts{name}}""#) .await; router.execute_default_query().await; router.assert_log_not_contains(r#""should_not_log""#).await; @@ -106,13 +110,13 @@ async fn test_json_uuid_format() -> Result<(), BoxError> { router.assert_started().await; router.execute_default_query().await; - router.assert_log_contains("trace_id").await; + router.wait_for_log_message("trace_id").await; let (trace_id, _) = router.execute_default_query().await; router - .assert_log_contains(&format!("{}", Uuid::from_bytes(trace_id.to_bytes()))) + .wait_for_log_message(&format!("{}", Uuid::from_bytes(trace_id.to_bytes()))) .await; router.execute_default_query().await; - router.assert_log_contains("span_id").await; + router.wait_for_log_message("span_id").await; router.graceful_shutdown().await; Ok(()) @@ -135,13 +139,13 @@ async fn test_text_uuid_format() -> Result<(), BoxError> { router.assert_started().await; router.execute_default_query().await; - router.assert_log_contains("trace_id").await; + router.wait_for_log_message("trace_id").await; let (trace_id, _) = router.execute_default_query().await; router - .assert_log_contains(&format!("{}", Uuid::from_bytes(trace_id.to_bytes()))) + .wait_for_log_message(&format!("{}", Uuid::from_bytes(trace_id.to_bytes()))) .await; router.execute_default_query().await; - router.assert_log_contains("span_id").await; + router.wait_for_log_message("span_id").await; router.graceful_shutdown().await; Ok(()) @@ -163,17 +167,19 @@ async fn test_json_sampler_off() -> Result<(), BoxError> { router.assert_started().await; router.execute_default_query().await; - router.assert_log_contains("trace_id").await; + router.wait_for_log_message("trace_id").await; router.execute_default_query().await; - router.assert_log_contains("span_id").await; + router.wait_for_log_message("span_id").await; router.execute_default_query().await; - router.assert_log_contains(r#""static_one":"test""#).await; + router.wait_for_log_message(r#""static_one":"test""#).await; router.execute_default_query().await; router - .assert_log_contains(r#""on_supergraph_response_event":"on_supergraph_event""#) + .wait_for_log_message(r#""on_supergraph_response_event":"on_supergraph_event""#) .await; router.execute_default_query().await; - router.assert_log_contains(r#""response_status":200"#).await; + router + .wait_for_log_message(r#""response_status":200"#) + .await; router.graceful_shutdown().await; Ok(()) @@ -197,15 +203,15 @@ async fn test_text() -> Result<(), BoxError> { router.execute_query(Query::default()).await; router.execute_query(Query::default()).await; - router.assert_log_contains("trace_id").await; + router.wait_for_log_message("trace_id").await; router.execute_query(Query::default()).await; - router.assert_log_contains("span_id").await; + router.wait_for_log_message("span_id").await; router - .assert_log_contains(r#"on_supergraph_response_event=on_supergraph_event"#) + .wait_for_log_message(r#"on_supergraph_response_event=on_supergraph_event"#) .await; router.execute_query(Query::default()).await; router.execute_query(Query::default()).await; - router.assert_log_contains("response_status=200").await; + router.wait_for_log_message("response_status=200").await; router.graceful_shutdown().await; Ok(()) } @@ -227,11 +233,11 @@ async fn test_text_sampler_off() -> Result<(), BoxError> { router.assert_started().await; router.execute_default_query().await; router.execute_default_query().await; - router.assert_log_contains("trace_id").await; + router.wait_for_log_message("trace_id").await; router.execute_default_query().await; - router.assert_log_contains("span_id").await; + router.wait_for_log_message("span_id").await; router.execute_default_query().await; - router.assert_log_contains("response_status=200").await; + router.wait_for_log_message("response_status=200").await; router.graceful_shutdown().await; Ok(()) } diff --git a/apollo-router/tests/integration/telemetry/otlp.rs b/apollo-router/tests/integration/telemetry/otlp.rs index 9aab563ee1..74821b1832 100644 --- a/apollo-router/tests/integration/telemetry/otlp.rs +++ b/apollo-router/tests/integration/telemetry/otlp.rs @@ -2,6 +2,7 @@ extern crate core; use std::collections::HashSet; use std::ops::Deref; +use std::time::Duration; use anyhow::anyhow; use opentelemetry_api::trace::TraceId; @@ -26,6 +27,36 @@ use crate::integration::telemetry::TraceSpec; use crate::integration::IntegrationTest; use crate::integration::ValueExt; +#[tokio::test(flavor = "multi_thread")] +async fn test_trace_error() -> Result<(), BoxError> { + if !graph_os_enabled() { + return Ok(()); + } + if !graph_os_enabled() { + panic!("Error: test skipped because GraphOS is not enabled"); + } + let mock_server = mock_otlp_server_delayed().await; + let config = include_str!("fixtures/otlp_invalid_endpoint.router.yaml") + .replace("", &mock_server.uri()); + + let mut router = IntegrationTest::builder() + .telemetry(Telemetry::Otlp { + endpoint: Some(format!("{}/v1/traces", mock_server.uri())), + }) + .config(config) + .build() + .await; + + router.start().await; + router.assert_started().await; + router.assert_log_contained("OpenTelemetry trace error occurred: cannot send message to batch processor 'otlp-tracing' as the channel is full").await; + router.assert_metrics_contains(r#"apollo_router_telemetry_batch_processor_errors_total{error="channel full",name="otlp-tracing",otel_scope_name="apollo/router"}"#, None).await; + router.graceful_shutdown().await; + + drop(mock_server); + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn test_basic() -> Result<(), BoxError> { if !graph_os_enabled() { @@ -844,6 +875,24 @@ impl Verifier for OtlpTraceSpec<'_> { } } +async fn mock_otlp_server_delayed() -> MockServer { + let mock_server = wiremock::MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/traces")) + .respond_with( + ResponseTemplate::new(200) + .set_delay(Duration::from_secs(1)) + .set_body_raw( + ExportTraceServiceResponse::default().encode_to_vec(), + "application/x-protobuf", + ), + ) + .mount(&mock_server) + .await; + + mock_server +} + async fn mock_otlp_server + Clone>(expected_requests: T) -> MockServer { let mock_server = wiremock::MockServer::start().await; Mock::given(method("POST")) diff --git a/apollo-router/tests/integration/telemetry/propagation.rs b/apollo-router/tests/integration/telemetry/propagation.rs index 9505efa558..4c4affd3eb 100644 --- a/apollo-router/tests/integration/telemetry/propagation.rs +++ b/apollo-router/tests/integration/telemetry/propagation.rs @@ -27,12 +27,12 @@ async fn test_trace_id_via_header() -> Result<(), BoxError> { router.assert_started().await; make_call(&mut router, trace_id).await; router - .assert_log_contains(&format!("trace_id: {}", trace_id)) + .wait_for_log_message(&format!("trace_id: {}", trace_id)) .await; make_call(&mut router, trace_id).await; router - .assert_log_contains(&format!("\"id_from_header\": \"{}\"", trace_id)) + .wait_for_log_message(&format!("\"id_from_header\": \"{}\"", trace_id)) .await; router.graceful_shutdown().await; diff --git a/apollo-router/tests/integration/traffic_shaping.rs b/apollo-router/tests/integration/traffic_shaping.rs index 579cb2b2a5..1896f86d8d 100644 --- a/apollo-router/tests/integration/traffic_shaping.rs +++ b/apollo-router/tests/integration/traffic_shaping.rs @@ -113,7 +113,7 @@ async fn test_router_timeout_operation_name_in_tracing() -> Result<(), BoxError> assert!(response.contains("REQUEST_TIMEOUT")); router - .assert_log_contains(r#""otel.name":"query UniqueName""#) + .wait_for_log_message(r#""otel.name":"query UniqueName""#) .await; router.graceful_shutdown().await; diff --git a/docs/shared/batch-processor-preamble.mdx b/docs/shared/batch-processor-preamble.mdx index 978ba15c1f..77b32d252d 100644 --- a/docs/shared/batch-processor-preamble.mdx +++ b/docs/shared/batch-processor-preamble.mdx @@ -2,8 +2,16 @@ All exporters support configuration of a batch span processor with `batch_proces You must tune your `batch_processor` configuration if you see any of the following messages in your logs: -* `OpenTelemetry trace error occurred: cannot send span to the batch span processor because the channel is full` +* `OpenTelemetry trace error occurred: cannot send message to batch processor '-tracing' as the channel is full` * `OpenTelemetry metrics error occurred: cannot send span to the batch span processor because the channel is full` -The exact settings depend on the bandwidth available for you to send data to your application peformance monitor (APM) and the bandwidth configuration of your APM. Expect to tune these settings over time as your application changes. +The exact settings depend on the bandwidth available for you to send data to your application performance monitor (APM) and the bandwidth configuration of your APM. Expect to tune these settings over time as your application changes. + +You can see how many spans are being dropped by enabling metrics export and looking at the: + +- `apollo.router.telemetry.batch_processor.errors` - The number of errors encountered by exporter batch processors. + - `name`: One of `apollo-tracing`, `datadog-tracing`, `otlp-tracing`, `zipkin-tracing`. + - `error` = One of `channel closed`, `channel full`. + +By looking at the rate of batch processor errors you can decide how to tune your batch processor settings. diff --git a/docs/source/reference/router/telemetry/instrumentation/standard-instruments.mdx b/docs/source/reference/router/telemetry/instrumentation/standard-instruments.mdx index ec9fc56327..97d03f7b50 100644 --- a/docs/source/reference/router/telemetry/instrumentation/standard-instruments.mdx +++ b/docs/source/reference/router/telemetry/instrumentation/standard-instruments.mdx @@ -117,6 +117,11 @@ The initial call to Uplink during router startup is not reflected in metrics. - `report.type`: The type of report submitted: "traces" or "metrics" - `report.protocol`: Either "apollo" or "otlp", depending on the experimental_otlp_tracing_sampler configuration. +### Telemetry +- `apollo.router.telemetry.batch_processor.errors` - The number of errors encountered by exporter batch processors. + - `name`: One of `apollo-tracing`, `datadog-tracing`, `otlp-tracing`, `zipkin-tracing`. + - `error` = One of `channel closed`, `channel full`. + ### Deprecated The following metrics have been deprecated and should not be used.