From 28ad4b09800919587edc037db539b23c4688f653 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Thu, 19 Dec 2024 02:06:18 -0800 Subject: [PATCH 01/15] initial commit --- opentelemetry-sdk/src/trace/mod.rs | 1 + opentelemetry-sdk/src/trace/span_processor.rs | 199 ++++++++++++++++++ 2 files changed, 200 insertions(+) diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 43445c4a4c..2582e12f14 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -28,6 +28,7 @@ pub use span::Span; pub use span_limit::SpanLimits; pub use span_processor::{ BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder, + BatchSpanProcessorDedicatedThread, BatchSpanProcessorDedicatedThreadBuilder, SimpleSpanProcessor, SpanProcessor, }; pub use tracer::Tracer; diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 5023ca2bc5..53dcb413c4 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -55,6 +55,10 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::{env, fmt, str::FromStr, time::Duration}; +use std::sync::atomic::AtomicBool; +use std::thread; +use std::time::Instant; + /// Delay interval between two consecutive exports. const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; /// Default delay interval between two consecutive exports. @@ -166,6 +170,201 @@ impl SpanProcessor for SimpleSpanProcessor { } } +use futures_executor::block_on; +use std::sync::mpsc::sync_channel; +use std::sync::mpsc::RecvTimeoutError; +use std::sync::mpsc::SyncSender; + +/// Messages exchanged between the main thread and the background thread. +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum BatchMessageDedicatedThread { + ExportSpan(SpanData), + ForceFlush(SyncSender>), + Shutdown(SyncSender>), +} + +/// A batch span processor with a dedicated background thread. +#[derive(Debug)] +pub struct BatchSpanProcessorDedicatedThread { + message_sender: SyncSender, + handle: Mutex>>, + shutdown_timeout: Duration, + is_shutdown: AtomicBool, + dropped_span_count: Arc, +} + +impl BatchSpanProcessorDedicatedThread { + /// Creates a new instance of `BatchSpanProcessorDedicatedThread`. + pub fn new( + mut exporter: E, + max_queue_size: usize, + scheduled_delay: Duration, + shutdown_timeout: Duration, + ) -> Self + where + E: SpanExporter + Send + 'static, + { + let (message_sender, message_receiver) = sync_channel(max_queue_size); + + let handle = thread::Builder::new() + .name("BatchSpanProcessorThread".to_string()) + .spawn(move || { + let mut spans = Vec::new(); + let mut last_export_time = Instant::now(); + + loop { + let timeout = scheduled_delay.saturating_sub(last_export_time.elapsed()); + match message_receiver.recv_timeout(timeout) { + Ok(message) => match message { + BatchMessageDedicatedThread::ExportSpan(span) => { + spans.push(span); + if spans.len() >= max_queue_size + || last_export_time.elapsed() >= scheduled_delay + { + if let Err(err) = block_on(exporter.export(spans.split_off(0))) + { + eprintln!("Export error: {:?}", err); + } + last_export_time = Instant::now(); + } + } + BatchMessageDedicatedThread::ForceFlush(sender) => { + let result = block_on(exporter.export(spans.split_off(0))); + let _ = sender.send(result); + } + BatchMessageDedicatedThread::Shutdown(sender) => { + let result = block_on(exporter.export(spans.split_off(0))); + let _ = sender.send(result); + break; + } + }, + Err(RecvTimeoutError::Timeout) => { + if last_export_time.elapsed() >= scheduled_delay { + if let Err(err) = block_on(exporter.export(spans.split_off(0))) { + eprintln!("Export error: {:?}", err); + } + last_export_time = Instant::now(); + } + } + Err(RecvTimeoutError::Disconnected) => { + eprintln!("Channel disconnected, shutting down processor thread."); + break; + } + } + } + }) + .expect("Failed to spawn thread"); + + Self { + message_sender, + handle: Mutex::new(Some(handle)), + shutdown_timeout, + is_shutdown: AtomicBool::new(false), + dropped_span_count: Arc::new(AtomicBool::new(false)), + } + } + + /// Handles span end. + pub fn on_end(&self, span: SpanData) { + if self.is_shutdown.load(Ordering::Relaxed) { + eprintln!("Processor is shutdown. Dropping span."); + return; + } + if self + .message_sender + .try_send(BatchMessageDedicatedThread::ExportSpan(span)) + .is_err() && !self.dropped_span_count.load(Ordering::Relaxed) { + eprintln!("Queue is full, dropping spans."); + self.dropped_span_count.store(true, Ordering::Relaxed); + } + } + + /// Flushes all pending spans. + pub fn force_flush(&self) -> TraceResult<()> { + if self.is_shutdown.load(Ordering::Relaxed) { + return Err(TraceError::Other("Processor already shutdown".into())); + } + let (sender, receiver) = sync_channel(1); + self.message_sender + .try_send(BatchMessageDedicatedThread::ForceFlush(sender)) + .map_err(|_| TraceError::Other("Failed to send ForceFlush message".into()))?; + + receiver + .recv_timeout(self.shutdown_timeout) + .map_err(|_| TraceError::ExportTimedOut(self.shutdown_timeout))? + } + + /// Shuts down the processor. + pub fn shutdown(&self) -> TraceResult<()> { + if self.is_shutdown.swap(true, Ordering::Relaxed) { + return Err(TraceError::Other("Processor already shutdown".into())); + } + let (sender, receiver) = sync_channel(1); + self.message_sender + .try_send(BatchMessageDedicatedThread::Shutdown(sender)) + .map_err(|_| TraceError::Other("Failed to send Shutdown message".into()))?; + + let result = receiver + .recv_timeout(self.shutdown_timeout) + .map_err(|_| TraceError::ExportTimedOut(self.shutdown_timeout))?; + if let Some(handle) = self.handle.lock().unwrap().take() { + handle.join().expect("Failed to join thread"); + } + result + } +} + +/// Builder for `BatchSpanProcessorDedicatedThread`. +#[derive(Debug, Default)] +pub struct BatchSpanProcessorDedicatedThreadBuilder { + max_queue_size: usize, + scheduled_delay: Duration, + shutdown_timeout: Duration, +} + +impl BatchSpanProcessorDedicatedThreadBuilder { + /// Creates a new builder with default values. + pub fn new() -> Self { + Self { + max_queue_size: 2048, + scheduled_delay: Duration::from_secs(5), + shutdown_timeout: Duration::from_secs(5), + } + } + + /// Sets the maximum queue size for spans. + pub fn with_max_queue_size(mut self, size: usize) -> Self { + self.max_queue_size = size; + self + } + + /// Sets the delay between exports. + pub fn with_scheduled_delay(mut self, delay: Duration) -> Self { + self.scheduled_delay = delay; + self + } + + /// Sets the timeout for shutdown and flush operations. + pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self { + self.shutdown_timeout = timeout; + self + } + + /// Builds the `BatchSpanProcessorDedicatedThread` instance. + pub fn build(self, exporter: E) -> BatchSpanProcessorDedicatedThread + where + E: SpanExporter + Send + 'static, + { + BatchSpanProcessorDedicatedThread::new( + exporter, + self.max_queue_size, + self.scheduled_delay, + self.shutdown_timeout, + ) + } +} + /// A [`SpanProcessor`] that asynchronously buffers finished spans and reports /// them at a preconfigured interval. /// From b15b124b3e1eb47b4a9fef98e6f9187d2ff356f9 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 20 Dec 2024 04:30:43 -0800 Subject: [PATCH 02/15] fix --- opentelemetry-sdk/src/trace/span_processor.rs | 158 +++++++++++++++++- 1 file changed, 151 insertions(+), 7 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 53dcb413c4..718762b4a5 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -191,7 +191,7 @@ pub struct BatchSpanProcessorDedicatedThread { handle: Mutex>>, shutdown_timeout: Duration, is_shutdown: AtomicBool, - dropped_span_count: Arc, + dropped_span_count: Arc, } impl BatchSpanProcessorDedicatedThread { @@ -261,7 +261,7 @@ impl BatchSpanProcessorDedicatedThread { handle: Mutex::new(Some(handle)), shutdown_timeout, is_shutdown: AtomicBool::new(false), - dropped_span_count: Arc::new(AtomicBool::new(false)), + dropped_span_count: Arc::new(AtomicUsize::new(0)), } } @@ -271,13 +271,19 @@ impl BatchSpanProcessorDedicatedThread { eprintln!("Processor is shutdown. Dropping span."); return; } - if self + let result = self .message_sender - .try_send(BatchMessageDedicatedThread::ExportSpan(span)) - .is_err() && !self.dropped_span_count.load(Ordering::Relaxed) { - eprintln!("Queue is full, dropping spans."); - self.dropped_span_count.store(true, Ordering::Relaxed); + .try_send(BatchMessageDedicatedThread::ExportSpan(span)); + + // TODO - Implement throttling to prevent error flooding when the queue is full or closed. + if result.is_err() { + // Increment dropped logs count. The first time we have to drop a log, + // emit a warning. + if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 { + otel_warn!(name: "BatchSpanProcessorDedicatedThread.SpanDroppingStarted", + message = "BatchSpanProcessorDedicatedThread dropped a Span due to queue full/internal errors. No further span will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); } + } } /// Flushes all pending spans. @@ -1285,4 +1291,142 @@ mod tests { let shutdown_res = processor.shutdown(); assert!(shutdown_res.is_ok()); } + + // Helper function to create a default test span + fn create_test_span(name: &str) -> SpanData { + SpanData { + span_context: SpanContext::empty_context(), + parent_span_id: SpanId::INVALID, + span_kind: SpanKind::Internal, + name: name.to_string().into(), + start_time: opentelemetry::time::now(), + end_time: opentelemetry::time::now(), + attributes: Vec::new(), + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + } + } + + use crate::trace::BatchSpanProcessorDedicatedThread; + use futures_util::future::BoxFuture; + use futures_util::FutureExt; + use std::sync::Arc; + use std::sync::Mutex; + + // Mock exporter to test functionality + #[derive(Debug)] + struct MockSpanExporter { + exported_spans: Arc>>, + } + + impl MockSpanExporter { + fn new() -> Self { + Self { + exported_spans: Arc::new(Mutex::new(Vec::new())), + } + } + } + + impl SpanExporter for MockSpanExporter { + fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { + let exported_spans = self.exported_spans.clone(); + async move { + exported_spans.lock().unwrap().extend(batch); + Ok(()) + } + .boxed() + } + + fn shutdown(&mut self) {} + } + + #[test] + fn batchspanprocessor_dedicatedthread_handles_on_end() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let processor = BatchSpanProcessorDedicatedThread::new( + exporter, + 10, // max queue size + Duration::from_secs(5), + Duration::from_secs(2), + ); + + let test_span = create_test_span("test_span"); + processor.on_end(test_span.clone()); + + // Wait for flush interval to ensure the span is processed + std::thread::sleep(Duration::from_secs(6)); + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 1); + assert_eq!(exported_spans[0].name, "test_span"); + } + + #[test] + fn batchspanprocessor_deficatedthread_force_flush() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans + let processor = BatchSpanProcessorDedicatedThread::new( + exporter, + 10, // max queue size + Duration::from_secs(5), + Duration::from_secs(2), + ); + + // Create a test span and send it to the processor + let test_span = create_test_span("force_flush_span"); + processor.on_end(test_span.clone()); + + // Call force_flush to immediately export the spans + let flush_result = processor.force_flush(); + assert!(flush_result.is_ok(), "Force flush failed unexpectedly"); + + // Verify the exported spans in the mock exporter + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!( + exported_spans.len(), + 1, + "Unexpected number of exported spans" + ); + assert_eq!(exported_spans[0].name, "force_flush_span"); + } + + #[test] + fn batchspanprocessor_shutdown() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans + let processor = BatchSpanProcessorDedicatedThread::new( + exporter, + 10, // max queue size + Duration::from_secs(5), + Duration::from_secs(2), + ); + + // Create a test span and send it to the processor + let test_span = create_test_span("shutdown_span"); + processor.on_end(test_span.clone()); + + // Call shutdown to flush and export all pending spans + let shutdown_result = processor.shutdown(); + assert!(shutdown_result.is_ok(), "Shutdown failed unexpectedly"); + + // Verify the exported spans in the mock exporter + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!( + exported_spans.len(), + 1, + "Unexpected number of exported spans" + ); + assert_eq!(exported_spans[0].name, "shutdown_span"); + + // Ensure further calls to shutdown are idempotent + let second_shutdown_result = processor.shutdown(); + assert!( + second_shutdown_result.is_err(), + "Shutdown should fail when called a second time" + ); + } } From 7268568a2af69bd0a96e201490b0a85ec522e6fa Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Fri, 20 Dec 2024 06:55:28 -0800 Subject: [PATCH 03/15] cont.. --- opentelemetry-sdk/src/trace/span_processor.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 718762b4a5..7204f877cd 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -208,7 +208,7 @@ impl BatchSpanProcessorDedicatedThread { let (message_sender, message_receiver) = sync_channel(max_queue_size); let handle = thread::Builder::new() - .name("BatchSpanProcessorThread".to_string()) + .name("BatchSpanProcessorDedicatedThread".to_string()) .spawn(move || { let mut spans = Vec::new(); let mut last_export_time = Instant::now(); @@ -1366,7 +1366,7 @@ mod tests { } #[test] - fn batchspanprocessor_deficatedthread_force_flush() { + fn batchspanprocessor_dedicatedthread_force_flush() { let exporter = MockSpanExporter::new(); let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans let processor = BatchSpanProcessorDedicatedThread::new( @@ -1395,7 +1395,7 @@ mod tests { } #[test] - fn batchspanprocessor_shutdown() { + fn batchspanprocessor_dedicatedthread_shutdown() { let exporter = MockSpanExporter::new(); let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans let processor = BatchSpanProcessorDedicatedThread::new( From f1a3a129b44d8ebca6c971f1b0673a156e5fb183 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 22 Dec 2024 02:46:36 -0800 Subject: [PATCH 04/15] move async batch span processor to separate module --- opentelemetry-sdk/Cargo.toml | 2 + opentelemetry-sdk/src/trace/mod.rs | 9 +- opentelemetry-sdk/src/trace/provider.rs | 2 +- opentelemetry-sdk/src/trace/span_processor.rs | 710 ++---------------- .../span_processor_with_async_runtime.rs | 618 +++++++++++++++ 5 files changed, 697 insertions(+), 644 deletions(-) create mode 100644 opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index ac33f369dc..648f472936 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -56,6 +56,8 @@ internal-logs = ["tracing"] experimental_metrics_periodicreader_with_async_runtime = ["metrics"] spec_unstable_metrics_views = ["metrics"] experimental_logs_batch_log_processor_with_async_runtime = ["logs"] +experimental_trace_batch_span_processor_with_async_runtime = ["trace"] + [[bench]] name = "context" diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 2582e12f14..5f5de58c7a 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -15,6 +15,8 @@ mod sampler; mod span; mod span_limit; mod span_processor; +mod span_processor_with_async_runtime; + mod tracer; pub use config::{config, Config}; @@ -28,9 +30,14 @@ pub use span::Span; pub use span_limit::SpanLimits; pub use span_processor::{ BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder, - BatchSpanProcessorDedicatedThread, BatchSpanProcessorDedicatedThreadBuilder, SimpleSpanProcessor, SpanProcessor, }; + +/// Re-export asynchronous runtime span processor components. +// TODO: make them available without re-export +#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] +pub mod span_processor_with_async_runtime; + pub use tracer::Tracer; #[cfg(feature = "jaeger_remote_sampler")] diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 4820d7e929..d90c649c9c 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -301,7 +301,7 @@ impl Builder { exporter: T, runtime: R, ) -> Self { - let batch = BatchSpanProcessor::builder(exporter, runtime).build(); + let batch = BatchSpanProcessor::builder(exporter).build(); self.with_span_processor(batch) } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 7204f877cd..4e945e4f3c 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -34,18 +34,10 @@ //! [`is_recording`]: opentelemetry::trace::Span::is_recording() //! [`TracerProvider`]: opentelemetry::trace::TracerProvider -use crate::export::trace::{ExportResult, SpanData, SpanExporter}; +use crate::export::trace::{SpanData, SpanExporter}; use crate::resource::Resource; -use crate::runtime::{RuntimeChannel, TrySend}; use crate::trace::Span; -use futures_channel::oneshot; -use futures_util::{ - future::{self, BoxFuture, Either}, - select, - stream::{self, FusedStream, FuturesUnordered}, - StreamExt as _, -}; -use opentelemetry::{otel_debug, otel_error, otel_warn}; +use opentelemetry::{otel_debug, otel_warn}; use opentelemetry::{ trace::{TraceError, TraceResult}, Context, @@ -53,33 +45,33 @@ use opentelemetry::{ use std::cmp::min; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use std::{env, fmt, str::FromStr, time::Duration}; +use std::{env, str::FromStr, time::Duration}; use std::sync::atomic::AtomicBool; use std::thread; use std::time::Instant; /// Delay interval between two consecutive exports. -const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; +pub(crate) const OTEL_BSP_SCHEDULE_DELAY: &str = "OTEL_BSP_SCHEDULE_DELAY"; /// Default delay interval between two consecutive exports. -const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000; +pub(crate) const OTEL_BSP_SCHEDULE_DELAY_DEFAULT: u64 = 5_000; /// Maximum queue size -const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE"; +pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE: &str = "OTEL_BSP_MAX_QUEUE_SIZE"; /// Default maximum queue size -const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048; +pub(crate) const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048; /// Maximum batch size, must be less than or equal to OTEL_BSP_MAX_QUEUE_SIZE -const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"; +pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size -const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; +pub(crate) const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; /// Maximum allowed time to export data. -const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT"; +pub(crate) const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT"; /// Default maximum allowed time to export data. -const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000; +pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000; /// Environment variable to configure max concurrent exports for batch span /// processor. -const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS"; +pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS"; /// Default max concurrent exports for BSP -const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1; +pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1; /// `SpanProcessor` is an interface which allows hooks for span start and end /// method invocations. The span processors are invoked only when is_recording @@ -178,7 +170,7 @@ use std::sync::mpsc::SyncSender; /// Messages exchanged between the main thread and the background thread. #[allow(clippy::large_enum_variant)] #[derive(Debug)] -enum BatchMessageDedicatedThread { +enum BatchMessage { ExportSpan(SpanData), ForceFlush(SyncSender>), Shutdown(SyncSender>), @@ -186,16 +178,16 @@ enum BatchMessageDedicatedThread { /// A batch span processor with a dedicated background thread. #[derive(Debug)] -pub struct BatchSpanProcessorDedicatedThread { - message_sender: SyncSender, +pub struct BatchSpanProcessor { + message_sender: SyncSender, handle: Mutex>>, shutdown_timeout: Duration, is_shutdown: AtomicBool, dropped_span_count: Arc, } -impl BatchSpanProcessorDedicatedThread { - /// Creates a new instance of `BatchSpanProcessorDedicatedThread`. +impl BatchSpanProcessor { + /// Creates a new instance of `BatchSpanProcessor`. pub fn new( mut exporter: E, max_queue_size: usize, @@ -217,7 +209,7 @@ impl BatchSpanProcessorDedicatedThread { let timeout = scheduled_delay.saturating_sub(last_export_time.elapsed()); match message_receiver.recv_timeout(timeout) { Ok(message) => match message { - BatchMessageDedicatedThread::ExportSpan(span) => { + BatchMessage::ExportSpan(span) => { spans.push(span); if spans.len() >= max_queue_size || last_export_time.elapsed() >= scheduled_delay @@ -229,11 +221,11 @@ impl BatchSpanProcessorDedicatedThread { last_export_time = Instant::now(); } } - BatchMessageDedicatedThread::ForceFlush(sender) => { + BatchMessage::ForceFlush(sender) => { let result = block_on(exporter.export(spans.split_off(0))); let _ = sender.send(result); } - BatchMessageDedicatedThread::Shutdown(sender) => { + BatchMessage::Shutdown(sender) => { let result = block_on(exporter.export(spans.split_off(0))); let _ = sender.send(result); break; @@ -265,15 +257,28 @@ impl BatchSpanProcessorDedicatedThread { } } + /// builder + pub fn builder(exporter: E) -> BatchSpanProcessorBuilder + where + E: SpanExporter + Send + 'static, + { + BatchSpanProcessorBuilder::new(exporter) + } +} + +impl SpanProcessor for BatchSpanProcessor { + /// Handles span start. + fn on_start(&self, _span: &mut Span, _cx: &Context) { + // Ignored + } + /// Handles span end. - pub fn on_end(&self, span: SpanData) { + fn on_end(&self, span: SpanData) { if self.is_shutdown.load(Ordering::Relaxed) { eprintln!("Processor is shutdown. Dropping span."); return; } - let result = self - .message_sender - .try_send(BatchMessageDedicatedThread::ExportSpan(span)); + let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); // TODO - Implement throttling to prevent error flooding when the queue is full or closed. if result.is_err() { @@ -287,13 +292,13 @@ impl BatchSpanProcessorDedicatedThread { } /// Flushes all pending spans. - pub fn force_flush(&self) -> TraceResult<()> { + fn force_flush(&self) -> TraceResult<()> { if self.is_shutdown.load(Ordering::Relaxed) { return Err(TraceError::Other("Processor already shutdown".into())); } let (sender, receiver) = sync_channel(1); self.message_sender - .try_send(BatchMessageDedicatedThread::ForceFlush(sender)) + .try_send(BatchMessage::ForceFlush(sender)) .map_err(|_| TraceError::Other("Failed to send ForceFlush message".into()))?; receiver @@ -302,13 +307,13 @@ impl BatchSpanProcessorDedicatedThread { } /// Shuts down the processor. - pub fn shutdown(&self) -> TraceResult<()> { + fn shutdown(&self) -> TraceResult<()> { if self.is_shutdown.swap(true, Ordering::Relaxed) { return Err(TraceError::Other("Processor already shutdown".into())); } let (sender, receiver) = sync_channel(1); self.message_sender - .try_send(BatchMessageDedicatedThread::Shutdown(sender)) + .try_send(BatchMessage::Shutdown(sender)) .map_err(|_| TraceError::Other("Failed to send Shutdown message".into()))?; let result = receiver @@ -323,16 +328,24 @@ impl BatchSpanProcessorDedicatedThread { /// Builder for `BatchSpanProcessorDedicatedThread`. #[derive(Debug, Default)] -pub struct BatchSpanProcessorDedicatedThreadBuilder { +pub struct BatchSpanProcessorBuilder +where + E: SpanExporter + Send + 'static, +{ + exporter: E, max_queue_size: usize, scheduled_delay: Duration, shutdown_timeout: Duration, } -impl BatchSpanProcessorDedicatedThreadBuilder { +impl BatchSpanProcessorBuilder +where + E: SpanExporter + Send + 'static, +{ /// Creates a new builder with default values. - pub fn new() -> Self { + pub fn new(exporter: E) -> Self { Self { + exporter, max_queue_size: 2048, scheduled_delay: Duration::from_secs(5), shutdown_timeout: Duration::from_secs(5), @@ -358,12 +371,9 @@ impl BatchSpanProcessorDedicatedThreadBuilder { } /// Builds the `BatchSpanProcessorDedicatedThread` instance. - pub fn build(self, exporter: E) -> BatchSpanProcessorDedicatedThread - where - E: SpanExporter + Send + 'static, - { - BatchSpanProcessorDedicatedThread::new( - exporter, + pub fn build(self) -> BatchSpanProcessor { + BatchSpanProcessor::new( + self.exporter, self.max_queue_size, self.scheduled_delay, self.shutdown_timeout, @@ -371,403 +381,33 @@ impl BatchSpanProcessorDedicatedThreadBuilder { } } -/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports -/// them at a preconfigured interval. -/// -/// Batch span processors need to run a background task to collect and send -/// spans. Different runtimes need different ways to handle the background task. -/// -/// Note: Configuring an opentelemetry `Runtime` that's not compatible with the -/// underlying runtime can cause deadlocks (see tokio section). -/// -/// ### Use with Tokio -/// -/// Tokio currently offers two different schedulers. One is -/// `current_thread_scheduler`, the other is `multiple_thread_scheduler`. Both -/// of them default to use batch span processors to install span exporters. -/// -/// Tokio's `current_thread_scheduler` can cause the program to hang forever if -/// blocking work is scheduled with other tasks in the same runtime. To avoid -/// this, be sure to enable the `rt-tokio-current-thread` feature in this crate -/// if you are using that runtime (e.g. users of actix-web), and blocking tasks -/// will then be scheduled on a different thread. -/// -/// # Examples -/// -/// This processor can be configured with an [`executor`] of your choice to -/// batch and upload spans asynchronously when they end. If you have added a -/// library like [`tokio`] or [`async-std`], you can pass in their respective -/// `spawn` and `interval` functions to have batching performed in those -/// contexts. -/// -/// ``` -/// # #[cfg(feature="tokio")] -/// # { -/// use opentelemetry::global; -/// use opentelemetry_sdk::{runtime, testing::trace::NoopSpanExporter, trace}; -/// use opentelemetry_sdk::trace::BatchConfigBuilder; -/// use std::time::Duration; -/// -/// #[tokio::main] -/// async fn main() { -/// // Configure your preferred exporter -/// let exporter = NoopSpanExporter::new(); -/// -/// // Create a batch span processor using an exporter and a runtime -/// let batch = trace::BatchSpanProcessor::builder(exporter, runtime::Tokio) -/// .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build()) -/// .build(); -/// -/// // Then use the `with_batch_exporter` method to have the provider export spans in batches. -/// let provider = trace::TracerProvider::builder() -/// .with_span_processor(batch) -/// .build(); -/// -/// let _ = global::set_tracer_provider(provider); -/// } -/// # } -/// ``` -/// -/// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html -/// [`tokio`]: https://tokio.rs -/// [`async-std`]: https://async.rs -pub struct BatchSpanProcessor { - message_sender: R::Sender, - - // Track dropped spans - dropped_spans_count: AtomicUsize, - - // Track the maximum queue size that was configured for this processor - max_queue_size: usize, -} - -impl fmt::Debug for BatchSpanProcessor { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BatchSpanProcessor") - .field("message_sender", &self.message_sender) - .finish() - } -} - -impl SpanProcessor for BatchSpanProcessor { - fn on_start(&self, _span: &mut Span, _cx: &Context) { - // Ignored - } - - fn on_end(&self, span: SpanData) { - if !span.span_context.is_sampled() { - return; - } - - let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); - - // If the queue is full, and we can't buffer a span - if result.is_err() { - // Increment the number of dropped spans. If this is the first time we've had to drop, - // emit a warning. - if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 { - otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted", - message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped."); - } - } - } - - fn force_flush(&self) -> TraceResult<()> { - let (res_sender, res_receiver) = oneshot::channel(); - self.message_sender - .try_send(BatchMessage::Flush(Some(res_sender))) - .map_err(|err| TraceError::Other(err.into()))?; - - futures_executor::block_on(res_receiver) - .map_err(|err| TraceError::Other(err.into())) - .and_then(|identity| identity) - } - - fn shutdown(&self) -> TraceResult<()> { - let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed); - let max_queue_size = self.max_queue_size; - if dropped_spans > 0 { - otel_warn!( - name: "BatchSpanProcessor.Shutdown", - dropped_spans = dropped_spans, - max_queue_size = max_queue_size, - message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." - ); - } - - let (res_sender, res_receiver) = oneshot::channel(); - self.message_sender - .try_send(BatchMessage::Shutdown(res_sender)) - .map_err(|err| TraceError::Other(err.into()))?; - - futures_executor::block_on(res_receiver) - .map_err(|err| TraceError::Other(err.into())) - .and_then(|identity| identity) - } - - fn set_resource(&mut self, resource: &Resource) { - let resource = Arc::new(resource.clone()); - let _ = self - .message_sender - .try_send(BatchMessage::SetResource(resource)); - } -} - -/// Messages sent between application thread and batch span processor's work thread. -// In this enum the size difference is not a concern because: -// 1. If we wrap SpanData into a pointer, it will add overhead when processing. -// 2. Most of the messages will be ExportSpan. -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -enum BatchMessage { - /// Export spans, usually called when span ends - ExportSpan(SpanData), - /// Flush the current buffer to the backend, it can be triggered by - /// pre configured interval or a call to `force_push` function. - Flush(Option>), - /// Shut down the worker thread, push all spans in buffer to the backend. - Shutdown(oneshot::Sender), - /// Set the resource for the exporter. - SetResource(Arc), -} - -struct BatchSpanProcessorInternal { - spans: Vec, - export_tasks: FuturesUnordered>, - runtime: R, - exporter: Box, - config: BatchConfig, -} - -impl BatchSpanProcessorInternal { - async fn flush(&mut self, res_channel: Option>) { - let export_task = self.export(); - let task = Box::pin(async move { - let result = export_task.await; - - if let Some(channel) = res_channel { - // If a response channel is provided, attempt to send the export result through it. - if let Err(result) = channel.send(result) { - otel_debug!( - name: "BatchSpanProcessor.Flush.SendResultError", - reason = format!("{:?}", result) - ); - } - } else if let Err(err) = result { - // If no channel is provided and the export operation encountered an error, - // log the error directly here. - // TODO: Consider returning the status instead of logging it. - otel_error!( - name: "BatchSpanProcessor.Flush.ExportError", - reason = format!("{:?}", err), - message = "Failed during the export process" - ); - } - - Ok(()) - }); - - if self.config.max_concurrent_exports == 1 { - let _ = task.await; - } else { - self.export_tasks.push(task); - while self.export_tasks.next().await.is_some() {} - } - } - - /// Process a single message - /// - /// A return value of false indicates shutdown - async fn process_message(&mut self, message: BatchMessage) -> bool { - match message { - // Span has finished, add to buffer of pending spans. - BatchMessage::ExportSpan(span) => { - self.spans.push(span); - - if self.spans.len() == self.config.max_export_batch_size { - // If concurrent exports are saturated, wait for one to complete. - if !self.export_tasks.is_empty() - && self.export_tasks.len() == self.config.max_concurrent_exports - { - self.export_tasks.next().await; - } - - let export_task = self.export(); - let task = async move { - if let Err(err) = export_task.await { - otel_error!( - name: "BatchSpanProcessor.Export.Error", - reason = format!("{}", err) - ); - } - - Ok(()) - }; - // Special case when not using concurrent exports - if self.config.max_concurrent_exports == 1 { - let _ = task.await; - } else { - self.export_tasks.push(Box::pin(task)); - } - } - } - // Span batch interval time reached or a force flush has been invoked, export - // current spans. - // - // This is a hint to ensure that any tasks associated with Spans for which the - // SpanProcessor had already received events prior to the call to ForceFlush - // SHOULD be completed as soon as possible, preferably before returning from - // this method. - // - // In particular, if any SpanProcessor has any associated exporter, it SHOULD - // try to call the exporter's Export with all spans for which this was not - // already done and then invoke ForceFlush on it. The built-in SpanProcessors - // MUST do so. If a timeout is specified (see below), the SpanProcessor MUST - // prioritize honoring the timeout over finishing all calls. It MAY skip or - // abort some or all Export or ForceFlush calls it has made to achieve this - // goal. - // - // NB: `force_flush` is not currently implemented on exporters; the equivalent - // would be waiting for exporter tasks to complete. In the case of - // channel-coupled exporters, they will need a `force_flush` implementation to - // properly block. - BatchMessage::Flush(res_channel) => { - self.flush(res_channel).await; - } - // Stream has terminated or processor is shutdown, return to finish execution. - BatchMessage::Shutdown(ch) => { - self.flush(Some(ch)).await; - self.exporter.shutdown(); - return false; - } - // propagate the resource - BatchMessage::SetResource(resource) => { - self.exporter.set_resource(&resource); - } - } - true - } - - fn export(&mut self) -> BoxFuture<'static, ExportResult> { - // Batch size check for flush / shutdown. Those methods may be called - // when there's no work to do. - if self.spans.is_empty() { - return Box::pin(future::ready(Ok(()))); - } - - let export = self.exporter.export(self.spans.split_off(0)); - let timeout = self.runtime.delay(self.config.max_export_timeout); - let time_out = self.config.max_export_timeout; - - Box::pin(async move { - match future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)), - } - }) - } - - async fn run(mut self, mut messages: impl FusedStream + Unpin) { - loop { - select! { - // FuturesUnordered implements Fuse intelligently such that it - // will become eligible again once new tasks are added to it. - _ = self.export_tasks.next() => { - // An export task completed; do we need to do anything with it? - }, - message = messages.next() => { - match message { - Some(message) => { - if !self.process_message(message).await { - break; - } - }, - None => break, - } - }, - } - } - } -} - -impl BatchSpanProcessor { - pub(crate) fn new(exporter: Box, config: BatchConfig, runtime: R) -> Self { - let (message_sender, message_receiver) = - runtime.batch_message_channel(config.max_queue_size); - - let max_queue_size = config.max_queue_size; - - let inner_runtime = runtime.clone(); - // Spawn worker process via user-defined spawn function. - runtime.spawn(Box::pin(async move { - // Timer will take a reference to the current runtime, so its important we do this within the - // runtime.spawn() - let ticker = inner_runtime - .interval(config.scheduled_delay) - .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. - .map(|_| BatchMessage::Flush(None)); - let timeout_runtime = inner_runtime.clone(); - - let messages = Box::pin(stream::select(message_receiver, ticker)); - let processor = BatchSpanProcessorInternal { - spans: Vec::new(), - export_tasks: FuturesUnordered::new(), - runtime: timeout_runtime, - config, - exporter, - }; - - processor.run(messages).await - })); - - // Return batch processor with link to worker - BatchSpanProcessor { - message_sender, - dropped_spans_count: AtomicUsize::new(0), - max_queue_size, - } - } - - /// Create a new batch processor builder - pub fn builder(exporter: E, runtime: R) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - { - BatchSpanProcessorBuilder { - exporter, - config: Default::default(), - runtime, - } - } -} - /// Batch span processor configuration. /// Use [`BatchConfigBuilder`] to configure your own instance of [`BatchConfig`]. #[derive(Debug)] pub struct BatchConfig { /// The maximum queue size to buffer spans for delayed processing. If the /// queue gets full it drops the spans. The default value of is 2048. - max_queue_size: usize, + pub(crate) max_queue_size: usize, /// The delay interval in milliseconds between two consecutive processing /// of batches. The default value is 5 seconds. - scheduled_delay: Duration, + pub(crate) scheduled_delay: Duration, /// The maximum number of spans to process in a single batch. If there are /// more than one batch worth of spans then it processes multiple batches /// of spans one batch after the other without any delay. The default value /// is 512. - max_export_batch_size: usize, + pub(crate) max_export_batch_size: usize, /// The maximum duration to export a batch of data. - max_export_timeout: Duration, + pub(crate) max_export_timeout: Duration, /// Maximum number of concurrent exports /// /// Limits the number of spawned tasks for exports and thus memory consumed /// by an exporter. A value of 1 will cause exports to be performed /// synchronously on the BatchSpanProcessor task. - max_concurrent_exports: usize, + pub(crate) max_concurrent_exports: usize, } impl Default for BatchConfig { @@ -916,31 +556,6 @@ impl BatchConfigBuilder { } } -/// A builder for creating [`BatchSpanProcessor`] instances. -/// -#[derive(Debug)] -pub struct BatchSpanProcessorBuilder { - exporter: E, - config: BatchConfig, - runtime: R, -} - -impl BatchSpanProcessorBuilder -where - E: SpanExporter + 'static, - R: RuntimeChannel, -{ - /// Set the BatchConfig for [BatchSpanProcessorBuilder] - pub fn with_batch_config(self, config: BatchConfig) -> Self { - BatchSpanProcessorBuilder { config, ..self } - } - - /// Build a batch processor - pub fn build(self) -> BatchSpanProcessor { - BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime) - } -} - #[cfg(all(test, feature = "testing", feature = "trace"))] mod tests { // cargo test trace::span_processor::tests:: --features=testing @@ -950,10 +565,7 @@ mod tests { OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT, }; use crate::export::trace::{ExportResult, SpanData, SpanExporter}; - use crate::runtime; - use crate::testing::trace::{ - new_test_export_span_data, new_tokio_test_exporter, InMemorySpanExporterBuilder, - }; + use crate::testing::trace::{new_test_export_span_data, InMemorySpanExporterBuilder}; use crate::trace::span_processor::{ OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS, OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, @@ -961,7 +573,6 @@ mod tests { use crate::trace::{BatchConfig, BatchConfigBuilder, SpanEvents, SpanLinks}; use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status}; use std::fmt::Debug; - use std::future::Future; use std::time::Duration; #[test] @@ -1108,190 +719,6 @@ mod tests { assert_eq!(batch.max_queue_size, 10); } - #[test] - fn test_build_batch_span_processor_builder() { - let mut env_vars = vec![ - (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("500")), - (OTEL_BSP_SCHEDULE_DELAY, Some("I am not number")), - (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")), - ]; - temp_env::with_vars(env_vars.clone(), || { - let builder = BatchSpanProcessor::builder( - InMemorySpanExporterBuilder::new().build(), - runtime::Tokio, - ); - // export batch size cannot exceed max queue size - assert_eq!(builder.config.max_export_batch_size, 500); - assert_eq!( - builder.config.scheduled_delay, - Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT) - ); - assert_eq!( - builder.config.max_queue_size, - OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT - ); - assert_eq!( - builder.config.max_export_timeout, - Duration::from_millis(2046) - ); - }); - - env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120"))); - - temp_env::with_vars(env_vars, || { - let builder = BatchSpanProcessor::builder( - InMemorySpanExporterBuilder::new().build(), - runtime::Tokio, - ); - assert_eq!(builder.config.max_export_batch_size, 120); - assert_eq!(builder.config.max_queue_size, 120); - }); - } - - #[tokio::test] - async fn test_batch_span_processor() { - let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter(); - let config = BatchConfig { - scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush - ..Default::default() - }; - let processor = - BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread); - let handle = tokio::spawn(async move { - loop { - if let Some(span) = export_receiver.recv().await { - assert_eq!(span.span_context, new_test_export_span_data().span_context); - break; - } - } - }); - tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); - let flush_res = processor.force_flush(); - assert!(flush_res.is_ok()); - let _shutdown_result = processor.shutdown(); - - assert!( - tokio::time::timeout(Duration::from_secs(5), handle) - .await - .is_ok(), - "timed out in 5 seconds. force_flush may not export any data when called" - ); - } - - struct BlockingExporter { - delay_for: Duration, - delay_fn: D, - } - - impl Debug for BlockingExporter - where - D: Fn(Duration) -> DS + 'static + Send + Sync, - DS: Future + Send + Sync + 'static, - { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("blocking exporter for testing") - } - } - - impl SpanExporter for BlockingExporter - where - D: Fn(Duration) -> DS + 'static + Send + Sync, - DS: Future + Send + Sync + 'static, - { - fn export( - &mut self, - _batch: Vec, - ) -> futures_util::future::BoxFuture<'static, ExportResult> { - use futures_util::FutureExt; - Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(()))) - } - } - - #[test] - fn test_timeout_tokio_timeout() { - // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s. - // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s. - // Either way, the test should be finished within 5s. - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - runtime.block_on(timeout_test_tokio(true)); - } - - #[test] - fn test_timeout_tokio_not_timeout() { - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - runtime.block_on(timeout_test_tokio(false)); - } - - #[test] - #[cfg(feature = "rt-async-std")] - fn test_timeout_async_std_timeout() { - async_std::task::block_on(timeout_test_std_async(true)); - } - - #[test] - #[cfg(feature = "rt-async-std")] - fn test_timeout_async_std_not_timeout() { - async_std::task::block_on(timeout_test_std_async(false)); - } - - // If the time_out is true, then the result suppose to ended with timeout. - // otherwise the exporter should be able to export within time out duration. - #[cfg(feature = "rt-async-std")] - async fn timeout_test_std_async(time_out: bool) { - let config = BatchConfig { - max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), - scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush - ..Default::default() - }; - let exporter = BlockingExporter { - delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }), - delay_fn: async_std::task::sleep, - }; - let processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd); - processor.on_end(new_test_export_span_data()); - let flush_res = processor.force_flush(); - if time_out { - assert!(flush_res.is_err()); - } else { - assert!(flush_res.is_ok()); - } - let shutdown_res = processor.shutdown(); - assert!(shutdown_res.is_ok()); - } - - // If the time_out is true, then the result suppose to ended with timeout. - // otherwise the exporter should be able to export within time out duration. - async fn timeout_test_tokio(time_out: bool) { - let config = BatchConfig { - max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), - scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush, - ..Default::default() - }; - let exporter = BlockingExporter { - delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }), - delay_fn: tokio::time::sleep, - }; - let processor = - BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread); - tokio::time::sleep(Duration::from_secs(1)).await; // skip the first - processor.on_end(new_test_export_span_data()); - let flush_res = processor.force_flush(); - if time_out { - assert!(flush_res.is_err()); - } else { - assert!(flush_res.is_ok()); - } - let shutdown_res = processor.shutdown(); - assert!(shutdown_res.is_ok()); - } - // Helper function to create a default test span fn create_test_span(name: &str) -> SpanData { SpanData { @@ -1310,7 +737,6 @@ mod tests { } } - use crate::trace::BatchSpanProcessorDedicatedThread; use futures_util::future::BoxFuture; use futures_util::FutureExt; use std::sync::Arc; @@ -1344,10 +770,10 @@ mod tests { } #[test] - fn batchspanprocessor_dedicatedthread_handles_on_end() { + fn batchspanprocessor_handles_on_end() { let exporter = MockSpanExporter::new(); let exporter_shared = exporter.exported_spans.clone(); - let processor = BatchSpanProcessorDedicatedThread::new( + let processor = BatchSpanProcessor::new( exporter, 10, // max queue size Duration::from_secs(5), @@ -1366,10 +792,10 @@ mod tests { } #[test] - fn batchspanprocessor_dedicatedthread_force_flush() { + fn batchspanprocessor_force_flush() { let exporter = MockSpanExporter::new(); let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans - let processor = BatchSpanProcessorDedicatedThread::new( + let processor = BatchSpanProcessor::new( exporter, 10, // max queue size Duration::from_secs(5), @@ -1395,10 +821,10 @@ mod tests { } #[test] - fn batchspanprocessor_dedicatedthread_shutdown() { + fn batchspanprocessor_shutdown() { let exporter = MockSpanExporter::new(); let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans - let processor = BatchSpanProcessorDedicatedThread::new( + let processor = BatchSpanProcessor::new( exporter, 10, // max queue size Duration::from_secs(5), diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs new file mode 100644 index 0000000000..32d6d134e0 --- /dev/null +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -0,0 +1,618 @@ +use crate::export::trace::{ExportResult, SpanData, SpanExporter}; +use crate::resource::Resource; +use crate::runtime::{RuntimeChannel, TrySend}; +use crate::trace::BatchConfig; +use crate::trace::Span; +use crate::trace::SpanProcessor; +use futures_channel::oneshot; +use futures_util::{ + future::{self, BoxFuture, Either}, + select, + stream::{self, FusedStream, FuturesUnordered}, + StreamExt as _, +}; +use opentelemetry::{otel_debug, otel_error, otel_warn}; +use opentelemetry::{ + trace::{TraceError, TraceResult}, + Context, +}; +use std::fmt; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports +/// them at a preconfigured interval. +/// +/// Batch span processors need to run a background task to collect and send +/// spans. Different runtimes need different ways to handle the background task. +/// +/// Note: Configuring an opentelemetry `Runtime` that's not compatible with the +/// underlying runtime can cause deadlocks (see tokio section). +/// +/// ### Use with Tokio +/// +/// Tokio currently offers two different schedulers. One is +/// `current_thread_scheduler`, the other is `multiple_thread_scheduler`. Both +/// of them default to use batch span processors to install span exporters. +/// +/// Tokio's `current_thread_scheduler` can cause the program to hang forever if +/// blocking work is scheduled with other tasks in the same runtime. To avoid +/// this, be sure to enable the `rt-tokio-current-thread` feature in this crate +/// if you are using that runtime (e.g. users of actix-web), and blocking tasks +/// will then be scheduled on a different thread. +/// +/// # Examples +/// +/// This processor can be configured with an [`executor`] of your choice to +/// batch and upload spans asynchronously when they end. If you have added a +/// library like [`tokio`] or [`async-std`], you can pass in their respective +/// `spawn` and `interval` functions to have batching performed in those +/// contexts. +/// +/// ``` +/// # #[cfg(feature="tokio")] +/// # { +/// use opentelemetry::global; +/// use opentelemetry_sdk::{runtime, testing::trace::NoopSpanExporter, trace}; +/// use opentelemetry_sdk::trace::BatchConfigBuilder; +/// use std::time::Duration; +/// +/// #[tokio::main] +/// async fn main() { +/// // Configure your preferred exporter +/// let exporter = NoopSpanExporter::new(); +/// +/// // Create a batch span processor using an exporter and a runtime +/// let batch = trace::BatchSpanProcessor::builder(exporter, runtime::Tokio) +/// .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build()) +/// .build(); +/// +/// // Then use the `with_batch_exporter` method to have the provider export spans in batches. +/// let provider = trace::TracerProvider::builder() +/// .with_span_processor(batch) +/// .build(); +/// +/// let _ = global::set_tracer_provider(provider); +/// } +/// # } +/// ``` +/// +/// [`executor`]: https://docs.rs/futures/0.3/futures/executor/index.html +/// [`tokio`]: https://tokio.rs +/// [`async-std`]: https://async.rs +pub struct BatchSpanProcessor { + message_sender: R::Sender, + + // Track dropped spans + dropped_spans_count: AtomicUsize, + + // Track the maximum queue size that was configured for this processor + max_queue_size: usize, +} + +impl fmt::Debug for BatchSpanProcessor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BatchSpanProcessor") + .field("message_sender", &self.message_sender) + .finish() + } +} + +impl SpanProcessor for BatchSpanProcessor { + fn on_start(&self, _span: &mut Span, _cx: &Context) { + // Ignored + } + + fn on_end(&self, span: SpanData) { + if !span.span_context.is_sampled() { + return; + } + + let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); + + // If the queue is full, and we can't buffer a span + if result.is_err() { + // Increment the number of dropped spans. If this is the first time we've had to drop, + // emit a warning. + if self.dropped_spans_count.fetch_add(1, Ordering::Relaxed) == 0 { + otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted", + message = "Beginning to drop span messages due to full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total spans dropped."); + } + } + } + + fn force_flush(&self) -> TraceResult<()> { + let (res_sender, res_receiver) = oneshot::channel(); + self.message_sender + .try_send(BatchMessage::Flush(Some(res_sender))) + .map_err(|err| TraceError::Other(err.into()))?; + + futures_executor::block_on(res_receiver) + .map_err(|err| TraceError::Other(err.into())) + .and_then(|identity| identity) + } + + fn shutdown(&self) -> TraceResult<()> { + let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed); + let max_queue_size = self.max_queue_size; + if dropped_spans > 0 { + otel_warn!( + name: "BatchSpanProcessor.Shutdown", + dropped_spans = dropped_spans, + max_queue_size = max_queue_size, + message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." + ); + } + + let (res_sender, res_receiver) = oneshot::channel(); + self.message_sender + .try_send(BatchMessage::Shutdown(res_sender)) + .map_err(|err| TraceError::Other(err.into()))?; + + futures_executor::block_on(res_receiver) + .map_err(|err| TraceError::Other(err.into())) + .and_then(|identity| identity) + } + + fn set_resource(&mut self, resource: &Resource) { + let resource = Arc::new(resource.clone()); + let _ = self + .message_sender + .try_send(BatchMessage::SetResource(resource)); + } +} + +/// Messages sent between application thread and batch span processor's work thread. +// In this enum the size difference is not a concern because: +// 1. If we wrap SpanData into a pointer, it will add overhead when processing. +// 2. Most of the messages will be ExportSpan. +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum BatchMessage { + /// Export spans, usually called when span ends + ExportSpan(SpanData), + /// Flush the current buffer to the backend, it can be triggered by + /// pre configured interval or a call to `force_push` function. + Flush(Option>), + /// Shut down the worker thread, push all spans in buffer to the backend. + Shutdown(oneshot::Sender), + /// Set the resource for the exporter. + SetResource(Arc), +} + +struct BatchSpanProcessorInternal { + spans: Vec, + export_tasks: FuturesUnordered>, + runtime: R, + exporter: Box, + config: BatchConfig, +} + +impl BatchSpanProcessorInternal { + async fn flush(&mut self, res_channel: Option>) { + let export_task = self.export(); + let task = Box::pin(async move { + let result = export_task.await; + + if let Some(channel) = res_channel { + // If a response channel is provided, attempt to send the export result through it. + if let Err(result) = channel.send(result) { + otel_debug!( + name: "BatchSpanProcessor.Flush.SendResultError", + reason = format!("{:?}", result) + ); + } + } else if let Err(err) = result { + // If no channel is provided and the export operation encountered an error, + // log the error directly here. + // TODO: Consider returning the status instead of logging it. + otel_error!( + name: "BatchSpanProcessor.Flush.ExportError", + reason = format!("{:?}", err), + message = "Failed during the export process" + ); + } + + Ok(()) + }); + + if self.config.max_concurrent_exports == 1 { + let _ = task.await; + } else { + self.export_tasks.push(task); + while self.export_tasks.next().await.is_some() {} + } + } + + /// Process a single message + /// + /// A return value of false indicates shutdown + async fn process_message(&mut self, message: BatchMessage) -> bool { + match message { + // Span has finished, add to buffer of pending spans. + BatchMessage::ExportSpan(span) => { + self.spans.push(span); + + if self.spans.len() == self.config.max_export_batch_size { + // If concurrent exports are saturated, wait for one to complete. + if !self.export_tasks.is_empty() + && self.export_tasks.len() == self.config.max_concurrent_exports + { + self.export_tasks.next().await; + } + + let export_task = self.export(); + let task = async move { + if let Err(err) = export_task.await { + otel_error!( + name: "BatchSpanProcessor.Export.Error", + reason = format!("{}", err) + ); + } + + Ok(()) + }; + // Special case when not using concurrent exports + if self.config.max_concurrent_exports == 1 { + let _ = task.await; + } else { + self.export_tasks.push(Box::pin(task)); + } + } + } + // Span batch interval time reached or a force flush has been invoked, export + // current spans. + // + // This is a hint to ensure that any tasks associated with Spans for which the + // SpanProcessor had already received events prior to the call to ForceFlush + // SHOULD be completed as soon as possible, preferably before returning from + // this method. + // + // In particular, if any SpanProcessor has any associated exporter, it SHOULD + // try to call the exporter's Export with all spans for which this was not + // already done and then invoke ForceFlush on it. The built-in SpanProcessors + // MUST do so. If a timeout is specified (see below), the SpanProcessor MUST + // prioritize honoring the timeout over finishing all calls. It MAY skip or + // abort some or all Export or ForceFlush calls it has made to achieve this + // goal. + // + // NB: `force_flush` is not currently implemented on exporters; the equivalent + // would be waiting for exporter tasks to complete. In the case of + // channel-coupled exporters, they will need a `force_flush` implementation to + // properly block. + BatchMessage::Flush(res_channel) => { + self.flush(res_channel).await; + } + // Stream has terminated or processor is shutdown, return to finish execution. + BatchMessage::Shutdown(ch) => { + self.flush(Some(ch)).await; + self.exporter.shutdown(); + return false; + } + // propagate the resource + BatchMessage::SetResource(resource) => { + self.exporter.set_resource(&resource); + } + } + true + } + + fn export(&mut self) -> BoxFuture<'static, ExportResult> { + // Batch size check for flush / shutdown. Those methods may be called + // when there's no work to do. + if self.spans.is_empty() { + return Box::pin(future::ready(Ok(()))); + } + + let export = self.exporter.export(self.spans.split_off(0)); + let timeout = self.runtime.delay(self.config.max_export_timeout); + let time_out = self.config.max_export_timeout; + + Box::pin(async move { + match future::select(export, timeout).await { + Either::Left((export_res, _)) => export_res, + Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)), + } + }) + } + + async fn run(mut self, mut messages: impl FusedStream + Unpin) { + loop { + select! { + // FuturesUnordered implements Fuse intelligently such that it + // will become eligible again once new tasks are added to it. + _ = self.export_tasks.next() => { + // An export task completed; do we need to do anything with it? + }, + message = messages.next() => { + match message { + Some(message) => { + if !self.process_message(message).await { + break; + } + }, + None => break, + } + }, + } + } + } +} + +impl BatchSpanProcessor { + pub(crate) fn new(exporter: Box, config: BatchConfig, runtime: R) -> Self { + let (message_sender, message_receiver) = + runtime.batch_message_channel(config.max_queue_size); + + let max_queue_size = config.max_queue_size; + + let inner_runtime = runtime.clone(); + // Spawn worker process via user-defined spawn function. + runtime.spawn(Box::pin(async move { + // Timer will take a reference to the current runtime, so its important we do this within the + // runtime.spawn() + let ticker = inner_runtime + .interval(config.scheduled_delay) + .skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval. + .map(|_| BatchMessage::Flush(None)); + let timeout_runtime = inner_runtime.clone(); + + let messages = Box::pin(stream::select(message_receiver, ticker)); + let processor = BatchSpanProcessorInternal { + spans: Vec::new(), + export_tasks: FuturesUnordered::new(), + runtime: timeout_runtime, + config, + exporter, + }; + + processor.run(messages).await + })); + + // Return batch processor with link to worker + BatchSpanProcessor { + message_sender, + dropped_spans_count: AtomicUsize::new(0), + max_queue_size, + } + } + + /// Create a new batch processor builder + pub fn builder(exporter: E, runtime: R) -> BatchSpanProcessorBuilder + where + E: SpanExporter, + { + BatchSpanProcessorBuilder { + exporter, + config: Default::default(), + runtime, + } + } +} + +/// A builder for creating [`BatchSpanProcessor`] instances. +/// +#[derive(Debug)] +pub struct BatchSpanProcessorBuilder { + exporter: E, + config: BatchConfig, + runtime: R, +} + +impl BatchSpanProcessorBuilder +where + E: SpanExporter + 'static, + R: RuntimeChannel, +{ + /// Set the BatchConfig for [BatchSpanProcessorBuilder] + pub fn with_batch_config(self, config: BatchConfig) -> Self { + BatchSpanProcessorBuilder { config, ..self } + } + + /// Build a batch processor + pub fn build(self) -> BatchSpanProcessor { + BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime) + } +} + +#[cfg(all(test, feature = "testing", feature = "trace"))] +mod tests { + // cargo test trace::span_processor::tests:: --features=testing + use super::{BatchSpanProcessor, SpanProcessor}; + use crate::export::trace::{ExportResult, SpanData, SpanExporter}; + use crate::runtime; + use crate::testing::trace::{ + new_test_export_span_data, new_tokio_test_exporter, InMemorySpanExporterBuilder, + }; + use crate::trace::span_processor::{ + OTEL_BSP_EXPORT_TIMEOUT, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, + OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT, + }; + use crate::trace::{BatchConfig, BatchConfigBuilder}; + use futures_util::Future; + use std::fmt::Debug; + use std::time::Duration; + + struct BlockingExporter { + delay_for: Duration, + delay_fn: D, + } + + impl Debug for BlockingExporter + where + D: Fn(Duration) -> DS + 'static + Send + Sync, + DS: Future + Send + Sync + 'static, + { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("blocking exporter for testing") + } + } + + impl SpanExporter for BlockingExporter + where + D: Fn(Duration) -> DS + 'static + Send + Sync, + DS: Future + Send + Sync + 'static, + { + fn export( + &mut self, + _batch: Vec, + ) -> futures_util::future::BoxFuture<'static, ExportResult> { + use futures_util::FutureExt; + Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(()))) + } + } + + #[test] + fn test_build_batch_span_processor_builder() { + let mut env_vars = vec![ + (OTEL_BSP_MAX_EXPORT_BATCH_SIZE, Some("500")), + (OTEL_BSP_SCHEDULE_DELAY, Some("I am not number")), + (OTEL_BSP_EXPORT_TIMEOUT, Some("2046")), + ]; + temp_env::with_vars(env_vars.clone(), || { + let builder = BatchSpanProcessor::builder( + InMemorySpanExporterBuilder::new().build(), + runtime::Tokio, + ); + // export batch size cannot exceed max queue size + assert_eq!(builder.config.max_export_batch_size, 500); + assert_eq!( + builder.config.scheduled_delay, + Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT) + ); + assert_eq!( + builder.config.max_queue_size, + OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT + ); + assert_eq!( + builder.config.max_export_timeout, + Duration::from_millis(2046) + ); + }); + + env_vars.push((OTEL_BSP_MAX_QUEUE_SIZE, Some("120"))); + + temp_env::with_vars(env_vars, || { + let builder = BatchSpanProcessor::builder( + InMemorySpanExporterBuilder::new().build(), + runtime::Tokio, + ); + assert_eq!(builder.config.max_export_batch_size, 120); + assert_eq!(builder.config.max_queue_size, 120); + }); + } + + #[tokio::test] + async fn test_batch_span_processor() { + let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter(); + let config = BatchConfigBuilder::default() + .with_scheduled_delay(Duration::from_secs(60 * 60 * 24)) // set the tick to 24 hours so we know the span must be exported via force_flush + .build(); + let processor = + BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread); + let handle = tokio::spawn(async move { + loop { + if let Some(span) = export_receiver.recv().await { + assert_eq!(span.span_context, new_test_export_span_data().span_context); + break; + } + } + }); + tokio::time::sleep(Duration::from_secs(1)).await; // skip the first + processor.on_end(new_test_export_span_data()); + let flush_res = processor.force_flush(); + assert!(flush_res.is_ok()); + let _shutdown_result = processor.shutdown(); + + assert!( + tokio::time::timeout(Duration::from_secs(5), handle) + .await + .is_ok(), + "timed out in 5 seconds. force_flush may not export any data when called" + ); + } + + // If the time_out is true, then the result suppose to ended with timeout. + // otherwise the exporter should be able to export within time out duration. + async fn timeout_test_tokio(time_out: bool) { + let config = BatchConfig { + max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), + scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush, + ..Default::default() + }; + let exporter = BlockingExporter { + delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }), + delay_fn: tokio::time::sleep, + }; + let processor = + BatchSpanProcessor::new(Box::new(exporter), config, runtime::TokioCurrentThread); + tokio::time::sleep(Duration::from_secs(1)).await; // skip the first + processor.on_end(new_test_export_span_data()); + let flush_res = processor.force_flush(); + if time_out { + assert!(flush_res.is_err()); + } else { + assert!(flush_res.is_ok()); + } + let shutdown_res = processor.shutdown(); + assert!(shutdown_res.is_ok()); + } + + #[test] + fn test_timeout_tokio_timeout() { + // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s. + // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s. + // Either way, the test should be finished within 5s. + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + runtime.block_on(timeout_test_tokio(true)); + } + + #[test] + fn test_timeout_tokio_not_timeout() { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + runtime.block_on(timeout_test_tokio(false)); + } + + #[test] + #[cfg(feature = "rt-async-std")] + fn test_timeout_async_std_timeout() { + async_std::task::block_on(timeout_test_std_async(true)); + } + + #[test] + #[cfg(feature = "rt-async-std")] + fn test_timeout_async_std_not_timeout() { + async_std::task::block_on(timeout_test_std_async(false)); + } + + // If the time_out is true, then the result suppose to ended with timeout. + // otherwise the exporter should be able to export within time out duration. + #[cfg(feature = "rt-async-std")] + async fn timeout_test_std_async(time_out: bool) { + let config = BatchConfig { + max_export_timeout: Duration::from_millis(if time_out { 5 } else { 60 }), + scheduled_delay: Duration::from_secs(60 * 60 * 24), // set the tick to 24 hours so we know the span must be exported via force_flush + ..Default::default() + }; + let exporter = BlockingExporter { + delay_for: Duration::from_millis(if !time_out { 5 } else { 60 }), + delay_fn: async_std::task::sleep, + }; + let processor = BatchSpanProcessor::new(Box::new(exporter), config, runtime::AsyncStd); + processor.on_end(new_test_export_span_data()); + let flush_res = processor.force_flush(); + if time_out { + assert!(flush_res.is_err()); + } else { + assert!(flush_res.is_ok()); + } + let shutdown_res = processor.shutdown(); + assert!(shutdown_res.is_ok()); + } +} From a062c9c9905c5c7ba4ed7fb896101a29a5427695 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 22 Dec 2024 05:29:20 -0800 Subject: [PATCH 05/15] cont... --- .../benches/batch_span_processor.rs | 23 ++-- opentelemetry-sdk/src/trace/mod.rs | 10 +- opentelemetry-sdk/src/trace/provider.rs | 9 ++ opentelemetry-sdk/src/trace/runtime_tests.rs | 1 + opentelemetry-sdk/src/trace/span_processor.rs | 113 ++++++++---------- 5 files changed, 70 insertions(+), 86 deletions(-) diff --git a/opentelemetry-sdk/benches/batch_span_processor.rs b/opentelemetry-sdk/benches/batch_span_processor.rs index ed20c45a06..d57ef26157 100644 --- a/opentelemetry-sdk/benches/batch_span_processor.rs +++ b/opentelemetry-sdk/benches/batch_span_processor.rs @@ -3,7 +3,6 @@ use opentelemetry::trace::{ SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, }; use opentelemetry_sdk::export::trace::SpanData; -use opentelemetry_sdk::runtime::Tokio; use opentelemetry_sdk::testing::trace::NoopSpanExporter; use opentelemetry_sdk::trace::{ BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor, @@ -49,14 +48,13 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(|| { let rt = Runtime::new().unwrap(); rt.block_on(async move { - let span_processor = - BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio) - .with_batch_config( - BatchConfigBuilder::default() - .with_max_queue_size(10_000) - .build(), - ) - .build(); + let span_processor = BatchSpanProcessor::builder(NoopSpanExporter::new()) + .with_batch_config( + BatchConfigBuilder::default() + .with_max_queue_size(10_000) + .build(), + ) + .build(); let mut shared_span_processor = Arc::new(span_processor); let mut handles = Vec::with_capacity(10); for _ in 0..task_num { @@ -70,10 +68,9 @@ fn criterion_benchmark(c: &mut Criterion) { })); } futures_util::future::join_all(handles).await; - let _ = - Arc::>::get_mut(&mut shared_span_processor) - .unwrap() - .shutdown(); + let _ = Arc::::get_mut(&mut shared_span_processor) + .unwrap() + .shutdown(); }); }) }, diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index 5f5de58c7a..b6017ebfdd 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -15,8 +15,8 @@ mod sampler; mod span; mod span_limit; mod span_processor; -mod span_processor_with_async_runtime; - +#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] +pub mod span_processor_with_async_runtime; mod tracer; pub use config::{config, Config}; @@ -33,16 +33,12 @@ pub use span_processor::{ SimpleSpanProcessor, SpanProcessor, }; -/// Re-export asynchronous runtime span processor components. -// TODO: make them available without re-export -#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] -pub mod span_processor_with_async_runtime; - pub use tracer::Tracer; #[cfg(feature = "jaeger_remote_sampler")] pub use sampler::{JaegerRemoteSampler, JaegerRemoteSamplerBuilder}; +#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] #[cfg(test)] mod runtime_tests; diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index d90c649c9c..e2f0bde216 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -62,6 +62,7 @@ /// provider.shutdown(); /// } /// ``` +#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] use crate::runtime::RuntimeChannel; use crate::trace::{ BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer, @@ -295,12 +296,20 @@ impl Builder { Builder { processors, ..self } } + #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] /// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use. pub fn with_batch_exporter( self, exporter: T, runtime: R, ) -> Self { + let batch = BatchSpanProcessor::builder(exporter, runtime).build(); + self.with_span_processor(batch) + } + + #[cfg(not(feature = "experimental_trace_batch_span_processor_with_async_runtime"))] + /// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use. + pub fn with_batch_exporter(self, exporter: T) -> Self { let batch = BatchSpanProcessor::builder(exporter).build(); self.with_span_processor(batch) } diff --git a/opentelemetry-sdk/src/trace/runtime_tests.rs b/opentelemetry-sdk/src/trace/runtime_tests.rs index 75cb1b4475..87143c75bc 100644 --- a/opentelemetry-sdk/src/trace/runtime_tests.rs +++ b/opentelemetry-sdk/src/trace/runtime_tests.rs @@ -46,6 +46,7 @@ impl SpanCountExporter { } } +#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] #[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] fn build_batch_tracer_provider( exporter: SpanCountExporter, diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 4e945e4f3c..4e04db6dc6 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -181,6 +181,7 @@ enum BatchMessage { pub struct BatchSpanProcessor { message_sender: SyncSender, handle: Mutex>>, + forceflush_timeout: Duration, shutdown_timeout: Duration, is_shutdown: AtomicBool, dropped_span_count: Arc, @@ -190,14 +191,15 @@ impl BatchSpanProcessor { /// Creates a new instance of `BatchSpanProcessor`. pub fn new( mut exporter: E, - max_queue_size: usize, - scheduled_delay: Duration, - shutdown_timeout: Duration, + config: BatchConfig, + //ax_queue_size: usize, + //scheduled_delay: Duration, + //shutdown_timeout: Duration, ) -> Self where E: SpanExporter + Send + 'static, { - let (message_sender, message_receiver) = sync_channel(max_queue_size); + let (message_sender, message_receiver) = sync_channel(config.max_queue_size); let handle = thread::Builder::new() .name("BatchSpanProcessorDedicatedThread".to_string()) @@ -206,13 +208,15 @@ impl BatchSpanProcessor { let mut last_export_time = Instant::now(); loop { - let timeout = scheduled_delay.saturating_sub(last_export_time.elapsed()); + let timeout = config + .scheduled_delay + .saturating_sub(last_export_time.elapsed()); match message_receiver.recv_timeout(timeout) { Ok(message) => match message { BatchMessage::ExportSpan(span) => { spans.push(span); - if spans.len() >= max_queue_size - || last_export_time.elapsed() >= scheduled_delay + if spans.len() >= config.max_queue_size + || last_export_time.elapsed() >= config.scheduled_delay { if let Err(err) = block_on(exporter.export(spans.split_off(0))) { @@ -232,7 +236,7 @@ impl BatchSpanProcessor { } }, Err(RecvTimeoutError::Timeout) => { - if last_export_time.elapsed() >= scheduled_delay { + if last_export_time.elapsed() >= config.scheduled_delay { if let Err(err) = block_on(exporter.export(spans.split_off(0))) { eprintln!("Export error: {:?}", err); } @@ -251,7 +255,8 @@ impl BatchSpanProcessor { Self { message_sender, handle: Mutex::new(Some(handle)), - shutdown_timeout, + forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable + shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable is_shutdown: AtomicBool::new(false), dropped_span_count: Arc::new(AtomicUsize::new(0)), } @@ -262,7 +267,10 @@ impl BatchSpanProcessor { where E: SpanExporter + Send + 'static, { - BatchSpanProcessorBuilder::new(exporter) + BatchSpanProcessorBuilder { + exporter, + config: BatchConfig::default(), + } } } @@ -302,8 +310,8 @@ impl SpanProcessor for BatchSpanProcessor { .map_err(|_| TraceError::Other("Failed to send ForceFlush message".into()))?; receiver - .recv_timeout(self.shutdown_timeout) - .map_err(|_| TraceError::ExportTimedOut(self.shutdown_timeout))? + .recv_timeout(self.forceflush_timeout) + .map_err(|_| TraceError::ExportTimedOut(self.forceflush_timeout))? } /// Shuts down the processor. @@ -333,51 +341,21 @@ where E: SpanExporter + Send + 'static, { exporter: E, - max_queue_size: usize, - scheduled_delay: Duration, - shutdown_timeout: Duration, + config: BatchConfig, } impl BatchSpanProcessorBuilder where E: SpanExporter + Send + 'static, { - /// Creates a new builder with default values. - pub fn new(exporter: E) -> Self { - Self { - exporter, - max_queue_size: 2048, - scheduled_delay: Duration::from_secs(5), - shutdown_timeout: Duration::from_secs(5), - } + /// Set the BatchConfig for [BatchSpanProcessorBuilder] + pub fn with_batch_config(self, config: BatchConfig) -> Self { + BatchSpanProcessorBuilder { config, ..self } } - /// Sets the maximum queue size for spans. - pub fn with_max_queue_size(mut self, size: usize) -> Self { - self.max_queue_size = size; - self - } - - /// Sets the delay between exports. - pub fn with_scheduled_delay(mut self, delay: Duration) -> Self { - self.scheduled_delay = delay; - self - } - - /// Sets the timeout for shutdown and flush operations. - pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self { - self.shutdown_timeout = timeout; - self - } - - /// Builds the `BatchSpanProcessorDedicatedThread` instance. + /// Build a new instance of `BatchSpanProcessor`. pub fn build(self) -> BatchSpanProcessor { - BatchSpanProcessor::new( - self.exporter, - self.max_queue_size, - self.scheduled_delay, - self.shutdown_timeout, - ) + BatchSpanProcessor::new(self.exporter, self.config) } } @@ -773,12 +751,13 @@ mod tests { fn batchspanprocessor_handles_on_end() { let exporter = MockSpanExporter::new(); let exporter_shared = exporter.exported_spans.clone(); - let processor = BatchSpanProcessor::new( - exporter, - 10, // max queue size - Duration::from_secs(5), - Duration::from_secs(2), - ); + let config = BatchConfigBuilder::default() + .with_max_queue_size(10) + .with_max_export_batch_size(10) + .with_scheduled_delay(Duration::from_secs(5)) + .with_max_export_timeout(Duration::from_secs(2)) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); let test_span = create_test_span("test_span"); processor.on_end(test_span.clone()); @@ -795,12 +774,13 @@ mod tests { fn batchspanprocessor_force_flush() { let exporter = MockSpanExporter::new(); let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans - let processor = BatchSpanProcessor::new( - exporter, - 10, // max queue size - Duration::from_secs(5), - Duration::from_secs(2), - ); + let config = BatchConfigBuilder::default() + .with_max_queue_size(10) + .with_max_export_batch_size(10) + .with_scheduled_delay(Duration::from_secs(5)) + .with_max_export_timeout(Duration::from_secs(2)) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); // Create a test span and send it to the processor let test_span = create_test_span("force_flush_span"); @@ -824,12 +804,13 @@ mod tests { fn batchspanprocessor_shutdown() { let exporter = MockSpanExporter::new(); let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans - let processor = BatchSpanProcessor::new( - exporter, - 10, // max queue size - Duration::from_secs(5), - Duration::from_secs(2), - ); + let config = BatchConfigBuilder::default() + .with_max_queue_size(10) + .with_max_export_batch_size(10) + .with_scheduled_delay(Duration::from_secs(5)) + .with_max_export_timeout(Duration::from_secs(2)) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); // Create a test span and send it to the processor let test_span = create_test_span("shutdown_span"); From 83f9d2eded2ccd0c9fb0ca1c8939b0bc3ab5b935 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 22 Dec 2024 06:02:39 -0800 Subject: [PATCH 06/15] fix test --- opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs | 2 +- opentelemetry-sdk/src/trace/mod.rs | 1 + opentelemetry-sdk/src/trace/provider.rs | 8 +++++--- opentelemetry-sdk/src/trace/span_processor.rs | 3 +++ .../src/trace/span_processor_with_async_runtime.rs | 3 ++- 5 files changed, 12 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs index 0ae261916a..3645d9f6c2 100644 --- a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs @@ -22,7 +22,7 @@ use std::sync::{Arc, Mutex}; ///# async fn main() { /// let exporter = InMemorySpanExporterBuilder::new().build(); /// let provider = TracerProvider::builder() -/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone(), runtime::Tokio).build()) +/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone()).build()) /// .build(); /// /// global::set_tracer_provider(provider.clone()); diff --git a/opentelemetry-sdk/src/trace/mod.rs b/opentelemetry-sdk/src/trace/mod.rs index b6017ebfdd..4acf809022 100644 --- a/opentelemetry-sdk/src/trace/mod.rs +++ b/opentelemetry-sdk/src/trace/mod.rs @@ -16,6 +16,7 @@ mod span; mod span_limit; mod span_processor; #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] +/// Experimental feature to use async runtime with batch span processor. pub mod span_processor_with_async_runtime; mod tracer; diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index e2f0bde216..253d76d148 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -64,9 +64,11 @@ /// ``` #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] use crate::runtime::RuntimeChannel; -use crate::trace::{ - BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer, -}; +#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] +use crate::trace::span_processor_with_async_runtime::BatchSpanProcessor; +#[cfg(not(feature = "experimental_trace_batch_span_processor_with_async_runtime"))] +use crate::trace::BatchSpanProcessor; +use crate::trace::{Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer}; use crate::Resource; use crate::{export::trace::SpanExporter, trace::SpanProcessor}; use opentelemetry::trace::TraceError; diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 4e04db6dc6..7bdc659939 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -371,15 +371,18 @@ pub struct BatchConfig { /// of batches. The default value is 5 seconds. pub(crate) scheduled_delay: Duration, + #[allow(dead_code)] /// The maximum number of spans to process in a single batch. If there are /// more than one batch worth of spans then it processes multiple batches /// of spans one batch after the other without any delay. The default value /// is 512. pub(crate) max_export_batch_size: usize, + #[allow(dead_code)] /// The maximum duration to export a batch of data. pub(crate) max_export_timeout: Duration, + #[allow(dead_code)] /// Maximum number of concurrent exports /// /// Limits the number of spawned tasks for exports and thus memory consumed diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index 32d6d134e0..02576e3f7d 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -56,6 +56,7 @@ use std::sync::Arc; /// use opentelemetry_sdk::{runtime, testing::trace::NoopSpanExporter, trace}; /// use opentelemetry_sdk::trace::BatchConfigBuilder; /// use std::time::Duration; +/// use opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor; /// /// #[tokio::main] /// async fn main() { @@ -63,7 +64,7 @@ use std::sync::Arc; /// let exporter = NoopSpanExporter::new(); /// /// // Create a batch span processor using an exporter and a runtime -/// let batch = trace::BatchSpanProcessor::builder(exporter, runtime::Tokio) +/// let batch = BatchSpanProcessor::builder(exporter, runtime::Tokio) /// .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build()) /// .build(); /// From d41ff4d8efd2d8e7832d829bf8667f113103c14c Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Sun, 22 Dec 2024 07:15:08 -0800 Subject: [PATCH 07/15] cont.. --- examples/tracing-grpc/src/client.rs | 2 +- examples/tracing-grpc/src/server.rs | 2 +- examples/tracing-jaeger/src/main.rs | 2 +- .../examples/basic-otlp/src/main.rs | 2 +- .../tests/integration_test/tests/traces.rs | 2 +- opentelemetry-otlp/tests/smoke.rs | 1 - opentelemetry-sdk/src/trace/provider.rs | 22 +++---------------- opentelemetry-sdk/src/trace/runtime_tests.rs | 7 ++++-- opentelemetry-sdk/src/trace/span_processor.rs | 3 +-- opentelemetry-zipkin/src/exporter/mod.rs | 6 ++--- 10 files changed, 16 insertions(+), 33 deletions(-) diff --git a/examples/tracing-grpc/src/client.rs b/examples/tracing-grpc/src/client.rs index 29a9353621..18c42cec6f 100644 --- a/examples/tracing-grpc/src/client.rs +++ b/examples/tracing-grpc/src/client.rs @@ -13,7 +13,7 @@ fn init_tracer() -> sdktrace::TracerProvider { global::set_text_map_propagator(TraceContextPropagator::new()); // Install stdout exporter pipeline to be able to retrieve the collected spans. let provider = sdktrace::TracerProvider::builder() - .with_batch_exporter(SpanExporter::default(), Tokio) + .with_batch_exporter(SpanExporter::default()) .build(); global::set_tracer_provider(provider.clone()); diff --git a/examples/tracing-grpc/src/server.rs b/examples/tracing-grpc/src/server.rs index 24a9e09481..3dbb012321 100644 --- a/examples/tracing-grpc/src/server.rs +++ b/examples/tracing-grpc/src/server.rs @@ -15,7 +15,7 @@ fn init_tracer() -> TracerProvider { global::set_text_map_propagator(TraceContextPropagator::new()); // Install stdout exporter pipeline to be able to retrieve the collected spans. let provider = TracerProvider::builder() - .with_batch_exporter(SpanExporter::default(), Tokio) + .with_batch_exporter(SpanExporter::default()) .build(); global::set_tracer_provider(provider.clone()); diff --git a/examples/tracing-jaeger/src/main.rs b/examples/tracing-jaeger/src/main.rs index e015f9ab9f..7e06d8e921 100644 --- a/examples/tracing-jaeger/src/main.rs +++ b/examples/tracing-jaeger/src/main.rs @@ -14,7 +14,7 @@ fn init_tracer_provider() -> Result Result { .build()?; Ok(sdktrace::TracerProvider::builder() .with_resource(RESOURCE.clone()) - .with_batch_exporter(exporter, runtime::Tokio) + .with_batch_exporter(exporter) .build()) } diff --git a/opentelemetry-otlp/tests/integration_test/tests/traces.rs b/opentelemetry-otlp/tests/integration_test/tests/traces.rs index 1601e04132..5ce10bf944 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/traces.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/traces.rs @@ -35,7 +35,7 @@ fn init_tracer_provider() -> Result { let exporter = exporter_builder.build()?; Ok(opentelemetry_sdk::trace::TracerProvider::builder() - .with_batch_exporter(exporter, runtime::Tokio) + .with_batch_exporter(exporter) .with_resource( Resource::builder_empty() .with_service_name("basic-otlp-tracing-example") diff --git a/opentelemetry-otlp/tests/smoke.rs b/opentelemetry-otlp/tests/smoke.rs index ba09407e1e..e9cd0da165 100644 --- a/opentelemetry-otlp/tests/smoke.rs +++ b/opentelemetry-otlp/tests/smoke.rs @@ -100,7 +100,6 @@ async fn smoke_tracer() { .with_metadata(metadata) .build() .expect("NON gzip-tonic SpanExporter failed to build"), - opentelemetry_sdk::runtime::Tokio, ) .build(); diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 253d76d148..141850a759 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -62,13 +62,9 @@ /// provider.shutdown(); /// } /// ``` -#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] -use crate::runtime::RuntimeChannel; -#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] -use crate::trace::span_processor_with_async_runtime::BatchSpanProcessor; -#[cfg(not(feature = "experimental_trace_batch_span_processor_with_async_runtime"))] -use crate::trace::BatchSpanProcessor; -use crate::trace::{Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer}; +use crate::trace::{ + BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer, +}; use crate::Resource; use crate::{export::trace::SpanExporter, trace::SpanProcessor}; use opentelemetry::trace::TraceError; @@ -298,18 +294,6 @@ impl Builder { Builder { processors, ..self } } - #[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] - /// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use. - pub fn with_batch_exporter( - self, - exporter: T, - runtime: R, - ) -> Self { - let batch = BatchSpanProcessor::builder(exporter, runtime).build(); - self.with_span_processor(batch) - } - - #[cfg(not(feature = "experimental_trace_batch_span_processor_with_async_runtime"))] /// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use. pub fn with_batch_exporter(self, exporter: T) -> Self { let batch = BatchSpanProcessor::builder(exporter).build(); diff --git a/opentelemetry-sdk/src/trace/runtime_tests.rs b/opentelemetry-sdk/src/trace/runtime_tests.rs index 87143c75bc..ed1a8325d1 100644 --- a/opentelemetry-sdk/src/trace/runtime_tests.rs +++ b/opentelemetry-sdk/src/trace/runtime_tests.rs @@ -46,15 +46,18 @@ impl SpanCountExporter { } } -#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")] #[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] fn build_batch_tracer_provider( exporter: SpanCountExporter, runtime: R, ) -> crate::trace::TracerProvider { use crate::trace::TracerProvider; + let processor = crate::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder( + exporter, runtime, + ) + .build(); TracerProvider::builder() - .with_batch_exporter(exporter, runtime) + .with_span_processor(processor) .build() } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 7bdc659939..8d0d32e8e1 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -720,8 +720,7 @@ mod tests { use futures_util::future::BoxFuture; use futures_util::FutureExt; - use std::sync::Arc; - use std::sync::Mutex; + use std::sync::{Arc, Mutex}; // Mock exporter to test functionality #[derive(Debug)] diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index bb18c1473b..53890c02f0 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -11,7 +11,6 @@ use opentelemetry_http::HttpClient; use opentelemetry_sdk::{ export::{trace, ExportError}, resource::{ResourceDetector, SdkProvidedResourceDetector}, - runtime::RuntimeChannel, trace::{Config, Tracer, TracerProvider}, Resource, }; @@ -165,13 +164,12 @@ impl ZipkinPipelineBuilder { /// Install the Zipkin trace exporter pipeline with a batch span processor using the specified /// runtime. #[allow(deprecated)] - pub fn install_batch( + pub fn install_batch( mut self, - runtime: R, ) -> Result<(Tracer, opentelemetry_sdk::trace::TracerProvider), TraceError> { let (config, endpoint) = self.init_config_and_endpoint(); let exporter = self.init_exporter_with_endpoint(endpoint)?; - let mut provider_builder = TracerProvider::builder().with_batch_exporter(exporter, runtime); + let mut provider_builder = TracerProvider::builder().with_batch_exporter(exporter); provider_builder = provider_builder.with_config(config); let provider = provider_builder.build(); let scope = InstrumentationScope::builder("opentelemetry-zipkin") From 3feaddcefe3d2f798cca477ddc69911f1134fd34 Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 22 Dec 2024 17:10:15 +0000 Subject: [PATCH 08/15] add integration tst --- .../tests/integration_test/tests/traces.rs | 45 +++++++ opentelemetry-sdk/src/trace/span_processor.rs | 119 +++++++++++++++++- 2 files changed, 159 insertions(+), 5 deletions(-) diff --git a/opentelemetry-otlp/tests/integration_test/tests/traces.rs b/opentelemetry-otlp/tests/integration_test/tests/traces.rs index 5ce10bf944..69798033bb 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/traces.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/traces.rs @@ -141,6 +141,51 @@ pub fn test_serde() -> Result<()> { Ok(()) } +#[test] +#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] +pub fn span_batch_non_tokio_main() -> Result<()> { + // Initialize the tracer provider inside a tokio runtime + // as this allows tonic client to capture the runtime, + // but actual export occurs from the dedicated std::thread + // created by BatchSpanProcessor. + + use anyhow::Ok; + let rt = tokio::runtime::Runtime::new()?; + let tracer_provider = rt.block_on(async { + // While we're here setup our collector container too, as this needs tokio to run + let _ = test_utils::start_collector_container().await; + init_tracer_provider() + })?; + + let tracer = global::tracer("ex.com/basic"); + + tracer.in_span("operation", |cx| { + let span = cx.span(); + span.add_event( + "Nice operation!".to_string(), + vec![KeyValue::new("bogons", 100)], + ); + span.set_attribute(KeyValue::new(ANOTHER_KEY, "yes")); + + tracer.in_span("Sub operation...", |cx| { + let span = cx.span(); + span.set_attribute(KeyValue::new(LEMONS_KEY, "five")); + + span.add_event("Sub span event", vec![]); + }); + }); + + tracer_provider.shutdown()?; + + // Give it a second to flush + std::thread::sleep(Duration::from_secs(2)); + + // Validate results + // TODO: Fix this test + // assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?; + Ok(()) +} + /// /// Make sure we stop the collector container, otherwise it will sit around hogging our /// ports and subsequent test runs will fail. diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 8d0d32e8e1..1a1ce0791d 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -37,6 +37,7 @@ use crate::export::trace::{SpanData, SpanExporter}; use crate::resource::Resource; use crate::trace::Span; +use opentelemetry::otel_error; use opentelemetry::{otel_debug, otel_warn}; use opentelemetry::{ trace::{TraceError, TraceResult}, @@ -162,6 +163,60 @@ impl SpanProcessor for SimpleSpanProcessor { } } +/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them +/// in batches to the configured `SpanExporter`. This processor is ideal for +/// high-throughput environments, as it minimizes the overhead of exporting spans +/// individually. It uses a **dedicated background thread** to manage and export spans +/// asynchronously, ensuring that the application's main execution flow is not blocked. +/// +/// /// # Example +/// +/// This example demonstrates how to configure and use the `BatchSpanProcessor` +/// with a custom configuration. Note that a dedicated thread is used internally +/// to manage the export process. +/// +/// ```rust +/// use opentelemetry::global; +/// use opentelemetry_sdk::{ +/// trace::{BatchSpanProcessor, BatchConfigBuilder, TracerProvider}, +/// runtime, +/// testing::trace::NoopSpanExporter, +/// }; +/// use opentelemetry::trace::Tracer as _; +/// use opentelemetry::trace::Span; +/// use std::time::Duration; +/// +/// fn main() { +/// // Step 1: Create an exporter (e.g., a No-Op Exporter for demonstration). +/// let exporter = NoopSpanExporter::new(); +/// +/// // Step 2: Configure the BatchSpanProcessor. +/// let batch_processor = BatchSpanProcessor::builder(exporter) +/// .with_batch_config( +/// BatchConfigBuilder::default() +/// .with_max_queue_size(1024) // Buffer up to 1024 spans. +/// .with_max_export_batch_size(256) // Export in batches of up to 256 spans. +/// .with_scheduled_delay(Duration::from_secs(5)) // Export every 5 seconds. +/// .with_max_export_timeout(Duration::from_secs(10)) // Timeout after 10 seconds. +/// .build(), +/// ) +/// .build(); +/// +/// // Step 3: Set up a TracerProvider with the configured processor. +/// let provider = TracerProvider::builder() +/// .with_span_processor(batch_processor) +/// .build(); +/// global::set_tracer_provider(provider.clone()); +/// +/// // Step 4: Create spans and record operations. +/// let tracer = global::tracer("example-tracer"); +/// let mut span = tracer.start("example-span"); +/// span.end(); // Mark the span as completed. +/// +/// // Step 5: Ensure all spans are flushed before exiting. +/// provider.shutdown(); +/// } +/// ``` use futures_executor::block_on; use std::sync::mpsc::sync_channel; use std::sync::mpsc::RecvTimeoutError; @@ -220,7 +275,10 @@ impl BatchSpanProcessor { { if let Err(err) = block_on(exporter.export(spans.split_off(0))) { - eprintln!("Export error: {:?}", err); + otel_error!( + name: "BatchSpanProcessor.ExportError", + error = format!("{}", err) + ); } last_export_time = Instant::now(); } @@ -238,19 +296,25 @@ impl BatchSpanProcessor { Err(RecvTimeoutError::Timeout) => { if last_export_time.elapsed() >= config.scheduled_delay { if let Err(err) = block_on(exporter.export(spans.split_off(0))) { - eprintln!("Export error: {:?}", err); + otel_error!( + name: "BatchSpanProcessor.ExportError", + error = format!("{}", err) + ); } last_export_time = Instant::now(); } } Err(RecvTimeoutError::Disconnected) => { - eprintln!("Channel disconnected, shutting down processor thread."); + otel_error!( + name: "BatchLogProcessor.InternalError.ChannelDisconnected", + message = "Channel disconnected, shutting down processor thread." + ); break; } } } }) - .expect("Failed to spawn thread"); + .expect("Failed to spawn thread"); //TODO: Handle thread spawn failure Self { message_sender, @@ -720,7 +784,7 @@ mod tests { use futures_util::future::BoxFuture; use futures_util::FutureExt; - use std::sync::{Arc, Mutex}; + use std::sync::{atomic::Ordering, Arc, Mutex}; // Mock exporter to test functionality #[derive(Debug)] @@ -838,4 +902,49 @@ mod tests { "Shutdown should fail when called a second time" ); } + + #[test] + fn batchspanprocessor_handles_dropped_spans() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans + let config = BatchConfigBuilder::default() + .with_max_queue_size(2) // Small queue size to test span dropping + .with_scheduled_delay(Duration::from_secs(5)) + .with_max_export_timeout(Duration::from_secs(2)) + .build(); + let processor = BatchSpanProcessor::new(exporter, config); + + // Create test spans and send them to the processor + let span1 = create_test_span("span1"); + let span2 = create_test_span("span2"); + let span3 = create_test_span("span3"); // This span should be dropped + + processor.on_end(span1.clone()); + processor.on_end(span2.clone()); + processor.on_end(span3.clone()); // This span exceeds the queue size + + // Wait for the scheduled delay to expire + std::thread::sleep(Duration::from_secs(3)); + + let exported_spans = exporter_shared.lock().unwrap(); + + // Verify that only the first two spans are exported + assert_eq!( + exported_spans.len(), + 2, + "Unexpected number of exported spans" + ); + assert!(exported_spans.iter().any(|s| s.name == "span1")); + assert!(exported_spans.iter().any(|s| s.name == "span2")); + + // Ensure the third span is dropped + assert!( + !exported_spans.iter().any(|s| s.name == "span3"), + "Span3 should have been dropped" + ); + + // Verify dropped spans count (if accessible in your implementation) + let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed); + assert_eq!(dropped_count, 1, "Unexpected number of dropped spans"); + } } From 0d97896efc10ba8844f4d527c561b1cf68bc46f5 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 23 Dec 2024 05:21:05 +0000 Subject: [PATCH 09/15] fix integration test, and enable batch in otlp-http example --- .../examples/basic-otlp-http/src/main.rs | 5 +- .../tests/integration_test/tests/traces.rs | 3 +- opentelemetry-sdk/src/trace/span_processor.rs | 85 +++++++++++++++++++ 3 files changed, 87 insertions(+), 6 deletions(-) diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index 3e564ee5c5..bf33828091 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -49,9 +49,7 @@ fn init_traces() -> Result { .build()?; Ok(TracerProvider::builder() - // TODO: Enable BatchExporter after - // https://github.com/open-telemetry/opentelemetry-rust/pull/2456 - .with_simple_exporter(exporter) + .with_batch_exporter(exporter) .with_resource(RESOURCE.clone()) .build()) } @@ -73,7 +71,6 @@ fn init_metrics() -> Result Result<(), Box> { let logger_provider = init_logs()?; diff --git a/opentelemetry-otlp/tests/integration_test/tests/traces.rs b/opentelemetry-otlp/tests/integration_test/tests/traces.rs index 69798033bb..29bc93d77f 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/traces.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/traces.rs @@ -181,8 +181,7 @@ pub fn span_batch_non_tokio_main() -> Result<()> { std::thread::sleep(Duration::from_secs(2)); // Validate results - // TODO: Fix this test - // assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?; + assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?; Ok(()) } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 1a1ce0791d..a066b4cf13 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -229,6 +229,7 @@ enum BatchMessage { ExportSpan(SpanData), ForceFlush(SyncSender>), Shutdown(SyncSender>), + SetResource(Arc), } /// A batch span processor with a dedicated background thread. @@ -292,6 +293,9 @@ impl BatchSpanProcessor { let _ = sender.send(result); break; } + BatchMessage::SetResource(resource) => { + exporter.set_resource(&resource); + } }, Err(RecvTimeoutError::Timeout) => { if last_export_time.elapsed() >= config.scheduled_delay { @@ -396,6 +400,14 @@ impl SpanProcessor for BatchSpanProcessor { } result } + + /// Set the resource for the processor. + fn set_resource(&mut self, resource: &Resource) { + let resource = Arc::new(resource.clone()); + let _ = self + .message_sender + .try_send(BatchMessage::SetResource(resource)); + } } /// Builder for `BatchSpanProcessorDedicatedThread`. @@ -782,20 +794,24 @@ mod tests { } } + use crate::Resource; use futures_util::future::BoxFuture; use futures_util::FutureExt; + use opentelemetry::{Key, KeyValue, Value}; use std::sync::{atomic::Ordering, Arc, Mutex}; // Mock exporter to test functionality #[derive(Debug)] struct MockSpanExporter { exported_spans: Arc>>, + exported_resource: Arc>>, } impl MockSpanExporter { fn new() -> Self { Self { exported_spans: Arc::new(Mutex::new(Vec::new())), + exported_resource: Arc::new(Mutex::new(None)), } } } @@ -811,6 +827,10 @@ mod tests { } fn shutdown(&mut self) {} + fn set_resource(&mut self, resource: &Resource) { + let mut exported_resource = self.exported_resource.lock().unwrap(); + *exported_resource = Some(resource.clone()); + } } #[test] @@ -947,4 +967,69 @@ mod tests { let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed); assert_eq!(dropped_count, 1, "Unexpected number of dropped spans"); } + + #[test] + fn validate_span_attributes_exported_correctly() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let config = BatchConfigBuilder::default().build(); + let processor = BatchSpanProcessor::new(exporter, config); + + // Create a span with attributes + let mut span_data = create_test_span("attribute_validation"); + span_data.attributes = vec![ + KeyValue::new("key1", "value1"), + KeyValue::new("key2", "value2"), + ]; + processor.on_end(span_data.clone()); + + // Force flush to export the span + let _ = processor.force_flush(); + + // Validate the exported attributes + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 1); + let exported_span = &exported_spans[0]; + assert!(exported_span + .attributes + .contains(&KeyValue::new("key1", "value1"))); + assert!(exported_span + .attributes + .contains(&KeyValue::new("key2", "value2"))); + } + + #[test] + fn batchspanprocessor_sets_and_exports_with_resource() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let resource_shared = exporter.exported_resource.clone(); + let config = BatchConfigBuilder::default().build(); + let mut processor = BatchSpanProcessor::new(exporter, config); + + // Set a resource for the processor + let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]); + processor.set_resource(&resource); + + // Create a span and send it to the processor + let test_span = create_test_span("resource_test"); + processor.on_end(test_span.clone()); + + // Force flush to ensure the span is exported + let _ = processor.force_flush(); + + // Validate spans are exported + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 1); + + // Validate the resource is correctly set in the exporter + let exported_resource = resource_shared.lock().unwrap(); + assert!(exported_resource.is_some()); + assert_eq!( + exported_resource + .as_ref() + .unwrap() + .get(Key::new("service.name")), + Some(Value::from("test_service")) + ); + } } From 0a8288727f6ebccc1cd2c30c7ed9ba44ed9abde0 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 23 Dec 2024 05:24:17 +0000 Subject: [PATCH 10/15] nit typo --- opentelemetry-sdk/src/trace/span_processor.rs | 4 ++-- .../src/trace/span_processor_with_async_runtime.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index a066b4cf13..d4e1b05837 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -93,7 +93,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// /// Implementation should make sure shutdown can be called multiple times. fn shutdown(&self) -> TraceResult<()>; - /// Set the resource for the log processor. + /// Set the resource for the span processor. fn set_resource(&mut self, _resource: &Resource) {} } @@ -310,7 +310,7 @@ impl BatchSpanProcessor { } Err(RecvTimeoutError::Disconnected) => { otel_error!( - name: "BatchLogProcessor.InternalError.ChannelDisconnected", + name: "BatchSpanProcessor.InternalError.ChannelDisconnected", message = "Channel disconnected, shutting down processor thread." ); break; diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index 02576e3f7d..c3c241c776 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -141,7 +141,7 @@ impl SpanProcessor for BatchSpanProcessor { name: "BatchSpanProcessor.Shutdown", dropped_spans = dropped_spans, max_queue_size = max_queue_size, - message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals." + message = "Spans were dropped due to a full or closed queue. The count represents the total count of span records dropped in the lifetime of the BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals." ); } From 394ca5e091c2040004b47de2ac99eb14d681c52a Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 23 Dec 2024 07:05:51 +0000 Subject: [PATCH 11/15] add tokiocontext test --- opentelemetry-sdk/src/trace/span_processor.rs | 121 ++++++++++++++++-- 1 file changed, 113 insertions(+), 8 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index d4e1b05837..48ddd7a462 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -248,7 +248,7 @@ impl BatchSpanProcessor { pub fn new( mut exporter: E, config: BatchConfig, - //ax_queue_size: usize, + //max_queue_size: usize, //scheduled_delay: Duration, //shutdown_timeout: Duration, ) -> Self @@ -258,16 +258,21 @@ impl BatchSpanProcessor { let (message_sender, message_receiver) = sync_channel(config.max_queue_size); let handle = thread::Builder::new() - .name("BatchSpanProcessorDedicatedThread".to_string()) + .name("BatchSpanProcessorThread".to_string()) .spawn(move || { let mut spans = Vec::new(); + spans.reserve(config.max_export_batch_size); let mut last_export_time = Instant::now(); loop { - let timeout = config + let remaining_time_option = config .scheduled_delay - .saturating_sub(last_export_time.elapsed()); - match message_receiver.recv_timeout(timeout) { + .checked_sub(last_export_time.elapsed()); + let remaining_time = match remaining_time_option { + Some(remaining_time) => remaining_time, + None => config.scheduled_delay, + }; + match message_receiver.recv_timeout(remaining_time) { Ok(message) => match message { BatchMessage::ExportSpan(span) => { spans.push(span); @@ -351,14 +356,17 @@ impl SpanProcessor for BatchSpanProcessor { /// Handles span end. fn on_end(&self, span: SpanData) { if self.is_shutdown.load(Ordering::Relaxed) { - eprintln!("Processor is shutdown. Dropping span."); + // this is a warning, as the user is trying to emit after the processor has been shutdown + otel_warn!( + name: "BatchSpanProcessor.Emit.ProcessorShutdown", + ); return; } let result = self.message_sender.try_send(BatchMessage::ExportSpan(span)); // TODO - Implement throttling to prevent error flooding when the queue is full or closed. if result.is_err() { - // Increment dropped logs count. The first time we have to drop a log, + // Increment dropped span count. The first time we have to drop a span, // emit a warning. if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 { otel_warn!(name: "BatchSpanProcessorDedicatedThread.SpanDroppingStarted", @@ -384,6 +392,14 @@ impl SpanProcessor for BatchSpanProcessor { /// Shuts down the processor. fn shutdown(&self) -> TraceResult<()> { + let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed); + if dropped_spans > 0 { + otel_warn!( + name: "BatchSpanProcessor.LogsDropped", + dropped_span_count = dropped_spans, + message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals." + ); + } if self.is_shutdown.swap(true, Ordering::Relaxed) { return Err(TraceError::Other("Processor already shutdown".into())); } @@ -396,7 +412,12 @@ impl SpanProcessor for BatchSpanProcessor { .recv_timeout(self.shutdown_timeout) .map_err(|_| TraceError::ExportTimedOut(self.shutdown_timeout))?; if let Some(handle) = self.handle.lock().unwrap().take() { - handle.join().expect("Failed to join thread"); + if let Err(err) = handle.join() { + return Err(TraceError::Other(format!( + "Background thread failed to join during shutdown. This may indicate a panic or unexpected termination: {:?}", + err + ).into())); + } } result } @@ -1032,4 +1053,88 @@ mod tests { Some(Value::from("test_service")) ); } + + #[tokio::test(flavor = "current_thread")] + async fn test_batch_processor_current_thread_runtime() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + + let config = BatchConfigBuilder::default() + .with_max_queue_size(5) + .with_max_export_batch_size(3) + .with_scheduled_delay(Duration::from_millis(50)) + .build(); + + let processor = BatchSpanProcessor::new(exporter, config); + + for _ in 0..4 { + let span = new_test_export_span_data(); + processor.on_end(span); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 4); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn test_batch_processor_multi_thread_count_1_runtime() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + + let config = BatchConfigBuilder::default() + .with_max_queue_size(5) + .with_max_export_batch_size(3) + .with_scheduled_delay(Duration::from_millis(50)) + .build(); + + let processor = BatchSpanProcessor::new(exporter, config); + + for _ in 0..4 { + let span = new_test_export_span_data(); + processor.on_end(span); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 4); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_batch_processor_multi_thread() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + + let config = BatchConfigBuilder::default() + .with_max_queue_size(20) + .with_max_export_batch_size(5) + .with_scheduled_delay(Duration::from_millis(50)) + .build(); + + // Create the processor with the thread-safe exporter + let processor = Arc::new(BatchSpanProcessor::new(exporter, config)); + + let mut handles = vec![]; + for _ in 0..10 { + let processor_clone = Arc::clone(&processor); + let handle = tokio::spawn(async move { + let span = new_test_export_span_data(); + processor_clone.on_end(span); + }); + handles.push(handle); + } + + for handle in handles { + handle.await.unwrap(); + } + + // Allow time for batching and export + tokio::time::sleep(Duration::from_millis(200)).await; + + // Verify exported spans + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 10); + } } From 7163918eff8f209e13e8e20e39e146cc102bb82c Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 23 Dec 2024 07:10:52 +0000 Subject: [PATCH 12/15] fix lint --- opentelemetry-sdk/src/trace/span_processor.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 48ddd7a462..e085a5e36e 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -260,8 +260,7 @@ impl BatchSpanProcessor { let handle = thread::Builder::new() .name("BatchSpanProcessorThread".to_string()) .spawn(move || { - let mut spans = Vec::new(); - spans.reserve(config.max_export_batch_size); + let mut spans = Vec::with_capacity(config.max_export_batch_size); let mut last_export_time = Instant::now(); loop { From 252623f7ce4d8646426f94b123b43238a359026a Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 23 Dec 2024 08:04:39 +0000 Subject: [PATCH 13/15] increate timeout for current_thread test --- opentelemetry-sdk/src/trace/span_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index e085a5e36e..4f20b1f9d3 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -1071,7 +1071,7 @@ mod tests { processor.on_end(span); } - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(200)).await; let exported_spans = exporter_shared.lock().unwrap(); assert_eq!(exported_spans.len(), 4); From 2ab143c758d7cd717a0ca4c290fae74b3e595f7f Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 23 Dec 2024 02:14:55 -0800 Subject: [PATCH 14/15] fix ci - comment-out reqwest and tokio integration tests, and increase wait time for span export --- opentelemetry-sdk/src/trace/span_processor.rs | 2 +- scripts/integration_tests.sh | 15 +++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 4f20b1f9d3..3972ac65ee 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -1095,7 +1095,7 @@ mod tests { processor.on_end(span); } - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(200)).await; let exported_spans = exporter_shared.lock().unwrap(); assert_eq!(exported_spans.len(), 4); diff --git a/scripts/integration_tests.sh b/scripts/integration_tests.sh index b984cc023f..9835ffe5b8 100755 --- a/scripts/integration_tests.sh +++ b/scripts/integration_tests.sh @@ -19,10 +19,16 @@ if [ -d "$TEST_DIR" ]; then echo Integration Tests: Reqwest Client echo #### echo - cargo test --no-default-features --features "reqwest-client","internal-logs" + # TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported. + #cargo test --no-default-features --features "reqwest-client","internal-logs" - # TODO - Uncomment the following lines once the reqwest-blocking-client feature is working. - # cargo test --no-default-features --features "reqwest-blocking-client" + # Run tests with the reqwest-client feature + echo + echo #### + echo Integration Tests: Reqwest Blocking Client + echo #### + echo + cargo test --no-default-features --features "reqwest-blocking-client" # Run tests with the hyper-client feature echo @@ -30,7 +36,8 @@ if [ -d "$TEST_DIR" ]; then echo Integration Tests: Hyper Client echo #### echo - cargo test --no-default-features --features "hyper-client","internal-logs" + # TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported. + #cargo test --no-default-features --features "hyper-client","internal-logs" else echo "Directory $TEST_DIR does not exist. Skipping tests." exit 1 From a16e50c5d165f24fc5a8002dc102c1d2514860a7 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Mon, 23 Dec 2024 02:46:30 -0800 Subject: [PATCH 15/15] add changelog --- opentelemetry-sdk/CHANGELOG.md | 52 ++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index df8a7aa2f0..140f41ed1f 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -159,6 +159,58 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope - Continue enabling one of the async runtime feature flags: `rt-tokio`, `rt-tokio-current-thread`, or `rt-async-std`. +- **Breaking** [#2456](https://github.com/open-telemetry/opentelemetry-rust/pull/2456) + + `BatchSpanProcessor` no longer requires an async runtime by default. Instead, a dedicated + background thread is created to do the batch processing and exporting. + + For users who prefer the previous behavior of relying on a specific + `Runtime`, they can do so by enabling the feature flag + **`experimental_trace_batch_span_processor_with_async_runtime`**. + + 1. *Default Implementation, requires no async runtime* (**Recommended**) The + new default implementation does not require a runtime argument. Replace the + builder method accordingly: + - *Before:* + ```rust + let tracer_provider = TracerProvider::builder() + .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + - *After:* + ```rust + let tracer_provider = TracerProvider::builder() + .with_span_processor(BatchSpanProcessor::builder(exporter).build()) + .build(); + ``` + + 2. *Async Runtime Support* + If your application cannot spin up new threads or you prefer using async + runtimes, enable the + "experimental_trace_batch_span_processor_with_async_runtime" feature flag and + adjust code as below. + + - *Before:* + ```rust + let tracer_provider = TracerProvider::builder() + .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + - *After:* + ```rust + let tracer_provider = TracerProvider::builder() + .with_span_processor(span_processor_with_async_runtime::BatchSpanProcessor::builder(exporter, runtime::Tokio).build()) + .build(); + ``` + + *Requirements:* + - Enable the feature flag: + `experimental_trace_batch_span_processor_with_async_runtime`. + - Continue enabling one of the async runtime feature flags: `rt-tokio`, + `rt-tokio-current-thread`, or `rt-async-std`. + ## 0.27.1 Released 2024-Nov-27