Skip to content

Commit

Permalink
feat(tracing): open-telemetry traces (#1039)
Browse files Browse the repository at this point in the history
  • Loading branch information
seriousben authored Dec 7, 2024
1 parent f485d03 commit 7e188a6
Show file tree
Hide file tree
Showing 13 changed files with 378 additions and 160 deletions.
285 changes: 203 additions & 82 deletions server/Cargo.lock

Large diffs are not rendered by default.

16 changes: 10 additions & 6 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ pin-project = "1.1.7"
ciborium = "0.2.2"
uuid = { version = "1.11.0", features = ["v4"] }
url = "2.5.4"
opentelemetry = { version="0.24.0", features = ["metrics"] }
opentelemetry_sdk = { version = "0.24.0", features = ["metrics"] }
opentelemetry-prometheus = {version = "0.17.0"}
opentelemetry = { version="0.24", features = ["metrics", "trace"] }
opentelemetry_sdk = { version = "0.24.1", features = ["rt-tokio", "metrics", "trace"] }
opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "trace"] }
opentelemetry-prometheus = { version = "0.17" }
prometheus = {version = "0.13.4"}
axum-otel-metrics = { version = "0.8.1"}
axum-otel-metrics = { version = "0.9.0-alpha.2" }
tracing-opentelemetry = "0.25"

[dependencies]
async-stream = {workspace = true}
Expand Down Expand Up @@ -106,15 +108,17 @@ hyper = {workspace=true}
url = {workspace=true}
opentelemetry = {workspace=true}
opentelemetry-prometheus = {workspace=true}
opentelemetry_sdk = { workspace=true }
opentelemetry_sdk.workspace = true
prometheus = {workspace=true}
axum-otel-metrics = { workspace=true }
metrics = {workspace=true}
tracing-opentelemetry = {workspace=true}
opentelemetry-otlp = {workspace=true}
axum-tracing-opentelemetry = { version = "0.19.0", features = ["tracing_level_info"] }

[dev-dependencies]
tempfile = { workspace = true }


