Skip to content

Commit

Permalink
cont...
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed Dec 22, 2024
1 parent 993dcbc commit a062c9c
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 86 deletions.
23 changes: 10 additions & 13 deletions opentelemetry-sdk/benches/batch_span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::runtime::Tokio;
use opentelemetry_sdk::testing::trace::NoopSpanExporter;
use opentelemetry_sdk::trace::{
BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor,
Expand Down Expand Up @@ -49,14 +48,13 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
let span_processor =
BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio)
.with_batch_config(
BatchConfigBuilder::default()
.with_max_queue_size(10_000)
.build(),
)
.build();
let span_processor = BatchSpanProcessor::builder(NoopSpanExporter::new())
.with_batch_config(
BatchConfigBuilder::default()
.with_max_queue_size(10_000)
.build(),
)
.build();
let mut shared_span_processor = Arc::new(span_processor);
let mut handles = Vec::with_capacity(10);
for _ in 0..task_num {
Expand All @@ -70,10 +68,9 @@ fn criterion_benchmark(c: &mut Criterion) {
}));
}
futures_util::future::join_all(handles).await;
let _ =
Arc::<BatchSpanProcessor<Tokio>>::get_mut(&mut shared_span_processor)
.unwrap()
.shutdown();
let _ = Arc::<BatchSpanProcessor>::get_mut(&mut shared_span_processor)
.unwrap()
.shutdown();
});
})
},
Expand Down
10 changes: 3 additions & 7 deletions opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ mod sampler;
mod span;
mod span_limit;
mod span_processor;
mod span_processor_with_async_runtime;

#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
pub mod span_processor_with_async_runtime;
mod tracer;

pub use config::{config, Config};
Expand All @@ -33,16 +33,12 @@ pub use span_processor::{
SimpleSpanProcessor, SpanProcessor,
};

/// Re-export asynchronous runtime span processor components.
// TODO: make them available without re-export
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
pub mod span_processor_with_async_runtime;

pub use tracer::Tracer;

#[cfg(feature = "jaeger_remote_sampler")]
pub use sampler::{JaegerRemoteSampler, JaegerRemoteSamplerBuilder};

#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
#[cfg(test)]
mod runtime_tests;

Expand Down
9 changes: 9 additions & 0 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
/// provider.shutdown();
/// }
/// ```
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
use crate::runtime::RuntimeChannel;
use crate::trace::{
BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
Expand Down Expand Up @@ -295,12 +296,20 @@ impl Builder {
Builder { processors, ..self }
}

#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
/// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use.
pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel>(
self,
exporter: T,
runtime: R,
) -> Self {
let batch = BatchSpanProcessor::builder(exporter, runtime).build();
self.with_span_processor(batch)
}

#[cfg(not(feature = "experimental_trace_batch_span_processor_with_async_runtime"))]
/// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use.
pub fn with_batch_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
let batch = BatchSpanProcessor::builder(exporter).build();
self.with_span_processor(batch)
}
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-sdk/src/trace/runtime_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl SpanCountExporter {
}
}

