diff --git a/.cspell.json b/.cspell.json index fd5a476fb7..e620df293b 100644 --- a/.cspell.json +++ b/.cspell.json @@ -60,10 +60,13 @@ "reqwest", "runtimes", "rustc", + "serde", "shoppingcart", "struct", "Tescher", + "testcontainers", "testresults", + "thiserror", "tracerprovider", "updown", "Zhongyang", diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index ce4b760618..b7156e3ab6 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -6,7 +6,10 @@ - Feature flag "populate-logs-event-name" is removed as no longer relevant. LogRecord's `event_name()` is now automatically populated on the newly added "event_name" field in LogRecord proto definition. - +- Remove "grpc-tonic" feature from default, and instead add "http-proto" and + "reqwest-blocking-client" features as default, to align with the + specification. + [2516](https://github.com/open-telemetry/opentelemetry-rust/pull/2516) ## 0.27.0 diff --git a/opentelemetry-otlp/Cargo.toml b/opentelemetry-otlp/Cargo.toml index cebe075082..0c7ee8fe94 100644 --- a/opentelemetry-otlp/Cargo.toml +++ b/opentelemetry-otlp/Cargo.toml @@ -62,7 +62,7 @@ internal-logs = ["tracing", "opentelemetry/internal-logs"] # add ons serialize = ["serde", "serde_json"] -default = ["grpc-tonic", "trace", "metrics", "logs", "internal-logs"] +default = ["http-proto", "reqwest-blocking-client", "trace", "metrics", "logs", "internal-logs"] # grpc using tonic grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic"] diff --git a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml index 4974c8a223..ef1595b0ae 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml @@ -8,13 +8,12 @@ publish = false [features] default = ["reqwest-blocking"] reqwest-blocking = ["opentelemetry-otlp/reqwest-blocking-client"] -hyper = ["opentelemetry-otlp/hyper-client"] [dependencies] once_cell = { workspace = true } opentelemetry = { path = "../../../opentelemetry" } -opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "experimental_metrics_periodicreader_with_async_runtime"]} -opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "logs", "internal-logs"] , default-features = false} +opentelemetry_sdk = { path = "../../../opentelemetry-sdk" } +opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "logs", "internal-logs"], default-features = false} opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} tokio = { workspace = true, features = ["full"] } diff --git a/opentelemetry-otlp/examples/basic-otlp-http/Dockerfile b/opentelemetry-otlp/examples/basic-otlp-http/Dockerfile deleted file mode 100644 index f88c276a55..0000000000 --- a/opentelemetry-otlp/examples/basic-otlp-http/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM rust:1.51 -COPY . /usr/src/basic-otlp-http/ -WORKDIR /usr/src/basic-otlp-http/ -RUN cargo build --release -RUN cargo install --path . -CMD ["/usr/local/cargo/bin/basic-otlp-http"] diff --git a/opentelemetry-otlp/examples/basic-otlp-http/README.md b/opentelemetry-otlp/examples/basic-otlp-http/README.md index 2d06e6a8fe..78ff779a66 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/README.md +++ b/opentelemetry-otlp/examples/basic-otlp-http/README.md @@ -16,46 +16,25 @@ recommended approach when using OTLP exporters. While it can be modified to use a `SimpleExporter`, this requires making the main function a regular main and *not* tokio main. -// TODO: Document `hyper` feature flag when using SimpleProcessor. +// TODO: Document how to use hyper client. ## Usage -### `docker-compose` - -By default runs against the `otel/opentelemetry-collector:latest` image, and uses `reqwest-client` -as the http client, using http as the transport. - -```shell -docker-compose up -``` - -In another terminal run the application `cargo run` - -The docker-compose terminal will display logs, traces, metrics. - -Press Ctrl+C to stop the collector, and then tear it down: - -```shell -docker-compose down -``` - -### Manual - -If you don't want to use `docker-compose`, you can manually run the `otel/opentelemetry-collector` container -and inspect the logs to see traces being transferred. +Run the `otel/opentelemetry-collector` container using docker +and inspect the logs to see the exported telemetry. On Unix based systems use: ```shell # From the current directory, run `opentelemetry-collector` -docker run --rm -it -p 4318:4318 -v $(pwd):/cfg otel/opentelemetry-collector:latest --config=/cfg/otel-collector-config.yaml +docker run --rm -it -p 4317:4317 -p 4318:4318 -v $(pwd):/cfg otel/opentelemetry-collector:latest --config=/cfg/otel-collector-config.yaml ``` On Windows use: ```shell # From the current directory, run `opentelemetry-collector` -docker run --rm -it -p 4318:4318 -v "%cd%":/cfg otel/opentelemetry-collector:latest --config=/cfg/otel-collector-config.yaml +docker run --rm -it -p 4317:4317 -p 4318:4318 -v "%cd%":/cfg otel/opentelemetry-collector:latest --config=/cfg/otel-collector-config.yaml ``` Run the app which exports logs, metrics and traces via OTLP to the collector @@ -64,11 +43,7 @@ Run the app which exports logs, metrics and traces via OTLP to the collector cargo run ``` -By default the app will use a `reqwest` client to send. A hyper 0.14 client can be used with the `hyper` feature enabled - -```shell -cargo run --no-default-features --features=hyper -``` +The app will use a `reqwest-blocking` client to send. ## View results diff --git a/opentelemetry-otlp/examples/basic-otlp-http/docker-compose.yaml b/opentelemetry-otlp/examples/basic-otlp-http/docker-compose.yaml deleted file mode 100644 index dc9d1e7a5d..0000000000 --- a/opentelemetry-otlp/examples/basic-otlp-http/docker-compose.yaml +++ /dev/null @@ -1,15 +0,0 @@ -version: "2" -services: - - # Collector - otel-collector: - image: otel/opentelemetry-collector:latest - command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"] - volumes: - - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml - ports: - - "4318:4318" # OTLP HTTP receiver - - - - diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index bf33828091..763add8b50 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -69,9 +69,8 @@ fn init_metrics() -> Result Result<(), Box> { +#[tokio::main] +async fn main() -> Result<(), Box> { let logger_provider = init_logs()?; // Create a new OpenTelemetryTracingBridge using the above LoggerProvider. diff --git a/opentelemetry-otlp/examples/basic-otlp/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp/Cargo.toml index ad050bc338..f841ae5374 100644 --- a/opentelemetry-otlp/examples/basic-otlp/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp/Cargo.toml @@ -8,8 +8,8 @@ publish = false [dependencies] once_cell = { workspace = true } opentelemetry = { path = "../../../opentelemetry" } -opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio"] } -opentelemetry-otlp = { path = "../../../opentelemetry-otlp" } +opentelemetry_sdk = { path = "../../../opentelemetry-sdk" } +opentelemetry-otlp = { path = "../../../opentelemetry-otlp", features = ["grpc-tonic"] } tokio = { version = "1.0", features = ["full"] } opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} tracing = { workspace = true, features = ["std"]} diff --git a/opentelemetry-otlp/examples/basic-otlp/Dockerfile b/opentelemetry-otlp/examples/basic-otlp/Dockerfile deleted file mode 100644 index b63241e283..0000000000 --- a/opentelemetry-otlp/examples/basic-otlp/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM rust:1.51 -COPY . /usr/src/basic-otlp/ -WORKDIR /usr/src/basic-otlp/ -RUN cargo build --release -RUN cargo install --path . -CMD ["/usr/local/cargo/bin/basic-otlp"] diff --git a/opentelemetry-otlp/examples/basic-otlp/README.md b/opentelemetry-otlp/examples/basic-otlp/README.md index ca02018ad5..f4ebe150fb 100644 --- a/opentelemetry-otlp/examples/basic-otlp/README.md +++ b/opentelemetry-otlp/examples/basic-otlp/README.md @@ -49,42 +49,21 @@ fn main() -> Result<(), Box> { ## Usage -### `docker-compose` - -By default runs against the `otel/opentelemetry-collector:latest` image, and uses the `tonic`'s -`grpc` example as the transport. - -```shell -docker-compose up -``` - -In another terminal run the application `cargo run` - -The docker-compose terminal will display logs, traces, metrics. - -Press Ctrl+C to stop the collector, and then tear it down: - -```shell -docker-compose down -``` - -### Manual - -If you don't want to use `docker-compose`, you can manually run the `otel/opentelemetry-collector` container -and inspect the logs to see traces being transferred. +Run the `otel/opentelemetry-collector` container using docker +and inspect the logs to see the exported telemetry. On Unix based systems use: ```shell # From the current directory, run `opentelemetry-collector` -docker run --rm -it -p 4317:4317 -v $(pwd):/cfg otel/opentelemetry-collector:latest --config=/cfg/otel-collector-config.yaml +docker run --rm -it -p 4317:4317 -p 4318:4318 -v $(pwd):/cfg otel/opentelemetry-collector:latest --config=/cfg/otel-collector-config.yaml ``` On Windows use: ```shell # From the current directory, run `opentelemetry-collector` -docker run --rm -it -p 4317:4317 -v "%cd%":/cfg otel/opentelemetry-collector:latest --config=/cfg/otel-collector-config.yaml +docker run --rm -it -p 4317:4317 -p 4318:4318 -v "%cd%":/cfg otel/opentelemetry-collector:latest --config=/cfg/otel-collector-config.yaml ``` Run the app which exports logs, metrics and traces via OTLP to the collector diff --git a/opentelemetry-otlp/examples/basic-otlp/docker-compose.yaml b/opentelemetry-otlp/examples/basic-otlp/docker-compose.yaml deleted file mode 100644 index fc9b3f1948..0000000000 --- a/opentelemetry-otlp/examples/basic-otlp/docker-compose.yaml +++ /dev/null @@ -1,15 +0,0 @@ -version: "2" -services: - - # Collector - otel-collector: - image: otel/opentelemetry-collector:latest - command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"] - volumes: - - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml - ports: - - "4317:4317" # OTLP gRPC receiver - - - - diff --git a/opentelemetry-otlp/tests/integration_test/Cargo.toml b/opentelemetry-otlp/tests/integration_test/Cargo.toml index 5314c1fe61..dc58d5d44b 100644 --- a/opentelemetry-otlp/tests/integration_test/Cargo.toml +++ b/opentelemetry-otlp/tests/integration_test/Cargo.toml @@ -14,6 +14,7 @@ testcontainers = { version = "0.23.1", features = ["http_wait"]} once_cell.workspace = true anyhow = "1.0.94" ctor = "0.2.9" +uuid = { version = "1.3", features = ["v4"] } tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] } tracing = {workspace = true} diff --git a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs index bd62674868..b0264a5b76 100644 --- a/opentelemetry-otlp/tests/integration_test/src/test_utils.rs +++ b/opentelemetry-otlp/tests/integration_test/src/test_utils.rs @@ -52,7 +52,7 @@ fn init_tracing() { // Initialize the tracing subscriber with the OpenTelemetry layer and the // Fmt layer. tracing_subscriber::registry().with(fmt_layer).init(); - otel_info!(name: "tracing initializing completed!"); + otel_info!(name: "tracing::fmt initializing completed! SDK internal logs will be printed to stdout."); }); } diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index af78ee9005..eb5bc7170b 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -2,15 +2,14 @@ use anyhow::Result; use ctor::dtor; -use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter}; use integration_test_runner::test_utils; use opentelemetry_otlp::LogExporter; use opentelemetry_sdk::logs::LoggerProvider; use opentelemetry_sdk::{logs as sdklogs, Resource}; use std::fs::File; -use std::os::unix::fs::MetadataExt; +use std::io::Read; -fn init_logs() -> Result { +fn init_logs(is_simple: bool) -> Result { let exporter_builder = LogExporter::builder(); #[cfg(feature = "tonic-client")] let exporter_builder = exporter_builder.with_tonic(); @@ -24,24 +23,43 @@ fn init_logs() -> Result { let exporter = exporter_builder.build()?; - Ok(LoggerProvider::builder() - .with_batch_exporter(exporter) + let mut logger_provider_builder = LoggerProvider::builder(); + if is_simple { + logger_provider_builder = logger_provider_builder.with_simple_exporter(exporter) + } else { + logger_provider_builder = logger_provider_builder.with_batch_exporter(exporter) + }; + + let logger_provider = logger_provider_builder .with_resource( Resource::builder_empty() .with_service_name("logs-integration-test") .build(), ) - .build()) + .build(); + + Ok(logger_provider) } #[cfg(test)] mod logtests { + // TODO: The tests in this mod works like below: Emit a log with a UUID, + // then read the logs from the file and check if the UUID is present in the + // logs. This makes it easy to validate with a single collector and its + // output. This is a very simple test but good enough to validate that OTLP + // Exporter did work! A more comprehensive test would be to validate the + // entire Payload. The infra for it already exists (logs_asserter.rs), the + // TODO here is to write a test that validates the entire payload. + use super::*; use integration_test_runner::logs_asserter::{read_logs_from_json, LogsAsserter}; + use integration_test_runner::test_utils; + use opentelemetry_appender_tracing::layer; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use std::{fs::File, time::Duration}; use tracing::info; use tracing_subscriber::layer::SubscriberExt; + use uuid::Uuid; #[test] #[should_panic(expected = "assertion `left == right` failed: body does not match")] @@ -68,41 +86,91 @@ mod logtests { } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - #[cfg(not(feature = "hyper-client"))] - #[cfg(not(feature = "reqwest-client"))] - pub async fn test_logs() -> Result<()> { - // Make sure the container is running + #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] + pub async fn logs_batch_tokio_multi_thread() -> Result<()> { + logs_batch_tokio_helper().await + } - use integration_test_runner::test_utils; - use opentelemetry_appender_tracing::layer; - use tracing::info; - use tracing_subscriber::layer::SubscriberExt; + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] + pub async fn logs_batch_tokio_multi_with_one_worker() -> Result<()> { + logs_batch_tokio_helper().await + } + #[tokio::test(flavor = "current_thread")] + #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] + pub async fn logs_batch_tokio_current() -> Result<()> { + logs_batch_tokio_helper().await + } + + async fn logs_batch_tokio_helper() -> Result<()> { use crate::{assert_logs_results, init_logs}; test_utils::start_collector_container().await?; - let logger_provider = init_logs().unwrap(); - let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); + let logger_provider = init_logs(false).unwrap(); + let layer = OpenTelemetryTracingBridge::new(&logger_provider); let subscriber = tracing_subscriber::registry().with(layer); + // generate a random uuid and store it to expected guid + let expected_uuid = Uuid::new_v4().to_string(); { let _guard = tracing::subscriber::set_default(subscriber); - info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99); + info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); } - // TODO: remove below wait before calling logger_provider.shutdown() - // tokio::time::sleep(Duration::from_secs(10)).await; + let _ = logger_provider.shutdown(); + tokio::time::sleep(Duration::from_secs(5)).await; + assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?; + Ok(()) + } - tokio::time::sleep(Duration::from_secs(10)).await; + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + #[cfg(any(feature = "tonic-client", feature = "reqwest-client"))] + pub async fn logs_simple_tokio_multi_thread() -> Result<()> { + logs_simple_tokio_helper().await + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + #[cfg(any(feature = "tonic-client", feature = "reqwest-client"))] + pub async fn logs_simple_tokio_multi_with_one_worker() -> Result<()> { + logs_simple_tokio_helper().await + } + + // Ignored, to be investigated + #[ignore] + #[tokio::test(flavor = "current_thread")] + #[cfg(any(feature = "tonic-client", feature = "reqwest-client"))] + pub async fn logs_simple_tokio_current() -> Result<()> { + logs_simple_tokio_helper().await + } + + async fn logs_simple_tokio_helper() -> Result<()> { + use crate::{assert_logs_results, init_logs}; + test_utils::start_collector_container().await?; - assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json")?; + let logger_provider = init_logs(true).unwrap(); + let layer = OpenTelemetryTracingBridge::new(&logger_provider); + let subscriber = tracing_subscriber::registry().with(layer); + info!("Tracing initialized"); + // generate a random uuid and store it to expected guid + let expected_uuid = Uuid::new_v4().to_string(); + { + let _guard = tracing::subscriber::set_default(subscriber); + info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); + } + let _ = logger_provider.shutdown(); + tokio::time::sleep(Duration::from_secs(5)).await; + assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?; Ok(()) } - #[ignore = "TODO: [Fix Me] Failing on CI. Needs to be investigated and resolved."] #[test] #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] pub fn logs_batch_non_tokio_main() -> Result<()> { + logs_batch_non_tokio_helper() + } + + fn logs_batch_non_tokio_helper() -> Result<()> { // Initialize the logger provider inside a tokio runtime // as this allows tonic client to capture the runtime, // but actual export occurs from the dedicated std::thread @@ -111,31 +179,61 @@ mod logtests { let logger_provider = rt.block_on(async { // While we're here setup our collector container too, as this needs tokio to run test_utils::start_collector_container().await?; - init_logs() + init_logs(false) })?; - - info!("LoggerProvider created"); - let layer = OpenTelemetryTracingBridge::new(&logger_provider); + let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); let subscriber = tracing_subscriber::registry().with(layer); + // generate a random uuid and store it to expected guid + let expected_uuid = Uuid::new_v4().to_string(); { let _guard = tracing::subscriber::set_default(subscriber); - info!(target: "my-target", "hello from {}. My price is {}.", "banana", 2.99); + info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); } - let _ = logger_provider.shutdown(); - // tokio::time::sleep(Duration::from_secs(10)).await; - assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json")?; + let _ = logger_provider.shutdown(); + std::thread::sleep(Duration::from_secs(5)); + assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?; Ok(()) } -} -pub fn assert_logs_results(result: &str, expected: &str) -> Result<()> { - let left = read_logs_from_json(File::open(expected)?)?; - let right = read_logs_from_json(File::open(result)?)?; + #[test] + #[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] + pub fn logs_simple_non_tokio_main() -> Result<()> { + logs_simple_non_tokio_helper() + } - LogsAsserter::new(left, right).assert(); + fn logs_simple_non_tokio_helper() -> Result<()> { + // Initialize the logger provider inside a tokio runtime + // as this allows tonic client to capture the runtime, + // but actual export occurs from the main non-tokio thread. + let rt = tokio::runtime::Runtime::new()?; + let logger_provider = rt.block_on(async { + // While we're here setup our collector container too, as this needs tokio to run + test_utils::start_collector_container().await?; + init_logs(true) + })?; + let layer = layer::OpenTelemetryTracingBridge::new(&logger_provider); + let subscriber = tracing_subscriber::registry().with(layer); + // generate a random uuid and store it to expected guid + let expected_uuid = Uuid::new_v4().to_string(); + { + let _guard = tracing::subscriber::set_default(subscriber); + info!(target: "my-target", uuid = expected_uuid, "hello from {}. My price is {}.", "banana", 2.99); + } + + let _ = logger_provider.shutdown(); + std::thread::sleep(Duration::from_secs(5)); + assert_logs_results(test_utils::LOGS_FILE, expected_uuid.as_str())?; + Ok(()) + } +} - assert!(File::open(result).unwrap().metadata().unwrap().size() > 0); +pub fn assert_logs_results(result: &str, expected_content: &str) -> Result<()> { + let file = File::open(result)?; + let mut contents = String::new(); + let mut reader = std::io::BufReader::new(&file); + reader.read_to_string(&mut contents)?; + assert!(contents.contains(expected_content)); Ok(()) } diff --git a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs index 125c501e14..311ddbfae7 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/metrics.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/metrics.rs @@ -189,9 +189,8 @@ pub fn validate_metrics_against_results(scope_name: &str) -> Result<()> { /// TODO - fix this asynchronously. /// #[cfg(test)] -#[cfg(not(feature = "hyper-client"))] -#[cfg(not(feature = "reqwest-client"))] -mod tests { +#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] +mod metrictests { use super::*; use opentelemetry::metrics::MeterProvider; @@ -246,7 +245,6 @@ mod tests { } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - // #[ignore] // skip when running unit test async fn test_histogram() -> Result<()> { _ = setup_metrics_test().await; const METER_NAME: &str = "test_histogram_meter"; @@ -263,7 +261,6 @@ mod tests { } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - // #[ignore] // skip when running unit test async fn test_up_down_counter() -> Result<()> { _ = setup_metrics_test().await; const METER_NAME: &str = "test_up_down_meter"; diff --git a/opentelemetry-otlp/tests/integration_test/tests/traces.rs b/opentelemetry-otlp/tests/integration_test/tests/traces.rs index e137fa1cad..65f42402e8 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/traces.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/traces.rs @@ -48,6 +48,7 @@ const LEMONS_KEY: Key = Key::from_static_str("lemons"); const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another"); #[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[cfg(any(feature = "tonic-client", feature = "reqwest-blocking-client"))] pub async fn traces() -> Result<()> { test_utils::start_collector_container().await?; diff --git a/opentelemetry-proto/Cargo.toml b/opentelemetry-proto/Cargo.toml index 5ab08257f3..ff7e428405 100644 --- a/opentelemetry-proto/Cargo.toml +++ b/opentelemetry-proto/Cargo.toml @@ -30,7 +30,7 @@ path = "tests/json_serde.rs" [features] default = ["full"] -full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde"] +full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde", "internal-logs"] # crates used to generate rs files gen-tonic = ["gen-tonic-messages", "tonic/transport"] @@ -44,6 +44,7 @@ zpages = ["trace"] testing = ["opentelemetry/testing"] # add ons +internal-logs = ["tracing"] with-schemars = ["schemars"] with-serde = ["serde", "hex"] @@ -55,6 +56,7 @@ opentelemetry_sdk = { version = "0.27", default-features = false, path = "../ope schemars = { version = "0.8", optional = true } serde = { workspace = true, optional = true, features = ["serde_derive"] } hex = { version = "0.4.3", optional = true } +tracing = {workspace = true, optional = true} # optional for opentelemetry internal logging [dev-dependencies] opentelemetry = { features = ["testing"], path = "../opentelemetry" } diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index b7a3f2a0da..d1e4910523 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -44,7 +44,7 @@ - *Breaking* Removed the following deprecated methods: - `Logger::provider()` : Previously deprecated in version 0.27.1 - `Logger::instrumentation_scope()` : Previously deprecated in version 0.27.1. - Migration Guidance: + Migration Guidance: - These methods were intended for log appenders. Keep the clone of the provider handle, instead of depending on above methods. - *Breaking* - `PeriodicReader` Updates @@ -58,11 +58,10 @@ **`experimental_metrics_periodicreader_with_async_runtime`**. Migration Guide: - - 1. *Default Implementation, requires no async runtime* (**Recommended**) The + 1. *Default Implementation, requires no async runtime* (**Recommended**) The new default implementation does not require a runtime argument. Replace the builder method accordingly: - - *Before:* + - *Before:* ```rust let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, runtime::Tokio).build(); ``` @@ -71,25 +70,35 @@ let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter).build(); ``` + The new PeriodicReader can be used with OTLP Exporter, and supports + following exporter features: + - `grpc-tonic`: This requires `MeterProvider` to be created within a tokio + runtime. + - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`. + + In other words, other clients like `reqwest` and `hyper` are not supported. + 2. *Async Runtime Support* If your application cannot spin up new threads or you prefer using async runtimes, enable the "experimental_metrics_periodicreader_with_async_runtime" feature flag and - adjust code as below. + adjust code as below. - *Before:* + ```rust let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(exporter, runtime::Tokio).build(); ``` - *After:* + ```rust let reader = opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader::builder(exporter, runtime::Tokio).build(); - ``` + ``` *Requirements:* - Enable the feature flag: - `experimental_metrics_periodicreader_with_async_runtime`. + `experimental_metrics_periodicreader_with_async_runtime`. - Continue enabling one of the async runtime feature flags: `rt-tokio`, `rt-tokio-current-thread`, or `rt-async-std`. @@ -104,11 +113,10 @@ - Getter methods have been introduced to access field values. This change impacts custom exporter and processor developers by requiring updates to code that directly accessed LogRecord fields. They must now use the provided getter methods (e.g., `log_record.event_name()` instead of `log_record.event_name`). -- Upgrade the tracing crate used for internal logging to version 0.1.40 or later. This is necessary because the internal logging macros utilize the name field as +- Upgrade the tracing crate used for internal logging to version 0.1.40 or later. This is necessary because the internal logging macros utilize the name field as metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/open-telemetry/opentelemetry-rust/pull/2418) -- **Breaking** [#2436](https://github.com/open-telemetry/opentelemetry-rust/pull/2436) - +- *Breaking* - `BatchLogProcessor` Updates [#2436](https://github.com/open-telemetry/opentelemetry-rust/pull/2436) `BatchLogProcessor` no longer requires an async runtime by default. Instead, a dedicated background thread is created to do the batch processing and exporting. @@ -120,6 +128,7 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope new default implementation does not require a runtime argument. Replace the builder method accordingly: - *Before:* + ```rust let logger_provider = LoggerProvider::builder() .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) @@ -127,12 +136,21 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope ``` - *After:* + ```rust let logger_provider = LoggerProvider::builder() .with_log_processor(BatchLogProcessor::builder(exporter).build()) .build(); ``` + The new BatchLogProcessor can be used with OTLP Exporter, and supports + following exporter features: + - `grpc-tonic`: This requires `MeterProvider` to be created within a tokio + runtime. + - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`. + + In other words, other clients like `reqwest` and `hyper` are not supported. + 2. *Async Runtime Support* If your application cannot spin up new threads or you prefer using async runtimes, enable the @@ -140,6 +158,7 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope adjust code as below. - *Before:* + ```rust let logger_provider = LoggerProvider::builder() .with_log_processor(BatchLogProcessor::builder(exporter, runtime::Tokio).build()) @@ -147,6 +166,7 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope ``` - *After:* + ```rust let logger_provider = LoggerProvider::builder() .with_log_processor(log_processor_with_async_runtime::BatchLogProcessor::builder(exporter, runtime::Tokio).build()) @@ -155,11 +175,11 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope *Requirements:* - Enable the feature flag: - `experimental_logs_batch_log_processor_with_async_runtime`. + `experimental_logs_batch_log_processor_with_async_runtime`. - Continue enabling one of the async runtime feature flags: `rt-tokio`, `rt-tokio-current-thread`, or `rt-async-std`. -- **Breaking** [#2456](https://github.com/open-telemetry/opentelemetry-rust/pull/2456) +- *Breaking* - `BatchSpanProcessor` Updates [#2435](https://github.com/open-telemetry/opentelemetry-rust/pull/2456) `BatchSpanProcessor` no longer requires an async runtime by default. Instead, a dedicated background thread is created to do the batch processing and exporting. @@ -172,6 +192,7 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope new default implementation does not require a runtime argument. Replace the builder method accordingly: - *Before:* + ```rust let tracer_provider = TracerProvider::builder() .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build()) @@ -179,12 +200,21 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope ``` - *After:* + ```rust let tracer_provider = TracerProvider::builder() .with_span_processor(BatchSpanProcessor::builder(exporter).build()) .build(); ``` + The new BatchLogProcessor can be used with OTLP Exporter, and supports + following exporter features: + - `grpc-tonic`: This requires `MeterProvider` to be created within a tokio + runtime. + - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`. + + In other words, other clients like `reqwest` and `hyper` are not supported. + 2. *Async Runtime Support* If your application cannot spin up new threads or you prefer using async runtimes, enable the @@ -192,6 +222,7 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope adjust code as below. - *Before:* + ```rust let tracer_provider = TracerProvider::builder() .with_span_processor(BatchSpanProcessor::builder(exporter, runtime::Tokio).build()) @@ -199,6 +230,7 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope ``` - *After:* + ```rust let tracer_provider = TracerProvider::builder() .with_span_processor(span_processor_with_async_runtime::BatchSpanProcessor::builder(exporter, runtime::Tokio).build()) @@ -207,13 +239,23 @@ metadata, a feature introduced in version 0.1.40. [#2418](https://github.com/ope *Requirements:* - Enable the feature flag: - `experimental_trace_batch_span_processor_with_async_runtime`. + `experimental_trace_batch_span_processor_with_async_runtime`. - Continue enabling one of the async runtime feature flags: `rt-tokio`, `rt-tokio-current-thread`, or `rt-async-std`. - Bug fix: Empty Tracer names are retained as-is instead of replacing with "rust.opentelemetry.io/sdk/tracer" [#2486](https://github.com/open-telemetry/opentelemetry-rust/pull/2486) +- Update `EnvResourceDetector` to allow resource attribute values containing + equal signs (`"="`). [#2120](https://github.com/open-telemetry/opentelemetry-rust/pull/2120) + +- **Breaking** Introduced `experimental_async_runtime` feature for runtime-specific traits. + - Runtime-specific features (`rt-tokio`, `rt-tokio-current-thread`, and `rt-async-std`) + now depend on the `experimental_async_runtime` feature. + - For most users, no action is required. Enabling runtime features such as `rt-tokio`, `rt-tokio-current-thread`, + or `rt-async-std` will automatically enable the `experimental_async_runtime` feature. + - If you're implementing a custom runtime, you must explicitly enable the experimental_async_runtime` feature in your + Cargo.toml and implement the required `Runtime` traits. ## 0.27.1 @@ -242,10 +284,10 @@ Released 2024-Nov-27 - Bug fix: Empty Logger names are retained as-is instead of replacing with "rust.opentelemetry.io/sdk/logger" [#2316](https://github.com/open-telemetry/opentelemetry-rust/pull/2316) - + - `Logger::provider`: This method is deprecated as of version `0.27.1`. To be removed in `0.28.0`. - `Logger::instrumentation_scope`: This method is deprecated as of version `0.27.1`. To be removed in `0.28.0` - Migration Guidance: + Migration Guidance: - These methods are intended for log appenders. Keep the clone of the provider handle, instead of depending on above methods. @@ -271,7 +313,7 @@ Released 2024-Nov-11 - **Replaced** - ([#2217](https://github.com/open-telemetry/opentelemetry-rust/pull/2217)): Removed `{Delta,Cumulative}TemporalitySelector::new()` in favor of directly using `Temporality` enum to simplify the configuration of MetricsExporterBuilder with different temporalities. - **Renamed** - - ([#2232](https://github.com/open-telemetry/opentelemetry-rust/pull/2232)): The `init` method used to create instruments has been renamed to `build`. + - ([#2232](https://github.com/open-telemetry/opentelemetry-rust/pull/2232)): The `init` method used to create instruments has been renamed to `build`. Before: ```rust let counter = meter.u64_counter("my_counter").init(); diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 167846dce1..ca21fd55dd 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -49,9 +49,10 @@ logs = ["opentelemetry/logs", "serde_json"] spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"] metrics = ["opentelemetry/metrics", "glob", "async-trait"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] -rt-tokio = ["tokio", "tokio-stream"] -rt-tokio-current-thread = ["tokio", "tokio-stream"] -rt-async-std = ["async-std"] +experimental_async_runtime = [] +rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"] +rt-tokio-current-thread = ["tokio", "tokio-stream", "experimental_async_runtime"] +rt-async-std = ["async-std", "experimental_async_runtime"] internal-logs = ["tracing"] experimental_metrics_periodicreader_with_async_runtime = ["metrics"] spec_unstable_metrics_views = ["metrics"] diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index 4afda7deb7..4e2ef47ba7 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -94,6 +94,7 @@ //! Support for recording and exporting telemetry asynchronously and perform //! metrics aggregation can be added via the following flags: //! +//! * `experimental_async_runtime`: Enables the experimental `Runtime` trait and related functionality. //! * `rt-tokio`: Spawn telemetry tasks using [tokio]'s multi-thread runtime. //! * `rt-tokio-current-thread`: Spawn telemetry tasks on a separate runtime so that the main runtime won't be blocked. //! * `rt-async-std`: Spawn telemetry tasks using [async-std]'s runtime. @@ -133,6 +134,7 @@ pub mod metrics; #[cfg_attr(docsrs, doc(cfg(feature = "trace")))] pub mod propagation; pub mod resource; +#[cfg(feature = "experimental_async_runtime")] pub mod runtime; #[cfg(any(feature = "testing", test))] #[cfg_attr(docsrs, doc(cfg(any(feature = "testing", test))))] diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index b6284b87c6..9135141574 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -199,8 +199,8 @@ impl LogProcessor for SimpleLogProcessor { #[allow(clippy::large_enum_variant)] #[derive(Debug)] enum BatchMessage { - /// Export logs, called when the log is emitted. - ExportLog(Box<(LogRecord, InstrumentationScope)>), + /// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`. + ExportLog(Arc), /// ForceFlush flushes the current buffer to the exporter. ForceFlush(mpsc::SyncSender), /// Shut down the worker thread, push all logs in buffer to the exporter. @@ -209,6 +209,8 @@ enum BatchMessage { SetResource(Arc), } +type LogsData = Box<(LogRecord, InstrumentationScope)>; + /// The `BatchLogProcessor` collects finished logs in a buffer and exports them /// in batches to the configured `LogExporter`. This processor is ideal for /// high-throughput environments, as it minimizes the overhead of exporting logs @@ -221,6 +223,14 @@ enum BatchMessage { /// - **Export timeout**: Maximum duration allowed for an export operation. /// - **Scheduled delay**: Frequency at which the batch is exported. /// +/// When using this processor with the OTLP Exporter, the following exporter +/// features are supported: +/// - `grpc-tonic`: This requires `MeterProvider` to be created within a tokio +/// runtime. +/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`. +/// +/// In other words, other clients like `reqwest` and `hyper` are not supported. +/// /// ### Using a BatchLogProcessor: /// /// ```rust @@ -246,11 +256,15 @@ enum BatchMessage { /// .build(); /// pub struct BatchLogProcessor { - message_sender: SyncSender, + logs_sender: SyncSender, // Data channel to store log records and instrumentation scopes + message_sender: SyncSender, // Control channel to store control messages for the worker thread handle: Mutex>>, forceflush_timeout: Duration, shutdown_timeout: Duration, is_shutdown: AtomicBool, + export_log_message_sent: Arc, + current_batch_size: Arc, + max_export_batch_size: usize, // Track dropped logs - we'll log this at shutdown dropped_logs_count: AtomicUsize, @@ -279,11 +293,8 @@ impl LogProcessor for BatchLogProcessor { } let result = self - .message_sender - .try_send(BatchMessage::ExportLog(Box::new(( - record.clone(), - instrumentation.clone(), - )))); + .logs_sender + .try_send(Box::new((record.clone(), instrumentation.clone()))); if result.is_err() { // Increment dropped logs count. The first time we have to drop a log, @@ -292,6 +303,37 @@ impl LogProcessor for BatchLogProcessor { otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); } + return; + } + + // At this point, sending the log record to the data channel was successful. + // Increment the current batch size and check if it has reached the max export batch size. + if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size + { + // Check if the a control message for exporting logs is already sent to the worker thread. + // If not, send a control message to export logs. + // `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message. + + if !self.export_log_message_sent.load(Ordering::Relaxed) { + // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. + // Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false. + // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. + // We could have used compare_exchange as well here, but it's more verbose than swap. + if !self.export_log_message_sent.swap(true, Ordering::Relaxed) { + match self.message_sender.try_send(BatchMessage::ExportLog( + self.export_log_message_sent.clone(), + )) { + Ok(_) => { + // Control message sent successfully. + } + Err(_err) => { + // TODO: Log error + // If the control message could not be sent, reset the `export_log_message_sent` flag. + self.export_log_message_sent.store(false, Ordering::Relaxed); + } + } + } + } } } @@ -318,19 +360,9 @@ impl LogProcessor for BatchLogProcessor { } fn shutdown(&self) -> LogResult<()> { - // test and set is_shutdown flag if it is not set - if self - .is_shutdown - .swap(true, std::sync::atomic::Ordering::Relaxed) - { - otel_warn!( - name: "BatchLogProcessor.Shutdown.ProcessorShutdown", - message = "BatchLogProcessor has been shutdown. No further logs will be emitted." - ); - return LogResult::Err(LogError::AlreadyShutdown( - "BatchLogProcessor is already shutdown".into(), - )); - } + // Set is_shutdown to true + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; @@ -388,8 +420,12 @@ impl BatchLogProcessor { where E: LogExporter + Send + Sync + 'static, { - let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size); + let (logs_sender, logs_receiver) = mpsc::sync_channel::(config.max_queue_size); + let (message_sender, message_receiver) = mpsc::sync_channel::(64); // Is this a reasonable bound? let max_queue_size = config.max_queue_size; + let max_export_batch_size = config.max_export_batch_size; + let current_batch_size = Arc::new(AtomicUsize::new(0)); + let current_batch_size_for_thread = current_batch_size.clone(); let handle = thread::Builder::new() .name("OpenTelemetry.Logs.BatchProcessor".to_string()) @@ -402,6 +438,50 @@ impl BatchLogProcessor { ); let mut last_export_time = Instant::now(); let mut logs = Vec::with_capacity(config.max_export_batch_size); + let current_batch_size = current_batch_size_for_thread; + + // This method gets upto `max_export_batch_size` amount of logs from the channel and exports them. + // It returns the result of the export operation. + // It expects the logs vec to be empty when it's called. + #[inline] + fn get_logs_and_export( + logs_receiver: &mpsc::Receiver, + exporter: &E, + logs: &mut Vec, + last_export_time: &mut Instant, + current_batch_size: &AtomicUsize, + config: &BatchConfig, + ) -> ExportResult + where + E: LogExporter + Send + Sync + 'static, + { + let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs. + let mut result = LogResult::Ok(()); + let mut total_exported_logs: usize = 0; + + while target > 0 && total_exported_logs < target { + // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec + while let Ok(log) = logs_receiver.try_recv() { + logs.push(log); + if logs.len() == config.max_export_batch_size { + break; + } + } + + let count_of_logs = logs.len(); // Count of logs that will be exported + total_exported_logs += count_of_logs; + + result = export_with_timeout_sync( + config.max_export_timeout, + exporter, + logs, + last_export_time, + ); // This method clears the logs vec after exporting + + current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); + } + result + } loop { let remaining_time = config @@ -410,37 +490,44 @@ impl BatchLogProcessor { .unwrap_or(config.scheduled_delay); match message_receiver.recv_timeout(remaining_time) { - Ok(BatchMessage::ExportLog(log)) => { - logs.push(log); - if logs.len() == config.max_export_batch_size { - otel_debug!( - name: "BatchLogProcessor.ExportingDueToBatchSize", - ); - let _ = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, - &mut logs, - &mut last_export_time, - ); - } + Ok(BatchMessage::ExportLog(export_log_message_sent)) => { + // Reset the export log message sent flag now it has has been processed. + export_log_message_sent.store(false, Ordering::Relaxed); + + otel_debug!( + name: "BatchLogProcessor.ExportingDueToBatchSize", + ); + + let _ = get_logs_and_export( + &logs_receiver, + &exporter, + &mut logs, + &mut last_export_time, + ¤t_batch_size, + &config, + ); } Ok(BatchMessage::ForceFlush(sender)) => { otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush"); - let result = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, + let result = get_logs_and_export( + &logs_receiver, + &exporter, &mut logs, &mut last_export_time, + ¤t_batch_size, + &config, ); let _ = sender.send(result); } Ok(BatchMessage::Shutdown(sender)) => { otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown"); - let result = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, + let result = get_logs_and_export( + &logs_receiver, + &exporter, &mut logs, &mut last_export_time, + ¤t_batch_size, + &config, ); let _ = sender.send(result); @@ -460,11 +547,14 @@ impl BatchLogProcessor { otel_debug!( name: "BatchLogProcessor.ExportingDueToTimer", ); - let _ = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, + + let _ = get_logs_and_export( + &logs_receiver, + &exporter, &mut logs, &mut last_export_time, + ¤t_batch_size, + &config, ); } Err(RecvTimeoutError::Disconnected) => { @@ -486,6 +576,7 @@ impl BatchLogProcessor { // Return batch processor with link to worker BatchLogProcessor { + logs_sender, message_sender, handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable @@ -493,6 +584,9 @@ impl BatchLogProcessor { is_shutdown: AtomicBool::new(false), dropped_logs_count: AtomicUsize::new(0), max_queue_size, + export_log_message_sent: Arc::new(AtomicBool::new(false)), + current_batch_size, + max_export_batch_size, } } @@ -511,7 +605,7 @@ impl BatchLogProcessor { #[allow(clippy::vec_box)] fn export_with_timeout_sync( _: Duration, // TODO, enforcing timeout in exporter. - exporter: &mut E, + exporter: &E, batch: &mut Vec>, last_export_time: &mut Instant, ) -> ExportResult diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 2cee6c4d0d..09d380b658 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -26,20 +26,6 @@ const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL"; const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT"; /// Configuration options for [PeriodicReader]. -/// -/// A periodic reader is a [MetricReader] that collects and exports metric data -/// to the exporter at a defined interval. -/// -/// By default, the returned [MetricReader] will collect and export data every -/// 60 seconds. The export time is not counted towards the interval between -/// attempts. PeriodicReader itself does not enforce timeout. Instead timeout -/// is passed on to the exporter for each export attempt. -/// -/// The [collect] method of the returned [MetricReader] continues to gather and -/// return metric data to the user. It will not automatically send that data to -/// the exporter outside of the predefined interval. -/// -/// [collect]: MetricReader::collect #[derive(Debug)] pub struct PeriodicReaderBuilder { interval: Duration, @@ -104,20 +90,25 @@ where } } -/// A [MetricReader] that continuously collects and exports metric data at a set +/// A [MetricReader] that continuously collects and exports metrics at a set /// interval. /// -/// By default, PeriodicReader will collect and export data every -/// 60 seconds. The export time is not counted towards the interval between -/// attempts. PeriodicReader itself does not enforce timeout. -/// Instead timeout is passed on to the exporter for each export attempt. +/// By default, `PeriodicReader` will collect and export metrics every 60 +/// seconds. The export time is not counted towards the interval between +/// attempts. `PeriodicReader` itself does not enforce a timeout. Instead, the +/// timeout is passed on to the configured exporter for each export attempt. /// -/// The [collect] method of the returned continues to gather and -/// return metric data to the user. It will not automatically send that data to -/// the exporter outside of the predefined interval. +/// `PeriodicReader` spawns a background thread to handle the periodic +/// collection and export of metrics. The background thread will continue to run +/// until `shutdown()` is called. /// +/// When using this reader with the OTLP Exporter, the following exporter +/// features are supported: +/// - `grpc-tonic`: This requires `MeterProvider` to be created within a tokio +/// runtime. +/// - `reqwest-blocking-client`: Works with a regular `main` or `tokio::main`. /// -/// [collect]: MetricReader::collect +/// In other words, other clients like `reqwest` and `hyper` are not supported. /// /// # Example /// @@ -415,14 +406,21 @@ impl PeriodicReaderInner { .send(Message::Shutdown(response_tx)) .map_err(|e| MetricError::Other(e.to_string()))?; - if let Ok(response) = response_rx.recv() { - if response { - Ok(()) - } else { + // TODO: Make this timeout configurable. + match response_rx.recv_timeout(Duration::from_secs(5)) { + Ok(response) => { + if response { + Ok(()) + } else { + Err(MetricError::Other("Failed to shutdown".into())) + } + } + Err(mpsc::RecvTimeoutError::Timeout) => Err(MetricError::Other( + "Failed to shutdown due to Timeout".into(), + )), + Err(mpsc::RecvTimeoutError::Disconnected) => { Err(MetricError::Other("Failed to shutdown".into())) } - } else { - Err(MetricError::Other("Failed to shutdown".into())) } } } diff --git a/opentelemetry-sdk/src/resource/env.rs b/opentelemetry-sdk/src/resource/env.rs index ac5ee0c034..38801228c0 100644 --- a/opentelemetry-sdk/src/resource/env.rs +++ b/opentelemetry-sdk/src/resource/env.rs @@ -45,12 +45,12 @@ impl Default for EnvResourceDetector { fn construct_otel_resources(s: String) -> Resource { Resource::builder_empty() .with_attributes(s.split_terminator(',').filter_map(|entry| { - let mut parts = entry.splitn(2, '='); - let key = parts.next()?.trim(); - let value = parts.next()?.trim(); - if value.find('=').is_some() { - return None; - } + let parts = match entry.split_once('=') { + Some(p) => p, + None => return None, + }; + let key = parts.0.trim(); + let value = parts.1.trim(); Some(KeyValue::new(key.to_owned(), value.to_owned())) })) @@ -106,7 +106,7 @@ mod tests { [ ( "OTEL_RESOURCE_ATTRIBUTES", - Some("key=value, k = v , a= x, a=z"), + Some("key=value, k = v , a= x, a=z,base64=SGVsbG8sIFdvcmxkIQ=="), ), ("IRRELEVANT", Some("20200810")), ], @@ -121,6 +121,7 @@ mod tests { KeyValue::new("k", "v"), KeyValue::new("a", "x"), KeyValue::new("a", "z"), + KeyValue::new("base64", "SGVsbG8sIFdvcmxkIQ=="), // base64('Hello, World!') ]) .build() ); diff --git a/opentelemetry-sdk/src/runtime.rs b/opentelemetry-sdk/src/runtime.rs index 7705c10e91..00720e0892 100644 --- a/opentelemetry-sdk/src/runtime.rs +++ b/opentelemetry-sdk/src/runtime.rs @@ -15,6 +15,7 @@ use thiserror::Error; /// /// [Tokio]: https://crates.io/crates/tokio /// [async-std]: https://crates.io/crates/async-std +#[cfg(feature = "experimental_async_runtime")] pub trait Runtime: Clone + Send + Sync + 'static { /// A future stream, which returns items in a previously specified interval. The item type is /// not important. @@ -44,13 +45,19 @@ pub trait Runtime: Clone + Send + Sync + 'static { } /// Runtime implementation, which works with Tokio's multi thread runtime. -#[cfg(feature = "rt-tokio")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))) +)] #[derive(Debug, Clone)] pub struct Tokio; -#[cfg(feature = "rt-tokio")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))) +)] impl Runtime for Tokio { type Interval = tokio_stream::wrappers::IntervalStream; type Delay = ::std::pin::Pin>; @@ -71,13 +78,31 @@ impl Runtime for Tokio { } /// Runtime implementation, which works with Tokio's current thread runtime. -#[cfg(feature = "rt-tokio-current-thread")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))] +#[cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" +))] +#[cfg_attr( + docsrs, + doc(cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" + ))) +)] #[derive(Debug, Clone)] pub struct TokioCurrentThread; -#[cfg(feature = "rt-tokio-current-thread")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))] +#[cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" +))] +#[cfg_attr( + docsrs, + doc(cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" + ))) +)] impl Runtime for TokioCurrentThread { type Interval = tokio_stream::wrappers::IntervalStream; type Delay = ::std::pin::Pin>; @@ -108,13 +133,19 @@ impl Runtime for TokioCurrentThread { } /// Runtime implementation, which works with async-std. -#[cfg(feature = "rt-async-std")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))) +)] #[derive(Debug, Clone)] pub struct AsyncStd; -#[cfg(feature = "rt-async-std")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))) +)] impl Runtime for AsyncStd { type Interval = async_std::stream::Interval; type Delay = BoxFuture<'static, ()>; @@ -138,6 +169,7 @@ impl Runtime for AsyncStd { /// /// [log]: crate::logs::BatchLogProcessor /// [span]: crate::trace::BatchSpanProcessor +#[cfg(feature = "experimental_async_runtime")] pub trait RuntimeChannel: Runtime { /// A future stream to receive batch messages from channels. type Receiver: Stream + Send; @@ -152,6 +184,7 @@ pub trait RuntimeChannel: Runtime { } /// Error returned by a [`TrySend`] implementation. +#[cfg(feature = "experimental_async_runtime")] #[derive(Debug, Error)] pub enum TrySendError { /// Send failed due to the channel being full. @@ -166,6 +199,7 @@ pub enum TrySendError { } /// TrySend is an abstraction of `Sender` that is capable of sending messages through a reference. +#[cfg(feature = "experimental_async_runtime")] pub trait TrySend: Sync + Send { /// The message that will be sent. type Message; @@ -176,7 +210,10 @@ pub trait TrySend: Sync + Send { fn try_send(&self, item: Self::Message) -> Result<(), TrySendError>; } -#[cfg(any(feature = "rt-tokio", feature = "rt-tokio-current-thread"))] +#[cfg(all( + feature = "experimental_async_runtime", + any(feature = "rt-tokio", feature = "rt-tokio-current-thread") +))] impl TrySend for tokio::sync::mpsc::Sender { type Message = T; @@ -188,8 +225,11 @@ impl TrySend for tokio::sync::mpsc::Sender { } } -#[cfg(feature = "rt-tokio")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio"))) +)] impl RuntimeChannel for Tokio { type Receiver = tokio_stream::wrappers::ReceiverStream; type Sender = tokio::sync::mpsc::Sender; @@ -206,8 +246,17 @@ impl RuntimeChannel for Tokio { } } -#[cfg(feature = "rt-tokio-current-thread")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))] +#[cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" +))] +#[cfg_attr( + docsrs, + doc(cfg(all( + feature = "experimental_async_runtime", + feature = "rt-tokio-current-thread" + ))) +)] impl RuntimeChannel for TokioCurrentThread { type Receiver = tokio_stream::wrappers::ReceiverStream; type Sender = tokio::sync::mpsc::Sender; @@ -224,7 +273,7 @@ impl RuntimeChannel for TokioCurrentThread { } } -#[cfg(feature = "rt-async-std")] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))] impl TrySend for async_std::channel::Sender { type Message = T; @@ -236,8 +285,11 @@ impl TrySend for async_std::channel::Sender { } } -#[cfg(feature = "rt-async-std")] -#[cfg_attr(docsrs, doc(cfg(feature = "rt-async-std")))] +#[cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))] +#[cfg_attr( + docsrs, + doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std"))) +)] impl RuntimeChannel for AsyncStd { type Receiver = async_std::channel::Receiver; type Sender = async_std::channel::Sender; diff --git a/scripts/integration_tests.sh b/scripts/integration_tests.sh index 36aec3100b..44dd6afa74 100755 --- a/scripts/integration_tests.sh +++ b/scripts/integration_tests.sh @@ -16,11 +16,10 @@ if [ -d "$TEST_DIR" ]; then # Run tests with the reqwest-client feature echo echo #### - echo "Integration Tests: Reqwest Client (Disabled now)" + echo "Integration Tests: Reqwest Client" echo #### echo - # TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported. - #cargo test --no-default-features --features "reqwest-client","internal-logs" + cargo test --no-default-features --features "reqwest-client","internal-logs" # Run tests with the reqwest-blocking-client feature echo