Skip to content

Commit

Permalink
update export to take batch by value
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed Dec 20, 2024
1 parent 7627224 commit ddb15bf
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 55 deletions.
8 changes: 4 additions & 4 deletions opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ struct NoopExporter {

impl LogExporter for NoopExporter {
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
_batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async { LogResult::Ok(()) }
}

Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,10 @@ mod tests {

impl LogExporter for ReentrantLogExporter {
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
_batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async {
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use super::OtlpHttpClient;

impl LogExporter for OtlpHttpClient {
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async move {
let client = self
.client
Expand All @@ -23,7 +23,7 @@ impl LogExporter for OtlpHttpClient {
_ => Err(LogError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = { self.build_logs_export_body(batch)? };
let (body, content_type) = { self.build_logs_export_body(&batch)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ impl TonicLogsClient {

impl LogExporter for TonicLogsClient {
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async move {
let (mut client, metadata, extensions) = match &self.inner {
Some(inner) => {
Expand All @@ -76,7 +76,7 @@ impl LogExporter for TonicLogsClient {
None => return Err(LogError::Other("exporter is already shut down".into())),
};

let resource_logs = group_logs_by_resource_and_scope(batch, &self.resource);
let resource_logs = group_logs_by_resource_and_scope(&batch, &self.resource);

otel_debug!(name: "TonicsLogsClient.CallingExport");

Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ impl LogExporter {

impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async move {
match &self.client {
#[cfg(feature = "grpc-tonic")]
Expand Down
11 changes: 4 additions & 7 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,10 @@ pub trait LogExporter: Send + Sync + Debug {
/// A `LogResult<()>`, which is a result type indicating either a successful export (with
/// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed.
///
/// Note:
/// The `Send` bound ensures the future can be safely moved across threads, which is crucial for multi-threaded async runtimes like Tokio.
/// Explicit lifetimes (`'a`) synchronize the lifetimes of `self`, `batch`, and the returned future.
fn export<'a>(
&'a self,
batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a;
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send;

/// Shuts down the exporter.
fn shutdown(&mut self) {}
Expand Down
22 changes: 11 additions & 11 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
.and_then(|exporter| {
let log_tuple = &[(record as &LogRecord, instrumentation)];
let log_batch = LogBatch::new(log_tuple);
futures_executor::block_on(exporter.export(&log_batch))
futures_executor::block_on(exporter.export(log_batch))
});
// Handle errors with specific static names
match result {
Expand Down Expand Up @@ -447,7 +447,7 @@ where
.map(|log_data| (&log_data.0, &log_data.1))
.collect();
let log_batch = LogBatch::new(log_vec.as_slice());
let export = exporter.export(&log_batch);
let export = exporter.export(log_batch);
let export_result = futures_executor::block_on(export);

match export_result {
Expand Down Expand Up @@ -717,7 +717,7 @@ where
.map(|log_data| (&log_data.0, &log_data.1))
.collect();
let log_batch = LogBatch::new(log_vec.as_slice());
let export = exporter.export(&log_batch);
let export = exporter.export(log_batch);
let timeout = runtime.delay(time_out);
pin_mut!(export);
pin_mut!(timeout);
Expand Down Expand Up @@ -937,10 +937,10 @@ mod tests {

impl LogExporter for MockLogExporter {
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
_batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async { Ok(()) }
}

Expand Down Expand Up @@ -1443,10 +1443,10 @@ mod tests {

impl LogExporter for LogExporterThatRequiresTokio {
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
// Simulate minimal dependency on tokio by sleeping asynchronously for a short duration
async move {
tokio::time::sleep(Duration::from_millis(50)).await;
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ impl InMemoryLogExporter {

impl LogExporter for InMemoryLogExporter {
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async move {
let mut logs_guard = self.logs.lock().map_err(LogError::from)?;
for (log_record, instrumentation) in batch.iter() {
Expand Down
12 changes: 6 additions & 6 deletions opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ impl fmt::Debug for LogExporter {
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
/// Export spans to stdout
#[allow(clippy::manual_async_fn)]
fn export<'a>(
&'a self,
batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
fn export(
&self,
batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async move {
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
Err("exporter is shut down".into())
Expand All @@ -46,7 +46,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
print_logs(batch);
print_logs(&batch);
} else {
println!("Resource");
if let Some(schema_url) = self.resource.schema_url() {
Expand All @@ -55,7 +55,7 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
self.resource.iter().for_each(|(k, v)| {
println!("\t -> {}={:?}", k, v);
});
print_logs(batch);
print_logs(&batch);
}

Ok(())
Expand Down
10 changes: 5 additions & 5 deletions stress/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ mod throughput;
struct MockLogExporter;

impl LogExporter for MockLogExporter {
fn export<'a>(
&'a self,
_batch: &'a LogBatch<'a>,
) -> impl std::future::Future<Output = LogResult<()>> + Send + 'a {
fn export(
&self,
_batch: LogBatch<'_>,
) -> impl std::future::Future<Output = LogResult<()>> + Send {
async { Ok(()) }
}
}
Expand All @@ -40,7 +40,7 @@ impl LogProcessor for MockLogProcessor {
fn emit(&self, record: &mut opentelemetry_sdk::logs::LogRecord, scope: &InstrumentationScope) {
let log_tuple = &[(record as &LogRecord, scope)];
let log_batch = LogBatch::new(log_tuple);
let _ = futures_executor::block_on(self.exporter.export(&log_batch));
let _ = futures_executor::block_on(self.exporter.export(log_batch));
}

fn force_flush(&self) -> LogResult<()> {
Expand Down

0 comments on commit ddb15bf

Please sign in to comment.