From 61b99c8223f10bcc8002a4e71da212ccc390aa82 Mon Sep 17 00:00:00 2001 From: Dotan Simha Date: Sun, 11 Feb 2024 11:31:25 +0200 Subject: [PATCH] Telemetry on WASM runtime (#362) --- .github/workflows/ci.yaml | 10 +- Cargo.lock | 120 +++++++++++- Cargo.toml | 5 +- bin/cloudflare_worker/Cargo.toml | 5 +- bin/cloudflare_worker/src/http_tracing.rs | 117 +++++++++++ bin/cloudflare_worker/src/lib.rs | 45 ++++- bin/cloudflare_worker/wrangler.toml | 2 +- bin/conductor/Cargo.toml | 1 + bin/conductor/src/lib.rs | 7 +- bin/conductor/src/minitrace_actix.rs | 4 +- libs/benches/bench.rs | 5 +- libs/common/src/http.rs | 2 +- libs/config/conductor.schema.json | 27 ++- libs/e2e_tests/tests/plugin_telemetry.rs | 18 +- libs/engine/src/gateway.rs | 5 +- libs/engine/src/plugin_manager.rs | 16 +- libs/federation_query_planner/src/executor.rs | 4 +- libs/smoke_tests/Cargo.toml | 1 + libs/smoke_tests/docker-compose.yaml | 6 + ...smoke_telemetry__telemetry_otlp_grpc.snap} | 0 ..._smoke_telemetry__telemetry_otlp_http.snap | 9 + ...ry__smoke_telemetry__telemetry_zipkin.snap | 9 + libs/smoke_tests/src/telemetry.rs | 146 +++++++++++--- libs/smoke_tests/test_gw.yaml | 30 ++- libs/tracing/Cargo.toml | 2 + libs/tracing/src/lib.rs | 3 + libs/tracing/src/minitrace_mgr.rs | 182 +++-------------- libs/tracing/src/reporters.rs | 54 +++++ libs/tracing/src/routed_reporter.rs | 168 ++++++++++++++++ libs/tracing/src/trace_id.rs | 11 ++ plugins/telemetry/Cargo.toml | 30 ++- plugins/telemetry/src/config.rs | 46 +++-- plugins/telemetry/src/lib.rs | 3 + plugins/telemetry/src/plugin.rs | 185 ++++++++++++++---- .../src/wasm_reporter/console_reporter.rs | 11 ++ .../src/wasm_reporter/datadog_reporter.rs | 112 +++++++++++ plugins/telemetry/src/wasm_reporter/mod.rs | 3 + .../src/wasm_reporter/otlp_reporter.rs | 160 +++++++++++++++ test_config/config.yaml | 2 +- test_config/worker.yaml | 6 +- 40 files changed, 1270 insertions(+), 302 deletions(-) create mode 100644 bin/cloudflare_worker/src/http_tracing.rs rename libs/smoke_tests/src/snapshots/{smoke_tests__telemetry__smoke_telemetry__telemetry_otlp.snap => smoke_tests__telemetry__smoke_telemetry__telemetry_otlp_grpc.snap} (100%) create mode 100644 libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_otlp_http.snap create mode 100644 libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_zipkin.snap create mode 100644 libs/tracing/src/reporters.rs create mode 100644 libs/tracing/src/routed_reporter.rs create mode 100644 libs/tracing/src/trace_id.rs create mode 100644 plugins/telemetry/src/wasm_reporter/console_reporter.rs create mode 100644 plugins/telemetry/src/wasm_reporter/datadog_reporter.rs create mode 100644 plugins/telemetry/src/wasm_reporter/mod.rs create mode 100644 plugins/telemetry/src/wasm_reporter/otlp_reporter.rs diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4cef384d..3c3cdd42 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -61,7 +61,7 @@ jobs: - name: run containers working-directory: libs/smoke_tests/ - run: docker compose -f docker-compose.yaml up -d + run: docker compose -f docker-compose.yaml up -d --remove-orphans --wait --force-recreate - uses: JarvusInnovations/background-action@v1 name: run test server in background @@ -89,6 +89,13 @@ jobs: run: cargo test --features binary -- --nocapture env: CONDUCTOR_URL: http://127.0.0.1:9000 + RUST_BACKTRACE: full + + - name: restart containers + working-directory: libs/smoke_tests/ + run: | + docker compose down + docker compose -f docker-compose.yaml up -d --remove-orphans --wait --force-recreate - uses: the-guild-org/shared-config/setup@main name: setup env (wasm) @@ -114,6 +121,7 @@ jobs: run: cargo test --features wasm -- --nocapture env: CONDUCTOR_URL: http://127.0.0.1:8787 + RUST_BACKTRACE: full graphql-over-http: runs-on: ubuntu-22.04 diff --git a/Cargo.lock b/Cargo.lock index b7ec4267..8d761321 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1228,6 +1228,7 @@ dependencies = [ "futures-util", "minitrace", "openssl", + "tokio", "tracing", "tracing-subscriber", "ulid", @@ -1241,7 +1242,9 @@ dependencies = [ "conductor_config", "conductor_engine", "conductor_logger", + "conductor_tracing", "console_error_panic_hook", + "minitrace", "time", "tracing", "tracing-subscriber", @@ -1352,6 +1355,7 @@ dependencies = [ name = "conductor_tracing" version = "0.0.0" dependencies = [ + "async-trait", "conductor_common", "minitrace", "opentelemetry", @@ -1363,6 +1367,7 @@ dependencies = [ "serde", "serde_json", "task-local-extensions", + "tokio", "tracing", "wasm_polyfills", ] @@ -1658,6 +1663,19 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.5.0" @@ -2883,9 +2901,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "minitrace" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0902c19dd1eaeb26d77bf2984475bfcc2185b31f139696d0502e673c6038d56e" +checksum = "c2df1d765f7ec35138abeefde2a023c3b26b8d9bb2e4a3b98ed132acf2d755a7" dependencies = [ "futures", "minitrace-macro", @@ -2899,10 +2917,11 @@ dependencies = [ [[package]] name = "minitrace-datadog" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "053aa8b0f3fcf782ff9c1465456da1c09a893fbe50ed8a5163f0dec9f96981c3" +checksum = "970001ca92870038d46118732289f041d3a81dbd1cb59d58cc3a7ea2443fe620" dependencies = [ + "log", "minitrace", "reqwest", "rmp-serde", @@ -2911,9 +2930,9 @@ dependencies = [ [[package]] name = "minitrace-jaeger" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176d22d3e7a751a662804b4b925e69045f57d6f58096e2e083f418dd6d6ea54e" +checksum = "77b4d7da9760b1c17119341a59efca097ea8f9bc943a369598162f2ef25c4e74" dependencies = [ "log", "minitrace", @@ -2922,9 +2941,9 @@ dependencies = [ [[package]] name = "minitrace-macro" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93114a0fc311242ab7f05d1b2f9c46dab4e24342df9fcdc7296ce368f71675fc" +checksum = "36aca96c5da5b6a8c7f75910fb52c8d5aecb70f27d821adeae06ba54d2cf74b0" dependencies = [ "proc-macro-error", "proc-macro2", @@ -2934,9 +2953,9 @@ dependencies = [ [[package]] name = "minitrace-opentelemetry" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4fa7ea2c3666a86e9bbd1ac50e13cb01f6b397a722022790bf573f39ee324c" +checksum = "bc2ec34c8f759ef45261cc2c5b089cdaf07b43efb6c047b112b19d5f6f3d3bfe" dependencies = [ "futures", "log", @@ -3303,6 +3322,19 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "opentelemetry-http" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f51189ce8be654f9b5f7e70e49967ed894e84a06fc35c6c042e64ac1fc5399e" +dependencies = [ + "async-trait", + "bytes", + "http 0.2.11", + "opentelemetry", + "reqwest", +] + [[package]] name = "opentelemetry-otlp" version = "0.14.0" @@ -3313,6 +3345,7 @@ dependencies = [ "futures-core", "http 0.2.11", "opentelemetry", + "opentelemetry-http", "opentelemetry-proto", "opentelemetry-semantic-conventions", "opentelemetry_sdk", @@ -3343,6 +3376,27 @@ dependencies = [ "opentelemetry", ] +[[package]] +name = "opentelemetry-zipkin" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c2bee3ec1be4d0088378e0eb1dd54c113cbd7ec5622cc4f26181debf1d4d7b5" +dependencies = [ + "async-trait", + "futures-core", + "http 0.2.11", + "once_cell", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "reqwest", + "serde", + "serde_json", + "thiserror", + "typed-builder", +] + [[package]] name = "opentelemetry_sdk" version = "0.21.2" @@ -3354,6 +3408,7 @@ dependencies = [ "futures-channel", "futures-executor", "futures-util", + "glob", "once_cell", "opentelemetry", "ordered-float", @@ -4397,6 +4452,31 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "serial_test" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "953ad9342b3aaca7cb43c45c097dd008d4907070394bd0751a0aa8817e5a018d" +dependencies = [ + "dashmap", + "futures", + "lazy_static", + "log", + "parking_lot", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93fb4adc70021ac1b47f7d45e8cc4169baaa7ea58483bc5b721d19a26202212" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "sha-1" version = "0.10.1" @@ -4513,6 +4593,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "serial_test", "tokio", ] @@ -4724,17 +4805,25 @@ dependencies = [ "async-trait", "conductor_common", "conductor_tracing", + "http 0.2.11", "humantime-serde", "minitrace", "minitrace-datadog", "minitrace-jaeger", "minitrace-opentelemetry", "opentelemetry", + "opentelemetry-http", "opentelemetry-otlp", + "opentelemetry-zipkin", "opentelemetry_sdk", + "reqwest", + "rmp-serde", "schemars", "serde", "serde_json", + "tracing", + "wasm_polyfills", + "web-time", ] [[package]] @@ -5167,6 +5256,17 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typed-builder" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6179333b981641242a768f30f371c9baccbfcc03749627000c500ab88bf4528b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "typenum" version = "1.17.0" diff --git a/Cargo.toml b/Cargo.toml index 5bb3729a..2bf32ee7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,11 +26,8 @@ vrl = { git = "https://github.com/dotansimha/vrl.git", rev = "d59b2f66727d3c345b "value", "stdlib", ] } -minitrace = "0.6.3" +minitrace = "0.6.4" [profile.release.package.conductor-cf-worker] strip = true codegen-units = 1 - -[profile.release.graphql-conductor_lib] -lto = true diff --git a/bin/cloudflare_worker/Cargo.toml b/bin/cloudflare_worker/Cargo.toml index 8853d613..994ed41d 100644 --- a/bin/cloudflare_worker/Cargo.toml +++ b/bin/cloudflare_worker/Cargo.toml @@ -6,9 +6,8 @@ description = "Conductor Cloudflare Worker runtime" license = "MIT" repository = "https://github.com/the-guild-org/conductor" -# https://github.com/rustwasm/wasm-pack/issues/1247 [package.metadata.wasm-pack.profile.release] -wasm-opt = false +wasm-opt = true [lib] crate-type = ["cdylib"] @@ -19,8 +18,10 @@ conductor_config = { path = "../../libs/config" } conductor_engine = { path = "../../libs/engine" } conductor_common = { path = "../../libs/common" } conductor_logger = { path = "../../libs/logger" } +conductor_tracing = { path = "../../libs/tracing" } tracing = { workspace = true } tracing-web = "0.1.3" tracing-subscriber = { workspace = true, features = ['time', 'json'] } time = { version = "0.3.34", features = ['wasm-bindgen'] } console_error_panic_hook = "0.1.7" +minitrace = { workspace = true, features = ["enable"] } diff --git a/bin/cloudflare_worker/src/http_tracing.rs b/bin/cloudflare_worker/src/http_tracing.rs new file mode 100644 index 00000000..ce5added --- /dev/null +++ b/bin/cloudflare_worker/src/http_tracing.rs @@ -0,0 +1,117 @@ +use conductor_common::http::{header::*, StatusCode}; +use conductor_tracing::{otel_attrs::*, trace_id::generate_trace_id}; +use minitrace::{ + collector::{SpanContext, SpanId}, + Span, +}; +use worker::*; + +#[inline] +fn strip_http_flavor(value: &String) -> String { + value.split('/').nth(1).unwrap_or(value).to_owned() +} + +#[inline] +fn build_request_properties(req: &Request) -> Vec<(&'static str, String)> { + let headers = req.headers(); + let user_agent = headers + .get(USER_AGENT.as_str()) + // @expected: it panics only if the header name is not valid, and we know it is. + .unwrap() + .unwrap_or_default(); + // @expected: it panics only if the URL source is not valid, and it's already validated before. + let url = req.url().unwrap(); + let http_route = url.path(); + let http_method = req.method().to_string(); + // @expected: it only panics if we are not running in a CF context, should be safe. + let cf_info = req.cf().unwrap(); + let request_id = headers + .get("x-request-id") + .unwrap() + .or_else(|| headers.get("cf-ray").unwrap()) + .unwrap_or_default(); + let http_flavor = strip_http_flavor(&cf_info.http_protocol()); + let http_scheme = url.scheme().to_owned(); + // @expected: unwraps only in special cases where "data:text" is used. + let http_host = url.host().unwrap().to_string(); + // See https://developers.cloudflare.com/fundamentals/reference/http-request-headers/#true-client-ip-enterprise-plan-only + let client_id = headers + .get("true-client-ip") + .unwrap() + .or_else(|| headers.get("cf-connecting-ip").unwrap()) + .unwrap_or_default(); + + let http_target = format!( + "{}{}", + http_route, + url.query().map(|v| format!("?{}", v)).unwrap_or_default() + ); + + vec![ + (HTTP_METHOD, http_method), + (HTTP_ROUTE, http_route.to_string()), + (HTTP_FLAVOR, http_flavor), + (HTTP_SCHEME, http_scheme), + (HTTP_HOST, http_host), + (HTTP_CLIENT_IP, client_id), + (HTTP_USER_AGENT, user_agent), + (HTTP_TARGET, http_target), + (OTEL_KIND, "server".to_string()), + (REQUEST_ID, request_id), + // Specific to DataDog + (SPAN_TYPE, "web".to_string()), + ] +} + +#[inline] +pub fn build_request_root_span(tenant_id: u32, endpoint_identifier: &str, req: &Request) -> Span { + let method = req.method().to_string(); + let span_name = format!("HTTP {} {}", method, req.path()); + let mut properties: Vec<(&str, String)> = build_request_properties(req); + properties.push((CONDUCTOR_ENDPOINT, endpoint_identifier.to_owned())); + + let span_context = SpanContext::new(generate_trace_id(tenant_id), SpanId::default()); + + Span::root(span_name, span_context).with_properties(|| properties) +} + +#[inline] +fn handle_error(response_error: &worker::Error) -> Vec<(&'static str, String)> { + let mut properties: Vec<(&'static str, String)> = vec![]; + + let display = format!("{response_error}"); + let debug = format!("{response_error:?}"); + properties.push((EXCEPTION_MESSAGE, display)); + properties.push((EXCEPTION_DETAILS, debug)); + properties.push((ERROR_INDICATOR, "true".to_string())); + + properties +} + +#[inline] +pub fn build_response_properties(res: &Result) -> Vec<(&'static str, String)> { + let mut properties: Vec<(&'static str, String)> = vec![]; + + match res { + Ok(res) => { + let status_code = StatusCode::from_u16(res.status_code()).unwrap(); + + if status_code == StatusCode::OK || status_code.is_client_error() { + properties.push((OTEL_STATUS_CODE, "OK".to_string())); + } else { + properties.push((OTEL_STATUS_CODE, "ERROR".to_string())); + } + + properties.push((HTTP_STATUS_CODE, status_code.as_u16().to_string())); + } + Err(e) => { + properties.append(&mut handle_error(e)); + properties.push(( + HTTP_STATUS_CODE, + StatusCode::INTERNAL_SERVER_ERROR.as_u16().to_string(), + )); + } + } + + properties +} diff --git a/bin/cloudflare_worker/src/lib.rs b/bin/cloudflare_worker/src/lib.rs index 8b4e65bf..73f559fe 100644 --- a/bin/cloudflare_worker/src/lib.rs +++ b/bin/cloudflare_worker/src/lib.rs @@ -1,3 +1,4 @@ +mod http_tracing; use std::str::FromStr; use conductor_common::http::{ @@ -5,10 +6,14 @@ use conductor_common::http::{ }; use conductor_config::parse_config_contents; use conductor_engine::gateway::{ConductorGateway, GatewayError}; +use conductor_tracing::minitrace_mgr::MinitraceManager; +use http_tracing::{build_request_root_span, build_response_properties}; +use minitrace::{collector::Config, trace}; use std::panic; use tracing_subscriber::prelude::*; use worker::*; +#[trace(name = "transform_request")] async fn transform_req(url: &Url, mut req: Request) -> Result { let mut headers_map = HttpHeadersMap::new(); @@ -32,7 +37,7 @@ async fn transform_req(url: &Url, mut req: Request) -> Result Result { let mut response_headers = Headers::new(); for (k, v) in conductor_response.headers.into_iter() { @@ -49,7 +54,11 @@ fn transform_res(conductor_response: ConductorHttpResponse) -> Result }) } -async fn run_flow(req: Request, env: Env) -> Result { +async fn run_flow( + req: Request, + env: Env, + minitrace_mgr: &mut MinitraceManager, +) -> Result { let conductor_config_str = env.var("CONDUCTOR_CONFIG").map(|v| v.to_string()); let get_env_value = |key: &str| env.var(key).map(|s| s.to_string()).ok(); @@ -69,17 +78,26 @@ async fn run_flow(req: Request, env: Env) -> Result { ) .unwrap_or_else(|e| panic!("failed to build logger: {}", e)); - match ConductorGateway::new(&conductor_config, &mut None).await { + let result = match ConductorGateway::new(&conductor_config, minitrace_mgr).await { Ok(gw) => { let _ = tracing_subscriber::registry().with(logger).try_init(); + let root_reporter = minitrace_mgr.build_root_reporter(); + minitrace::set_reporter(root_reporter, Config::default()); + let url = req.url()?; match gw.match_route(&url) { Ok(route_data) => { + let root_span = + build_request_root_span(route_data.tenant_id, &route_data.endpoint, &req); + let _guard = root_span.set_local_parent(); let conductor_req = transform_req(&url, req).await?; let conductor_response = ConductorGateway::execute(conductor_req, route_data).await; + let http_response = transform_res(conductor_response); + let res_properties = build_response_properties(&http_response); + let _ = root_span.with_properties(|| res_properties); - transform_res(conductor_response) + http_response } Err(GatewayError::MissingEndpoint(_)) => { Response::error("failed to locate endpoint".to_string(), 404) @@ -88,9 +106,11 @@ async fn run_flow(req: Request, env: Env) -> Result { } } Err(_) => Response::error("gateway is not ready".to_string(), 500), - } + }; + + result } - Err(e) => Response::error(e.to_string(), 500), + Err(e) => Response::error(format!("failed to read conductor config: {}", e), 500), } } @@ -101,11 +121,18 @@ fn start() { } #[event(fetch, respond_with_errors)] -async fn main(req: Request, env: Env, _ctx: Context) -> Result { - let result = run_flow(req, env).await; +async fn main(req: Request, env: Env, context: Context) -> Result { + let mut minitrace_mgr = MinitraceManager::default(); + let result = run_flow(req, env, &mut minitrace_mgr).await; match result { - Ok(response) => Ok(response), + Ok(response) => { + context.wait_until(async move { + minitrace_mgr.shutdown().await; + }); + + Ok(response) + } Err(e) => Response::error(e.to_string(), 500), } } diff --git a/bin/cloudflare_worker/wrangler.toml b/bin/cloudflare_worker/wrangler.toml index 1cd097db..821be322 100644 --- a/bin/cloudflare_worker/wrangler.toml +++ b/bin/cloudflare_worker/wrangler.toml @@ -3,4 +3,4 @@ main = "build/worker/shim.mjs" compatibility_date = "2023-03-22" [build] -command = "cargo install -q worker-build && worker-build --release" +command = "cargo install -q worker-build && worker-build" diff --git a/bin/conductor/Cargo.toml b/bin/conductor/Cargo.toml index 1cb0ac64..543b7869 100644 --- a/bin/conductor/Cargo.toml +++ b/bin/conductor/Cargo.toml @@ -12,6 +12,7 @@ name = "conductor" path = "src/lib.rs" [dependencies] +tokio = { workspace = true, features = ["full"] } conductor_config = { path = "../../libs/config" } conductor_engine = { path = "../../libs/engine" } conductor_common = { path = "../../libs/common" } diff --git a/bin/conductor/src/lib.rs b/bin/conductor/src/lib.rs index 8f3a8a97..6691b987 100644 --- a/bin/conductor/src/lib.rs +++ b/bin/conductor/src/lib.rs @@ -30,12 +30,13 @@ pub async fn run_services(config_file_path: &String) -> std::io::Result<()> { .unwrap_or_else(|e| panic!("failed to build logger: {}", e)); let mut tracing_manager = MinitraceManager::default(); - match ConductorGateway::new(&config, &mut Some(&mut tracing_manager)).await { + match ConductorGateway::new(&config, &mut tracing_manager).await { Ok(gw) => { let subscriber = registry::Registry::default().with(logger); // @expected: we need to exit the process, if the logger can't be correctly set. tracing::subscriber::set_global_default(subscriber).expect("failed to set up tracing"); - minitrace::set_reporter(tracing_manager.build_reporter(), Config::default()); + let tracing_reporter = tracing_manager.build_root_reporter(); + minitrace::set_reporter(tracing_reporter, Config::default()); let gateway = Arc::new(gw); let http_server = HttpServer::new(move || { @@ -64,7 +65,7 @@ pub async fn run_services(config_file_path: &String) -> std::io::Result<()> { .run() .await; - minitrace::flush(); + tracing_manager.shutdown().await; server_instance } diff --git a/bin/conductor/src/minitrace_actix.rs b/bin/conductor/src/minitrace_actix.rs index 23409788..e7536f19 100644 --- a/bin/conductor/src/minitrace_actix.rs +++ b/bin/conductor/src/minitrace_actix.rs @@ -4,7 +4,7 @@ use actix_web::{ web, Error, ResponseError, }; use conductor_engine::gateway::ConductorGatewayRouteData; -use conductor_tracing::{minitrace_mgr::MinitraceManager, otel_attrs::*}; +use conductor_tracing::{otel_attrs::*, trace_id::generate_trace_id}; use futures_util::future::LocalBoxFuture; use minitrace::{ collector::{SpanContext, SpanId}, @@ -52,7 +52,7 @@ fn build_request_root_span(req: &ServiceRequest) -> Span { properties.push((CONDUCTOR_ENDPOINT, endpoint_data.endpoint.clone())); let span_context = SpanContext::new( - MinitraceManager::generate_trace_id(endpoint_data.tenant_id), + generate_trace_id(endpoint_data.tenant_id), SpanId::default(), ); diff --git a/libs/benches/bench.rs b/libs/benches/bench.rs index fe49605d..d81fe96a 100644 --- a/libs/benches/bench.rs +++ b/libs/benches/bench.rs @@ -5,6 +5,7 @@ use conductor_config::{ ConductorConfig, EndpointDefinition, GraphQLSourceConfig, SourceDefinition, }; use conductor_engine::gateway::ConductorGateway; +use conductor_tracing::minitrace_mgr::MinitraceManager; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use futures::future::join_all; use hyper::Client; @@ -92,8 +93,8 @@ fn criterion_benchmark(c: &mut Criterion) { plugins: None, }; - let t = &mut None; - let gw_future = ConductorGateway::new(&config, t); + let mut tracing_mgr = MinitraceManager::default(); + let gw_future = ConductorGateway::new(&config, &mut tracing_mgr); let rt = Runtime::new().unwrap(); let gw = rt.block_on(gw_future).unwrap(); let route_data = gw diff --git a/libs/common/src/http.rs b/libs/common/src/http.rs index 9e6b677f..fd206e9e 100644 --- a/libs/common/src/http.rs +++ b/libs/common/src/http.rs @@ -72,7 +72,7 @@ impl ConductorHttpRequest { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ConductorHttpResponse { pub body: Bytes, pub status: StatusCode, diff --git a/libs/config/conductor.schema.json b/libs/config/conductor.schema.json index d8066dcf..82b99171 100644 --- a/libs/config/conductor.schema.json +++ b/libs/config/conductor.schema.json @@ -1787,7 +1787,7 @@ ] }, "TelemetryPluginConfig": { - "description": "The `telemetry` plugin exports traces information about Conductor to a telemetry backend.\n\n\n\nAt the moment, this plugin is not supported on WASM (CloudFlare Worker) runtime.\n\nYou may follow [this GitHub issue](https://github.com/the-guild-org/conductor/issues/354) for additional information.\n\n\n\nThe telemetry plugin exports traces information about the following aspects of Conductor:\n\n- GraphQL parser (timing)\n\n- GraphQL execution (operation type, operation body, operation name, timing, errors)\n\n- Query planning (timing, operation body, operation name)\n\n- Incoming HTTP requests (attributes, timing, errors)\n\n- Outgoing HTTP requests (attributes, timing, errors)\n\nWhen used with a telemtry backend, you can expect to see the following information:\n\n![img](/assets/telemetry.png)", + "description": "The `telemetry` plugin exports traces information about Conductor to a telemetry backend.\n\nThe telemetry plugin exports traces information about the following aspects of Conductor:\n\n- GraphQL parser (timing)\n\n- GraphQL execution (operation type, operation body, operation name, timing, errors)\n\n- Query planning (timing, operation body, operation name)\n\n- Incoming HTTP requests (attributes, timing, errors)\n\n- Outgoing HTTP requests (attributes, timing, errors)\n\nWhen used with a telemtry backend, you can expect to see the following information:\n\n![img](https://raw.githubusercontent.com/the-guild-org/conductor/master/website/public/assets/telemetry.png)", "type": "object", "required": [ "targets" @@ -1825,6 +1825,27 @@ } } }, + { + "title": "zipkin", + "description": "Sends telemetry traces data to a [Zipkin](https://zipkin.io/) collector, using the HTTP protocol.\n\nTo get started with Zipkin, use the following command to start the Zipkin collector and UI in your local machine, using Docker:\n\n`docker run -d -p 9411:9411 openzipkin/zipkin`", + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "zipkin" + ] + }, + "collector_endpoint": { + "description": "The Zipkin endpoint. Please use full URL endpoint format, e.g. `http://127.0.0.1:9411/api/v2/spans`.", + "default": "http://127.0.0.1:9411/api/v2/spans", + "type": "string" + } + } + }, { "title": "Open Telemetry (OTLP)", "description": "Sends telemetry traces data to an [OpenTelemetry](https://opentelemetry.io/) backend, using the [OTLP protocol](https://opentelemetry.io/docs/specs/otel/protocol/).\n\nYou can find [here a list backends that supports the OTLP format](https://github.com/magsther/awesome-opentelemetry#open-source).", @@ -1845,7 +1866,7 @@ "type": "string" }, "protocol": { - "description": "The OTLP transport to use to export telemetry data.", + "description": "The OTLP transport to use to export telemetry data.\n\n> ❗️ The gRPC transport is not supported on WASM runtime (CloudFlare Worker).", "default": "grpc", "$ref": "#/definitions/OtlpProtcol" }, @@ -1884,7 +1905,7 @@ }, { "title": "Jaeger", - "description": "Sends telemetry traces data to a [Jaeger](https://www.jaegertracing.io/) backend, using the native protocol of [Jaeger (UDP) using `thrift`](https://www.jaegertracing.io/docs/next-release/getting-started/).\n\n> Note: Jaeger also [supports OTLP format](https://opentelemetry.io/blog/2022/jaeger-native-otlp/), so it's preferred to use the `otlp` target.\n\nTo get started with Jaeger, make sure you have a Jaeger backend running, and then use the following command to start the Jaeger backend and UI in your local machine, using Docker:\n\n`docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest`", + "description": "Sends telemetry traces data to a [Jaeger](https://www.jaegertracing.io/) backend, using the native protocol of [Jaeger (UDP) using `thrift`](https://www.jaegertracing.io/docs/next-release/getting-started/).\n\n> Note: Jaeger also [supports OTLP format](https://opentelemetry.io/blog/2022/jaeger-native-otlp/), so it's preferred to use the `otlp` target.\n\n> ❗️ This target is not available on WASM runtime (CloudFlare Worker).\n\nTo get started with Jaeger, use the following command to start the Jaeger backend and UI in your local machine, using Docker:\n\n`docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest`", "type": "object", "required": [ "type" diff --git a/libs/e2e_tests/tests/plugin_telemetry.rs b/libs/e2e_tests/tests/plugin_telemetry.rs index ee621905..d4f1f533 100644 --- a/libs/e2e_tests/tests/plugin_telemetry.rs +++ b/libs/e2e_tests/tests/plugin_telemetry.rs @@ -1,6 +1,10 @@ pub mod telemetry { use conductor_common::{graphql::GraphQLRequest, plugin::CreatablePlugin}; - use conductor_tracing::{minitrace_mgr::MinitraceManager, otel_attrs::*}; + use conductor_tracing::reporters::TracingReporter; + use conductor_tracing::routed_reporter::test_utils::TestReporter; + use conductor_tracing::{ + minitrace_mgr::MinitraceManager, otel_attrs::*, trace_id::generate_trace_id, + }; use e2e::suite::TestSuite; use minitrace::{ collector::{Config, SpanContext, SpanId}, @@ -11,7 +15,7 @@ pub mod telemetry { #[test] async fn spans() { - let (spans, reporter) = conductor_tracing::minitrace_mgr::test_utils::TestReporter::new(); + let (spans, reporter) = TestReporter::new(); let plugin = telemetry_plugin::Plugin::create(telemetry_plugin::Config { targets: vec![telemetry_plugin::Target::Stdout], ..Default::default() @@ -20,15 +24,19 @@ pub mod telemetry { .unwrap(); let mut minitrace_mgr = MinitraceManager::default(); - plugin.configure_tracing_for_test(0, Box::new(reporter), &mut minitrace_mgr); - minitrace::set_reporter(minitrace_mgr.build_reporter(), Config::default()); + plugin.configure_tracing_for_test( + 0, + TracingReporter::Simple(Box::new(reporter)), + &mut minitrace_mgr, + ); + minitrace::set_reporter(minitrace_mgr.build_root_reporter(), Config::default()); let test = TestSuite { plugins: vec![plugin], ..Default::default() }; - let span_context = SpanContext::new(MinitraceManager::generate_trace_id(0), SpanId::default()); + let span_context = SpanContext::new(generate_trace_id(0), SpanId::default()); let root_span = Span::root("root", span_context); test .run_graphql_request(GraphQLRequest::default()) diff --git a/libs/engine/src/gateway.rs b/libs/engine/src/gateway.rs index b99baf2f..a1fc87c3 100644 --- a/libs/engine/src/gateway.rs +++ b/libs/engine/src/gateway.rs @@ -92,7 +92,7 @@ impl ConductorGateway { tenant_id: u32, config_object: &ConductorConfig, endpoint_config: &EndpointDefinition, - tracing_manager: &mut Option<&mut MinitraceManager>, + tracing_manager: &mut MinitraceManager, ) -> Result { let global_plugins = &config_object.plugins; let combined_plugins = global_plugins @@ -124,7 +124,7 @@ impl ConductorGateway { pub async fn new( config_object: &ConductorConfig, - tracing_manager: &mut Option<&mut MinitraceManager>, + tracing_manager: &mut MinitraceManager, ) -> Result { let mut route_mapping: Vec = vec![]; @@ -269,6 +269,7 @@ impl ConductorGateway { let upstream_span = Span::enter_with_parent("upstream_call", &_graphql_span) .with_property(|| (CONDUCTOR_SOURCE, route_data.to.name().to_string())); + let upstream_response = route_data .to .execute(route_data, &mut request_ctx) diff --git a/libs/engine/src/plugin_manager.rs b/libs/engine/src/plugin_manager.rs index 3f685d68..974dff79 100644 --- a/libs/engine/src/plugin_manager.rs +++ b/libs/engine/src/plugin_manager.rs @@ -31,7 +31,7 @@ impl PluginManager { pub async fn new( plugins_config: &Option>, - tracing_manager: &mut Option<&mut MinitraceManager>, + tracing_manager: &mut MinitraceManager, tenant_id: u32, ) -> Result { let mut instance = PluginManager::default(); @@ -84,16 +84,10 @@ impl PluginManager { enabled: Some(true), config, } => { - if tracing_manager.is_some() { - let plugin = Self::create_plugin::(config.clone()).await?; - plugin.configure_tracing(tenant_id, tracing_manager.as_mut().unwrap())?; - - plugin - } else { - return Err(PluginError::PluginNotSupportedInRuntime { - name: "telemetry".to_string(), - }); - } + let plugin = Self::create_plugin::(config.clone()).await?; + plugin.configure_tracing(tenant_id, tracing_manager)?; + + plugin } // In case plugin is not enabled, we are skipping it. Also when we don't have a match, so watch out for this one if you add a new plugin. _ => continue, diff --git a/libs/federation_query_planner/src/executor.rs b/libs/federation_query_planner/src/executor.rs index a9ebeda3..83ead620 100644 --- a/libs/federation_query_planner/src/executor.rs +++ b/libs/federation_query_planner/src/executor.rs @@ -9,7 +9,6 @@ use anyhow::{anyhow, Result}; use async_graphql::{dynamic::*, Error, Value}; use futures::future::join_all; use futures::Future; -use minitrace::future::FutureExt; use minitrace::Span; use serde::{Deserialize, Serialize}; use serde_json::Value as SerdeValue; @@ -220,7 +219,7 @@ async fn execute_query_step( extensions: None, }) } else { - let span = Span::enter_with_local_parent(format!("subgraph {}", query_step.service_name)) + let _span = Span::enter_with_local_parent(format!("subgraph {}", query_step.service_name)) .with_properties(|| { [ ("service_name", query_step.service_name.clone()), @@ -247,7 +246,6 @@ async fn execute_query_step( .to_string(), ) .send() - .in_span(span) .await { Ok(resp) => resp, diff --git a/libs/smoke_tests/Cargo.toml b/libs/smoke_tests/Cargo.toml index bf1859da..2f94ab4a 100644 --- a/libs/smoke_tests/Cargo.toml +++ b/libs/smoke_tests/Cargo.toml @@ -14,6 +14,7 @@ serde = { workspace = true } serde_json = { workspace = true } insta = "1.34.0" lazy_static = "1.4.0" +serial_test = "3.0.0" [features] binary = [] diff --git a/libs/smoke_tests/docker-compose.yaml b/libs/smoke_tests/docker-compose.yaml index 1e54443a..7cc3163c 100644 --- a/libs/smoke_tests/docker-compose.yaml +++ b/libs/smoke_tests/docker-compose.yaml @@ -24,3 +24,9 @@ services: - 6831:6831/udp # Jaeger Thrift over compact thrift protocol, UDP - 6832:6832/udp # Jaeger Thrift over binary thrift protocol, UDP - 16686:16686 # Jaeger UI / API + + # Zipkin (Telemetry) + zipkin: + image: openzipkin/zipkin + ports: + - 9411:9411 diff --git a/libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_otlp.snap b/libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_otlp_grpc.snap similarity index 100% rename from libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_otlp.snap rename to libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_otlp_grpc.snap diff --git a/libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_otlp_http.snap b/libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_otlp_http.snap new file mode 100644 index 00000000..3c447443 --- /dev/null +++ b/libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_otlp_http.snap @@ -0,0 +1,9 @@ +--- +source: libs/smoke_tests/src/telemetry.rs +expression: json_body +--- +Object { + "data": Object { + "__typename": String("Query"), + }, +} diff --git a/libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_zipkin.snap b/libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_zipkin.snap new file mode 100644 index 00000000..3c447443 --- /dev/null +++ b/libs/smoke_tests/src/snapshots/smoke_tests__telemetry__smoke_telemetry__telemetry_zipkin.snap @@ -0,0 +1,9 @@ +--- +source: libs/smoke_tests/src/telemetry.rs +expression: json_body +--- +Object { + "data": Object { + "__typename": String("Query"), + }, +} diff --git a/libs/smoke_tests/src/telemetry.rs b/libs/smoke_tests/src/telemetry.rs index e037acbd..427e298e 100644 --- a/libs/smoke_tests/src/telemetry.rs +++ b/libs/smoke_tests/src/telemetry.rs @@ -1,13 +1,13 @@ #[cfg(test)] -#[cfg(not(feature = "wasm"))] // TODO: Remove this when Telemtry will be fully functional on WASM runtime mod smoke_telemetry { use conductor_common::http::ConductorHttpRequest; use insta::assert_debug_snapshot; use lazy_static::lazy_static; use reqwest::Response; use serde_json::Value; + use serial_test::serial; use std::env::var; - use std::time::{Duration, SystemTime, UNIX_EPOCH}; + use std::time::Duration; use tokio::time::sleep; lazy_static! { @@ -15,6 +15,7 @@ mod smoke_telemetry { } static JAEGER_API: &str = "localhost:16686"; + static ZIPKIN_API: &str = "localhost:9411"; async fn make_graphql_request(req: ConductorHttpRequest) -> Response { let req_builder = reqwest::Client::new() @@ -44,8 +45,8 @@ mod smoke_telemetry { pub operation_name: String, } - async fn fetch_traces(service: &str, start: u128, end: u128) -> Vec { - let url = format!("http://{JAEGER_API}/api/traces?end={end}&limit=20&lookback=1h&maxDuration&minDuration&service={service}&start={start}"); + async fn fetch_jaeger_traces(service: &str) -> Vec { + let url = format!("http://{JAEGER_API}/api/traces?service={service}"); let response = reqwest::Client::new() .get(url) @@ -61,28 +62,44 @@ mod smoke_telemetry { response.data[0].spans.clone() } + #[derive(Clone, Debug, serde::Deserialize)] + struct ZipkinSpan { + name: String, + } + + async fn fetch_zipkin_traces(service: &str) -> Vec { + let url = format!("http://{ZIPKIN_API}/api/v2/traces?serviceName={service}"); + + let response = reqwest::Client::new() + .get(url) + .send() + .await + .expect("failed to fetch zipkin traces") + .json::>>() + .await + .expect("failed to get zipkin response"); + + assert_eq!(response.len(), 1); + + response[0].clone() + } + + #[cfg(feature = "binary")] #[tokio::test] + #[serial] async fn telemetry_jaeger() { let mut req = ConductorHttpRequest::default(); req.method = reqwest::Method::POST; req.uri = format!("{}/telemetry-jaeger-udp", CONDUCTOR_URL.as_str()) .parse() .unwrap(); - let start_timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_micros(); let gql_response: Response = make_graphql_request(req).await; - let end_timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_micros(); assert_eq!(gql_response.status(), 200); let json_body = gql_response.json::().await.unwrap(); assert_debug_snapshot!(json_body); sleep(Duration::from_secs(5)).await; // Jaeger needs some processing time... - let traces = fetch_traces("conductor-jaeger-test", start_timestamp, end_timestamp).await; + let traces = fetch_jaeger_traces("conductor-jaeger-test").await; assert!(traces .iter() @@ -119,27 +136,71 @@ mod smoke_telemetry { } #[tokio::test] - async fn telemetry_otlp() { + #[serial] + #[cfg(feature = "binary")] + async fn telemetry_otlp_grpc() { + let mut req = ConductorHttpRequest::default(); + req.method = reqwest::Method::POST; + req.uri = format!("{}/telemetry-jaeger-otlp-grpc", CONDUCTOR_URL.as_str()) + .parse() + .unwrap(); + let gql_response: Response = make_graphql_request(req).await; + assert_eq!(gql_response.status(), 200); + let json_body = gql_response.json::().await.unwrap(); + assert_debug_snapshot!(json_body); + + sleep(Duration::from_secs(5)).await; // Jaeger needs some processing time... + let traces = fetch_jaeger_traces("conductor-otlp-test-grpc").await; + + assert!(traces + .iter() + .find(|v| v.operation_name == "transform_request") + .is_some()); + assert!(traces + .iter() + .find(|v| v.operation_name == "transform_response") + .is_some()); + assert!(traces + .iter() + .find(|v| v.operation_name == "query") + .is_some()); + assert!(traces + .iter() + .find(|v| v.operation_name == "execute") + .is_some()); + assert!(traces + .iter() + .find(|v| v.operation_name == "POST /") + .is_some()); + assert!(traces + .iter() + .find(|v| v.operation_name == "upstream_call") + .is_some()); + assert!(traces + .iter() + .find(|v| v.operation_name == "graphql_parse") + .is_some()); + assert!(traces + .iter() + .find(|v| v.operation_name == "HTTP POST /telemetry-jaeger-otlp-grpc") + .is_some()); + } + + #[tokio::test] + #[serial] + async fn telemetry_otlp_http() { let mut req = ConductorHttpRequest::default(); req.method = reqwest::Method::POST; - req.uri = format!("{}/telemetry-jaeger-otlp", CONDUCTOR_URL.as_str()) + req.uri = format!("{}/telemetry-jaeger-otlp-http", CONDUCTOR_URL.as_str()) .parse() .unwrap(); - let start_timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_micros(); let gql_response: Response = make_graphql_request(req).await; - let end_timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_micros(); assert_eq!(gql_response.status(), 200); let json_body = gql_response.json::().await.unwrap(); assert_debug_snapshot!(json_body); sleep(Duration::from_secs(5)).await; // Jaeger needs some processing time... - let traces = fetch_traces("conductor-otlp-test", start_timestamp, end_timestamp).await; + let traces = fetch_jaeger_traces("conductor-otlp-test-http").await; assert!(traces .iter() @@ -171,7 +232,42 @@ mod smoke_telemetry { .is_some()); assert!(traces .iter() - .find(|v| v.operation_name == "HTTP POST /telemetry-jaeger-otlp") + .find(|v| v.operation_name == "HTTP POST /telemetry-jaeger-otlp-http") + .is_some()); + } + + #[tokio::test] + #[serial] + async fn telemetry_zipkin() { + let mut req = ConductorHttpRequest::default(); + req.method = reqwest::Method::POST; + req.uri = format!("{}/telemetry-zipkin", CONDUCTOR_URL.as_str()) + .parse() + .unwrap(); + let gql_response: Response = make_graphql_request(req).await; + assert_eq!(gql_response.status(), 200); + let json_body = gql_response.json::().await.unwrap(); + assert_debug_snapshot!(json_body); + + sleep(Duration::from_secs(5)).await; // Zipkin needs some processing time... + let traces = fetch_zipkin_traces("conductor-zipkin").await; + + assert!(traces + .iter() + .find(|v| v.name == "transform_request") + .is_some()); + assert!(traces + .iter() + .find(|v| v.name == "transform_response") + .is_some()); + assert!(traces.iter().find(|v| v.name == "query").is_some()); + assert!(traces.iter().find(|v| v.name == "execute").is_some()); + assert!(traces.iter().find(|v| v.name == "post /").is_some()); + assert!(traces.iter().find(|v| v.name == "upstream_call").is_some()); + assert!(traces.iter().find(|v| v.name == "graphql_parse").is_some()); + assert!(traces + .iter() + .find(|v| v.name == "http post /telemetry-zipkin") .is_some()); } } diff --git a/libs/smoke_tests/test_gw.yaml b/libs/smoke_tests/test_gw.yaml index 55703b0f..39f4dd01 100644 --- a/libs/smoke_tests/test_gw.yaml +++ b/libs/smoke_tests/test_gw.yaml @@ -68,19 +68,41 @@ endpoints: config: endpoint: "127.0.0.1:6831" - # Jaeger with OTLP endpoint - - path: /telemetry-jaeger-otlp + # Jaeger with OTLP endpoint (over gRPC) + - path: /telemetry-jaeger-otlp-grpc from: upstream plugins: - type: telemetry enabled: ${ENABLE_WASM_FEATURES:true} config: - service_name: conductor-otlp-test + service_name: conductor-otlp-test-grpc targets: - type: otlp - endpoint: http://localhost:4317/v1/traces + endpoint: http://localhost:4317 protocol: grpc + # Jaeger with OTLP endpoint (over HTTP) + - path: /telemetry-jaeger-otlp-http + from: upstream + plugins: + - type: telemetry + config: + service_name: conductor-otlp-test-http + targets: + - type: otlp + endpoint: http://localhost:4318 + protocol: http + + # Zipkin (over HTTP) + - path: /telemetry-zipkin + from: upstream + plugins: + - type: telemetry + config: + service_name: conductor-zipkin + targets: + - type: zipkin + # HTTP GET - path: /http-get from: upstream diff --git a/libs/tracing/Cargo.toml b/libs/tracing/Cargo.toml index ad6f5c00..e07fb24e 100644 --- a/libs/tracing/Cargo.toml +++ b/libs/tracing/Cargo.toml @@ -10,6 +10,8 @@ path = "src/lib.rs" test_utils = [] [dependencies] +tokio = { workspace = true } +async-trait = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } schemars = { workspace = true } diff --git a/libs/tracing/src/lib.rs b/libs/tracing/src/lib.rs index 391dff05..1e6a8dbc 100644 --- a/libs/tracing/src/lib.rs +++ b/libs/tracing/src/lib.rs @@ -1,3 +1,6 @@ pub mod minitrace_mgr; pub mod otel_attrs; pub mod otel_utils; +pub mod reporters; +pub mod routed_reporter; +pub mod trace_id; diff --git a/libs/tracing/src/minitrace_mgr.rs b/libs/tracing/src/minitrace_mgr.rs index 3279026d..1fa65de6 100644 --- a/libs/tracing/src/minitrace_mgr.rs +++ b/libs/tracing/src/minitrace_mgr.rs @@ -1,10 +1,16 @@ -use std::collections::HashMap; +use minitrace::collector::Reporter; -use minitrace::collector::{Reporter, SpanRecord, TraceId}; +use crate::{ + reporters::TracingReporter, routed_reporter::RoutedReporter, trace_id::extract_tenant_id, +}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; #[derive(Default)] pub struct MinitraceManager { - reporters: HashMap>, + reporters: HashMap>>, } impl std::fmt::Debug for MinitraceManager { @@ -14,170 +20,32 @@ impl std::fmt::Debug for MinitraceManager { } impl MinitraceManager { - pub fn add_reporter(&mut self, tenant_id: u32, reporter: Box) { - self.reporters.insert(tenant_id, reporter); - } - - pub fn generate_trace_id(tenant_id: u32) -> TraceId { - let uniq: u32 = rand::random(); - - TraceId(((tenant_id as u128) << 32) | (uniq as u128)) - } - - pub fn extract_tenant_id(trace_id: TraceId) -> u32 { - (trace_id.0 >> 32) as u32 - } - - pub fn build_reporter(self) -> impl Reporter { - let mut routed_reporter = - RoutedReporter::new(|span| Some(Self::extract_tenant_id(span.trace_id))); - - for (tenant_id, reporter) in self.reporters { - routed_reporter = routed_reporter.with_reporter(tenant_id, reporter); - } - - routed_reporter - } -} - -type RouterFn = fn(&SpanRecord) -> Option; - -struct RoutedReporter { - reporters: HashMap>, - router_fn: RouterFn, -} - -impl RoutedReporter { - pub fn new(router_fn: RouterFn) -> Self { - Self { - reporters: HashMap::new(), - router_fn, - } - } - - pub fn with_reporter(mut self, tenant_id: u32, reporter: Box) -> Self { - self.reporters.insert(tenant_id, reporter); - + pub fn add_reporter(&mut self, tenant_id: u32, reporter: TracingReporter) { self + .reporters + .insert(tenant_id, Arc::new(Mutex::new(reporter))); } -} -impl Reporter for RoutedReporter { - fn report(&mut self, spans: &[SpanRecord]) { - let mut chunks: HashMap> = HashMap::new(); + pub fn build_root_reporter(&self) -> impl Reporter { + let mut routed_reporter = RoutedReporter::new(|span| Some(extract_tenant_id(span.trace_id))); - for span in spans { - if let Some(key) = (self.router_fn)(span) { - let chunk = chunks.entry(key).or_default(); - chunk.push(span.clone()); - } else { - tracing::warn!("no key for span: {:?}, dropping span", span); - } + for (tenant_id, reporter) in &self.reporters { + routed_reporter = routed_reporter.with_reporter(*tenant_id, reporter.clone()); } - for (key, chunk) in chunks { - if let Some(reporter) = self.reporters.get_mut(&key) { - reporter.report(chunk.as_slice()); - } - } + routed_reporter } -} -#[cfg(feature = "test_utils")] -pub mod test_utils { - use std::sync::{Arc, Mutex}; + pub async fn shutdown(self) { + tracing::info!("Shutting down tracing reporters..."); + minitrace::flush(); - use minitrace::collector::{Reporter, SpanRecord}; + for (_, reporter) in self.reporters { + let mut reporter = reporter + .lock() + .expect("failed to acquire lock for tracing reporter"); - pub struct TestReporter { - captured_spans: Arc>>, - } - - impl TestReporter { - pub fn new() -> (Arc>>, Self) { - let spans: Arc>> = Arc::new(Mutex::new(vec![])); - - ( - spans.clone(), - Self { - captured_spans: spans, - }, - ) + reporter.flush().await } } - - impl Reporter for TestReporter { - fn report(&mut self, spans: &[SpanRecord]) { - for span in spans.iter() { - self.captured_spans.lock().unwrap().push(span.clone()); - } - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn routed_reporter() { - let (spans0, reporter0) = test_utils::TestReporter::new(); - let (spans1, reporter1) = test_utils::TestReporter::new(); - let mut routed_reporter = - RoutedReporter::new(|span| Some(MinitraceManager::extract_tenant_id(span.trace_id))) - .with_reporter(0, Box::new(reporter0)) - .with_reporter(1, Box::new(reporter1)); - - routed_reporter.report(&vec![ - // This one goes to tenant 2, it does not exists - SpanRecord { - trace_id: MinitraceManager::generate_trace_id(2), - ..Default::default() - }, - // This one goes to tenant 0 - SpanRecord { - trace_id: MinitraceManager::generate_trace_id(0), - ..Default::default() - }, - // This one goes to tenant 0 - SpanRecord { - trace_id: MinitraceManager::generate_trace_id(0), - ..Default::default() - }, - // This one goes to tenant 1 - SpanRecord { - trace_id: MinitraceManager::generate_trace_id(1), - ..Default::default() - }, - // This one goes to tenant 2, it does not exists - SpanRecord { - trace_id: MinitraceManager::generate_trace_id(2), - ..Default::default() - }, - ]); - - routed_reporter.report(&vec![ - // This one goes to tenant 1 - SpanRecord { - trace_id: MinitraceManager::generate_trace_id(1), - ..Default::default() - }, - // This one goes to tenant 1 - SpanRecord { - trace_id: MinitraceManager::generate_trace_id(1), - ..Default::default() - }, - // This one goes to tenant 2 - SpanRecord { - trace_id: MinitraceManager::generate_trace_id(2), - ..Default::default() - }, - ]); - - let spans0 = spans0.lock().unwrap(); - let spans1 = spans1.lock().unwrap(); - - assert_eq!(spans0.len(), 2); - assert_eq!(spans1.len(), 3); - } } diff --git a/libs/tracing/src/reporters.rs b/libs/tracing/src/reporters.rs new file mode 100644 index 00000000..c3a9d992 --- /dev/null +++ b/libs/tracing/src/reporters.rs @@ -0,0 +1,54 @@ +use minitrace::collector::{Reporter as MinitraceSyncReporter, SpanRecord}; + +#[async_trait::async_trait(?Send)] +pub trait AsyncReporter: Send + 'static { + async fn flush(&mut self, spans: &[SpanRecord]); +} + +pub struct AggregatingReporter { + collected_spans: Vec, + reporter: Box, +} + +impl AggregatingReporter { + pub fn new(reporter: Box) -> Self { + Self { + collected_spans: Vec::new(), + reporter, + } + } + + pub async fn flush(&mut self) { + self.reporter.flush(&self.collected_spans).await; + } +} + +impl MinitraceSyncReporter for AggregatingReporter { + fn report(&mut self, spans: &[SpanRecord]) { + self.collected_spans.extend_from_slice(spans); + } +} + +pub enum TracingReporter { + // A simple wrapper around a generic Reporter created from minitrace package. + Simple(Box), + // A special reporter that aggregates the spans in memory, and can later ship the spans on demand. + // This is a workaround that collects traces in-memory, and later ships them asynchronously, on a WASM runtime. + Aggregating(AggregatingReporter), +} + +impl TracingReporter { + pub fn report(&mut self, spans: &[SpanRecord]) { + match self { + TracingReporter::Aggregating(reporter) => reporter.report(spans), + TracingReporter::Simple(reporter) => reporter.report(spans), + } + } + + pub async fn flush(&mut self) { + // Only the AggregatingReporter needs to flush the spans at this point. + if let TracingReporter::Aggregating(reporter) = self { + reporter.flush().await; + } + } +} diff --git a/libs/tracing/src/routed_reporter.rs b/libs/tracing/src/routed_reporter.rs new file mode 100644 index 00000000..85ce5b3b --- /dev/null +++ b/libs/tracing/src/routed_reporter.rs @@ -0,0 +1,168 @@ +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use minitrace::collector::{Reporter, SpanRecord}; + +use crate::reporters::TracingReporter; + +pub type RouterFn = fn(&SpanRecord) -> Option; + +pub struct RoutedReporter { + reporters: HashMap>>, + router_fn: RouterFn, +} + +impl RoutedReporter { + pub fn new(router_fn: RouterFn) -> Self { + Self { + reporters: HashMap::new(), + router_fn, + } + } + + pub fn with_reporter(mut self, tenant_id: u32, reporter: Arc>) -> Self { + self.reporters.insert(tenant_id, reporter); + + self + } + + pub fn make_report(&mut self, spans: &[SpanRecord]) { + let mut chunks: HashMap> = HashMap::new(); + + for span in spans { + if let Some(key) = (self.router_fn)(span) { + let chunk = chunks.entry(key).or_default(); + chunk.push(span.clone()); + } else { + tracing::warn!("no key for span: {:?}, dropping span", span); + } + } + + for (key, chunk) in chunks { + if let Some(reporter) = self.reporters.get_mut(&key) { + let mut r = reporter.lock().unwrap(); + r.report(&chunk); + } + } + } +} + +impl Reporter for RoutedReporter { + #[cfg(target_arch = "wasm32")] + fn report(&mut self, spans: &[SpanRecord]) { + self.make_report(spans); + } + + #[cfg(not(target_arch = "wasm32"))] + fn report(&mut self, spans: &[SpanRecord]) { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async move { + self.make_report(spans); + }); + } +} + +#[cfg(feature = "test_utils")] +pub mod test_utils { + use std::sync::{Arc, Mutex}; + + use minitrace::collector::{Reporter, SpanRecord}; + + pub struct TestReporter { + captured_spans: Arc>>, + } + + impl TestReporter { + pub fn new() -> (Arc>>, Self) { + let spans: Arc>> = Arc::new(Mutex::new(vec![])); + + ( + spans.clone(), + Self { + captured_spans: spans, + }, + ) + } + } + + impl Reporter for TestReporter { + fn report(&mut self, spans: &[SpanRecord]) { + for span in spans.iter() { + self.captured_spans.lock().unwrap().push(span.clone()); + } + } + } +} + +// #[cfg(test)] +// mod test { +// use super::*; + +// #[tokio::test] +// async fn routed_reporter() { +// let (spans0, reporter0) = test_utils::TestReporter::new(); +// let (spans1, reporter1) = test_utils::TestReporter::new(); +// let mut routed_reporter = +// RoutedReporter::new(|span| Some(MinitraceManager::extract_tenant_id(span.trace_id))) +// .with_reporter(0, Box::new(reporter0).into()) +// .with_reporter(1, Box::new(reporter1).into()); + +// routed_reporter +// .report(&vec![ +// // This one goes to tenant 2, it does not exists +// SpanRecord { +// trace_id: MinitraceManager::generate_trace_id(2), +// ..Default::default() +// }, +// // This one goes to tenant 0 +// SpanRecord { +// trace_id: MinitraceManager::generate_trace_id(0), +// ..Default::default() +// }, +// // This one goes to tenant 0 +// SpanRecord { +// trace_id: MinitraceManager::generate_trace_id(0), +// ..Default::default() +// }, +// // This one goes to tenant 1 +// SpanRecord { +// trace_id: MinitraceManager::generate_trace_id(1), +// ..Default::default() +// }, +// // This one goes to tenant 2, it does not exists +// SpanRecord { +// trace_id: MinitraceManager::generate_trace_id(2), +// ..Default::default() +// }, +// ]) +// .await; + +// routed_reporter +// .report(&vec![ +// // This one goes to tenant 1 +// SpanRecord { +// trace_id: MinitraceManager::generate_trace_id(1), +// ..Default::default() +// }, +// // This one goes to tenant 1 +// SpanRecord { +// trace_id: MinitraceManager::generate_trace_id(1), +// ..Default::default() +// }, +// // This one goes to tenant 2 +// SpanRecord { +// trace_id: MinitraceManager::generate_trace_id(2), +// ..Default::default() +// }, +// ]) +// .await; + +// let spans0 = spans0.lock().unwrap(); +// let spans1 = spans1.lock().unwrap(); + +// assert_eq!(spans0.len(), 2); +// assert_eq!(spans1.len(), 3); +// } +// } diff --git a/libs/tracing/src/trace_id.rs b/libs/tracing/src/trace_id.rs new file mode 100644 index 00000000..ed926a2b --- /dev/null +++ b/libs/tracing/src/trace_id.rs @@ -0,0 +1,11 @@ +use minitrace::collector::TraceId; + +pub fn generate_trace_id(tenant_id: u32) -> TraceId { + let uniq: u32 = rand::random(); + + TraceId(((tenant_id as u128) << 32) | (uniq as u128)) +} + +pub fn extract_tenant_id(trace_id: TraceId) -> u32 { + (trace_id.0 >> 32) as u32 +} diff --git a/plugins/telemetry/Cargo.toml b/plugins/telemetry/Cargo.toml index 7fae8735..a59c5273 100644 --- a/plugins/telemetry/Cargo.toml +++ b/plugins/telemetry/Cargo.toml @@ -10,6 +10,8 @@ path = "src/lib.rs" test_utils = [] [dependencies] +reqwest = { workspace = true } +tracing = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } async-trait = { workspace = true } @@ -21,8 +23,28 @@ opentelemetry = { version = "0.21.0", features = ["trace"] } opentelemetry_sdk = { version = "0.21.2", features = ["trace"] } minitrace = { workspace = true } +[target.'cfg(target_arch = "wasm32")'.dependencies] +wasm_polyfills = { path = "../../libs/wasm_polyfills" } +http = { workspace = true } +opentelemetry-http = { version = "0.10.0", default-features = false } +rmp-serde = "1.1.2" +web-time = "1.0.0" +opentelemetry-otlp = { version = "0.14.0", features = [ + "http-proto", +], default-features = false } +opentelemetry-zipkin = { version = "0.19.0", default-features = false } + [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -minitrace-datadog = "0.6.3" -minitrace-jaeger = "0.6.3" -minitrace-opentelemetry = "0.6.3" -opentelemetry-otlp = { version = "0.14.0" } +opentelemetry-otlp = { version = "0.14.0", features = [ + "grpc-tonic", + "http-proto", +] } +minitrace-datadog = "0.6.4" +opentelemetry-zipkin = { version = "0.19.0", default-features = false, features = [ + "reqwest-client", +] } +minitrace-jaeger = "0.6.4" +minitrace-opentelemetry = "0.6.4" +opentelemetry-http = { version = "0.10.0", default-features = false, features = [ + "reqwest", +] } diff --git a/plugins/telemetry/src/config.rs b/plugins/telemetry/src/config.rs index bc8c6ef9..6be277a0 100644 --- a/plugins/telemetry/src/config.rs +++ b/plugins/telemetry/src/config.rs @@ -7,14 +7,6 @@ use std::net::SocketAddr; #[derive(Deserialize, Serialize, Debug, Clone, Default, JsonSchema)] /// The `telemetry` plugin exports traces information about Conductor to a telemetry backend. /// -/// -/// -/// At the moment, this plugin is not supported on WASM (CloudFlare Worker) runtime. -/// -/// You may follow [this GitHub issue](https://github.com/the-guild-org/conductor/issues/354) for additional information. -/// -/// -/// /// The telemetry plugin exports traces information about the following aspects of Conductor: /// /// - GraphQL parser (timing) @@ -29,7 +21,7 @@ use std::net::SocketAddr; /// /// When used with a telemtry backend, you can expect to see the following information: /// -/// ![img](/assets/telemetry.png) +/// ![img](https://raw.githubusercontent.com/the-guild-org/conductor/master/website/public/assets/telemetry.png) /// pub struct TelemetryPluginConfig { /// Configures the service name that reports the telemetry data. This will appear in the telemetry data as the `service.name` attribute. @@ -54,6 +46,18 @@ pub enum TelemetryTarget { #[serde(rename = "stdout")] #[schemars(title = "stdout")] Stdout, + /// Sends telemetry traces data to a [Zipkin](https://zipkin.io/) collector, using the HTTP protocol. + /// + /// To get started with Zipkin, use the following command to start the Zipkin collector and UI in your local machine, using Docker: + /// + /// `docker run -d -p 9411:9411 openzipkin/zipkin` + #[serde(rename = "zipkin")] + #[schemars(title = "zipkin")] + Zipkin { + #[serde(default = "default_zipkin_endpoint")] + /// The Zipkin endpoint. Please use full URL endpoint format, e.g. `http://127.0.0.1:9411/api/v2/spans`. + collector_endpoint: String, + }, /// Sends telemetry traces data to an [OpenTelemetry](https://opentelemetry.io/) backend, using the [OTLP protocol](https://opentelemetry.io/docs/specs/otel/protocol/). /// /// You can find [here a list backends that supports the OTLP format](https://github.com/magsther/awesome-opentelemetry#open-source). @@ -64,6 +68,8 @@ pub enum TelemetryTarget { endpoint: String, #[serde(default = "default_otlp_protocol")] /// The OTLP transport to use to export telemetry data. + /// + /// > ❗️ The gRPC transport is not supported on WASM runtime (CloudFlare Worker). protocol: OtlpProtcol, #[serde( deserialize_with = "humantime_serde::deserialize", @@ -93,8 +99,9 @@ pub enum TelemetryTarget { /// /// > Note: Jaeger also [supports OTLP format](https://opentelemetry.io/blog/2022/jaeger-native-otlp/), so it's preferred to use the `otlp` target. /// - /// To get started with Jaeger, make sure you have a Jaeger backend running, - /// and then use the following command to start the Jaeger backend and UI in your local machine, using Docker: + /// > ❗️ This target is not available on WASM runtime (CloudFlare Worker). + /// + /// To get started with Jaeger, use the following command to start the Jaeger backend and UI in your local machine, using Docker: /// /// `docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest` #[serde(rename = "jaeger")] @@ -110,6 +117,10 @@ fn default_jaeger_endpoint() -> SocketAddr { "127.0.0.1:6831".parse().unwrap() } +fn default_zipkin_endpoint() -> String { + "http://127.0.0.1:9411/api/v2/spans".parse().unwrap() +} + fn default_datadog_agent_endpoint() -> SocketAddr { "127.0.0.1:8126".parse().unwrap() } @@ -134,12 +145,23 @@ pub enum OtlpProtcol { Http, } -#[cfg(not(target_arch = "wasm32"))] impl From for opentelemetry_otlp::Protocol { + #[cfg(not(target_arch = "wasm32"))] fn from(value: OtlpProtcol) -> Self { match value { OtlpProtcol::Grpc => opentelemetry_otlp::Protocol::Grpc, OtlpProtcol::Http => opentelemetry_otlp::Protocol::HttpBinary, } } + + #[cfg(target_arch = "wasm32")] + fn from(value: OtlpProtcol) -> Self { + match value { + OtlpProtcol::Grpc => { + tracing::warn!("GRPC is not supported on WASM runtime. Using HTTP instead."); + opentelemetry_otlp::Protocol::HttpBinary + } + OtlpProtcol::Http => opentelemetry_otlp::Protocol::HttpBinary, + } + } } diff --git a/plugins/telemetry/src/lib.rs b/plugins/telemetry/src/lib.rs index 83661ce4..76d70b28 100644 --- a/plugins/telemetry/src/lib.rs +++ b/plugins/telemetry/src/lib.rs @@ -1,6 +1,9 @@ mod config; mod plugin; +#[cfg(target_arch = "wasm32")] +pub mod wasm_reporter; + pub use config::TelemetryPluginConfig as Config; pub use config::TelemetryTarget as Target; pub use plugin::TelemetryPlugin as Plugin; diff --git a/plugins/telemetry/src/plugin.rs b/plugins/telemetry/src/plugin.rs index 0904252d..a544c0f7 100644 --- a/plugins/telemetry/src/plugin.rs +++ b/plugins/telemetry/src/plugin.rs @@ -1,9 +1,14 @@ +use std::borrow::Cow; + use crate::config::{TelemetryPluginConfig, TelemetryTarget}; use conductor_common::plugin::{CreatablePlugin, Plugin, PluginError}; - use conductor_tracing::minitrace_mgr::MinitraceManager; -use minitrace::collector::Reporter; +use conductor_tracing::reporters::TracingReporter; +use opentelemetry::trace::SpanKind; use opentelemetry::trace::TraceError; +use opentelemetry::{InstrumentationLibrary, KeyValue}; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::Resource; #[derive(Debug)] pub struct TelemetryPlugin { @@ -19,34 +24,122 @@ impl CreatablePlugin for TelemetryPlugin { } } -#[cfg(not(target_arch = "wasm32"))] static LIB_NAME: &str = "conductor"; impl TelemetryPlugin { #[cfg(target_arch = "wasm32")] fn compose_reporter( - _service_name: &String, - _target: &TelemetryTarget, - ) -> Result, TraceError> { - Err(TraceError::Other( - "plugin is not supported in this runtime".into(), - )) + service_name: &String, + target: &TelemetryTarget, + ) -> Result { + use crate::wasm_reporter::console_reporter::WasmConsoleReporter; + use crate::wasm_reporter::datadog_reporter::WasmDatadogReporter; + use crate::wasm_reporter::otlp_reporter::{WasmOtlpReporter, WasmTracingHttpClient}; + use conductor_tracing::reporters::AggregatingReporter; + + let reporter: TracingReporter = match target { + TelemetryTarget::Stdout => TracingReporter::Simple(Box::new(WasmConsoleReporter)), + TelemetryTarget::Jaeger { .. } => { + return Err(TraceError::Other( + "The \"jaeger\" target is not supported on WASM runtime. Please use the \"otlp\" target instead. See: https://opentelemetry.io/blog/2022/jaeger-native-otlp/".into(), + )) + }, + TelemetryTarget::Zipkin { collector_endpoint } => { + let (lib, resource) = Self::build_otlp_identifiers(service_name.clone()); + + let exporter = opentelemetry_zipkin::ZipkinPipelineBuilder::default() + .with_service_name(service_name) + .with_http_client(WasmTracingHttpClient) + .with_collector_endpoint(collector_endpoint) + .init_exporter()?; + + let reporter = Box::new(WasmOtlpReporter::new( + exporter, + SpanKind::Server, + resource, + lib, + )); + + TracingReporter::Aggregating(AggregatingReporter::new(reporter)) + } + TelemetryTarget::Datadog { agent_endpoint } => TracingReporter::Aggregating(AggregatingReporter::new(Box::new(WasmDatadogReporter::new(agent_endpoint, service_name, LIB_NAME, "web")))), + TelemetryTarget::Otlp { endpoint, + protocol, + timeout, + gzip_compression } => { + let (lib, resource) = Self::build_otlp_identifiers(service_name.clone()); + let exporter = opentelemetry_otlp::new_exporter() + .http() + .with_http_client(WasmTracingHttpClient) + .with_endpoint(endpoint) + .with_protocol(protocol.clone().into()) + .with_timeout(*timeout); + + if *gzip_compression { + tracing::warn!("Gzip compression is not supported on WASM runtime. Ignoring."); + } + + let reporter = Box::new(WasmOtlpReporter::new( + exporter.build_span_exporter()?, + SpanKind::Server, + resource, + lib, + )); + + TracingReporter::Aggregating(AggregatingReporter::new(reporter)) + }, + }; + + Ok(reporter) + } + + fn build_otlp_identifiers( + service_name: String, + ) -> (InstrumentationLibrary, Cow<'static, Resource>) { + let lib = + InstrumentationLibrary::new(LIB_NAME, None::<&'static str>, None::<&'static str>, None); + let resource = Cow::Owned(Resource::new([KeyValue::new("service.name", service_name)])); + + (lib, resource) } #[cfg(not(target_arch = "wasm32"))] fn compose_reporter( service_name: &String, target: &TelemetryTarget, - ) -> Result, TraceError> { + ) -> Result { use minitrace::collector::ConsoleReporter; + use minitrace::collector::Reporter; use minitrace_opentelemetry::OpenTelemetryReporter; + use crate::config::OtlpProtcol; + let reporter: Box = match target { TelemetryTarget::Stdout => Box::new(ConsoleReporter), - TelemetryTarget::Jaeger { endpoint } => Box::new(minitrace_jaeger::JaegerReporter::new( - *endpoint, - service_name, - )?), + TelemetryTarget::Zipkin { collector_endpoint } => { + let (lib, resource) = Self::build_otlp_identifiers(service_name.clone()); + + let exporter = opentelemetry_zipkin::ZipkinPipelineBuilder::default() + .with_service_name(service_name) + .with_http_client(reqwest::Client::new()) + .with_collector_endpoint(collector_endpoint) + .init_exporter()?; + + Box::new(OpenTelemetryReporter::new( + exporter, + SpanKind::Server, + resource, + lib, + )) + } + TelemetryTarget::Jaeger { endpoint } => { + tracing::warn!("The \"jaeger\" target is deprecated. Please use the \"otlp\" target instead. See: https://opentelemetry.io/blog/2022/jaeger-native-otlp/"); + + Box::new(minitrace_jaeger::JaegerReporter::new( + *endpoint, + service_name, + )?) + } TelemetryTarget::Datadog { agent_endpoint } => Box::new( minitrace_datadog::DatadogReporter::new(*agent_endpoint, service_name, LIB_NAME, "web"), ), @@ -56,31 +149,40 @@ impl TelemetryPlugin { timeout, gzip_compression, } => { - use opentelemetry::trace::SpanKind; - use opentelemetry::{InstrumentationLibrary, KeyValue}; - use opentelemetry_otlp::WithExportConfig; - use opentelemetry_sdk::Resource; - use std::borrow::Cow; - - let lib = - InstrumentationLibrary::new(LIB_NAME, None::<&'static str>, None::<&'static str>, None); - let resource = Cow::Owned(Resource::new([KeyValue::new( - "service.name", - service_name.clone(), - )])); - - let mut exporter = opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint(endpoint) - .with_protocol(protocol.clone().into()) - .with_timeout(*timeout); - - if *gzip_compression { - exporter = exporter.with_compression(opentelemetry_otlp::Compression::Gzip); - } + let exporter = match protocol { + OtlpProtcol::Http => { + let builder = opentelemetry_otlp::new_exporter() + .http() + .with_http_client(reqwest::Client::new()) + .with_endpoint(endpoint) + .with_protocol(protocol.clone().into()) + .with_timeout(*timeout); + + if *gzip_compression { + tracing::warn!("Gzip compression is not supported on HTTP protocol. Ignoring."); + } + + builder.build_span_exporter()? + } + OtlpProtcol::Grpc => { + let mut builder = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(endpoint) + .with_protocol(protocol.clone().into()) + .with_timeout(*timeout); + + if *gzip_compression { + builder = builder.with_compression(opentelemetry_otlp::Compression::Gzip); + } + + builder.build_span_exporter()? + } + }; + + let (lib, resource) = Self::build_otlp_identifiers(service_name.clone()); Box::new(OpenTelemetryReporter::new( - exporter.build_span_exporter()?, + exporter, SpanKind::Server, resource, lib, @@ -88,14 +190,14 @@ impl TelemetryPlugin { } }; - Ok(reporter) + Ok(TracingReporter::Simple(reporter)) } #[cfg(feature = "test_utils")] pub fn configure_tracing_for_test( &self, tenant_id: u32, - reporter: Box, + reporter: TracingReporter, tracing_manager: &mut MinitraceManager, ) { tracing_manager.add_reporter(tenant_id, reporter); @@ -106,6 +208,11 @@ impl TelemetryPlugin { tenant_id: u32, tracing_manager: &mut MinitraceManager, ) -> Result<(), PluginError> { + opentelemetry::global::set_error_handler(|error| { + tracing::error!("telemetry error: {:?}", error); + }) + .map_err(|e| PluginError::InitError { source: e.into() })?; + for target in &self.config.targets { let reporter = Self::compose_reporter(&self.config.service_name, target) .map_err(|e| PluginError::InitError { source: e.into() })?; diff --git a/plugins/telemetry/src/wasm_reporter/console_reporter.rs b/plugins/telemetry/src/wasm_reporter/console_reporter.rs new file mode 100644 index 00000000..b50c7178 --- /dev/null +++ b/plugins/telemetry/src/wasm_reporter/console_reporter.rs @@ -0,0 +1,11 @@ +use minitrace::collector::{Reporter, SpanRecord}; + +pub struct WasmConsoleReporter; + +impl Reporter for WasmConsoleReporter { + fn report(&mut self, spans: &[SpanRecord]) { + for span in spans { + tracing::info!("{:#?}", span); + } + } +} diff --git a/plugins/telemetry/src/wasm_reporter/datadog_reporter.rs b/plugins/telemetry/src/wasm_reporter/datadog_reporter.rs new file mode 100644 index 00000000..556bf1d8 --- /dev/null +++ b/plugins/telemetry/src/wasm_reporter/datadog_reporter.rs @@ -0,0 +1,112 @@ +use std::{collections::HashMap, net::SocketAddr}; + +use conductor_tracing::reporters::AsyncReporter; +use minitrace::collector::SpanRecord; +use rmp_serde::Serializer; +use serde::Serialize; + +pub struct WasmDatadogReporter { + agent_endpoint: SocketAddr, + service_name: String, + resource: String, + trace_type: String, +} + +#[derive(Serialize)] +struct DatadogSpan<'a> { + name: &'a str, + service: &'a str, + #[serde(rename = "type")] + trace_type: &'a str, + resource: &'a str, + start: i64, + duration: i64, + #[serde(skip_serializing_if = "Option::is_none")] + meta: Option>, + error_code: i32, + span_id: u64, + trace_id: u64, + parent_id: u64, +} + +impl WasmDatadogReporter { + pub fn new( + agent_endpoint: &SocketAddr, + service_name: impl Into, + resource: impl Into, + trace_type: impl Into, + ) -> Self { + Self { + agent_endpoint: agent_endpoint.clone(), + service_name: service_name.into(), + resource: resource.into(), + trace_type: trace_type.into(), + } + } + + fn convert<'a>(&'a self, spans: &'a [SpanRecord]) -> Vec> { + spans + .iter() + .map(move |s| DatadogSpan { + name: &s.name, + service: &self.service_name, + trace_type: &self.trace_type, + resource: &self.resource, + start: s.begin_time_unix_ns as i64, + duration: s.duration_ns as i64, + meta: if s.properties.is_empty() { + None + } else { + Some( + s.properties + .iter() + .map(|(k, v)| (k.as_ref(), v.as_ref())) + .collect(), + ) + }, + error_code: 0, + span_id: s.span_id.0, + trace_id: s.trace_id.0 as u64, + parent_id: s.parent_id.0, + }) + .collect() + } + + fn serialize(&self, spans: Vec) -> Result, Box> { + let mut buf = vec![0b10010001]; + spans.serialize(&mut Serializer::new(&mut buf).with_struct_map())?; + Ok(buf) + } + + async fn try_report(&self, spans: &[SpanRecord]) -> Result<(), Box> { + let datadog_spans = self.convert(spans); + let bytes = self.serialize(datadog_spans)?; + let client = reqwest::Client::new(); + let response = client + .post(format!("http://{}/v0.4/traces", self.agent_endpoint)) + .header("Datadog-Meta-Tracer-Version", "v1.27.0") + .header("Content-Type", "application/msgpack") + .body(bytes) + .send() + .await?; + + tracing::debug!("datadog report done with status: {:?}", response.status()); + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl AsyncReporter for WasmDatadogReporter { + async fn flush(&mut self, spans: &[SpanRecord]) { + if spans.is_empty() { + return; + } + + if let Err(err) = self.try_report(spans).await { + tracing::error!("report to datadog failed: {}", err); + } else { + tracing::debug!("flushed {} traces to datadog agent", spans.len()); + } + } +} diff --git a/plugins/telemetry/src/wasm_reporter/mod.rs b/plugins/telemetry/src/wasm_reporter/mod.rs new file mode 100644 index 00000000..266511df --- /dev/null +++ b/plugins/telemetry/src/wasm_reporter/mod.rs @@ -0,0 +1,3 @@ +pub mod console_reporter; +pub mod datadog_reporter; +pub mod otlp_reporter; diff --git a/plugins/telemetry/src/wasm_reporter/otlp_reporter.rs b/plugins/telemetry/src/wasm_reporter/otlp_reporter.rs new file mode 100644 index 00000000..4d7718a3 --- /dev/null +++ b/plugins/telemetry/src/wasm_reporter/otlp_reporter.rs @@ -0,0 +1,160 @@ +use std::borrow::Cow; + +use conductor_common::http::Bytes; +use conductor_tracing::reporters::AsyncReporter; +use http::{Request, Response}; +use minitrace::collector::{EventRecord, SpanRecord}; +use opentelemetry::{ + trace::{Event, SpanContext, SpanKind, Status, TraceFlags, TraceState}, + InstrumentationLibrary, Key, KeyValue, StringValue, Value, +}; +use opentelemetry_http::{HttpClient, HttpError}; +use opentelemetry_sdk::{ + export::trace::{SpanData, SpanExporter}, + trace::EvictedQueue, + Resource, +}; +use web_time::web::SystemTimeExt; +use web_time::{Duration, UNIX_EPOCH}; + +#[derive(Debug)] +pub struct WasmTracingHttpClient; + +#[async_trait::async_trait] +impl HttpClient for WasmTracingHttpClient { + async fn send(&self, request: Request>) -> Result, HttpError> { + wasm_polyfills::call_async(async move { + let request = request.try_into()?; + let client = wasm_polyfills::create_http_client().build().unwrap(); + + let mut response = client.execute(request).await?.error_for_status()?; + let headers = std::mem::take(response.headers_mut()); + let mut http_response = Response::builder() + .status(response.status()) + .body(response.bytes().await?)?; + + *http_response.headers_mut() = headers; + + Ok(http_response) + }) + .await + } +} + +pub struct WasmOtlpReporter { + span_kind: SpanKind, + resource: Cow<'static, Resource>, + instrumentation_lib: InstrumentationLibrary, + opentelemetry_exporter: Box, +} + +impl WasmOtlpReporter { + pub fn new( + exporter: impl SpanExporter + 'static, + span_kind: SpanKind, + resource: Cow<'static, Resource>, + instrumentation_lib: InstrumentationLibrary, + ) -> Self { + Self { + opentelemetry_exporter: Box::new(exporter), + span_kind, + resource, + instrumentation_lib, + } + } + + fn convert(&self, spans: &[SpanRecord]) -> Vec { + spans + .iter() + .map(move |span| SpanData { + span_context: SpanContext::new( + span.trace_id.0.into(), + span.span_id.0.into(), + TraceFlags::default(), + false, + TraceState::default(), + ), + dropped_attributes_count: 0, + parent_span_id: span.parent_id.0.into(), + name: span.name.clone(), + start_time: (UNIX_EPOCH + Duration::from_nanos(span.begin_time_unix_ns)).to_std(), + end_time: (UNIX_EPOCH + Duration::from_nanos(span.begin_time_unix_ns + span.duration_ns)) + .to_std(), + attributes: Self::convert_properties(&span.properties), + events: Self::convert_events(&span.events), + links: EvictedQueue::new(0), + status: Status::default(), + span_kind: self.span_kind.clone(), + resource: self.resource.clone(), + instrumentation_lib: self.instrumentation_lib.clone(), + }) + .collect() + } + + fn convert_properties(properties: &[(Cow<'static, str>, Cow<'static, str>)]) -> Vec { + let mut map = Vec::new(); + for (k, v) in properties { + map.push(KeyValue::new( + cow_to_otel_key(k.clone()), + cow_to_otel_value(v.clone()), + )); + } + map + } + + fn convert_events(events: &[EventRecord]) -> EvictedQueue { + let mut queue = EvictedQueue::new(u32::MAX); + queue.extend(events.iter().map(|event| { + Event::new( + event.name.clone(), + (UNIX_EPOCH + Duration::from_nanos(event.timestamp_unix_ns)).to_std(), + event + .properties + .iter() + .map(|(k, v)| KeyValue::new(cow_to_otel_key(k.clone()), cow_to_otel_value(v.clone()))) + .collect(), + 0, + ) + })); + queue + } + + async fn try_report(&mut self, spans: &[SpanRecord]) -> Result<(), Box> { + let opentelemetry_spans = self.convert(spans); + self + .opentelemetry_exporter + .export(opentelemetry_spans) + .await?; + + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl AsyncReporter for WasmOtlpReporter { + async fn flush(&mut self, spans: &[SpanRecord]) { + if spans.is_empty() { + return; + } + + if let Err(err) = self.try_report(spans).await { + tracing::error!("report to otlp failed: {:?}", err); + } else { + tracing::debug!("flushed {} traces to otlp", spans.len()); + } + } +} + +fn cow_to_otel_key(cow: Cow<'static, str>) -> Key { + match cow { + Cow::Borrowed(s) => Key::from_static_str(s), + Cow::Owned(s) => Key::from(s), + } +} + +fn cow_to_otel_value(cow: Cow<'static, str>) -> Value { + match cow { + Cow::Borrowed(s) => Value::String(StringValue::from(s)), + Cow::Owned(s) => Value::String(StringValue::from(s)), + } +} diff --git a/test_config/config.yaml b/test_config/config.yaml index fb9b40d1..80c159cf 100644 --- a/test_config/config.yaml +++ b/test_config/config.yaml @@ -31,7 +31,7 @@ endpoints: - type: telemetry config: targets: - - type: datadog + - type: zipkin - type: cors config: allow_credentials: true diff --git a/test_config/worker.yaml b/test_config/worker.yaml index 8e608c82..2d071779 100644 --- a/test_config/worker.yaml +++ b/test_config/worker.yaml @@ -13,4 +13,8 @@ endpoints: from: countries plugins: - type: http_get - - type: graphiql \ No newline at end of file + - type: graphiql + - type: telemetry + config: + targets: + - type: zipkin