From 63abf5b2fcfc84655ca1538a4e3e30e2625d21a3 Mon Sep 17 00:00:00 2001 From: Yannick Poirier Date: Fri, 20 Sep 2024 09:46:57 +0200 Subject: [PATCH] Improve Opentelemetry support --- Cargo.toml | 6 +- src/common/fdtinstance.rs | 19 ++++- src/receiver/blockdecoder.rs | 34 --------- src/receiver/objectreceiver.rs | 97 +++++++++++++++++-------- src/receiver/objectreceiverlogger.rs | 105 ++++++++++++++++++++------- src/receiver/writer/mod.rs | 3 + src/sender/filedesc.rs | 8 ++ src/sender/objectdesc.rs | 5 ++ src/sender/objectsenderlogger.rs | 78 +++++++++++++------- src/sender/sendersession.rs | 1 + 10 files changed, 235 insertions(+), 121 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 68ffbf1..e5de2bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ path = "src/lib.rs" log = "0.4" chrono = "0.4.35" serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" quick-xml = { version = "0.36.0", features = ["serialize"] } base64 = "0.22" url = "2.5.0" @@ -32,7 +33,8 @@ pyo3 = { version = "0.20", features = ["extension-module"], optional = true } pyo3-log = { version = "0.9", optional = true } raptorq = "2.0" raptor-code = "1.0.6" -opentelemetry = { version = "0.22", optional = true } +opentelemetry = { version = "0.25", optional = true } +opentelemetry-semantic-conventions = { version = "0.25" , optional = true } rand = "0.8" [dev-dependencies] @@ -41,4 +43,4 @@ tempfile = "3.10.1" [features] python = ["pyo3", "pyo3-log"] -optel = ["opentelemetry"] +optel = ["opentelemetry", "opentelemetry-semantic-conventions"] diff --git a/src/common/fdtinstance.rs b/src/common/fdtinstance.rs index 983c38a..cae3cbf 100644 --- a/src/common/fdtinstance.rs +++ b/src/common/fdtinstance.rs @@ -4,6 +4,7 @@ use crate::tools::{ self, error::{FluteError, Result}, }; + use quick_xml::de::from_reader; use serde::{Deserialize, Serialize}; @@ -391,6 +392,9 @@ pub struct File { )] #[serde(alias = "@IndependentUnitPositions")] pub independent_unit_positions: Option, + + #[serde(alias = "@X-Optel-Propagator", skip_serializing_if = "Option::is_none")] + pub optel_propagator: Option, } fn reed_solomon_scheme_specific( @@ -433,7 +437,7 @@ impl FdtInstance { let tracer = opentelemetry::global::tracer("FdtInstance"); let mut span = tracer.start("FdtInstance"); let str = String::from_utf8_lossy(buffer); - span.add_event("fdt", vec![KeyValue::new("content", str.to_string())]); + span.set_attribute(KeyValue::new("content", str.to_string())); span } @@ -599,4 +603,17 @@ impl File { inband_fti: false, }) } + + #[cfg(feature = "opentelemetry")] + pub fn get_optel_propagator(&self) -> Option> { + use base64::Engine; + + self.optel_propagator.as_ref().and_then(|propagator| { + let decoded = base64::engine::general_purpose::STANDARD + .decode(propagator) + .ok()?; + let decoded = String::from_utf8_lossy(&decoded); + serde_json::from_str(&decoded).ok() + }) + } } diff --git a/src/receiver/blockdecoder.rs b/src/receiver/blockdecoder.rs index d8af8d7..63f955c 100644 --- a/src/receiver/blockdecoder.rs +++ b/src/receiver/blockdecoder.rs @@ -1,9 +1,3 @@ -#[cfg(feature = "opentelemetry")] -use opentelemetry::{ - trace::{Span, TraceContextExt}, - KeyValue, -}; - use crate::common::{ alc, oti::{self, SchemeSpecific}, @@ -21,8 +15,6 @@ pub struct BlockDecoder { pub initialized: bool, pub block_size: usize, decoder: Option>, - #[cfg(feature = "opentelemetry")] - telemetry_ctx: Option, } impl BlockDecoder { @@ -32,30 +24,9 @@ impl BlockDecoder { initialized: false, decoder: None, block_size: 0, - #[cfg(feature = "opentelemetry")] - telemetry_ctx: None, } } - #[cfg(feature = "opentelemetry")] - pub fn op_init( - &mut self, - mut telemetry_span: opentelemetry::global::BoxedSpan, - nb_source_symbols: u32, - block_size: usize, - sbn: u32, - ) { - telemetry_span.add_event( - "block", - vec![ - KeyValue::new("sbn", sbn as i64), - KeyValue::new("block_size", block_size as i64), - KeyValue::new("nb_source_symbols", nb_source_symbols as i64), - ], - ); - self.telemetry_ctx = Some(opentelemetry::Context::current_with_span(telemetry_span)); - } - pub fn init( &mut self, oti: &oti::Oti, @@ -139,11 +110,6 @@ impl BlockDecoder { return; } - /* - let tracer = opentelemetry::global::tracer("ObjectReceiverLogger"); - let _pkt_span = tracer.start_with_context("pkt", self.telemetry_ctx.as_ref().unwrap()); - */ - let payload = &pkt.data[pkt.data_payload_offset..]; let decoder = self.decoder.as_mut().unwrap(); decoder.push_symbol(payload, payload_id.esi); diff --git a/src/receiver/objectreceiver.rs b/src/receiver/objectreceiver.rs index 0bca881..689d71e 100644 --- a/src/receiver/objectreceiver.rs +++ b/src/receiver/objectreceiver.rs @@ -62,9 +62,8 @@ pub struct ObjectReceiver { pub content_location: Option, nb_allocated_blocks: usize, total_allocated_blocks_size: usize, - #[cfg(feature = "opentelemetry")] - logger: ObjectReceiverLogger, + logger: Option, } impl ObjectReceiver { @@ -108,7 +107,7 @@ impl ObjectReceiver { nb_allocated_blocks: 0, total_allocated_blocks_size: 0, #[cfg(feature = "opentelemetry")] - logger: ObjectReceiverLogger::new(endpoint, tsi, toi.clone(), _fdt_instance_id, _now), + logger: None, } } @@ -132,7 +131,7 @@ impl ObjectReceiver { self.last_activity = Instant::now(); self.set_fdt_id_from_pkt(pkt); self.set_cenc_from_pkt(pkt); - self.set_oti_from_pkt(pkt); + self.set_oti_from_pkt(pkt, now); self.init_blocks_partitioning(); self.init_object_writer(now); @@ -140,12 +139,12 @@ impl ObjectReceiver { if self.oti.is_none() { self.cache(pkt) - .unwrap_or_else(|_| self.error("Fail to push pkt to cache")); + .unwrap_or_else(|_| self.error("Fail to push pkt to cache", now)); return; } self.push_to_block(pkt, now) - .unwrap_or_else(|_| self.error("Fail to push pkt to block")); + .unwrap_or_else(|_| self.error("Fail to push pkt to block", now)); } fn push_to_block(&mut self, pkt: &alc::AlcPkt, now: std::time::SystemTime) -> Result<()> { @@ -234,14 +233,6 @@ impl ObjectReceiver { } self.nb_allocated_blocks += 1; self.total_allocated_blocks_size += block_length; - - #[cfg(feature = "opentelemetry")] - block.op_init( - self.logger.block_span(), - source_block_length, - block_length, - payload_id.sbn, - ); } block.push(pkt, &payload_id); @@ -253,6 +244,26 @@ impl ObjectReceiver { Ok(()) } + #[cfg(feature = "opentelemetry")] + fn init_logger( + &mut self, + propagator: Option<&std::collections::HashMap>, + now: std::time::SystemTime, + ) { + if self.logger.is_some() { + return; + } + + self.logger = Some(ObjectReceiverLogger::new( + &self.endpoint, + self.tsi, + self.toi, + self.fdt_instance_id, + now, + propagator, + )) + } + pub fn attach_fdt( &mut self, fdt_instance_id: u32, @@ -270,6 +281,12 @@ impl ObjectReceiver { None => return false, }; + #[cfg(feature = "opentelemetry")] + if self.logger.is_none() { + let propagator = file.get_optel_propagator(); + self.init_logger(propagator.as_ref(), now); + } + if self.cenc.is_none() { self.cenc = match &file.content_encoding { Some(str) => Some(str.as_str().try_into().unwrap_or(lct::Cenc::Null)), @@ -297,10 +314,13 @@ impl ObjectReceiver { "Fail to parse content-location {} to URL", file.content_location ); - self.error(&format!( - "Fail to parse content-location {} to URL", - file.content_location - )); + self.error( + &format!( + "Fail to parse content-location {} to URL", + file.content_location + ), + now, + ); return false; } } @@ -317,7 +337,7 @@ impl ObjectReceiver { } #[cfg(feature = "opentelemetry")] - let _span = self.logger.fdt_attached(); + let _span = self.logger.as_mut().map(|l| l.fdt_attached()); if self.enable_md5_check { self.content_md5 = file.content_md5.clone(); @@ -343,13 +363,17 @@ impl ObjectReceiver { false => Some(groups), }, md5: self.content_md5.clone(), + #[cfg(feature = "opentelemetry")] + optel_propagator: self.logger.as_ref().map(|l| l.get_propagator()), + #[cfg(not(feature = "opentelemetry"))] + optel_propagator: None, }); self.init_blocks_partitioning(); self.init_object_writer(now); self.push_from_cache(now); self.write_blocks(0, now) - .unwrap_or_else(|_| self.error("Fail to write blocks to storage")); + .unwrap_or_else(|_| self.error("Fail to write blocks to storage", now)); true } @@ -387,7 +411,7 @@ impl ObjectReceiver { if object_writer.writer.open().is_err() { log::error!("Fail to open destination for {:?}", self.meta); - self.error("Fail to create destination on storage"); + self.error("Fail to create destination on storage", now); return; }; @@ -457,10 +481,13 @@ impl ObjectReceiver { self.content_location ); - self.error(&format!( - "MD5 does not match expects {:?} received {:?}", - self.content_md5, &md5 - )); + self.error( + &format!( + "MD5 does not match expects {:?} received {:?}", + self.content_md5, &md5 + ), + now, + ); } break; } @@ -470,7 +497,7 @@ impl ObjectReceiver { fn complete(&mut self, _now: std::time::SystemTime) { #[cfg(feature = "opentelemetry")] - let _span = self.logger.complete(); + let _span = self.logger.as_mut().map(|l| l.complete()); self.state = State::Completed; @@ -485,9 +512,12 @@ impl ObjectReceiver { self.cache_size = 0; } - fn error(&mut self, _description: &str) { + fn error(&mut self, _description: &str, _now: std::time::SystemTime) { #[cfg(feature = "opentelemetry")] - let _span = self.logger.error(_description); + self.init_logger(None, _now); + + #[cfg(feature = "opentelemetry")] + let _span = self.logger.as_mut().map(|l| l.error(_description)); self.state = State::Error; @@ -509,7 +539,7 @@ impl ObjectReceiver { while let Some(item) = self.cache.pop() { let pkt = item.to_pkt(); if self.push_to_block(&pkt, now).is_err() { - self.error("Fail to push block"); + self.error("Fail to push block", now); break; } } @@ -536,7 +566,7 @@ impl ObjectReceiver { self.fdt_instance_id = pkt.fdt_info.as_ref().map(|info| info.fdt_instance_id); } - fn set_oti_from_pkt(&mut self, pkt: &alc::AlcPkt) { + fn set_oti_from_pkt(&mut self, pkt: &alc::AlcPkt, now: std::time::SystemTime) { if self.oti.is_some() { return; } @@ -551,7 +581,7 @@ impl ObjectReceiver { if pkt.transfer_length.is_none() { log::warn!("Bug? Pkt contains OTI without transfer length"); - self.error("Bug? Pkt contains OTI without transfer length"); + self.error("Bug? Pkt contains OTI without transfer length", now); return; } @@ -648,7 +678,10 @@ impl Drop for ObjectReceiver { self.endpoint, self.content_location.as_ref().map(|u| u.to_string()) ); - self.error("Drop object in open state, pkt missing ?"); + self.error( + "Drop object in open state, pkt missing ?", + SystemTime::now(), + ); } else if object_writer.state == ObjectWriterSessionState::Error { log::error!( "Drop object received with state {:?} TOI={} Endpoint={:?} Content-Location={:?}", diff --git a/src/receiver/objectreceiverlogger.rs b/src/receiver/objectreceiverlogger.rs index 30ecbf0..bd57589 100644 --- a/src/receiver/objectreceiverlogger.rs +++ b/src/receiver/objectreceiverlogger.rs @@ -1,8 +1,9 @@ -use std::time::SystemTime; +use std::{collections::HashMap, time::SystemTime}; use opentelemetry::{ global::{self, BoxedSpan}, - trace::{Span, Status, TraceContextExt, TraceId, Tracer}, + propagation::{Extractor, Injector}, + trace::{Span, SpanKind, Status, TraceContextExt, TraceId, Tracer}, Context, KeyValue, }; @@ -18,13 +19,36 @@ impl std::fmt::Debug for ObjectReceiverLogger { } } +struct HeaderExtractor<'a>(pub &'a HashMap); +impl Extractor for HeaderExtractor<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).map(|s| s.as_str()) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(|s| s.as_str()).collect() + } +} + +struct HeaderInjector<'a>(pub &'a mut HashMap); +impl Injector for HeaderInjector<'_> { + fn set(&mut self, key: &str, value: String) { + self.0.insert(key.to_string(), value); + } +} + impl ObjectReceiverLogger { + fn extract_context_from_propagator(req: &HashMap) -> Context { + global::get_text_map_propagator(|propagator| propagator.extract(&HeaderExtractor(req))) + } + pub fn new( endpoint: &UDPEndpoint, tsi: u64, toi: u128, fdt_instance_id: Option, now: SystemTime, + propagator: Option<&HashMap>, ) -> Self { let tracer = global::tracer("FluteLogger"); let name = match toi { @@ -32,39 +56,70 @@ impl ObjectReceiverLogger { _ => "FLUTEObject", }; - let mut span = tracer - .span_builder(name) - .with_trace_id(TraceId::from(endpoint.trace_id( - tsi, - toi, - fdt_instance_id, - now, - ))) - .start(&tracer); + let mut span; + if let Some(propagator) = propagator { + let parent_cx = Self::extract_context_from_propagator(propagator); + span = tracer + .span_builder(name) + .with_kind(SpanKind::Consumer) + .start_with_context(&tracer, &parent_cx) + } else { + span = tracer + .span_builder(name) + .with_trace_id(TraceId::from(endpoint.trace_id( + tsi, + toi, + fdt_instance_id, + now, + ))) + .with_kind(SpanKind::Consumer) + .start(&tracer); + } + + span.set_attribute(KeyValue::new( + opentelemetry_semantic_conventions::attribute::NETWORK_TRANSPORT, + "flute", + )); + + span.set_attribute(KeyValue::new( + opentelemetry_semantic_conventions::attribute::NETWORK_PEER_ADDRESS, + endpoint.destination_group_address.clone(), + )); + + span.set_attribute(KeyValue::new( + opentelemetry_semantic_conventions::attribute::NETWORK_PEER_PORT, + endpoint.port as i64, + )); - span.set_attribute(KeyValue::new("flute.toi", toi.to_string())); - span.set_attribute(KeyValue::new("flute.tsi", tsi.to_string())); - span.set_attribute(KeyValue::new("flute.port", endpoint.port.to_string())); if let Some(source_address) = endpoint.source_address.as_ref() { span.set_attribute(KeyValue::new( - "flute.source_address", - source_address.to_string(), + opentelemetry_semantic_conventions::attribute::NETWORK_LOCAL_ADDRESS, + source_address.clone(), )); } - span.set_attribute(KeyValue::new( - "flute.destination_group_address", - endpoint.destination_group_address.to_string(), - )); - span.add_event("object", vec![KeyValue::new("start", "")]); - let cx = Context::current_with_span(span); + span.set_attribute(KeyValue::new("flute.toi", toi.to_string())); + span.set_attribute(KeyValue::new("flute.tsi", tsi.to_string())); + + let cx = Context::default().with_span(span); Self { cx } } + pub fn get_propagator(&self) -> HashMap { + let propagator = global::get_text_map_propagator(|propagator| { + let mut headers = HashMap::new(); + propagator.inject_context(&self.cx, &mut HeaderInjector(&mut headers)); + headers + }); + + propagator + } + /* pub fn block_span(&mut self) -> BoxedSpan { let tracer = global::tracer("FluteLogger"); tracer.start_with_context("block", &self.cx) } + */ pub fn fdt_attached(&mut self) -> BoxedSpan { let tracer = global::tracer("FluteLogger"); @@ -88,11 +143,7 @@ impl ObjectReceiverLogger { description: std::borrow::Cow::Owned(description.to_string()), }); - span.add_event( - "error", - vec![KeyValue::new("error_description", description.to_string())], - ); - + span.set_attribute(KeyValue::new("error_description", description.to_string())); tracer.start_with_context("error", &self.cx) } } diff --git a/src/receiver/writer/mod.rs b/src/receiver/writer/mod.rs index daab783..a9105fa 100644 --- a/src/receiver/writer/mod.rs +++ b/src/receiver/writer/mod.rs @@ -10,6 +10,7 @@ //! ``` //! +use std::collections::HashMap; use std::time::Duration; use crate::common::udpendpoint::UDPEndpoint; @@ -34,6 +35,8 @@ pub struct ObjectMetadata { pub groups: Option>, /// Object MD5 pub md5: Option, + /// Opentelemetry propagation context + pub optel_propagator: Option>, } /// diff --git a/src/sender/filedesc.rs b/src/sender/filedesc.rs index 4b0cdd7..be7245c 100644 --- a/src/sender/filedesc.rs +++ b/src/sender/filedesc.rs @@ -1,3 +1,5 @@ +use base64::Engine; + use super::objectdesc::{create_fdt_cache_control, ObjectDesc}; use crate::common::oti::SchemeSpecific; use crate::common::{fdtinstance, oti, partition}; @@ -193,6 +195,11 @@ impl FileDesc { _ => self.object.oti.as_ref().map(|oti| oti.get_attributes()), }; + let optel_propagator = self.object.optel_propagator.as_ref().map(|propagator| { + let s = serde_json::to_string(&propagator).unwrap(); + base64::engine::general_purpose::STANDARD.encode(s) + }); + fdtinstance::File { content_location: self.object.content_location.to_string(), toi: self.toi.to_string(), @@ -234,6 +241,7 @@ impl FileDesc { delimiter: Some(0), delimiter2: Some(0), group: None, + optel_propagator, } } } diff --git a/src/sender/objectdesc.rs b/src/sender/objectdesc.rs index 554bc78..7849f44 100644 --- a/src/sender/objectdesc.rs +++ b/src/sender/objectdesc.rs @@ -6,6 +6,7 @@ use crate::common::{fdtinstance, lct, oti}; use crate::error::FluteError; use crate::tools; use crate::tools::error::Result; +use std::collections::HashMap; use std::ffi::OsStr; use std::io::{BufReader, Read}; use std::time::SystemTime; @@ -89,6 +90,8 @@ pub struct ObjectDesc { pub groups: Option>, /// Assign an optional TOI to this object pub toi: Option>, + /// Optional Opentelemetry propagator + pub optel_propagator: Option>, } impl ObjectDesc { @@ -245,6 +248,7 @@ impl ObjectDesc { cache_control, groups, toi: None, + optel_propagator: None, })) } @@ -294,6 +298,7 @@ impl ObjectDesc { cache_control, groups, toi: None, + optel_propagator: None, })) } diff --git a/src/sender/objectsenderlogger.rs b/src/sender/objectsenderlogger.rs index 79da4d1..c46e98f 100644 --- a/src/sender/objectsenderlogger.rs +++ b/src/sender/objectsenderlogger.rs @@ -1,8 +1,9 @@ -use std::time::SystemTime; +use std::{collections::HashMap, time::SystemTime}; use opentelemetry::{ global::{self}, - trace::{Span, TraceContextExt, TraceId, Tracer}, + propagation::Extractor, + trace::{Span, SpanKind, TraceContextExt, TraceId, Tracer}, Context, KeyValue, }; @@ -18,13 +19,29 @@ impl std::fmt::Debug for ObjectSenderLogger { } } +struct HeaderExtractor<'a>(pub &'a HashMap); +impl Extractor for HeaderExtractor<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).map(|s| s.as_str()) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(|s| s.as_str()).collect() + } +} + impl ObjectSenderLogger { + fn extract_context_from_propagator(req: &HashMap) -> Context { + global::get_text_map_propagator(|propagator| propagator.extract(&HeaderExtractor(req))) + } + pub fn new( endpoint: &UDPEndpoint, tsi: u64, toi: u128, fdt_instance_id: Option, now: SystemTime, + propagator: Option<&HashMap>, ) -> Self { let tracer = global::tracer("FluteLogger"); let name = match toi { @@ -32,36 +49,47 @@ impl ObjectSenderLogger { _ => "Object Transfer", }; - let trace_id = endpoint.trace_id(tsi, toi, fdt_instance_id, now); + let mut span; + if let Some(propagator) = propagator { + let parent_cx = Self::extract_context_from_propagator(propagator); + span = tracer + .span_builder(name) + .with_kind(SpanKind::Producer) + .start_with_context(&tracer, &parent_cx) + } else { + let trace_id = endpoint.trace_id(tsi, toi, fdt_instance_id, now); + span = tracer + .span_builder(name) + .with_trace_id(TraceId::from(trace_id)) + .with_kind(SpanKind::Producer) + .start(&tracer); + } - log::info!( - "Create {:?} {} {} {:?} trace_id={:?}", - endpoint, - tsi, - toi, - fdt_instance_id, - trace_id - ); - let mut span = tracer - .span_builder(name) - .with_trace_id(TraceId::from(trace_id)) - .start(&tracer); + span.set_attribute(KeyValue::new( + opentelemetry_semantic_conventions::attribute::NETWORK_TRANSPORT, + "flute", + )); + + span.set_attribute(KeyValue::new( + opentelemetry_semantic_conventions::attribute::NETWORK_PEER_ADDRESS, + endpoint.destination_group_address.clone(), + )); + + span.set_attribute(KeyValue::new( + opentelemetry_semantic_conventions::attribute::NETWORK_PEER_PORT, + endpoint.port as i64, + )); - span.set_attribute(KeyValue::new("flute.toi", toi.to_string())); - span.set_attribute(KeyValue::new("flute.tsi", tsi.to_string())); - span.set_attribute(KeyValue::new("flute.port", endpoint.port.to_string())); if let Some(source_address) = endpoint.source_address.as_ref() { span.set_attribute(KeyValue::new( - "flute.source_address", - source_address.to_string(), + opentelemetry_semantic_conventions::attribute::NETWORK_LOCAL_ADDRESS, + source_address.clone(), )); } - span.set_attribute(KeyValue::new( - "flute.destination_group_address", - endpoint.destination_group_address.to_string(), - )); - span.add_event("object", vec![KeyValue::new("start", "")]); + span.set_attribute(KeyValue::new("flute.toi", toi.to_string())); + span.set_attribute(KeyValue::new("flute.tsi", tsi.to_string())); + let cx = Context::current_with_span(span); Self { _cx: cx } } diff --git a/src/sender/sendersession.rs b/src/sender/sendersession.rs index 4363b8a..3723352 100644 --- a/src/sender/sendersession.rs +++ b/src/sender/sendersession.rs @@ -104,6 +104,7 @@ impl SenderSession { file.toi, file.fdt_id, now, + file.object.optel_propagator.as_ref(), )); } }