Skip to content

Commit

Permalink
Improve Opentelemetry support
Browse files Browse the repository at this point in the history
  • Loading branch information
ypo committed Sep 20, 2024
1 parent e3c9544 commit 63abf5b
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 121 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ path = "src/lib.rs"
log = "0.4"
chrono = "0.4.35"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
quick-xml = { version = "0.36.0", features = ["serialize"] }
base64 = "0.22"
url = "2.5.0"
Expand All @@ -32,7 +33,8 @@ pyo3 = { version = "0.20", features = ["extension-module"], optional = true }
pyo3-log = { version = "0.9", optional = true }
raptorq = "2.0"
raptor-code = "1.0.6"
opentelemetry = { version = "0.22", optional = true }
opentelemetry = { version = "0.25", optional = true }
opentelemetry-semantic-conventions = { version = "0.25" , optional = true }
rand = "0.8"

[dev-dependencies]
Expand All @@ -41,4 +43,4 @@ tempfile = "3.10.1"

[features]
python = ["pyo3", "pyo3-log"]
optel = ["opentelemetry"]
optel = ["opentelemetry", "opentelemetry-semantic-conventions"]
19 changes: 18 additions & 1 deletion src/common/fdtinstance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::tools::{
self,
error::{FluteError, Result},
};

use quick_xml::de::from_reader;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -391,6 +392,9 @@ pub struct File {
)]
#[serde(alias = "@IndependentUnitPositions")]
pub independent_unit_positions: Option<String>,

#[serde(alias = "@X-Optel-Propagator", skip_serializing_if = "Option::is_none")]
pub optel_propagator: Option<String>,
}

fn reed_solomon_scheme_specific(
Expand Down Expand Up @@ -433,7 +437,7 @@ impl FdtInstance {
let tracer = opentelemetry::global::tracer("FdtInstance");
let mut span = tracer.start("FdtInstance");
let str = String::from_utf8_lossy(buffer);
span.add_event("fdt", vec![KeyValue::new("content", str.to_string())]);
span.set_attribute(KeyValue::new("content", str.to_string()));
span
}

Expand Down Expand Up @@ -599,4 +603,17 @@ impl File {
inband_fti: false,
})
}

#[cfg(feature = "opentelemetry")]
pub fn get_optel_propagator(&self) -> Option<std::collections::HashMap<String, String>> {
use base64::Engine;

self.optel_propagator.as_ref().and_then(|propagator| {
let decoded = base64::engine::general_purpose::STANDARD
.decode(propagator)
.ok()?;
let decoded = String::from_utf8_lossy(&decoded);
serde_json::from_str(&decoded).ok()
})
}
}
34 changes: 0 additions & 34 deletions src/receiver/blockdecoder.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
#[cfg(feature = "opentelemetry")]
use opentelemetry::{
trace::{Span, TraceContextExt},
KeyValue,
};

