Skip to content

Commit

Permalink
fix integration test, and enable batch in otlp-http example
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed Dec 23, 2024
1 parent 3feaddc commit 0d97896
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 6 deletions.
5 changes: 1 addition & 4 deletions opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ fn init_traces() -> Result<sdktrace::TracerProvider, TraceError> {
.build()?;

Ok(TracerProvider::builder()
// TODO: Enable BatchExporter after
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
.with_simple_exporter(exporter)
.with_batch_exporter(exporter)
.with_resource(RESOURCE.clone())
.build())
}
Expand All @@ -73,7 +71,6 @@ fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, Metric

// #[tokio::main]
// TODO: Re-enable tokio::main, if needed, after
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let logger_provider = init_logs()?;

Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-otlp/tests/integration_test/tests/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ pub fn span_batch_non_tokio_main() -> Result<()> {
std::thread::sleep(Duration::from_secs(2));

// Validate results
// TODO: Fix this test
// assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?;
assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?;
Ok(())
}

Expand Down
85 changes: 85 additions & 0 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ enum BatchMessage {
ExportSpan(SpanData),
ForceFlush(SyncSender<TraceResult<()>>),
Shutdown(SyncSender<TraceResult<()>>),
SetResource(Arc<Resource>),
}

/// A batch span processor with a dedicated background thread.
Expand Down Expand Up @@ -292,6 +293,9 @@ impl BatchSpanProcessor {
let _ = sender.send(result);
break;
}
BatchMessage::SetResource(resource) => {
exporter.set_resource(&resource);
}
},
Err(RecvTimeoutError::Timeout) => {
if last_export_time.elapsed() >= config.scheduled_delay {
Expand Down Expand Up @@ -396,6 +400,14 @@ impl SpanProcessor for BatchSpanProcessor {
}
result
}

/// Set the resource for the processor.
fn set_resource(&mut self, resource: &Resource) {
let resource = Arc::new(resource.clone());
let _ = self
.message_sender
.try_send(BatchMessage::SetResource(resource));
}
}

/// Builder for `BatchSpanProcessorDedicatedThread`.
Expand Down Expand Up @@ -782,20 +794,24 @@ mod tests {
}
}

use crate::Resource;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use opentelemetry::{Key, KeyValue, Value};
use std::sync::{atomic::Ordering, Arc, Mutex};

// Mock exporter to test functionality
#[derive(Debug)]
struct MockSpanExporter {
exported_spans: Arc<Mutex<Vec<SpanData>>>,
exported_resource: Arc<Mutex<Option<Resource>>>,
}

impl MockSpanExporter {
fn new() -> Self {
Self {
exported_spans: Arc::new(Mutex::new(Vec::new())),
exported_resource: Arc::new(Mutex::new(None)),
}
}
}
Expand All @@ -811,6 +827,10 @@ mod tests {
}

fn shutdown(&mut self) {}
fn set_resource(&mut self, resource: &Resource) {
let mut exported_resource = self.exported_resource.lock().unwrap();
*exported_resource = Some(resource.clone());
}
}

#[test]
Expand Down Expand Up @@ -947,4 +967,69 @@ mod tests {
let dropped_count = processor.dropped_span_count.load(Ordering::Relaxed);
assert_eq!(dropped_count, 1, "Unexpected number of dropped spans");
}

#[test]
fn validate_span_attributes_exported_correctly() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let config = BatchConfigBuilder::default().build();
let processor = BatchSpanProcessor::new(exporter, config);

// Create a span with attributes
let mut span_data = create_test_span("attribute_validation");
span_data.attributes = vec![
KeyValue::new("key1", "value1"),
KeyValue::new("key2", "value2"),
];
processor.on_end(span_data.clone());

// Force flush to export the span
let _ = processor.force_flush();

// Validate the exported attributes
let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 1);
let exported_span = &exported_spans[0];
assert!(exported_span
.attributes
.contains(&KeyValue::new("key1", "value1")));
assert!(exported_span
.attributes
.contains(&KeyValue::new("key2", "value2")));
}

#[test]
fn batchspanprocessor_sets_and_exports_with_resource() {
let exporter = MockSpanExporter::new();
let exporter_shared = exporter.exported_spans.clone();
let resource_shared = exporter.exported_resource.clone();
let config = BatchConfigBuilder::default().build();
let mut processor = BatchSpanProcessor::new(exporter, config);

// Set a resource for the processor
let resource = Resource::new(vec![KeyValue::new("service.name", "test_service")]);
processor.set_resource(&resource);

// Create a span and send it to the processor
let test_span = create_test_span("resource_test");
processor.on_end(test_span.clone());

// Force flush to ensure the span is exported
let _ = processor.force_flush();

// Validate spans are exported
let exported_spans = exporter_shared.lock().unwrap();
assert_eq!(exported_spans.len(), 1);

// Validate the resource is correctly set in the exporter
let exported_resource = resource_shared.lock().unwrap();
assert!(exported_resource.is_some());
assert_eq!(
exported_resource
.as_ref()
.unwrap()
.get(Key::new("service.name")),
Some(Value::from("test_service"))
);
}
}

0 comments on commit 0d97896

Please sign in to comment.