Skip to content

Commit

Permalink
Nit improvements to batchprocessors
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas committed Dec 23, 2024
1 parent ef49833 commit 3cc9b14
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 15 deletions.
36 changes: 24 additions & 12 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,35 +337,42 @@ impl BatchLogProcessor {
match message_receiver.recv_timeout(remaining_time) {
Ok(BatchMessage::ExportLog(log)) => {
logs.push(log);
if logs.len() == config.max_export_batch_size
|| last_export_time.elapsed() >= config.scheduled_delay
{
if logs.len() == config.max_export_batch_size {
otel_debug!(
name: "BatchLogProcessor.ExportingDueToBatchSize",
);

Check warning on line 343 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L341-L343

Added lines #L341 - L343 were not covered by tests
let _ = export_with_timeout_sync(
remaining_time,
config.max_export_timeout,

Check warning on line 345 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L345

Added line #L345 was not covered by tests
&mut exporter,
logs.split_off(0),
&mut last_export_time,
);
}
}
Ok(BatchMessage::ForceFlush(sender)) => {
otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
let result = export_with_timeout_sync(
remaining_time,
config.max_export_timeout,
&mut exporter,
logs.split_off(0),
&mut last_export_time,
);
let _ = sender.send(result);
}
Ok(BatchMessage::Shutdown(sender)) => {
otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown");
let result = export_with_timeout_sync(
remaining_time,
config.max_export_timeout,
&mut exporter,
logs.split_off(0),
&mut last_export_time,
);
let _ = sender.send(result);

otel_debug!(
name: "BatchLogProcessor.ThreadExiting",
reason = "ShutdownRequested"
);
//
// break out the loop and return from the current background thread.
//
Expand All @@ -375,19 +382,24 @@ impl BatchLogProcessor {
exporter.set_resource(&resource);
}
Err(RecvTimeoutError::Timeout) => {
otel_debug!(
name: "BatchLogProcessor.ExportingDueToTimer",
);

Check warning on line 387 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L385-L387

Added lines #L385 - L387 were not covered by tests
let _ = export_with_timeout_sync(
remaining_time,
config.max_export_timeout,

Check warning on line 389 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L389

Added line #L389 was not covered by tests
&mut exporter,
logs.split_off(0),
&mut last_export_time,
);
}
Err(err) => {
// TODO: this should not happen! Log the error and continue for now.
otel_error!(
name: "BatchLogProcessor.InternalError",
error = format!("{}", err)
Err(RecvTimeoutError::Disconnected) => {
// Channel disconnected, only thing to do is break
// out (i.e exit the thread)
otel_debug!(
name: "BatchLogProcessor.ThreadExiting",
reason = "MessageReceiverDisconnected"

Check warning on line 400 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L398-L400

Added lines #L398 - L400 were not covered by tests
);
break;

Check warning on line 402 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L402

Added line #L402 was not covered by tests
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,11 @@ impl BatchSpanProcessor {
}
}
Err(RecvTimeoutError::Disconnected) => {
otel_error!(
name: "BatchSpanProcessor.InternalError.ChannelDisconnected",
message = "Channel disconnected, shutting down processor thread."
// Channel disconnected, only thing to do is break
// out (i.e exit the thread)
otel_debug!(
name: "BatchSpanProcessor.ThreadExiting",
reason = "MessageReceiverDisconnected"
);
break;
}
Expand Down

0 comments on commit 3cc9b14

Please sign in to comment.