diff --git a/opentelemetry-appender-tracing/benches/logs.rs b/opentelemetry-appender-tracing/benches/logs.rs index 679315ef6b..e5fb98273c 100644 --- a/opentelemetry-appender-tracing/benches/logs.rs +++ b/opentelemetry-appender-tracing/benches/logs.rs @@ -33,10 +33,10 @@ struct NoopExporter { impl LogExporter for NoopExporter { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - _batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async { LogResult::Ok(()) } } diff --git a/opentelemetry-appender-tracing/src/layer.rs b/opentelemetry-appender-tracing/src/layer.rs index 48fffdf971..af752c5f8e 100644 --- a/opentelemetry-appender-tracing/src/layer.rs +++ b/opentelemetry-appender-tracing/src/layer.rs @@ -246,10 +246,10 @@ mod tests { impl LogExporter for ReentrantLogExporter { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - _batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async { // This will cause a deadlock as the export itself creates a log // while still within the lock of the SimpleLogProcessor. diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index f6300d30ef..859055ba46 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -9,10 +9,10 @@ use super::OtlpHttpClient; impl LogExporter for OtlpHttpClient { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async move { let client = self .client @@ -23,7 +23,7 @@ impl LogExporter for OtlpHttpClient { _ => Err(LogError::Other("exporter is already shut down".into())), })?; - let (body, content_type) = { self.build_logs_export_body(batch)? }; + let (body, content_type) = { self.build_logs_export_body(&batch)? }; let mut request = http::Request::builder() .method(Method::POST) .uri(&self.collector_endpoint) diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 23198cfa7d..3f4049cfed 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -57,10 +57,10 @@ impl TonicLogsClient { impl LogExporter for TonicLogsClient { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async move { let (mut client, metadata, extensions) = match &self.inner { Some(inner) => { @@ -76,7 +76,7 @@ impl LogExporter for TonicLogsClient { None => return Err(LogError::Other("exporter is already shut down".into())), }; - let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource); + let resource_logs = group_logs_by_resource_and_scope(&batch, &self.resource); otel_debug!(name: "TonicsLogsClient.CallingExport"); diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index c765095664..c199aa5a96 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -141,10 +141,10 @@ impl LogExporter { impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async move { match &self.client { #[cfg(feature = "grpc-tonic")] diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 5ddcff29a5..902adb54dd 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -79,13 +79,10 @@ pub trait LogExporter: Send + Sync + Debug { /// A `LogResult<()>`, which is a result type indicating either a successful export (with /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. /// - /// Note: - /// The `Send` bound ensures the future can be safely moved across threads, which is crucial for multi-threaded async runtimes like Tokio. - /// Explicit lifetimes (`'a`) synchronize the lifetimes of `self`, `batch`, and the returned future. - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a; + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send; /// Shuts down the exporter. fn shutdown(&mut self) {} diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index e9c595112b..de3bf03387 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -118,7 +118,7 @@ impl LogProcessor for SimpleLogProcessor { .and_then(|exporter| { let log_tuple = &[(record as &LogRecord, instrumentation)]; let log_batch = LogBatch::new(log_tuple); - futures_executor::block_on(exporter.export(&log_batch)) + futures_executor::block_on(exporter.export(log_batch)) }); // Handle errors with specific static names match result { @@ -447,7 +447,7 @@ where .map(|log_data| (&log_data.0, &log_data.1)) .collect(); let log_batch = LogBatch::new(log_vec.as_slice()); - let export = exporter.export(&log_batch); + let export = exporter.export(log_batch); let export_result = futures_executor::block_on(export); match export_result { @@ -717,7 +717,7 @@ where .map(|log_data| (&log_data.0, &log_data.1)) .collect(); let log_batch = LogBatch::new(log_vec.as_slice()); - let export = exporter.export(&log_batch); + let export = exporter.export(log_batch); let timeout = runtime.delay(time_out); pin_mut!(export); pin_mut!(timeout); @@ -937,10 +937,10 @@ mod tests { impl LogExporter for MockLogExporter { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - _batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async { Ok(()) } } @@ -1443,10 +1443,10 @@ mod tests { impl LogExporter for LogExporterThatRequiresTokio { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { // Simulate minimal dependency on tokio by sleeping asynchronously for a short duration async move { tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 3cccfeaf92..dff6d93c7e 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -182,10 +182,10 @@ impl InMemoryLogExporter { impl LogExporter for InMemoryLogExporter { #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async move { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; for (log_record, instrumentation) in batch.iter() { diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index f99636e356..56b6818e6c 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -32,10 +32,10 @@ impl fmt::Debug for LogExporter { impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout #[allow(clippy::manual_async_fn)] - fn export<'a>( - &'a self, - batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async move { if self.is_shutdown.load(atomic::Ordering::SeqCst) { Err("exporter is shut down".into()) @@ -46,7 +46,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_err() { - print_logs(batch); + print_logs(&batch); } else { println!("Resource"); if let Some(schema_url) = self.resource.schema_url() { @@ -55,7 +55,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { self.resource.iter().for_each(|(k, v)| { println!("\t -> {}={:?}", k, v); }); - print_logs(batch); + print_logs(&batch); } Ok(()) diff --git a/stress/src/logs.rs b/stress/src/logs.rs index c9e91f418c..bcec774b30 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -23,10 +23,10 @@ mod throughput; struct MockLogExporter; impl LogExporter for MockLogExporter { - fn export<'a>( - &'a self, - _batch: &'a LogBatch<'a>, - ) -> impl std::future::Future> + Send + 'a { + fn export( + &self, + _batch: LogBatch<'_>, + ) -> impl std::future::Future> + Send { async { Ok(()) } } } @@ -40,7 +40,7 @@ impl LogProcessor for MockLogProcessor { fn emit(&self, record: &mut opentelemetry_sdk::logs::LogRecord, scope: &InstrumentationScope) { let log_tuple = &[(record as &LogRecord, scope)]; let log_batch = LogBatch::new(log_tuple); - let _ = futures_executor::block_on(self.exporter.export(&log_batch)); + let _ = futures_executor::block_on(self.exporter.export(log_batch)); } fn force_flush(&self) -> LogResult<()> {