diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index 3e564ee5c5..bf33828091 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -49,9 +49,7 @@ fn init_traces() -> Result { .build()?; Ok(TracerProvider::builder() - // TODO: Enable BatchExporter after - // https://github.com/open-telemetry/opentelemetry-rust/pull/2456 - .with_simple_exporter(exporter) + .with_batch_exporter(exporter) .with_resource(RESOURCE.clone()) .build()) } @@ -73,7 +71,6 @@ fn init_metrics() -> Result Result<(), Box> { let logger_provider = init_logs()?; diff --git a/opentelemetry-otlp/tests/integration_test/tests/traces.rs b/opentelemetry-otlp/tests/integration_test/tests/traces.rs index 69798033bb..29bc93d77f 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/traces.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/traces.rs @@ -181,8 +181,7 @@ pub fn span_batch_non_tokio_main() -> Result<()> { std::thread::sleep(Duration::from_secs(2)); // Validate results - // TODO: Fix this test - // assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?; + assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?; Ok(()) } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 1a1ce0791d..a066b4cf13 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -229,6 +229,7 @@ enum BatchMessage { ExportSpan(SpanData), ForceFlush(SyncSender>), Shutdown(SyncSender>), + SetResource(Arc), } /// A batch span processor with a dedicated background thread. @@ -292,6 +293,9 @@ impl BatchSpanProcessor { let _ = sender.send(result); break; } + BatchMessage::SetResource(resource) => { + exporter.set_resource(&resource); + } }, Err(RecvTimeoutError::Timeout) => { if last_export_time.elapsed() >= config.scheduled_delay { @@ -396,6 +400,14 @@ impl SpanProcessor for BatchSpanProcessor { } result } + + /// Set the resource for the processor. + fn set_resource(&mut self, resource: &Resource) { + let resource = Arc::new(resource.clone()); + let _ = self + .message_sender + .try_send(BatchMessage::SetResource(resource)); + } } /// Builder for `BatchSpanProcessorDedicatedThread`. @@ -782,20 +794,24 @@ mod tests { } } + use crate::Resource; use futures_util::future::BoxFuture; use futures_util::FutureExt; + use opentelemetry::{Key, KeyValue, Value}; use std::sync::{atomic::Ordering, Arc, Mutex}; // Mock exporter to test functionality #[derive(Debug)] struct MockSpanExporter { exported_spans: Arc>>, + exported_resource: Arc>>, } impl MockSpanExporter { fn new() -> Self { Self { exported_spans: Arc::new(Mutex::new(Vec::new())), + exported_resource: Arc::new(Mutex::new(None)), } } } @@ -811,6 +827,10 @@ mod tests { } fn shutdown(&mut self) {} + fn set_resource(&mut self, resource: &Resource) { + let mut exported_resource = self.exported_resource.lock().unwrap(); + *exported_resource = Some(resource.clone()); + } } #[test] @@ -947,4 +967,69 @@ mod tests { let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed); assert_eq!(dropped_count, 1, "Unexpected number of dropped spans"); } + + #[test] + fn validate_span_attributes_exported_correctly() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let config = BatchConfigBuilder::default().build(); + let processor = BatchSpanProcessor::new(exporter, config); + + // Create a span with attributes + let mut span_data = create_test_span("attribute_validation"); + span_data.attributes = vec![ + KeyValue::new("key1", "value1"), + KeyValue::new("key2", "value2"), + ]; + processor.on_end(span_data.clone()); + + // Force flush to export the span + let _ = processor.force_flush(); + + // Validate the exported attributes + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 1); + let exported_span = &exported_spans[0]; + assert!(exported_span + .attributes + .contains(&KeyValue::new("key1", "value1"))); + assert!(exported_span + .attributes + .contains(&KeyValue::new("key2", "value2"))); + } + + #[test] + fn batchspanprocessor_sets_and_exports_with_resource() { + let exporter = MockSpanExporter::new(); + let exporter_shared = exporter.exported_spans.clone(); + let resource_shared = exporter.exported_resource.clone(); + let config = BatchConfigBuilder::default().build(); + let mut processor = BatchSpanProcessor::new(exporter, config); + + // Set a resource for the processor + let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]); + processor.set_resource(&resource); + + // Create a span and send it to the processor + let test_span = create_test_span("resource_test"); + processor.on_end(test_span.clone()); + + // Force flush to ensure the span is exported + let _ = processor.force_flush(); + + // Validate spans are exported + let exported_spans = exporter_shared.lock().unwrap(); + assert_eq!(exported_spans.len(), 1); + + // Validate the resource is correctly set in the exporter + let exported_resource = resource_shared.lock().unwrap(); + assert!(exported_resource.is_some()); + assert_eq!( + exported_resource + .as_ref() + .unwrap() + .get(Key::new("service.name")), + Some(Value::from("test_service")) + ); + } }