Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add force_flush to LogExporter #2276

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
905ffbc
Add force_flush to LogExporter
ThomsonTan Nov 5, 2024
7db1bd9
Merge branch 'main' into add_force_flush_log
lalitb Nov 6, 2024
9cc40c1
Fix outdated comments
ThomsonTan Nov 6, 2024
29f7ae6
Merge branch 'main' into add_force_flush_log
lalitb Nov 7, 2024
d67c975
Change force_flush to async
ThomsonTan Nov 7, 2024
4ed3b73
Merge branch 'main' into add_force_flush_log
lalitb Nov 7, 2024
125af29
Merge branch 'main' into add_force_flush_log
ThomsonTan Nov 7, 2024
3aa6f8d
Remove async from force_flush in LogExporter
ThomsonTan Nov 14, 2024
9984bec
Merge branch 'main' into add_force_flush_log
lalitb Nov 14, 2024
6974ced
Merge branch 'main' into add_force_flush_log
ThomsonTan Nov 18, 2024
9290828
Add force_flush exporter call to SimpleLogProcessor
ThomsonTan Nov 18, 2024
f1e861e
Merge branch 'main' into add_force_flush_log
ThomsonTan Nov 20, 2024
b258265
Merge branch 'main' into add_force_flush_log
ThomsonTan Nov 27, 2024
71d4b40
Merge branch 'main' into add_force_flush_log
ThomsonTan Dec 4, 2024
2987161
Merge remote-tracking branch 'origin' into add_force_flush_log
ThomsonTan Jan 21, 2025
9b661f4
Merge branch 'main' into add_force_flush_log
ThomsonTan Jan 21, 2025
1bf3d45
Address feedback
ThomsonTan Jan 21, 2025
55dfd62
Fix build
ThomsonTan Jan 21, 2025
eae55fe
Remvoe force_flush call after export
ThomsonTan Jan 21, 2025
bacb628
Fix build
ThomsonTan Jan 21, 2025
3d02d35
Merge branch 'main' into add_force_flush_log
lalitb Jan 22, 2025
b67e2b0
Fix parameters
ThomsonTan Jan 22, 2025
a154ce8
Remove unneeded reference
ThomsonTan Jan 22, 2025
54bb9fa
Remove unnecessary mut
ThomsonTan Jan 22, 2025
86d7058
Remove runtime::tokio
ThomsonTan Jan 22, 2025
6e7e418
Fix test
ThomsonTan Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ pub trait LogExporter: Send + Sync + Debug {
// By default, all logs are enabled
true
}

///
/// This method SHOULD block the current thread until all pending log records are exported.
/// If the export was not successful, an error is returned.
///
fn force_flush(&mut self) -> ExportResult {
Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not have a default implementation for this method. Otherwise, we could run into a situation where an exporter user goes through the documentation and calls force_flush with certain expectations that haven't been agreed upon by the exporter author.

It looks like we have a default implementation for a few other methods as well such as shutdown and set_resource. We should revisit them based on what we decide.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, we could run into a situation where an exporter user goes through the documentation and calls force_flush with certain expectations that haven't been agreed upon by the exporter author.

If the exporter author does not provide an implementation for flush, then they can document that right? Or the doc for Provider can be updated to merely state that it'll invoke flush on the processor, and the same for processor can state they will invoke the flush for exporter.

Default implementation seems reasonable to me, as not every exporter need to do something for flush.

}
/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
}
Expand Down
20 changes: 19 additions & 1 deletion opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@
}

fn force_flush(&self) -> LogResult<()> {
Ok(())
if let Ok(mut exporter) = self.exporter.lock() {
exporter.force_flush()

Check warning on line 178 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L177-L178

Added lines #L177 - L178 were not covered by tests
} else {
Err(LogError::MutexPoisoned("SimpleLogProcessor".into()))

Check warning on line 180 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L180

Added line #L180 was not covered by tests
}
}

fn shutdown(&self) -> LogResult<()> {
Expand Down Expand Up @@ -1063,6 +1067,20 @@
let _ = provider.shutdown();
}

#[tokio::test(flavor = "multi_thread")]
async fn test_batch_forceflush() {
let exporter = InMemoryLogExporterBuilder::default().build();

let processor = BatchLogProcessor::new(exporter.clone(), BatchConfig::default());

let mut record = LogRecord::default();
let instrumentation = InstrumentationScope::default();

processor.emit(&mut record, &instrumentation);
processor.force_flush().unwrap();
assert_eq!(1, exporter.get_emitted_logs().unwrap().len());
}

#[tokio::test(flavor = "multi_thread")]
async fn test_batch_shutdown() {
// assert we will receive an error
Expand Down
Loading