Skip to content

feat: add shutdown with timeout for traces #2956

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use opentelemetry_sdk::{
};
use opentelemetry_semantic_conventions::trace;
use opentelemetry_stdout::{LogExporter, SpanExporter};
use std::time::Duration;
use std::{convert::Infallible, net::SocketAddr, sync::OnceLock};
use tokio::net::TcpListener;
use tracing::info;
Expand Down Expand Up @@ -131,7 +132,7 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ also modified to suppress telemetry before invoking exporters.
- Fixed the overflow attribute to correctly use the boolean value `true`
instead of the string `"true"`.
[#2878](https://github.com/open-telemetry/opentelemetry-rust/issues/2878)
- The `shutdown_with_timeout` method is added to SpanProcessor, SpanExporter trait and TracerProvider.
- The `shutdown_with_timeout` method is added to LogExporter trait.
- The `shutdown_with_timeout` method is added to LogProvider and LogProcessor trait.
- *Breaking* `MetricError`, `MetricResult` no longer public (except when
Expand Down
8 changes: 6 additions & 2 deletions opentelemetry-sdk/src/trace/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status};
use opentelemetry::{InstrumentationScope, KeyValue};
use std::borrow::Cow;
use std::fmt::Debug;
use std::time::SystemTime;
use std::time::{Duration, SystemTime};

/// `SpanExporter` defines the interface that protocol-specific exporters must
/// implement so that they can be plugged into OpenTelemetry SDK and support
Expand Down Expand Up @@ -43,9 +43,13 @@ pub trait SpanExporter: Send + Sync + Debug {
/// flush the data and the destination is unavailable). SDK authors
/// can decide if they want to make the shutdown timeout
/// configurable.
fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
/// Shuts down the exporter with default timeout.
fn shutdown(&mut self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_nanos(5))
}

/// This is a hint to ensure that the export of any Spans the exporter
/// has received prior to the call to this function SHOULD be completed
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/trace/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::resource::Resource;
use crate::trace::{SpanData, SpanExporter};
use crate::InMemoryExporterError;
use std::sync::{Arc, Mutex};
use std::time::Duration;

/// An in-memory span exporter that stores span data in memory.
///
Expand Down Expand Up @@ -140,7 +141,7 @@ impl SpanExporter for InMemorySpanExporter {
result
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult {
self.reset();
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ mod runtime_tests;

#[cfg(all(test, feature = "testing"))]
mod tests {

use super::*;
use crate::error::OTelSdkResult;
use crate::{
trace::span_limit::{DEFAULT_MAX_EVENT_PER_SPAN, DEFAULT_MAX_LINKS_PER_SPAN},
trace::{InMemorySpanExporter, InMemorySpanExporterBuilder},
Expand All @@ -76,6 +76,7 @@ mod tests {
},
Context, KeyValue,
};
use std::time::Duration;

#[test]
fn span_modification_via_context() {
Expand Down Expand Up @@ -146,7 +147,7 @@ mod tests {
Ok(())
}

fn shutdown(&self) -> crate::error::OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
26 changes: 18 additions & 8 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::IdGenerator;
use crate::error::{OTelSdkError, OTelSdkResult};
/// # Trace Provider SDK
///
Expand Down Expand Up @@ -74,8 +75,7 @@ use opentelemetry::{otel_info, InstrumentationScope};
use std::borrow::Cow;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};

use super::IdGenerator;
use std::time::Duration;

static PROVIDER_RESOURCE: OnceLock<Resource> = OnceLock::new();

Expand Down Expand Up @@ -112,10 +112,10 @@ pub(crate) struct TracerProviderInner {
impl TracerProviderInner {
/// Crate-private shutdown method to be called both from explicit shutdown
/// and from Drop when the last reference is released.
pub(crate) fn shutdown(&self) -> Vec<OTelSdkResult> {
pub(crate) fn shutdown_with_timeout(&self, timeout: Duration) -> Vec<OTelSdkResult> {
let mut results = vec![];
for processor in &self.processors {
let result = processor.shutdown();
let result = processor.shutdown_with_timeout(timeout);
if let Err(err) = &result {
// Log at debug level because:
// - The error is also returned to the user for handling (if applicable)
Expand All @@ -128,6 +128,10 @@ impl TracerProviderInner {
}
results
}
/// shutdown with default timeout
pub(crate) fn shutdown(&self) -> Vec<OTelSdkResult> {
self.shutdown_with_timeout(Duration::from_secs(5))
}
}

impl Drop for TracerProviderInner {
Expand Down Expand Up @@ -239,15 +243,15 @@ impl SdkTracerProvider {
/// Shuts down the current `TracerProvider`.
///
/// Note that shut down doesn't means the TracerProvider has dropped
pub fn shutdown(&self) -> OTelSdkResult {
pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
if self
.inner
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
// propagate the shutdown signal to processors
let results = self.inner.shutdown();
let results = self.inner.shutdown_with_timeout(timeout);

if results.iter().all(|res| res.is_ok()) {
Ok(())
Expand All @@ -264,6 +268,11 @@ impl SdkTracerProvider {
Err(OTelSdkError::AlreadyShutdown)
}
}

/// shutdown with default timeout
pub fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}
}

impl opentelemetry::trace::TracerProvider for SdkTracerProvider {
Expand Down Expand Up @@ -471,6 +480,7 @@ mod tests {
use std::env;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;

// fields below is wrapped with Arc so we can assert it
#[derive(Default, Debug)]
Expand Down Expand Up @@ -528,7 +538,7 @@ mod tests {
}
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
if self.assert_info.0.is_shutdown.load(Ordering::SeqCst) {
Ok(())
} else {
Expand Down Expand Up @@ -787,7 +797,7 @@ mod tests {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
self.shutdown_count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
Expand Down
18 changes: 10 additions & 8 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// opportunity for processors to do any cleanup required.
///
/// Implementation should make sure shutdown can be called multiple times.
fn shutdown(&self) -> OTelSdkResult;
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult;
/// shutdown the processor with a default timeout.
fn shutdown(&self) -> OTelSdkResult {
self.shutdown_with_timeout(Duration::from_secs(5))
}
/// Set the resource for the span processor.
fn set_resource(&mut self, _resource: &Resource) {}
}
Expand Down Expand Up @@ -154,9 +158,9 @@ impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown()
exporter.shutdown_with_timeout(timeout)
} else {
Err(OTelSdkError::InternalFailure(
"SimpleSpanProcessor mutex poison at shutdown".into(),
Expand Down Expand Up @@ -285,7 +289,6 @@ pub struct BatchSpanProcessor {
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages.
handle: Mutex<Option<thread::JoinHandle<()>>>,
forceflush_timeout: Duration,
shutdown_timeout: Duration,
is_shutdown: AtomicBool,
dropped_span_count: Arc<AtomicUsize>,
export_span_message_sent: Arc<AtomicBool>,
Expand Down Expand Up @@ -424,7 +427,6 @@ impl BatchSpanProcessor {
message_sender,
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
is_shutdown: AtomicBool::new(false),
dropped_span_count: Arc::new(AtomicUsize::new(0)),
max_queue_size,
Expand Down Expand Up @@ -580,7 +582,7 @@ impl SpanProcessor for BatchSpanProcessor {
}

/// Shuts down the processor.
fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
if self.is_shutdown.swap(true, Ordering::Relaxed) {
return Err(OTelSdkError::AlreadyShutdown);
}
Expand All @@ -601,8 +603,8 @@ impl SpanProcessor for BatchSpanProcessor {
.map_err(|e| OTelSdkError::InternalFailure(e.to_string()))?;

let result = receiver
.recv_timeout(self.shutdown_timeout)
.map_err(|_| OTelSdkError::Timeout(self.shutdown_timeout))?;
.recv_timeout(timeout)
.map_err(|_| OTelSdkError::Timeout(timeout))?;
if let Some(handle) = self.handle.lock().unwrap().take() {
if let Err(err) = handle.join() {
return Err(OTelSdkError::InternalFailure(format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use opentelemetry::{otel_debug, otel_error, otel_warn};
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
/// them at a preconfigured interval.
Expand Down Expand Up @@ -133,7 +134,7 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
})?
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
let dropped_spans = self.dropped_spans_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_spans > 0 {
Expand Down
4 changes: 2 additions & 2 deletions stress/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
~10.6 M /sec
*/

use lazy_static::lazy_static;
use opentelemetry::{
trace::{Span, SpanBuilder, Tracer, TracerProvider},
Expand All @@ -18,6 +17,7 @@ use opentelemetry_sdk::{
error::OTelSdkResult,
trace::{self as sdktrace, SpanData, SpanProcessor},
};
use std::time::Duration;

mod throughput;

Expand Down Expand Up @@ -45,7 +45,7 @@ impl SpanProcessor for NoOpSpanProcessor {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
fn shutdown_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}
}
Expand Down
Loading