Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchSpanProcessor with dedicated thread. #2456

Merged
Merged
2 changes: 1 addition & 1 deletion examples/tracing-grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn init_tracer() -> sdktrace::TracerProvider {
global::set_text_map_propagator(TraceContextPropagator::new());
// Install stdout exporter pipeline to be able to retrieve the collected spans.
let provider = sdktrace::TracerProvider::builder()
.with_batch_exporter(SpanExporter::default(), Tokio)
.with_batch_exporter(SpanExporter::default())
.build();

global::set_tracer_provider(provider.clone());
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing-grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn init_tracer() -> TracerProvider {
global::set_text_map_propagator(TraceContextPropagator::new());
// Install stdout exporter pipeline to be able to retrieve the collected spans.
let provider = TracerProvider::builder()
.with_batch_exporter(SpanExporter::default(), Tokio)
.with_batch_exporter(SpanExporter::default())
.build();

global::set_tracer_provider(provider.clone());
Expand Down
2 changes: 1 addition & 1 deletion examples/tracing-jaeger/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn init_tracer_provider() -> Result<opentelemetry_sdk::trace::TracerProvider, Tr
.build()?;

Ok(TracerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.with_resource(
Resource::builder()
.with_service_name("tracing-jaeger")
Expand Down
5 changes: 1 addition & 4 deletions opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ fn init_traces() -> Result<sdktrace::TracerProvider, TraceError> {
.build()?;

Ok(TracerProvider::builder()
// TODO: Enable BatchExporter after
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
.with_simple_exporter(exporter)
.with_batch_exporter(exporter)
.with_resource(RESOURCE.clone())
.build())
}
Expand All @@ -73,7 +71,6 @@ fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, Metric

// #[tokio::main]
// TODO: Re-enable tokio::main, if needed, after
// https://github.com/open-telemetry/opentelemetry-rust/pull/2456
fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let logger_provider = init_logs()?;

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn init_traces() -> Result<sdktrace::TracerProvider, TraceError> {
.build()?;
Ok(sdktrace::TracerProvider::builder()
.with_resource(RESOURCE.clone())
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.build())
}

Expand Down
46 changes: 45 additions & 1 deletion opentelemetry-otlp/tests/integration_test/tests/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn init_tracer_provider() -> Result<sdktrace::TracerProvider, TraceError> {
let exporter = exporter_builder.build()?;

Ok(opentelemetry_sdk::trace::TracerProvider::builder()
.with_batch_exporter(exporter, runtime::Tokio)
.with_batch_exporter(exporter)
.with_resource(
Resource::builder_empty()
.with_service_name("basic-otlp-tracing-example")
Expand Down Expand Up @@ -141,6 +141,50 @@ pub fn test_serde() -> Result<()> {
Ok(())
}

#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn span_batch_non_tokio_main() -> Result<()> {
// Initialize the tracer provider inside a tokio runtime
// as this allows tonic client to capture the runtime,
// but actual export occurs from the dedicated std::thread
// created by BatchSpanProcessor.

use anyhow::Ok;
let rt = tokio::runtime::Runtime::new()?;
let tracer_provider = rt.block_on(async {
// While we're here setup our collector container too, as this needs tokio to run
let _ = test_utils::start_collector_container().await;
init_tracer_provider()
})?;

let tracer = global::tracer("ex.com/basic");

tracer.in_span("operation", |cx| {
let span = cx.span();
span.add_event(
"Nice operation!".to_string(),
vec![KeyValue::new("bogons", 100)],
);
span.set_attribute(KeyValue::new(ANOTHER_KEY, "yes"));

tracer.in_span("Sub operation...", |cx| {
let span = cx.span();
span.set_attribute(KeyValue::new(LEMONS_KEY, "five"));

span.add_event("Sub span event", vec![]);
});
});

tracer_provider.shutdown()?;

// Give it a second to flush
std::thread::sleep(Duration::from_secs(2));

// Validate results
assert_traces_results(test_utils::TRACES_FILE, "./expected/traces.json")?;
Ok(())
}

///
/// Make sure we stop the collector container, otherwise it will sit around hogging our
/// ports and subsequent test runs will fail.
Expand Down
1 change: 0 additions & 1 deletion opentelemetry-otlp/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ async fn smoke_tracer() {
.with_metadata(metadata)
.build()
.expect("NON gzip-tonic SpanExporter failed to build"),
opentelemetry_sdk::runtime::Tokio,
)
.build();

Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ internal-logs = ["tracing"]
experimental_metrics_periodicreader_with_async_runtime = ["metrics"]
spec_unstable_metrics_views = ["metrics"]
experimental_logs_batch_log_processor_with_async_runtime = ["logs"]
experimental_trace_batch_span_processor_with_async_runtime = ["trace"]


[[bench]]
name = "context"
Expand Down
23 changes: 10 additions & 13 deletions opentelemetry-sdk/benches/batch_span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
};
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::runtime::Tokio;
use opentelemetry_sdk::testing::trace::NoopSpanExporter;
use opentelemetry_sdk::trace::{
BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor,
Expand Down Expand Up @@ -49,14 +48,13 @@ fn criterion_benchmark(c: &mut Criterion) {
b.iter(|| {
let rt = Runtime::new().unwrap();
rt.block_on(async move {
let span_processor =
BatchSpanProcessor::builder(NoopSpanExporter::new(), Tokio)
.with_batch_config(
BatchConfigBuilder::default()
.with_max_queue_size(10_000)
.build(),
)
.build();
let span_processor = BatchSpanProcessor::builder(NoopSpanExporter::new())
.with_batch_config(
BatchConfigBuilder::default()
.with_max_queue_size(10_000)
.build(),
)
.build();
let mut shared_span_processor = Arc::new(span_processor);
let mut handles = Vec::with_capacity(10);
for _ in 0..task_num {
Expand All @@ -70,10 +68,9 @@ fn criterion_benchmark(c: &mut Criterion) {
}));
}
futures_util::future::join_all(handles).await;
let _ =
Arc::<BatchSpanProcessor<Tokio>>::get_mut(&mut shared_span_processor)
.unwrap()
.shutdown();
let _ = Arc::<BatchSpanProcessor>::get_mut(&mut shared_span_processor)
.unwrap()
.shutdown();
});
})
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::{Arc, Mutex};
///# async fn main() {
/// let exporter = InMemorySpanExporterBuilder::new().build();
/// let provider = TracerProvider::builder()
/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone(), runtime::Tokio).build())
/// .with_span_processor(BatchSpanProcessor::builder(exporter.clone()).build())
/// .build();
///
/// global::set_tracer_provider(provider.clone());
Expand Down
5 changes: 5 additions & 0 deletions opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ mod sampler;
mod span;
mod span_limit;
mod span_processor;
#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
/// Experimental feature to use async runtime with batch span processor.
pub mod span_processor_with_async_runtime;
mod tracer;

pub use config::{config, Config};
Expand All @@ -30,11 +33,13 @@ pub use span_processor::{
BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder,
SimpleSpanProcessor, SpanProcessor,
};

pub use tracer::Tracer;

#[cfg(feature = "jaeger_remote_sampler")]
pub use sampler::{JaegerRemoteSampler, JaegerRemoteSamplerBuilder};

#[cfg(feature = "experimental_trace_batch_span_processor_with_async_runtime")]
#[cfg(test)]
mod runtime_tests;

Expand Down
9 changes: 2 additions & 7 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
/// provider.shutdown();
/// }
/// ```
use crate::runtime::RuntimeChannel;
use crate::trace::{
BatchSpanProcessor, Config, RandomIdGenerator, Sampler, SimpleSpanProcessor, SpanLimits, Tracer,
};
Expand Down Expand Up @@ -296,12 +295,8 @@
}

/// The [`SpanExporter`] setup using a default [`BatchSpanProcessor`] that this provider should use.
pub fn with_batch_exporter<T: SpanExporter + 'static, R: RuntimeChannel>(
self,
exporter: T,
runtime: R,
) -> Self {
let batch = BatchSpanProcessor::builder(exporter, runtime).build();
pub fn with_batch_exporter<T: SpanExporter + 'static>(self, exporter: T) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to called out in changelog.md file.

let batch = BatchSpanProcessor::builder(exporter).build();

Check warning on line 299 in opentelemetry-sdk/src/trace/provider.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/provider.rs#L298-L299

Added lines #L298 - L299 were not covered by tests
self.with_span_processor(batch)
}

Expand Down
6 changes: 5 additions & 1 deletion opentelemetry-sdk/src/trace/runtime_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@
runtime: R,
) -> crate::trace::TracerProvider {
use crate::trace::TracerProvider;
let processor = crate::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder(
exporter, runtime,
)
.build();

Check warning on line 58 in opentelemetry-sdk/src/trace/runtime_tests.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/runtime_tests.rs#L55-L58

Added lines #L55 - L58 were not covered by tests
TracerProvider::builder()
.with_batch_exporter(exporter, runtime)
.with_span_processor(processor)

Check warning on line 60 in opentelemetry-sdk/src/trace/runtime_tests.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/runtime_tests.rs#L60

Added line #L60 was not covered by tests
.build()
}

Expand Down
Loading
Loading