#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))]
fn build_batch_tracer_provider<R: RuntimeChannel>(
exporter: SpanCountExporter,
Expand Down
113 changes: 47 additions & 66 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ enum BatchMessage {
pub struct BatchSpanProcessor {
message_sender: SyncSender<BatchMessage>,
handle: Mutex<Option<thread::JoinHandle<()>>>,
forceflush_timeout: Duration,
shutdown_timeout: Duration,
is_shutdown: AtomicBool,
dropped_span_count: Arc<AtomicUsize>,
Expand All @@ -190,14 +191,15 @@ impl BatchSpanProcessor {
/// Creates a new instance of `BatchSpanProcessor`.
pub fn new<E>(
mut exporter: E,
max_queue_size: usize,
scheduled_delay: Duration,
shutdown_timeout: Duration,
config: BatchConfig,
//ax_queue_size: usize,
//scheduled_delay: Duration,
//shutdown_timeout: Duration,
) -> Self
where
E: SpanExporter + Send + 'static,
{
let (message_sender, message_receiver) = sync_channel(max_queue_size);
let (message_sender, message_receiver) = sync_channel(config.max_queue_size);

let handle = thread::Builder::new()
.name("BatchSpanProcessorDedicatedThread".to_string())
Expand All @@ -206,13 +208,15 @@ impl BatchSpanProcessor {
let mut last_export_time = Instant::now();

loop {
let timeout = scheduled_delay.saturating_sub(last_export_time.elapsed());
let timeout = config
.scheduled_delay
.saturating_sub(last_export_time.elapsed());
match message_receiver.recv_timeout(timeout) {
Ok(message) => match message {
BatchMessage::ExportSpan(span) => {
spans.push(span);
if spans.len() >= max_queue_size
|| last_export_time.elapsed() >= scheduled_delay
if spans.len() >= config.max_queue_size
|| last_export_time.elapsed() >= config.scheduled_delay
{
if let Err(err) = block_on(exporter.export(spans.split_off(0)))
{
Expand All @@ -232,7 +236,7 @@ impl BatchSpanProcessor {
}
},
Err(RecvTimeoutError::Timeout) => {
if last_export_time.elapsed() >= scheduled_delay {
if last_export_time.elapsed() >= config.scheduled_delay {
if let Err(err) = block_on(exporter.export(spans.split_off(0))) {
eprintln!("Export error: {:?}", err);
}
Expand All @@ -251,7 +255,8 @@ impl BatchSpanProcessor {
Self {
message_sender,
handle: Mutex::new(Some(handle)),
shutdown_timeout,
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
is_shutdown: AtomicBool::new(false),
dropped_span_count: Arc::new(AtomicUsize::new(0)),
}
Expand All @@ -262,7 +267,10 @@ impl BatchSpanProcessor {
where
E: SpanExporter + Send + 'static,
{
BatchSpanProcessorBuilder::new(exporter)
BatchSpanProcessorBuilder {
exporter,
config: BatchConfig::default(),
}
}
}

Expand Down Expand Up @@ -302,8 +310,8 @@ impl SpanProcessor for BatchSpanProcessor {
.map_err(|_| TraceError::Other("Failed to send ForceFlush message".into()))?;

receiver
.recv_timeout(self.shutdown_timeout)
.map_err(|_| TraceError::ExportTimedOut(self.shutdown_timeout))?
.recv_timeout(self.forceflush_timeout)
.map_err(|_| TraceError::ExportTimedOut(self.forceflush_timeout))?
}

/// Shuts down the processor.
Expand Down Expand Up @@ -333,51 +341,21 @@ where
E: SpanExporter + Send + 'static,
{
exporter: E,
max_queue_size: usize,
scheduled_delay: Duration,
shutdown_timeout: Duration,
config: BatchConfig,
}

impl<E> BatchSpanProcessorBuilder<E>
where
E: SpanExporter + Send + 'static,
{
/// Creates a new builder with default values.
pub fn new(exporter: E) -> Self {
Self {
exporter,
max_queue_size: 2048,
scheduled_delay: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(5),
}
/// Set the BatchConfig for [BatchSpanProcessorBuilder]
pub fn with_batch_config(self, config: BatchConfig) -> Self {
BatchSpanProcessorBuilder { config, ..self }
}

/// Sets the maximum queue size for spans.
pub fn with_max_queue_size(mut self, size: usize) -> Self {
self.max_queue_size = size;
self
}

/// Sets the delay between exports.
pub fn with_scheduled_delay(mut self, delay: Duration) -> Self {
self.scheduled_delay = delay;
self
}

/// Sets the timeout for shutdown and flush operations.
pub fn with_shutdown_timeout(mut self, timeout: Duration) -> Self {
self.shutdown_timeout = timeout;
self
}

/// Builds the `BatchSpanProcessorDedicatedThread` instance.
/// Build a new instance of `BatchSpanProcessor`.
pub fn build(self) -> BatchSpanProcessor {
BatchSpanProcessor::new(
self.exporter,
self.max_queue_size,
self.scheduled_delay,
self.shutdown_timeout,
)
BatchSpanProcessor::new(self.exporter, self.config)
}
}

Expand Down Expand Up @@ -773,12 +751,13 @@ mod tests {
fn batchspanprocessor_handles_on_end() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let processor = BatchSpanProcessor::new(
exporter,
10, // max queue size
Duration::from_secs(5),
Duration::from_secs(2),
);
let config = BatchConfigBuilder::default()
.with_max_queue_size(10)
.with_max_export_batch_size(10)
.with_scheduled_delay(Duration::from_secs(5))
.with_max_export_timeout(Duration::from_secs(2))
.build();
let processor = BatchSpanProcessor::new(exporter, config);

let test_span = create_test_span("test_span");
processor.on_end(test_span.clone());
Expand All @@ -795,12 +774,13 @@ mod tests {
fn batchspanprocessor_force_flush() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
let processor = BatchSpanProcessor::new(
exporter,
10, // max queue size
Duration::from_secs(5),
Duration::from_secs(2),
);
let config = BatchConfigBuilder::default()
.with_max_queue_size(10)
.with_max_export_batch_size(10)
.with_scheduled_delay(Duration::from_secs(5))
.with_max_export_timeout(Duration::from_secs(2))
.build();
let processor = BatchSpanProcessor::new(exporter, config);

// Create a test span and send it to the processor
let test_span = create_test_span("force_flush_span");
Expand All @@ -824,12 +804,13 @@ mod tests {
fn batchspanprocessor_shutdown() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone(); // Shared access to verify exported spans
let processor = BatchSpanProcessor::new(
exporter,
10, // max queue size
Duration::from_secs(5),
Duration::from_secs(2),
);
let config = BatchConfigBuilder::default()
.with_max_queue_size(10)
.with_max_export_batch_size(10)
.with_scheduled_delay(Duration::from_secs(5))
.with_max_export_timeout(Duration::from_secs(2))
.build();
let processor = BatchSpanProcessor::new(exporter, config);

// Create a test span and send it to the processor
let test_span = create_test_span("shutdown_span");
Expand Down

0 comments on commit a062c9c

Please sign in to comment.