-
Hello there, I've a shared library that is used by all the workers in the system I'm developing. And I want to implement opentelemetry for this system. here is the library code: // telemetry.rs
use opentelemetry::KeyValue;
use opentelemetry_otlp::{TonicExporterBuilder, WithExportConfig};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_sdk::resource::Resource;
use opentelemetry_sdk::runtime;
use std::time::Duration;
use tracing::level_filters::LevelFilter;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{EnvFilter, Layer, Registry};
fn resources(service_namespace: &str, service_name: &str, service_version: &str) -> Resource {
Resource::new(vec![
KeyValue::new("key", "value"),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
service_name.to_string(),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAMESPACE,
service_namespace.to_string(),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
service_version.to_string(),
),
// KeyValue::new(
// "env",
// "production",
// ),
])
}
fn exporter_builder() -> TonicExporterBuilder {
let endpoint = std::env::var("otel_export_endpoint").unwrap_or_else(|e| {
eprintln!("Failed to get otel_export_endpoint: {e:?}");
"http://localhost:4317".to_string()
});
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint)
// .with_metadata() can be used to pass org id to tempo maybe???
.with_timeout(Duration::from_secs(3))
}
fn init_tracer(
resource: Resource,
) -> anyhow::Result<(OpenTelemetryLayer<Registry, opentelemetry_sdk::trace::Tracer>)> {
opentelemetry::global::set_text_map_propagator(
opentelemetry_sdk::propagation::TraceContextPropagator::new(),
);
let config = opentelemetry_sdk::trace::config().with_resource(resource);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(config)
.with_exporter(exporter_builder())
.install_batch(runtime::Tokio)?;
// Create a tracing layer with the configured tracer
let layer: OpenTelemetryLayer<Registry, _> = tracing_opentelemetry::layer().with_tracer(tracer);
Ok(layer)
}
fn init_metrics(resource: Resource) -> crate::anyhow::Result<SdkMeterProvider> {
let provider = opentelemetry_otlp::new_pipeline()
.metrics(runtime::Tokio)
.with_exporter(exporter_builder())
.with_resource(resource)
.build()?;
// let provider = meter.map_err(|e| anyhow!("{e:?}"))?;
// opentelemetry::global::set_meter_provider(provider);
Ok(provider)
}
fn filter(crate_name: &str) -> crate::anyhow::Result<EnvFilter> {
Ok(EnvFilter::builder()
// .with_default_directive(LevelFilter::DEBUG.into())
.with_default_directive(LevelFilter::TRACE.into())
.from_env_lossy()
.add_directive("h2=info".parse()?)
.add_directive("tower=info".parse()?)
.add_directive("tokio=info".parse()?)
.add_directive("tokio_util=info".parse()?)
.add_directive("lapin=info".parse()?)
.add_directive("hyper=info".parse()?)
.add_directive(format!("{crate_name}=trace").parse()?))
}
pub fn init(
service_namespace: &str,
service_name: &str,
service_version: &str,
) -> anyhow::Result<(SdkMeterProvider)> {
opentelemetry::global::set_error_handler(|e| eprintln!("ERROR IN OPEN TELEMETRY: {e:?}"))?;
let tracing_layer = init_tracer(resources(service_namespace, service_name, service_version))?;
let meter_provider = init_metrics(resources(service_namespace, service_name, service_version))?;
// Tracing library
let subscriber = Registry::default()
.with(tracing_layer.with_filter(filter(env!("CARGO_PKG_NAME"))?))
.with(tracing_subscriber::fmt::layer().with_filter(filter(env!("CARGO_PKG_NAME"))?));
tracing::subscriber::set_global_default(subscriber)?;
Ok(meter_provider)
}
pub fn shut_down() -> anyhow::Result<()>{
opentelemetry::global::shutdown_tracer_provider();
opentelemetry::global::shutdown_logger_provider();
Ok(())
} Here is an example of me calling the code: use shared::{tracing, anyhow, tokio};
use tracing::{error, info, Level, span, warn};
mod pipedrive;
fn main() {
println!("Hello, world!");
if let Err(e) = run() {
error!("Failed to run program: {e}")
}
}
#[tokio::main]
async fn run() -> anyhow::Result<()> {
shared::utils::setup_env()?;
let _meter =
shared::telemetry::init("workers", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))?;
info!("Running CRM worker v{}", env!("CARGO_PKG_VERSION"));
let span = span!(
Level::INFO,
"crm_worker_startup",
);
let _guard = span.enter();
info!("Working inside the span");
warn!("Working inside the span");
drop(_guard);
shared::telemetry::shut_down()?;
Ok(())
} However no traces are pushed anywhere and there is no logs from the h2 library indicating work being done by opentelemetry Versions of the libraries: opentelemetry = { version = "0.22.0", features = ["trace", "metrics"] }
opentelemetry_sdk = { version = "0.22.0", features = ["trace", "rt-tokio"] }
opentelemetry-otlp = { version = "0.15.0", features = ["metrics", "trace", "logs"] }
opentelemetry-semantic-conventions = "0.14.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = [
"chrono",
"env-filter",
"parking_lot",
] }
tracing-opentelemetry = { version = "0.23.0" }
opentelemetry-appender-tracing = "0.3.0" |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 1 reply
-
If I pass the tracer provider around it kind of works... // telemetry.rs
use opentelemetry::KeyValue;
use opentelemetry_otlp::{TonicExporterBuilder, WithExportConfig};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_sdk::resource::Resource;
use opentelemetry_sdk::runtime;
use std::time::Duration;
use opentelemetry::trace::TracerProvider;
use tracing::level_filters::LevelFilter;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{EnvFilter, Layer, Registry};
fn resources(service_namespace: &str, service_name: &str, service_version: &str) -> Resource {
Resource::new(vec![
KeyValue::new("key", "value"),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
service_name.to_string(),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAMESPACE,
service_namespace.to_string(),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
service_version.to_string(),
),
// KeyValue::new(
// "env",
// "production",
// ),
])
}
fn exporter_builder() -> TonicExporterBuilder {
let endpoint = std::env::var("otel_export_endpoint").unwrap_or_else(|e| {
eprintln!("Failed to get otel_export_endpoint: {e:?}");
"http://localhost:4317".to_string()
});
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint)
// .with_metadata() can be used to pass org id to tempo maybe???
.with_timeout(Duration::from_secs(3))
}
fn init_tracer(
resource: Resource,
// ) -> anyhow::Result<(OpenTelemetryLayer<Registry, opentelemetry_sdk::trace::Tracer>)> {
) -> anyhow::Result<( OpenTelemetryLayer<Registry, opentelemetry_sdk::trace::Tracer>, opentelemetry_sdk::trace::TracerProvider)> {
opentelemetry::global::set_text_map_propagator(
opentelemetry_sdk::propagation::TraceContextPropagator::new(),
);
let config = opentelemetry_sdk::trace::config().with_resource(resource);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(config)
.with_exporter(exporter_builder())
.install_batch(runtime::Tokio)?;
let provider = tracer.provider().unwrap();
// opentelemetry::global::set_tracer_provider(provider);
// Create a tracing layer with the configured tracer
let layer: OpenTelemetryLayer<Registry, _> = tracing_opentelemetry::layer().with_tracer(tracer);
Ok((layer, provider))
}
fn init_metrics(resource: Resource) -> crate::anyhow::Result<SdkMeterProvider> {
let provider = opentelemetry_otlp::new_pipeline()
.metrics(runtime::Tokio)
.with_exporter(exporter_builder())
.with_resource(resource)
.build()?;
// let provider = meter.map_err(|e| anyhow!("{e:?}"))?;
// opentelemetry::global::set_meter_provider(provider);
Ok(provider)
}
fn filter(crate_name: &str) -> crate::anyhow::Result<EnvFilter> {
Ok(EnvFilter::builder()
// .with_default_directive(LevelFilter::DEBUG.into())
.with_default_directive(LevelFilter::TRACE.into())
.from_env_lossy()
.add_directive("h2=info".parse()?)
.add_directive("tower=info".parse()?)
.add_directive("tokio=info".parse()?)
.add_directive("tokio_util=info".parse()?)
.add_directive("lapin=info".parse()?)
.add_directive("hyper=info".parse()?)
.add_directive(format!("{crate_name}=trace").parse()?))
}
pub fn init(
service_namespace: &str,
service_name: &str,
service_version: &str,
) -> anyhow::Result<(SdkMeterProvider, opentelemetry_sdk::trace::TracerProvider)> {
opentelemetry::global::set_error_handler(|e| eprintln!("ERROR IN OPEN TELEMETRY: {e:?}"))?;
let (tracing_layer, trace_provider) = init_tracer(resources(service_namespace, service_name, service_version))?;
let meter_provider = init_metrics(resources(service_namespace, service_name, service_version))?;
// Tracing library
let subscriber = Registry::default()
.with(tracing_layer.with_filter(filter(env!("CARGO_PKG_NAME"))?))
.with(tracing_subscriber::fmt::layer().with_filter(filter(env!("CARGO_PKG_NAME"))?));
tracing::subscriber::set_global_default(subscriber)?;
Ok((meter_provider, trace_provider))
}
pub fn shut_down() -> anyhow::Result<()>{
opentelemetry::global::shutdown_tracer_provider();
opentelemetry::global::shutdown_logger_provider();
Ok(())
} here is an example of calling it: use shared::{tracing, anyhow, tokio};
use tracing::{error, info, Level, span, warn};
mod pipedrive;
fn main() {
println!("Hello, world!");
if let Err(e) = run() {
error!("Failed to run program: {e}")
}
}
#[tokio::main]
async fn run() -> anyhow::Result<()> {
shared::utils::setup_env()?;
let (_meter, _tracer) =
shared::telemetry::init("workers", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))?;
info!("Running CRM worker v{}", env!("CARGO_PKG_VERSION"));
let span = span!(
Level::INFO,
"crm_worker_startup",
);
let _guard = span.enter();
info!("Working inside the span");
warn!("Working inside the span");
drop(_guard);
shared::telemetry::shut_down()?;
info!("Done");
Ok(())
} |
Beta Was this translation helpful? Give feedback.
-
I also noticed that the traces are never pushed until the shutdown function is called. :| For more information. I have a function code that looks like this and uses lapin to connect to rabbitmq for queue consumption, however the span crm_worker_startup only gets pushed to otel collector when the program reaches the my shutdown function. On top of that, I tried the install_simple for the tracer pipeline without sucess as it also only pushes traces when the program exits. Is this intended behaviour? this is some code to help visualize it: use shared::{anyhow, tokio, tracing};
use tracing::{error, info, span, warn, Level};
use shared::anyhow::Context;
use shared::futures::StreamExt;
use protos::workers::v1::crm::WorkerInput;
mod pipedrive;
mod protos;
fn main() {
println!("Hello, world!");
if let Err(e) = run() {
error!("Failed to run program: {e}")
}
}
#[tokio::main]
async fn run() -> anyhow::Result<()> {
shared::utils::setup_env()?;
let (_meter, _tracer) =
shared::telemetry::init("workers", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))?;
info!("Running CRM worker v{}", env!("CARGO_PKG_VERSION"));
let span = span!(Level::INFO, "crm_worker_startup");
let _guard = span.enter();
info!("Working inside the span");
warn!("Working inside the span");
let worker_name = "crm";
let (ch, mut con) = shared::queue::consume(
&format!("actor-{worker_name}"),
worker_name,
1,
&format!("consumer-{worker_name}"),
)
.await?;
drop(_guard);
_tracer.force_flush();
while let Some(delivery) = con.next().await {
let mut delivery = delivery.context("Failed to get delivery")?;
let msg: WorkerInput = shared::decoder::decode_complex(&mut delivery)?;
for msg in msg.activity_methods {
}
// todo: remove this
break;
}
info!("Done");
shared::telemetry::shut_down().expect("Failed to shutdown tracing");
Ok(())
} |
Beta Was this translation helpful? Give feedback.
-
For anybody suffering from the same problem I was (I'll admit, I'm working on this since Friday, and I've expressed my frustration on this post). here is my final config that works as expected (only for traces): use anyhow::Context;
use opentelemetry::trace::TracerProvider;
use opentelemetry::KeyValue;
use opentelemetry_otlp::{Protocol, TonicExporterBuilder, WithExportConfig};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_sdk::resource::Resource;
use opentelemetry_sdk::runtime;
use opentelemetry_sdk::trace::{BatchConfig, RandomIdGenerator, Sampler};
use std::time::Duration;
use tracing::instrument::WithSubscriber;
use tracing::level_filters::LevelFilter;
use tracing::Level;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{layer, EnvFilter, Layer, Registry};
fn resources(service_namespace: &str, service_name: &str, service_version: &str) -> Resource {
Resource::new(vec![
KeyValue::new("key", "value"),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
service_name.to_string(),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAMESPACE,
service_namespace.to_string(),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
service_version.to_string(),
),
// KeyValue::new(
// "env",
// "production",
// ),
])
}
fn exporter_builder() -> TonicExporterBuilder {
let endpoint = std::env::var("otel_export_endpoint").unwrap_or_else(|e| {
eprintln!("Failed to get otel_export_endpoint: {e:?}");
"http://localhost:4317".to_string()
});
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint)
}
fn init_tracer(resource: Resource) -> anyhow::Result<opentelemetry_sdk::trace::Tracer> {
Ok(opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
opentelemetry_sdk::trace::Config::default()
.with_resource(resource),
)
.with_batch_config(BatchConfig::default())
.with_exporter(exporter_builder())
.install_batch(runtime::Tokio)?)
}
pub fn init(
service_namespace: &str,
service_name: &str,
service_version: &str,
) -> anyhow::Result<()> {
opentelemetry::global::set_error_handler(|e| eprintln!("ERROR IN OPEN TELEMETRY: {e:?}"))?;
let tracer = init_tracer(resources(service_namespace, service_name, service_version))?;
tracing_subscriber::registry()
.with(LevelFilter::from_level(Level::DEBUG))
.with(tracing_subscriber::fmt::layer())
// .with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer))
.init();
Ok(())
}
pub fn shut_down() {
opentelemetry::global::shutdown_tracer_provider();
} then just call init and shutdown and use tracing as normal. |
Beta Was this translation helpful? Give feedback.
For anybody suffering from the same problem I was (I'll admit, I'm working on this since Friday, and I've expressed my frustration on this post).
here is my final config that works as expected (only for traces):
telemetry.rs: