Skip to content

Commit

Permalink
Integration test for non tokio main (#2520)
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored Jan 17, 2025
1 parent c51c4b2 commit 27410b0
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 43 deletions.
1 change: 1 addition & 0 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"shoppingcart",
"struct",
"Tescher",
"testcontainers",
"testresults",
"thiserror",
"tracerprovider",
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-otlp/tests/integration_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ testcontainers = { version = "0.23.1", features = ["http_wait"]}
once_cell.workspace = true
anyhow = "1.0.94"
ctor = "0.2.9"
uuid = { version = "1.3", features = ["v4"] }
tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] }
tracing = {workspace = true}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn init_tracing() {
// Initialize the tracing subscriber with the OpenTelemetry layer and the
// Fmt layer.
tracing_subscriber::registry().with(fmt_layer).init();
otel_info!(name: "tracing initializing completed!");
otel_info!(name: "tracing::fmt initializing completed! SDK internal logs will be printed to stdout.");
});
}

Expand Down
86 changes: 52 additions & 34 deletions opentelemetry-otlp/tests/integration_test/tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

use anyhow::Result;
use ctor::dtor;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use integration_test_runner::test_utils;
use opentelemetry_otlp::LogExporter;
use opentelemetry_sdk::logs::LoggerProvider;
use opentelemetry_sdk::{logs as sdklogs, Resource};
use std::fs::File;
use std::os::unix::fs::MetadataExt;
use std::io::Read;

fn init_logs() -> Result<sdklogs::LoggerProvider> {
let exporter_builder = LogExporter::builder();
Expand Down Expand Up @@ -36,12 +35,23 @@ fn init_logs() -> Result<sdklogs::LoggerProvider> {

#[cfg(test)]
mod logtests {
// TODO: The tests in this mod works like below: Emit a log with a UUID,
// then read the logs from the file and check if the UUID is present in the
// logs. This makes it easy to validate with a single collector and its
// output. This is a very simple test but good enough to validate that OTLP
// Exporter did work! A more comprehensive test would be to validate the
// entire Payload. The infra for it already exists (logs_asserter.rs), the
// TODO here is to write a test that validates the entire payload.

use super::*;
use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter};
use integration_test_runner::test_utils;
use opentelemetry_appender_tracing::layer;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use std::{fs::File, time::Duration};
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
use uuid::Uuid;

#[test]
#[should_panic(expected = "assertion `left == right` failed: body does not match")]
Expand All @@ -68,41 +78,50 @@ mod logtests {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[cfg(not(feature = "hyper-client"))]
#[cfg(not(feature = "reqwest-client"))]
pub async fn test_logs() -> Result<()> {
// Make sure the container is running
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub async fn logs_batch_tokio_multi_thread() -> Result<()> {
logs_batch_tokio_helper().await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub async fn logs_batch_tokio_multi_with_one_worker() -> Result<()> {
logs_batch_tokio_helper().await
}

use integration_test_runner::test_utils;
use opentelemetry_appender_tracing::layer;
use tracing::info;
use tracing_subscriber::layer::SubscriberExt;
#[tokio::test(flavor = "current_thread")]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub async fn logs_batch_tokio_current() -> Result<()> {
logs_batch_tokio_helper().await
}

async fn logs_batch_tokio_helper() -> Result<()> {
use crate::{assert_logs_results, init_logs};
test_utils::start_collector_container().await?;

let logger_provider = init_logs().unwrap();
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let layer = OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
// generate a random uuid and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);
info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99);
}
// TODO: remove below wait before calling logger_provider.shutdown()
// tokio::time::sleep(Duration::from_secs(10)).await;
let _ = logger_provider.shutdown();

tokio::time::sleep(Duration::from_secs(10)).await;

assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json")?;

let _ = logger_provider.shutdown();
tokio::time::sleep(Duration::from_secs(5)).await;
assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
}

#[ignore = "TODO: [Fix Me] Failing on CI. Needs to be investigated and resolved."]
#[test]
#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))]
pub fn logs_batch_non_tokio_main() -> Result<()> {
logs_batch_non_tokio_helper()
}

fn logs_batch_non_tokio_helper() -> Result<()> {
// Initialize the logger provider inside a tokio runtime
// as this allows tonic client to capture the runtime,
// but actual export occurs from the dedicated std::thread
Expand All @@ -113,29 +132,28 @@ mod logtests {
test_utils::start_collector_container().await?;
init_logs()
})?;

info!("LoggerProvider created");
let layer = OpenTelemetryTracingBridge::new(&logger_provider);
let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider);
let subscriber = tracing_subscriber::registry().with(layer);
// generate a random uuid and store it to expected guid
let expected_uuid = Uuid::new_v4().to_string();
{
let _guard = tracing::subscriber::set_default(subscriber);
info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99);
info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99);
}
let _ = logger_provider.shutdown();
// tokio::time::sleep(Duration::from_secs(10)).await;
assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json")?;

let _ = logger_provider.shutdown();
std::thread::sleep(Duration::from_secs(5));
assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?;
Ok(())
}
}

pub fn assert_logs_results(result: &str, expected: &str) -> Result<()> {
let left = read_logs_from_json(File::open(expected)?)?;
let right = read_logs_from_json(File::open(result)?)?;

LogsAsserter::new(left, right).assert();

assert!(File::open(result).unwrap().metadata().unwrap().size() > 0);
pub fn assert_logs_results(result: &str, expected_content: &str) -> Result<()> {
let file = File::open(result)?;
let mut contents = String::new();
let mut reader = std::io::BufReader::new(&file);
reader.read_to_string(&mut contents)?;
assert!(contents.contains(expected_content));
Ok(())
}

Expand Down
4 changes: 1 addition & 3 deletions opentelemetry-otlp/tests/integration_test/tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub fn validate_metrics_against_results(scope_name: &str) -> Result<()> {
#[cfg(test)]
#[cfg(not(feature = "hyper-client"))]
#[cfg(not(feature = "reqwest-client"))]
mod tests {
mod metrictests {

use super::*;
use opentelemetry::metrics::MeterProvider;
Expand Down Expand Up @@ -246,7 +246,6 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
// #[ignore] // skip when running unit test
async fn test_histogram() -> Result<()> {
_ = setup_metrics_test().await;
const METER_NAME: &str = "test_histogram_meter";
Expand All @@ -263,7 +262,6 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
// #[ignore] // skip when running unit test
async fn test_up_down_counter() -> Result<()> {
_ = setup_metrics_test().await;
const METER_NAME: &str = "test_up_down_meter";
Expand Down
42 changes: 37 additions & 5 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@
**`experimental_metrics_periodicreader_with_async_runtime`**.

Migration Guide:

1. *Default Implementation, requires no async runtime* (**Recommended**) The
1. *Default Implementation, requires no async runtime* (**Recommended**) The
new default implementation does not require a runtime argument. Replace the
builder method accordingly:
- *Before:*
Expand All @@ -71,18 +70,28 @@
let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter).build();
```

The new PeriodicReader can be used with OTLP Exporter, and supports
following exporter features:
- `grpc-tonic`: This requires `MeterProvider` to be created within a tokio
runtime.
- `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.

In other words, other clients like `reqwest` and `hyper` are not supported.

2. *Async Runtime Support*
If your application cannot spin up new threads or you prefer using async
runtimes, enable the
"experimental_metrics_periodicreader_with_async_runtime" feature flag and
adjust code as below.

- *Before:*

```rust
let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, runtime::Tokio).build();
```

- *After:*

```rust
let reader = opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader::builder(exporter, runtime::Tokio).build();
```
Expand All @@ -107,8 +116,7 @@
- Upgrade the tracing crate used for internal logging to version 0.1.40 or later. This is necessary because the internal logging macros utilize the name field as
metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/open-telemetry/opentelemetry-rust/pull/2418)

- **Breaking** [#2436](https://github.com/open-telemetry/opentelemetry-rust/pull/2436)

- *Breaking* - `BatchLogProcessor` Updates [#2436](https://github.com/open-telemetry/opentelemetry-rust/pull/2436)
`BatchLogProcessor` no longer requires an async runtime by default. Instead, a dedicated
background thread is created to do the batch processing and exporting.

Expand All @@ -120,33 +128,45 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope
new default implementation does not require a runtime argument. Replace the
builder method accordingly:
- *Before:*

```rust
let logger_provider = LoggerProvider::builder()
.with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build())
.build();
```

- *After:*

```rust
let logger_provider = LoggerProvider::builder()
.with_log_processor(BatchLogProcessor::builder(exporter).build())
.build();
```

The new BatchLogProcessor can be used with OTLP Exporter, and supports
following exporter features:
- `grpc-tonic`: This requires `MeterProvider` to be created within a tokio
runtime.
- `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.

In other words, other clients like `reqwest` and `hyper` are not supported.

2. *Async Runtime Support*
If your application cannot spin up new threads or you prefer using async
runtimes, enable the
"experimental_logs_batch_log_processor_with_async_runtime" feature flag and
adjust code as below.

- *Before:*

```rust
let logger_provider = LoggerProvider::builder()
.with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build())
.build();
```

- *After:*

```rust
let logger_provider = LoggerProvider::builder()
.with_log_processor(log_processor_with_async_runtime::BatchLogProcessor::builder(exporter, runtime::Tokio).build())
Expand All @@ -159,7 +179,7 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope
- Continue enabling one of the async runtime feature flags: `rt-tokio`,
`rt-tokio-current-thread`, or `rt-async-std`.

- **Breaking** [#2456](https://github.com/open-telemetry/opentelemetry-rust/pull/2456)
- *Breaking* - `BatchSpanProcessor` Updates [#2435](https://github.com/open-telemetry/opentelemetry-rust/pull/2456)

`BatchSpanProcessor` no longer requires an async runtime by default. Instead, a dedicated
background thread is created to do the batch processing and exporting.
Expand All @@ -172,33 +192,45 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope
new default implementation does not require a runtime argument. Replace the
builder method accordingly:
- *Before:*

```rust
let tracer_provider = TracerProvider::builder()
.with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
.build();
```

- *After:*

```rust
let tracer_provider = TracerProvider::builder()
.with_span_processor(BatchSpanProcessor::builder(exporter).build())
.build();
```

The new BatchLogProcessor can be used with OTLP Exporter, and supports
following exporter features:
- `grpc-tonic`: This requires `MeterProvider` to be created within a tokio
runtime.
- `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`.

In other words, other clients like `reqwest` and `hyper` are not supported.

2. *Async Runtime Support*
If your application cannot spin up new threads or you prefer using async
runtimes, enable the
"experimental_trace_batch_span_processor_with_async_runtime" feature flag and
adjust code as below.

- *Before:*

```rust
let tracer_provider = TracerProvider::builder()
.with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
.build();
```

- *After:*

```rust
let tracer_provider = TracerProvider::builder()
.with_span_processor(span_processor_with_async_runtime::BatchSpanProcessor::builder(exporter, runtime::Tokio).build())
Expand Down

0 comments on commit 27410b0

Please sign in to comment.