[build-dependencies]
# All features enabled
vergen = { version = "9.0.2", features = [
Expand Down
13 changes: 6 additions & 7 deletions server/data_model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@ mod tests {
}

// Check function pattern
fn check_compute_parent<F>(node: &str, expected_parents: Vec<&str>, configure_graph: F)
fn check_compute_parent<F>(node: &str, mut expected_parents: Vec<&str>, configure_graph: F)
where
F: FnOnce(&mut ComputeGraph),
{
Expand Down Expand Up @@ -1172,12 +1172,11 @@ mod tests {
let mut graph = create_test_graph();
configure_graph(&mut graph);

assert_eq!(
graph.get_compute_parent_nodes(node).sort(),
expected_parents.clone().sort(),
"Failed for node: {}",
node
);
let mut parent_nodes = graph.get_compute_parent_nodes(node);
parent_nodes.sort();
expected_parents.sort();

assert_eq!(parent_nodes, expected_parents, "Failed for node: {}", node);
}

#[test]
Expand Down
6 changes: 5 additions & 1 deletion server/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ pub fn init_provider() -> prometheus::Registry {
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build();
let mut provider = SdkMeterProvider::builder();
let mut provider =
SdkMeterProvider::builder().with_resource(opentelemetry_sdk::Resource::new(vec![
opentelemetry::KeyValue::new("service.name", "indexify-server"),
opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
]));

let low_latency_boundaries = &[
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0,
Expand Down
19 changes: 17 additions & 2 deletions server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,39 @@ use std::{env, fmt::Debug, net::SocketAddr};
use anyhow::Result;
use blob_store::BlobStorageConfig;
use figment::{
providers::{Format, Yaml},
providers::{Format, Serialized, Yaml},
Figment,
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
pub dev: bool,
pub state_store_path: String,
pub listen_addr: String,
pub blob_storage: BlobStorageConfig,
pub tracing: TracingConfig,
}

impl Default for ServerConfig {
fn default() -> Self {
let state_store_path = env::current_dir().unwrap().join("indexify_storage/state");
ServerConfig {
dev: false,
state_store_path: state_store_path.to_str().unwrap().to_string(),
listen_addr: "0.0.0.0:8900".to_string(),
blob_storage: Default::default(),
tracing: TracingConfig::default(),
}
}
}

impl ServerConfig {
pub fn from_path(path: &str) -> Result<ServerConfig> {
let config_str = std::fs::read_to_string(path)?;
let config: ServerConfig = Figment::new().merge(Yaml::string(&config_str)).extract()?;
let config: ServerConfig = Figment::from(Serialized::defaults(ServerConfig::default()))
.merge(Yaml::string(&config_str))
.extract()?;
config.validate()?;
Ok(config)
}
Expand All @@ -44,3 +50,12 @@ impl ServerConfig {
Ok(())
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TracingConfig {
// Enable tracing.
pub enabled: bool,
// OpenTelemetry collector grpc endpoint. Defaults to using OTEL_EXPORTER_OTLP_ENDPOINT env var
// or to localhost:4317 if empty.
pub endpoint: Option<String>,
}
123 changes: 97 additions & 26 deletions server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use std::path::PathBuf;
use std::{env, path::PathBuf};

use anyhow::Result;
use clap::Parser;
use config::ServerConfig;
use opentelemetry::{global, trace::TracerProvider};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::Sampler};
use service::Service;
use tracing::error;
use tracing_subscriber::{
Expand All @@ -9,7 +14,7 @@ use tracing_subscriber::{
format::{Format, JsonFields},
},
layer::SubscriberExt,
util::SubscriberInitExt,
Layer,
};

mod config;
Expand All @@ -30,44 +35,107 @@ struct Cli {
config: Option<PathBuf>,
}

fn setup_tracing(structured_logging: bool) {
fn get_env_filter() -> tracing_subscriber::EnvFilter {
// RUST_LOG used to control logging level.
let env_filter_layer =
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
tracing_subscriber::EnvFilter::default()
.add_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
});
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
tracing_subscriber::EnvFilter::default()
.add_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
})
}

fn get_log_layer<S>(structured_logging: bool) -> Box<dyn Layer<S> + Send + Sync + 'static>
where
S: for<'a> tracing_subscriber::registry::LookupSpan<'a>,
S: tracing::Subscriber,
{
// Create an OTLP pipeline exporter for a `trace_demo` service.
if structured_logging {
let log_layer = fmt::layer()
.event_format(
Format::default()
.json()
.with_span_list(false)
.flatten_event(true),
)
.fmt_fields(JsonFields::default());
tracing_subscriber::registry()
.with(env_filter_layer)
.with(log_layer)
.init();
return;
return Box::new(
fmt::layer()
.event_format(
Format::default()
.json()
.with_span_list(false)
.flatten_event(true),
)
.fmt_fields(JsonFields::default()),
);
}

tracing_subscriber::registry()
Box::new(tracing_subscriber::fmt::layer().compact())
}

fn setup_tracing(config: ServerConfig) -> Result<()> {
let structured_logging = !config.dev;
let env_filter_layer = get_env_filter();
let log_layer = get_log_layer(structured_logging);
let subscriber = tracing_subscriber::Registry::default()
.with(env_filter_layer)
.with(tracing_subscriber::fmt::layer().compact())
.init();
.with(log_layer);

if !config.tracing.enabled {
if let Err(e) = tracing::subscriber::set_global_default(subscriber) {
error!("logger was already initiated, continuing: {:?}", e);
}
return Ok(());
}

let mut span_exporter: Option<opentelemetry_otlp::TonicExporterBuilder> = None;
// If endpoint is configured use it, otherwise use the otlp defaults.
if let Some(endpoint) = config.tracing.endpoint.clone() {
span_exporter.replace(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint),
);
}
let span_exporter = span_exporter.unwrap_or(opentelemetry_otlp::new_exporter().tonic());

let tracer_provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
opentelemetry_sdk::trace::Config::default()
.with_sampler(Sampler::AlwaysOn)
.with_resource(opentelemetry_sdk::Resource::new(vec![
opentelemetry::KeyValue::new("service.name", "indexify-server"),
opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
])),
)
.with_exporter(span_exporter)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;

global::set_tracer_provider(tracer_provider.clone());
let tracer = tracer_provider.tracer("tracing-otel-subscriber");

// Create a layer with the configured tracer
let otel_layer = tracing_opentelemetry::layer()
.with_error_records_to_exceptions(true)
.with_tracer(tracer);
global::set_tracer_provider(tracer_provider.clone());

opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
tracing::subscriber::set_global_default(subscriber.with(otel_layer))?;

Ok(())
}

#[tokio::main]
async fn main() {
let cli = Cli::parse();
let config = match cli.config {
let mut config = match cli.config {
Some(path) => config::ServerConfig::from_path(path.to_str().unwrap()).unwrap(),
None => config::ServerConfig::default(),
};
setup_tracing(!cli.dev);

// Override config with cli arguments.
if cli.dev {
config.dev = true;
}

if let Err(err) = setup_tracing(config.clone()) {
error!("Error setting up tracing: {:?}", err);
return;
}

let service = Service::new(config).await;
if let Err(err) = service {
Expand All @@ -77,4 +145,7 @@ async fn main() {
if let Err(err) = service.unwrap().start().await {
error!("Error starting service: {:?}", err);
}

// export traces before shutdown
opentelemetry::global::shutdown_tracer_provider();
}
38 changes: 9 additions & 29 deletions server/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,7 @@ use std::{sync::Arc, time::Duration};
use anyhow::{anyhow, Result};
use axum::{
body::Body,
extract::{
DefaultBodyLimit,
MatchedPath,
Multipart,
Path,
Query,
RawPathParams,
Request,
State,
},
extract::{DefaultBodyLimit, Multipart, Path, Query, RawPathParams, Request, State},
http::{Method, Response},
middleware::{self, Next},
response::{sse::Event, Html, IntoResponse},
Expand All @@ -21,6 +12,10 @@ use axum::{
Router,
};
use axum_otel_metrics::HttpMetricsLayerBuilder;
use axum_tracing_opentelemetry::{
self,
middleware::{OtelAxumLayer, OtelInResponseLayer},
};
use blob_store::PutResult;
use data_model::ExecutorId;
use futures::StreamExt;
Expand All @@ -42,11 +37,8 @@ use state_store::{
},
IndexifyState,
};
use tower_http::{
cors::{Any, CorsLayer},
trace::TraceLayer,
};
use tracing::{error, info, info_span};
use tower_http::cors::{Any, CorsLayer};
use tracing::{error, info};
use utoipa::{OpenApi, ToSchema};
use utoipa_swagger_ui::SwaggerUi;

Expand Down Expand Up @@ -206,20 +198,8 @@ pub fn create_routes(route_state: RouteState) -> Router {
"/internal/namespaces/:namespace/compute_graphs/:compute_graph/invocations/:invocation_id/ctx",
get(get_ctx_state_key).with_state(route_state.clone()),
)
.layer(
TraceLayer::new_for_http()
.make_span_with(|req: &Request| {
let method = req.method().as_str();
let uri = req.uri().to_string();

let matched_path = req
.extensions()
.get::<MatchedPath>()
.map(MatchedPath::as_str);

info_span!("request", method, uri, matched_path)
})
)
.layer(OtelInResponseLayer::default())
.layer(OtelAxumLayer::default())
// No tracing starting here.
.merge(axum_metrics.routes())
.route("/ui", get(ui_index_handler))
Expand Down
3 changes: 2 additions & 1 deletion server/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use task_scheduler::{
TaskScheduler,
};
use tokio::{self, sync::watch::Receiver};
use tracing::{error, info, span};
use tracing::{error, info, instrument, span};

pub struct Scheduler {
indexify_state: Arc<IndexifyState>,
Expand All @@ -36,6 +36,7 @@ impl Scheduler {
}
}

#[instrument(skip(self))]
pub async fn run_scheduler(&self) -> Result<()> {
let _timer = Timer::start(&self.metrics.scheduler_invocations);
let state_changes = self
Expand Down
4 changes: 2 additions & 2 deletions server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ impl Service {
axum_server::bind(addr)
.handle(handle)
.serve(app.into_make_service())
.await
.unwrap();
.await?;

Ok(())
}
}
Expand Down
Loading

0 comments on commit 7e188a6

Please sign in to comment.