diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 4ad4c0342d..be83244719 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -199,7 +199,7 @@ impl LogProcessor for SimpleLogProcessor { #[allow(clippy::large_enum_variant)] #[derive(Debug)] enum BatchMessage { - /// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`. + /// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`. ExportLog(Arc), /// ForceFlush flushes the current buffer to the exporter. ForceFlush(mpsc::SyncSender), @@ -457,23 +457,31 @@ impl BatchLogProcessor { where E: LogExporter + Send + Sync + 'static, { - // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec - while let Ok(log) = logs_receiver.try_recv() { - logs.push(log); - if logs.len() == config.max_export_batch_size { - break; + let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs. + let mut result = LogResult::Ok(()); + let mut total_exported_logs: usize = 0; + + while target > 0 && total_exported_logs < target { + // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec + while let Ok(log) = logs_receiver.try_recv() { + logs.push(log); + if logs.len() == config.max_export_batch_size { + break; + } } - } - let count_of_logs = logs.len(); // Count of logs that will be exported - let result = export_with_timeout_sync( - config.max_export_timeout, - exporter, - logs, - last_export_time, - ); // This method clears the logs vec after exporting + let count_of_logs = logs.len(); // Count of logs that will be exported + total_exported_logs += count_of_logs; + + result = export_with_timeout_sync( + config.max_export_timeout, + exporter, + logs, + last_export_time, + ); // This method clears the logs vec after exporting - current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); + current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); + } result } @@ -485,6 +493,9 @@ impl BatchLogProcessor { match message_receiver.recv_timeout(remaining_time) { Ok(BatchMessage::ExportLog(export_log_message_sent)) => { + // Reset the export log message sent flag now it has has been processed. + export_log_message_sent.store(false, Ordering::Relaxed); + otel_debug!( name: "BatchLogProcessor.ExportingDueToBatchSize", ); @@ -497,9 +508,6 @@ impl BatchLogProcessor { ¤t_batch_size, &config, ); - - // Reset the export log message sent flag now it has has been processed. - export_log_message_sent.store(false, Ordering::Relaxed); } Ok(BatchMessage::ForceFlush(sender)) => { otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");