Skip to content

Commit

Permalink
Merge branch 'main' into generic-metrics-collector
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored Jan 14, 2025
2 parents 09ebfdc + 888d5a3 commit 01cd8a0
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 33 deletions.
4 changes: 3 additions & 1 deletion opentelemetry-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ path = "tests/json_serde.rs"

[features]
default = ["full"]
full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde"]
full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde", "internal-logs"]

# crates used to generate rs files
gen-tonic = ["gen-tonic-messages", "tonic/transport"]
Expand All @@ -44,6 +44,7 @@ zpages = ["trace"]
testing = ["opentelemetry/testing"]

# add ons
internal-logs = ["tracing"]
with-schemars = ["schemars"]
with-serde = ["serde", "hex"]

Expand All @@ -55,6 +56,7 @@ opentelemetry_sdk = { version = "0.27", default-features = false, path = "../ope
schemars = { version = "0.8", optional = true }
serde = { workspace = true, optional = true, features = ["serde_derive"] }
hex = { version = "0.4.3", optional = true }
tracing = {workspace = true, optional = true} # optional for opentelemetry internal logging

[dev-dependencies]
opentelemetry = { features = ["testing"], path = "../opentelemetry" }
Expand Down
152 changes: 120 additions & 32 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
enum BatchMessage {
/// Export logs, called when the log is emitted.
ExportLog(Box<(LogRecord, InstrumentationScope)>),
/// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
ExportLog(Arc<AtomicBool>),
/// ForceFlush flushes the current buffer to the exporter.
ForceFlush(mpsc::SyncSender<ExportResult>),
/// Shut down the worker thread, push all logs in buffer to the exporter.
Expand All @@ -209,6 +209,8 @@ enum BatchMessage {
SetResource(Arc<Resource>),
}

type LogsData = Box<(LogRecord, InstrumentationScope)>;

/// The `BatchLogProcessor` collects finished logs in a buffer and exports them
/// in batches to the configured `LogExporter`. This processor is ideal for
/// high-throughput environments, as it minimizes the overhead of exporting logs
Expand Down Expand Up @@ -246,11 +248,15 @@ enum BatchMessage {
/// .build();
///
pub struct BatchLogProcessor {
message_sender: SyncSender<BatchMessage>,
logs_sender: SyncSender<LogsData>, // Data channel to store log records and instrumentation scopes
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
handle: Mutex<Option<thread::JoinHandle<()>>>,
forceflush_timeout: Duration,
shutdown_timeout: Duration,
is_shutdown: AtomicBool,
export_log_message_sent: Arc<AtomicBool>,
current_batch_size: Arc<AtomicUsize>,
max_export_batch_size: usize,

// Track dropped logs - we'll log this at shutdown
dropped_logs_count: AtomicUsize,
Expand Down Expand Up @@ -279,11 +285,8 @@ impl LogProcessor for BatchLogProcessor {
}

let result = self
.message_sender
.try_send(BatchMessage::ExportLog(Box::new((
record.clone(),
instrumentation.clone(),
))));
.logs_sender
.try_send(Box::new((record.clone(), instrumentation.clone())));

if result.is_err() {
// Increment dropped logs count. The first time we have to drop a log,
Expand All @@ -292,6 +295,37 @@ impl LogProcessor for BatchLogProcessor {
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
}
return;
}

// At this point, sending the log record to the data channel was successful.
// Increment the current batch size and check if it has reached the max export batch size.
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
{
// Check if the a control message for exporting logs is already sent to the worker thread.
// If not, send a control message to export logs.
// `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message.

if !self.export_log_message_sent.load(Ordering::Relaxed) {
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
// Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false.
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
// We could have used compare_exchange as well here, but it's more verbose than swap.
if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
match self.message_sender.try_send(BatchMessage::ExportLog(
self.export_log_message_sent.clone(),
)) {
Ok(_) => {
// Control message sent successfully.
}
Err(_err) => {
// TODO: Log error
// If the control message could not be sent, reset the `export_log_message_sent` flag.
self.export_log_message_sent.store(false, Ordering::Relaxed);
}
}
}
}
}
}

Expand Down Expand Up @@ -388,8 +422,12 @@ impl BatchLogProcessor {
where
E: LogExporter + Send + Sync + 'static,
{
let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size);
let (logs_sender, logs_receiver) = mpsc::sync_channel::<LogsData>(config.max_queue_size);
let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
let max_queue_size = config.max_queue_size;
let max_export_batch_size = config.max_export_batch_size;
let current_batch_size = Arc::new(AtomicUsize::new(0));
let current_batch_size_for_thread = current_batch_size.clone();

let handle = thread::Builder::new()
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
Expand All @@ -402,6 +440,42 @@ impl BatchLogProcessor {
);
let mut last_export_time = Instant::now();
let mut logs = Vec::with_capacity(config.max_export_batch_size);
let current_batch_size = current_batch_size_for_thread;

// This method gets upto `max_export_batch_size` amount of logs from the channel and exports them.
// It returns the result of the export operation.
// It expects the logs vec to be empty when it's called.
#[inline]
fn get_logs_and_export<E>(
logs_receiver: &mpsc::Receiver<LogsData>,
exporter: &E,
logs: &mut Vec<LogsData>,
last_export_time: &mut Instant,
current_batch_size: &AtomicUsize,
config: &BatchConfig,
) -> ExportResult
where
E: LogExporter + Send + Sync + 'static,
{
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
while let Ok(log) = logs_receiver.try_recv() {
logs.push(log);
if logs.len() == config.max_export_batch_size {
break;
}
}

let count_of_logs = logs.len(); // Count of logs that will be exported
let result = export_with_timeout_sync(
config.max_export_timeout,
exporter,
logs,
last_export_time,
); // This method clears the logs vec after exporting

current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
result
}

loop {
let remaining_time = config
Expand All @@ -410,37 +484,44 @@ impl BatchLogProcessor {
.unwrap_or(config.scheduled_delay);

match message_receiver.recv_timeout(remaining_time) {
Ok(BatchMessage::ExportLog(log)) => {
logs.push(log);
if logs.len() == config.max_export_batch_size {
otel_debug!(
name: "BatchLogProcessor.ExportingDueToBatchSize",
);
let _ = export_with_timeout_sync(
config.max_export_timeout,
&mut exporter,
&mut logs,
&mut last_export_time,
);
}
Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
otel_debug!(
name: "BatchLogProcessor.ExportingDueToBatchSize",
);

let _ = get_logs_and_export(
&logs_receiver,
&exporter,
&mut logs,
&mut last_export_time,
&current_batch_size,
&config,
);

// Reset the export log message sent flag now it has has been processed.
export_log_message_sent.store(false, Ordering::Relaxed);
}
Ok(BatchMessage::ForceFlush(sender)) => {
otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
let result = export_with_timeout_sync(
config.max_export_timeout,
&mut exporter,
let result = get_logs_and_export(
&logs_receiver,
&exporter,
&mut logs,
&mut last_export_time,
&current_batch_size,
&config,
);
let _ = sender.send(result);
}
Ok(BatchMessage::Shutdown(sender)) => {
otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown");
let result = export_with_timeout_sync(
config.max_export_timeout,
&mut exporter,
let result = get_logs_and_export(
&logs_receiver,
&exporter,
&mut logs,
&mut last_export_time,
&current_batch_size,
&config,
);
let _ = sender.send(result);

Expand All @@ -460,11 +541,14 @@ impl BatchLogProcessor {
otel_debug!(
name: "BatchLogProcessor.ExportingDueToTimer",
);
let _ = export_with_timeout_sync(
config.max_export_timeout,
&mut exporter,

let _ = get_logs_and_export(
&logs_receiver,
&exporter,
&mut logs,
&mut last_export_time,
&current_batch_size,
&config,
);
}
Err(RecvTimeoutError::Disconnected) => {
Expand All @@ -486,13 +570,17 @@ impl BatchLogProcessor {

// Return batch processor with link to worker
BatchLogProcessor {
logs_sender,
message_sender,
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
is_shutdown: AtomicBool::new(false),
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
export_log_message_sent: Arc::new(AtomicBool::new(false)),
current_batch_size,
max_export_batch_size,
}
}

Expand All @@ -511,7 +599,7 @@ impl BatchLogProcessor {
#[allow(clippy::vec_box)]
fn export_with_timeout_sync<E>(
_: Duration, // TODO, enforcing timeout in exporter.
exporter: &mut E,
exporter: &E,
batch: &mut Vec<Box<(LogRecord, InstrumentationScope)>>,
last_export_time: &mut Instant,
) -> ExportResult
Expand Down

0 comments on commit 01cd8a0

Please sign in to comment.