use crate::common::{
alc,
oti::{self, SchemeSpecific},
Expand All @@ -21,8 +15,6 @@ pub struct BlockDecoder {
pub initialized: bool,
pub block_size: usize,
decoder: Option<Box<dyn FecDecoder>>,
#[cfg(feature = "opentelemetry")]
telemetry_ctx: Option<opentelemetry::Context>,
}

impl BlockDecoder {
Expand All @@ -32,30 +24,9 @@ impl BlockDecoder {
initialized: false,
decoder: None,
block_size: 0,
#[cfg(feature = "opentelemetry")]
telemetry_ctx: None,
}
}

#[cfg(feature = "opentelemetry")]
pub fn op_init(
&mut self,
mut telemetry_span: opentelemetry::global::BoxedSpan,
nb_source_symbols: u32,
block_size: usize,
sbn: u32,
) {
telemetry_span.add_event(
"block",
vec![
KeyValue::new("sbn", sbn as i64),
KeyValue::new("block_size", block_size as i64),
KeyValue::new("nb_source_symbols", nb_source_symbols as i64),
],
);
self.telemetry_ctx = Some(opentelemetry::Context::current_with_span(telemetry_span));
}

pub fn init(
&mut self,
oti: &oti::Oti,
Expand Down Expand Up @@ -139,11 +110,6 @@ impl BlockDecoder {
return;
}

/*
let tracer = opentelemetry::global::tracer("ObjectReceiverLogger");
let _pkt_span = tracer.start_with_context("pkt", self.telemetry_ctx.as_ref().unwrap());
*/

let payload = &pkt.data[pkt.data_payload_offset..];
let decoder = self.decoder.as_mut().unwrap();
decoder.push_symbol(payload, payload_id.esi);
Expand Down
97 changes: 65 additions & 32 deletions src/receiver/objectreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ pub struct ObjectReceiver {
pub content_location: Option<url::Url>,
nb_allocated_blocks: usize,
total_allocated_blocks_size: usize,

#[cfg(feature = "opentelemetry")]
logger: ObjectReceiverLogger,
logger: Option<ObjectReceiverLogger>,
}

impl ObjectReceiver {
Expand Down Expand Up @@ -108,7 +107,7 @@ impl ObjectReceiver {
nb_allocated_blocks: 0,
total_allocated_blocks_size: 0,
#[cfg(feature = "opentelemetry")]
logger: ObjectReceiverLogger::new(endpoint, tsi, toi.clone(), _fdt_instance_id, _now),
logger: None,
}
}

Expand All @@ -132,20 +131,20 @@ impl ObjectReceiver {
self.last_activity = Instant::now();
self.set_fdt_id_from_pkt(pkt);
self.set_cenc_from_pkt(pkt);
self.set_oti_from_pkt(pkt);
self.set_oti_from_pkt(pkt, now);

self.init_blocks_partitioning();
self.init_object_writer(now);
self.push_from_cache(now);

if self.oti.is_none() {
self.cache(pkt)
.unwrap_or_else(|_| self.error("Fail to push pkt to cache"));
.unwrap_or_else(|_| self.error("Fail to push pkt to cache", now));
return;
}

self.push_to_block(pkt, now)
.unwrap_or_else(|_| self.error("Fail to push pkt to block"));
.unwrap_or_else(|_| self.error("Fail to push pkt to block", now));
}

fn push_to_block(&mut self, pkt: &alc::AlcPkt, now: std::time::SystemTime) -> Result<()> {
Expand Down Expand Up @@ -234,14 +233,6 @@ impl ObjectReceiver {
}
self.nb_allocated_blocks += 1;
self.total_allocated_blocks_size += block_length;

#[cfg(feature = "opentelemetry")]
block.op_init(
self.logger.block_span(),
source_block_length,
block_length,
payload_id.sbn,
);
}

block.push(pkt, &payload_id);
Expand All @@ -253,6 +244,26 @@ impl ObjectReceiver {
Ok(())
}

#[cfg(feature = "opentelemetry")]
fn init_logger(
&mut self,
propagator: Option<&std::collections::HashMap<String, String>>,
now: std::time::SystemTime,
) {
if self.logger.is_some() {
return;
}

self.logger = Some(ObjectReceiverLogger::new(
&self.endpoint,
self.tsi,
self.toi,
self.fdt_instance_id,
now,
propagator,
))
}

pub fn attach_fdt(
&mut self,
fdt_instance_id: u32,
Expand All @@ -270,6 +281,12 @@ impl ObjectReceiver {
None => return false,
};

#[cfg(feature = "opentelemetry")]
if self.logger.is_none() {
let propagator = file.get_optel_propagator();
self.init_logger(propagator.as_ref(), now);
}

if self.cenc.is_none() {
self.cenc = match &file.content_encoding {
Some(str) => Some(str.as_str().try_into().unwrap_or(lct::Cenc::Null)),
Expand Down Expand Up @@ -297,10 +314,13 @@ impl ObjectReceiver {
"Fail to parse content-location {} to URL",
file.content_location
);
self.error(&format!(
"Fail to parse content-location {} to URL",
file.content_location
));
self.error(
&format!(
"Fail to parse content-location {} to URL",
file.content_location
),
now,
);
return false;
}
}
Expand All @@ -317,7 +337,7 @@ impl ObjectReceiver {
}

#[cfg(feature = "opentelemetry")]
let _span = self.logger.fdt_attached();
let _span = self.logger.as_mut().map(|l| l.fdt_attached());

if self.enable_md5_check {
self.content_md5 = file.content_md5.clone();
Expand All @@ -343,13 +363,17 @@ impl ObjectReceiver {
false => Some(groups),
},
md5: self.content_md5.clone(),
#[cfg(feature = "opentelemetry")]
optel_propagator: self.logger.as_ref().map(|l| l.get_propagator()),
#[cfg(not(feature = "opentelemetry"))]
optel_propagator: None,
});

self.init_blocks_partitioning();
self.init_object_writer(now);
self.push_from_cache(now);
self.write_blocks(0, now)
.unwrap_or_else(|_| self.error("Fail to write blocks to storage"));
.unwrap_or_else(|_| self.error("Fail to write blocks to storage", now));
true
}

Expand Down Expand Up @@ -387,7 +411,7 @@ impl ObjectReceiver {

if object_writer.writer.open().is_err() {
log::error!("Fail to open destination for {:?}", self.meta);
self.error("Fail to create destination on storage");
self.error("Fail to create destination on storage", now);
return;
};

Expand Down Expand Up @@ -457,10 +481,13 @@ impl ObjectReceiver {
self.content_location
);

self.error(&format!(
"MD5 does not match expects {:?} received {:?}",
self.content_md5, &md5
));
self.error(
&format!(
"MD5 does not match expects {:?} received {:?}",
self.content_md5, &md5
),
now,
);
}
break;
}
Expand All @@ -470,7 +497,7 @@ impl ObjectReceiver {

fn complete(&mut self, _now: std::time::SystemTime) {
#[cfg(feature = "opentelemetry")]
let _span = self.logger.complete();
let _span = self.logger.as_mut().map(|l| l.complete());

self.state = State::Completed;

Expand All @@ -485,9 +512,12 @@ impl ObjectReceiver {
self.cache_size = 0;
}

fn error(&mut self, _description: &str) {
fn error(&mut self, _description: &str, _now: std::time::SystemTime) {
#[cfg(feature = "opentelemetry")]
let _span = self.logger.error(_description);
self.init_logger(None, _now);

#[cfg(feature = "opentelemetry")]
let _span = self.logger.as_mut().map(|l| l.error(_description));

self.state = State::Error;

Expand All @@ -509,7 +539,7 @@ impl ObjectReceiver {
while let Some(item) = self.cache.pop() {
let pkt = item.to_pkt();
if self.push_to_block(&pkt, now).is_err() {
self.error("Fail to push block");
self.error("Fail to push block", now);
break;
}
}
Expand All @@ -536,7 +566,7 @@ impl ObjectReceiver {
self.fdt_instance_id = pkt.fdt_info.as_ref().map(|info| info.fdt_instance_id);
}

fn set_oti_from_pkt(&mut self, pkt: &alc::AlcPkt) {
fn set_oti_from_pkt(&mut self, pkt: &alc::AlcPkt, now: std::time::SystemTime) {
if self.oti.is_some() {
return;
}
Expand All @@ -551,7 +581,7 @@ impl ObjectReceiver {

if pkt.transfer_length.is_none() {
log::warn!("Bug? Pkt contains OTI without transfer length");
self.error("Bug? Pkt contains OTI without transfer length");
self.error("Bug? Pkt contains OTI without transfer length", now);
return;
}

Expand Down Expand Up @@ -648,7 +678,10 @@ impl Drop for ObjectReceiver {
self.endpoint,
self.content_location.as_ref().map(|u| u.to_string())
);
self.error("Drop object in open state, pkt missing ?");
self.error(
"Drop object in open state, pkt missing ?",
SystemTime::now(),
);
} else if object_writer.state == ObjectWriterSessionState::Error {
log::error!(
"Drop object received with state {:?} TOI={} Endpoint={:?} Content-Location={:?}",
Expand Down
Loading

0 comments on commit 63abf5b

Please sign in to comment.