From 2a8c4e4cf95e34f4fb729ec4b72d254166ed4d8f Mon Sep 17 00:00:00 2001 From: Scott Fleener Date: Mon, 6 Jan 2025 13:37:23 +0000 Subject: [PATCH] Include extra trace attributes from env This allows us to include arbitrary values from the k8s downward API beyond just the pod labels that are included in the trace attributes file. See https://github.com/linkerd/linkerd2/pull/13544 for the corresponding control plane change. Signed-off-by: Scott Fleener --- linkerd/app/src/env.rs | 9 ++++- linkerd/app/src/env/trace.rs | 4 ++ linkerd/app/src/trace_collector.rs | 38 +++++++++++-------- .../app/src/trace_collector/otel_collector.rs | 34 +++++++++++------ 4 files changed, 58 insertions(+), 27 deletions(-) diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index b90970dae8..320db6ebd5 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -149,6 +149,7 @@ const ENV_OUTBOUND_DISABLE_INFORMATIONAL_HEADERS: &str = const ENV_TRACE_ATTRIBUTES_PATH: &str = "LINKERD2_PROXY_TRACE_ATTRIBUTES_PATH"; const ENV_TRACE_PROTOCOL: &str = "LINKERD2_PROXY_TRACE_PROTOCOL"; const ENV_TRACE_SERVICE_NAME: &str = "LINKERD2_PROXY_TRACE_SERVICE_NAME"; +const ENV_TRACE_EXTRA_ATTRIBUTES: &str = "LINKERD2_PROXY_TRACE_EXTRA_ATTRIBUTES"; /// Constrains which destination names may be used for profile/route discovery. /// @@ -432,6 +433,7 @@ pub fn parse_config(strings: &S) -> Result let hostname = strings.get(ENV_HOSTNAME); let trace_attributes_file_path = strings.get(ENV_TRACE_ATTRIBUTES_PATH); + let trace_extra_attributes = strings.get(ENV_TRACE_EXTRA_ATTRIBUTES); let trace_protocol = strings.get(ENV_TRACE_PROTOCOL); let trace_service_name = strings.get(ENV_TRACE_SERVICE_NAME); @@ -831,12 +833,17 @@ pub fn parse_config(strings: &S) -> Result } else { outbound.http_request_queue.failfast_timeout }; - let attributes = trace_attributes_file_path + let mut attributes = trace_attributes_file_path .map(|path| match path.and_then(|p| p.parse::().ok()) { Some(path) => trace::read_trace_attributes(&path), None => HashMap::new(), }) .unwrap_or_default(); + if let Ok(Some(attrs)) = trace_extra_attributes { + if !attrs.is_empty() { + attributes.extend(trace::parse_env_trace_attributes(&attrs)); + } + } let trace_protocol = trace_protocol .map(|proto| proto.and_then(|p| p.parse::().ok())) diff --git a/linkerd/app/src/env/trace.rs b/linkerd/app/src/env/trace.rs index 98b9c4e3f0..ca1eab54a8 100644 --- a/linkerd/app/src/env/trace.rs +++ b/linkerd/app/src/env/trace.rs @@ -14,6 +14,10 @@ pub(super) fn read_trace_attributes(path: &std::path::Path) -> HashMap HashMap { + parse_attrs(attrs) +} + fn parse_attrs(attrs: &str) -> HashMap { attrs .lines() diff --git a/linkerd/app/src/trace_collector.rs b/linkerd/app/src/trace_collector.rs index ea362e9921..b173848c61 100644 --- a/linkerd/app/src/trace_collector.rs +++ b/linkerd/app/src/trace_collector.rs @@ -1,11 +1,14 @@ -use linkerd_app_core::http_tracing::{CollectorProtocol, SpanSink}; -use linkerd_app_core::metrics::ControlHttp as HttpMetrics; -use linkerd_app_core::svc::NewService; -use linkerd_app_core::{control, dns, identity, opencensus, opentelemetry}; +use linkerd_app_core::{ + control, dns, + http_tracing::{CollectorProtocol, SpanSink}, + identity, + metrics::ControlHttp as HttpMetrics, + opencensus, opentelemetry, + svc::NewService, +}; use linkerd_error::Error; -use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; +use otel_collector::OtelCollectorAttributes; +use std::{collections::HashMap, future::Future, pin::Pin}; pub mod oc_collector; pub mod otel_collector; @@ -92,14 +95,19 @@ impl Config { svc, legacy_oc_metrics, ), - CollectorProtocol::OpenTelemetry => otel_collector::create_collector( - addr.clone(), - inner.hostname, - svc_name, - inner.attributes, - svc, - legacy_otel_metrics, - ), + CollectorProtocol::OpenTelemetry => { + let attributes = OtelCollectorAttributes { + hostname: inner.hostname, + service_name: svc_name, + extra: inner.attributes, + }; + otel_collector::create_collector( + addr.clone(), + attributes, + svc, + legacy_otel_metrics, + ) + } }; Ok(TraceCollector::Enabled(Box::new(collector))) diff --git a/linkerd/app/src/trace_collector/otel_collector.rs b/linkerd/app/src/trace_collector/otel_collector.rs index f503fdcf80..0541aa75d6 100644 --- a/linkerd/app/src/trace_collector/otel_collector.rs +++ b/linkerd/app/src/trace_collector/otel_collector.rs @@ -4,20 +4,29 @@ use linkerd_app_core::{ }; use linkerd_opentelemetry::{ self as opentelemetry, metrics, - proto::proto::common::v1::{any_value, AnyValue, KeyValue}, - proto::transform::common::ResourceAttributesWithSchema, + proto::{ + proto::common::v1::{any_value, AnyValue, KeyValue}, + transform::common::ResourceAttributesWithSchema, + }, +}; +use std::{ + collections::HashMap, + time::{SystemTime, UNIX_EPOCH}, }; -use std::{collections::HashMap, time::SystemTime, time::UNIX_EPOCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{body::BoxBody, client::GrpcService}; use tracing::Instrument; +pub(super) struct OtelCollectorAttributes { + pub hostname: Option, + pub service_name: String, + pub extra: HashMap, +} + pub(super) fn create_collector( addr: ControlAddr, - hostname: Option, - service_name: String, - attributes: HashMap, + attributes: OtelCollectorAttributes, svc: S, legacy_metrics: metrics::Registry, ) -> EnabledCollector @@ -36,7 +45,7 @@ where resources .attributes .0 - .push(service_name.with_key("service.name")); + .push(attributes.service_name.with_key("service.name")); resources .attributes .0 @@ -49,13 +58,16 @@ where .unwrap_or_else(|e| -(e.duration().as_secs() as i64)) .with_key("process.start_timestamp"), ); - resources - .attributes - .0 - .push(hostname.unwrap_or_default().with_key("host.name")); + resources.attributes.0.push( + attributes + .hostname + .unwrap_or_default() + .with_key("host.name"), + ); resources.attributes.0.extend( attributes + .extra .into_iter() .map(|(key, value)| value.with_key(&key)), );