diff --git a/server/Cargo.lock b/server/Cargo.lock index 3fe57b1f5..9ec51d693 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -264,18 +264,20 @@ dependencies = [ [[package]] name = "axum-otel-metrics" -version = "0.8.1" +version = "0.9.0-alpha.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11b5bd67776dca9326650fc2e2ddd15ddaca16a3c8e80a9a874ba111afab82bd" +checksum = "f60a607562cbef52413f4c2d66b6ac786b161920b183007dabbe3ba5c0b416c8" dependencies = [ "axum", "futures-util", "http", "http-body", - "opentelemetry 0.22.0", - "opentelemetry-prometheus 0.15.0", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-otlp", + "opentelemetry-prometheus", "opentelemetry-semantic-conventions", - "opentelemetry_sdk 0.22.1", + "opentelemetry_sdk", "pin-project-lite", "prometheus", "tower 0.4.13", @@ -300,6 +302,24 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-tracing-opentelemetry" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "561a0967337dfeaf3e28700d23e791712cd7d5e97ab335e0f4a0c3ac62e6ece0" +dependencies = [ + "axum", + "futures-core", + "futures-util", + "http", + "opentelemetry", + "pin-project-lite", + "tower 0.5.1", + "tracing", + "tracing-opentelemetry", + "tracing-opentelemetry-instrumentation-sdk", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -364,7 +384,7 @@ dependencies = [ "futures", "metrics", "object_store", - "opentelemetry 0.24.0", + "opentelemetry", "reqwest", "serde", "sha2", @@ -1085,7 +1105,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap", + "indexmap 2.5.0", "slab", "tokio", "tokio-util", @@ -1102,6 +1122,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.14.5" @@ -1218,11 +1244,24 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" -version = "0.1.7" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -1233,7 +1272,6 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower 0.4.13", "tower-service", "tracing", ] @@ -1415,6 +1453,7 @@ dependencies = [ "axum", "axum-otel-metrics", "axum-server", + "axum-tracing-opentelemetry", "blob_store", "bytes", "ciborium", @@ -1429,9 +1468,10 @@ dependencies = [ "metrics", "nanoid", "object_store", - "opentelemetry 0.24.0", - "opentelemetry-prometheus 0.17.0", - "opentelemetry_sdk 0.24.1", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry-prometheus", + "opentelemetry_sdk", "prometheus", "rand", "serde", @@ -1443,6 +1483,7 @@ dependencies = [ "tokio", "tower-http", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "url", "utoipa", @@ -1470,6 +1511,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.5.0" @@ -1477,7 +1528,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.5", "serde", ] @@ -1677,9 +1728,9 @@ dependencies = [ "axum-otel-metrics", "data_model", "once_cell", - "opentelemetry 0.24.0", - "opentelemetry-prometheus 0.17.0", - "opentelemetry_sdk 0.24.1", + "opentelemetry", + "opentelemetry-prometheus", + "opentelemetry_sdk", "pin-project-lite", "prometheus", "serde", @@ -1903,9 +1954,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "opentelemetry" -version = "0.22.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf" +checksum = "4c365a63eec4f55b7efeceb724f1336f26a9cf3427b70e59e2cd2a5b947fba96" dependencies = [ "futures-core", "futures-sink", @@ -1913,34 +1964,39 @@ dependencies = [ "once_cell", "pin-project-lite", "thiserror 1.0.63", - "urlencoding", ] [[package]] -name = "opentelemetry" -version = "0.24.0" +name = "opentelemetry-http" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c365a63eec4f55b7efeceb724f1336f26a9cf3427b70e59e2cd2a5b947fba96" +checksum = "ad31e9de44ee3538fb9d64fe3376c1362f406162434609e79aea2a41a0af78ab" dependencies = [ - "futures-core", - "futures-sink", - "js-sys", - "once_cell", - "pin-project-lite", - "thiserror 1.0.63", + "async-trait", + "bytes", + "http", + "opentelemetry", + "reqwest", ] [[package]] -name = "opentelemetry-prometheus" -version = "0.15.0" +name = "opentelemetry-otlp" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30bbcf6341cab7e2193e5843f0ac36c446a5b3fccb28747afaeda17996dcd02e" +checksum = "6b925a602ffb916fb7421276b86756027b37ee708f9dce2dbdcc51739f07e727" dependencies = [ - "once_cell", - "opentelemetry 0.22.0", - "opentelemetry_sdk 0.22.1", - "prometheus", - "protobuf", + "async-trait", + "futures-core", + "http", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest", + "thiserror 1.0.63", + "tokio", + "tonic", ] [[package]] @@ -1950,39 +2006,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc4191ce34aa274621861a7a9d68dbcf618d5b6c66b10081631b61fd81fbc015" dependencies = [ "once_cell", - "opentelemetry 0.24.0", - "opentelemetry_sdk 0.24.1", + "opentelemetry", + "opentelemetry_sdk", "prometheus", "protobuf", ] [[package]] -name = "opentelemetry-semantic-conventions" -version = "0.14.0" +name = "opentelemetry-proto" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" +checksum = "30ee9f20bff9c984511a02f082dc8ede839e4a9bf15cc2487c8d6fea5ad850d9" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] [[package]] -name = "opentelemetry_sdk" -version = "0.22.1" +name = "opentelemetry-semantic-conventions" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e" -dependencies = [ - "async-trait", - "crossbeam-channel", - "futures-channel", - "futures-executor", - "futures-util", - "glob", - "once_cell", - "opentelemetry 0.22.0", - "ordered-float", - "percent-encoding", - "rand", - "thiserror 1.0.63", - "tokio", - "tokio-stream", -] +checksum = "1cefe0543875379e47eb5f1e68ff83f45cc41366a92dfd0d073d513bf68e9a05" [[package]] name = "opentelemetry_sdk" @@ -1996,20 +2042,13 @@ dependencies = [ "futures-util", "glob", "once_cell", - "opentelemetry 0.24.0", + "opentelemetry", "percent-encoding", "rand", "serde_json", "thiserror 1.0.63", -] - -[[package]] -name = "ordered-float" -version = "4.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83e7ccb95e240b7c9506a3d544f10d935e142cc90b0a1d56954fb44d89ad6b97" -dependencies = [ - "num-traits", + "tokio", + "tokio-stream", ] [[package]] @@ -2166,6 +2205,29 @@ dependencies = [ "thiserror 1.0.63", ] +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -2393,6 +2455,7 @@ checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" dependencies = [ "base64", "bytes", + "futures-channel", "futures-core", "futures-util", "h2", @@ -2744,7 +2807,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap", + "indexmap 2.5.0", "itoa", "ryu", "serde", @@ -2909,7 +2972,7 @@ dependencies = [ "indexify_utils", "metrics", "object_store", - "opentelemetry 0.24.0", + "opentelemetry", "rocksdb", "serde", "serde_json", @@ -3238,13 +3301,43 @@ version = "0.22.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" dependencies = [ - "indexmap", + "indexmap 2.5.0", "serde", "serde_spanned", "toml_datetime", "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -3253,9 +3346,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -3349,6 +3446,36 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9784ed4da7d921bc8df6963f8c80a0e4ce34ba6ba76668acadd3edbd985ff3b" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-opentelemetry-instrumentation-sdk" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8159fbb3bd93e20342e7e6ef45b96c5d122cd88043f37ad0e4b5bb052f0f4483" +dependencies = [ + "http", + "opentelemetry", + "tracing", + "tracing-opentelemetry", +] + [[package]] name = "tracing-serde" version = "0.2.0" @@ -3457,12 +3584,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "urlencoding" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" - [[package]] name = "utf16_iter" version = "1.0.5" @@ -3487,7 +3608,7 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "514a48569e4e21c86d0b84b5612b5e73c0b2cf09db63260134ba426d4e8ea714" dependencies = [ - "indexmap", + "indexmap 2.5.0", "serde", "serde_json", "utoipa-gen", @@ -4048,7 +4169,7 @@ dependencies = [ "crossbeam-utils", "displaydoc", "flate2", - "indexmap", + "indexmap 2.5.0", "memchr", "thiserror 1.0.63", "zopfli", diff --git a/server/Cargo.toml b/server/Cargo.toml index 2177daa16..2c945e2b7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -66,11 +66,13 @@ pin-project = "1.1.7" ciborium = "0.2.2" uuid = { version = "1.11.0", features = ["v4"] } url = "2.5.4" -opentelemetry = { version="0.24.0", features = ["metrics"] } -opentelemetry_sdk = { version = "0.24.0", features = ["metrics"] } -opentelemetry-prometheus = {version = "0.17.0"} +opentelemetry = { version="0.24", features = ["metrics", "trace"] } +opentelemetry_sdk = { version = "0.24.1", features = ["rt-tokio", "metrics", "trace"] } +opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "trace"] } +opentelemetry-prometheus = { version = "0.17" } prometheus = {version = "0.13.4"} -axum-otel-metrics = { version = "0.8.1"} +axum-otel-metrics = { version = "0.9.0-alpha.2" } +tracing-opentelemetry = "0.25" [dependencies] async-stream = {workspace = true} @@ -106,15 +108,17 @@ hyper = {workspace=true} url = {workspace=true} opentelemetry = {workspace=true} opentelemetry-prometheus = {workspace=true} -opentelemetry_sdk = { workspace=true } +opentelemetry_sdk.workspace = true prometheus = {workspace=true} axum-otel-metrics = { workspace=true } metrics = {workspace=true} +tracing-opentelemetry = {workspace=true} +opentelemetry-otlp = {workspace=true} +axum-tracing-opentelemetry = { version = "0.19.0", features = ["tracing_level_info"] } [dev-dependencies] tempfile = { workspace = true } - [build-dependencies] # All features enabled vergen = { version = "9.0.2", features = [ diff --git a/server/data_model/src/lib.rs b/server/data_model/src/lib.rs index 7cf069168..abd7d8ee2 100644 --- a/server/data_model/src/lib.rs +++ b/server/data_model/src/lib.rs @@ -1140,7 +1140,7 @@ mod tests { } // Check function pattern - fn check_compute_parent(node: &str, expected_parents: Vec<&str>, configure_graph: F) + fn check_compute_parent(node: &str, mut expected_parents: Vec<&str>, configure_graph: F) where F: FnOnce(&mut ComputeGraph), { @@ -1172,12 +1172,11 @@ mod tests { let mut graph = create_test_graph(); configure_graph(&mut graph); - assert_eq!( - graph.get_compute_parent_nodes(node).sort(), - expected_parents.clone().sort(), - "Failed for node: {}", - node - ); + let mut parent_nodes = graph.get_compute_parent_nodes(node); + parent_nodes.sort(); + expected_parents.sort(); + + assert_eq!(parent_nodes, expected_parents, "Failed for node: {}", node); } #[test] diff --git a/server/metrics/src/lib.rs b/server/metrics/src/lib.rs index 2a0193f88..03418eccc 100644 --- a/server/metrics/src/lib.rs +++ b/server/metrics/src/lib.rs @@ -99,7 +99,11 @@ pub fn init_provider() -> prometheus::Registry { let exporter = opentelemetry_prometheus::exporter() .with_registry(registry.clone()) .build(); - let mut provider = SdkMeterProvider::builder(); + let mut provider = + SdkMeterProvider::builder().with_resource(opentelemetry_sdk::Resource::new(vec![ + opentelemetry::KeyValue::new("service.name", "indexify-server"), + opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")), + ])); let low_latency_boundaries = &[ 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, diff --git a/server/src/config.rs b/server/src/config.rs index 4eda5b086..468e36817 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -3,25 +3,29 @@ use std::{env, fmt::Debug, net::SocketAddr}; use anyhow::Result; use blob_store::BlobStorageConfig; use figment::{ - providers::{Format, Yaml}, + providers::{Format, Serialized, Yaml}, Figment, }; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ServerConfig { + pub dev: bool, pub state_store_path: String, pub listen_addr: String, pub blob_storage: BlobStorageConfig, + pub tracing: TracingConfig, } impl Default for ServerConfig { fn default() -> Self { let state_store_path = env::current_dir().unwrap().join("indexify_storage/state"); ServerConfig { + dev: false, state_store_path: state_store_path.to_str().unwrap().to_string(), listen_addr: "0.0.0.0:8900".to_string(), blob_storage: Default::default(), + tracing: TracingConfig::default(), } } } @@ -29,7 +33,9 @@ impl Default for ServerConfig { impl ServerConfig { pub fn from_path(path: &str) -> Result { let config_str = std::fs::read_to_string(path)?; - let config: ServerConfig = Figment::new().merge(Yaml::string(&config_str)).extract()?; + let config: ServerConfig = Figment::from(Serialized::defaults(ServerConfig::default())) + .merge(Yaml::string(&config_str)) + .extract()?; config.validate()?; Ok(config) } @@ -44,3 +50,12 @@ impl ServerConfig { Ok(()) } } + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct TracingConfig { + // Enable tracing. + pub enabled: bool, + // OpenTelemetry collector grpc endpoint. Defaults to using OTEL_EXPORTER_OTLP_ENDPOINT env var + // or to localhost:4317 if empty. + pub endpoint: Option, +} diff --git a/server/src/main.rs b/server/src/main.rs index 3fac8c9ab..59e8e3455 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,6 +1,11 @@ -use std::path::PathBuf; +use std::{env, path::PathBuf}; +use anyhow::Result; use clap::Parser; +use config::ServerConfig; +use opentelemetry::{global, trace::TracerProvider}; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::Sampler}; use service::Service; use tracing::error; use tracing_subscriber::{ @@ -9,7 +14,7 @@ use tracing_subscriber::{ format::{Format, JsonFields}, }, layer::SubscriberExt, - util::SubscriberInitExt, + Layer, }; mod config; @@ -30,44 +35,107 @@ struct Cli { config: Option, } -fn setup_tracing(structured_logging: bool) { +fn get_env_filter() -> tracing_subscriber::EnvFilter { // RUST_LOG used to control logging level. - let env_filter_layer = - tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { - tracing_subscriber::EnvFilter::default() - .add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) - }); + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { + tracing_subscriber::EnvFilter::default() + .add_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) + }) +} +fn get_log_layer(structured_logging: bool) -> Box + Send + Sync + 'static> +where + S: for<'a> tracing_subscriber::registry::LookupSpan<'a>, + S: tracing::Subscriber, +{ + // Create an OTLP pipeline exporter for a `trace_demo` service. if structured_logging { - let log_layer = fmt::layer() - .event_format( - Format::default() - .json() - .with_span_list(false) - .flatten_event(true), - ) - .fmt_fields(JsonFields::default()); - tracing_subscriber::registry() - .with(env_filter_layer) - .with(log_layer) - .init(); - return; + return Box::new( + fmt::layer() + .event_format( + Format::default() + .json() + .with_span_list(false) + .flatten_event(true), + ) + .fmt_fields(JsonFields::default()), + ); } - tracing_subscriber::registry() + Box::new(tracing_subscriber::fmt::layer().compact()) +} + +fn setup_tracing(config: ServerConfig) -> Result<()> { + let structured_logging = !config.dev; + let env_filter_layer = get_env_filter(); + let log_layer = get_log_layer(structured_logging); + let subscriber = tracing_subscriber::Registry::default() .with(env_filter_layer) - .with(tracing_subscriber::fmt::layer().compact()) - .init(); + .with(log_layer); + + if !config.tracing.enabled { + if let Err(e) = tracing::subscriber::set_global_default(subscriber) { + error!("logger was already initiated, continuing: {:?}", e); + } + return Ok(()); + } + + let mut span_exporter: Option = None; + // If endpoint is configured use it, otherwise use the otlp defaults. + if let Some(endpoint) = config.tracing.endpoint.clone() { + span_exporter.replace( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(endpoint), + ); + } + let span_exporter = span_exporter.unwrap_or(opentelemetry_otlp::new_exporter().tonic()); + + let tracer_provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_trace_config( + opentelemetry_sdk::trace::Config::default() + .with_sampler(Sampler::AlwaysOn) + .with_resource(opentelemetry_sdk::Resource::new(vec![ + opentelemetry::KeyValue::new("service.name", "indexify-server"), + opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")), + ])), + ) + .with_exporter(span_exporter) + .install_batch(opentelemetry_sdk::runtime::Tokio)?; + + global::set_tracer_provider(tracer_provider.clone()); + let tracer = tracer_provider.tracer("tracing-otel-subscriber"); + + // Create a layer with the configured tracer + let otel_layer = tracing_opentelemetry::layer() + .with_error_records_to_exceptions(true) + .with_tracer(tracer); + global::set_tracer_provider(tracer_provider.clone()); + + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); + tracing::subscriber::set_global_default(subscriber.with(otel_layer))?; + + Ok(()) } #[tokio::main] async fn main() { let cli = Cli::parse(); - let config = match cli.config { + let mut config = match cli.config { Some(path) => config::ServerConfig::from_path(path.to_str().unwrap()).unwrap(), None => config::ServerConfig::default(), }; - setup_tracing(!cli.dev); + + // Override config with cli arguments. + if cli.dev { + config.dev = true; + } + + if let Err(err) = setup_tracing(config.clone()) { + error!("Error setting up tracing: {:?}", err); + return; + } let service = Service::new(config).await; if let Err(err) = service { @@ -77,4 +145,7 @@ async fn main() { if let Err(err) = service.unwrap().start().await { error!("Error starting service: {:?}", err); } + + // export traces before shutdown + opentelemetry::global::shutdown_tracer_provider(); } diff --git a/server/src/routes.rs b/server/src/routes.rs index aabfc9815..d23562ef0 100644 --- a/server/src/routes.rs +++ b/server/src/routes.rs @@ -3,16 +3,7 @@ use std::{sync::Arc, time::Duration}; use anyhow::{anyhow, Result}; use axum::{ body::Body, - extract::{ - DefaultBodyLimit, - MatchedPath, - Multipart, - Path, - Query, - RawPathParams, - Request, - State, - }, + extract::{DefaultBodyLimit, Multipart, Path, Query, RawPathParams, Request, State}, http::{Method, Response}, middleware::{self, Next}, response::{sse::Event, Html, IntoResponse}, @@ -21,6 +12,10 @@ use axum::{ Router, }; use axum_otel_metrics::HttpMetricsLayerBuilder; +use axum_tracing_opentelemetry::{ + self, + middleware::{OtelAxumLayer, OtelInResponseLayer}, +}; use blob_store::PutResult; use data_model::ExecutorId; use futures::StreamExt; @@ -42,11 +37,8 @@ use state_store::{ }, IndexifyState, }; -use tower_http::{ - cors::{Any, CorsLayer}, - trace::TraceLayer, -}; -use tracing::{error, info, info_span}; +use tower_http::cors::{Any, CorsLayer}; +use tracing::{error, info}; use utoipa::{OpenApi, ToSchema}; use utoipa_swagger_ui::SwaggerUi; @@ -206,20 +198,8 @@ pub fn create_routes(route_state: RouteState) -> Router { "/internal/namespaces/:namespace/compute_graphs/:compute_graph/invocations/:invocation_id/ctx", get(get_ctx_state_key).with_state(route_state.clone()), ) - .layer( - TraceLayer::new_for_http() - .make_span_with(|req: &Request| { - let method = req.method().as_str(); - let uri = req.uri().to_string(); - - let matched_path = req - .extensions() - .get::() - .map(MatchedPath::as_str); - - info_span!("request", method, uri, matched_path) - }) - ) + .layer(OtelInResponseLayer::default()) + .layer(OtelAxumLayer::default()) // No tracing starting here. .merge(axum_metrics.routes()) .route("/ui", get(ui_index_handler)) diff --git a/server/src/scheduler.rs b/server/src/scheduler.rs index 37410dd10..b5e58c43f 100644 --- a/server/src/scheduler.rs +++ b/server/src/scheduler.rs @@ -18,7 +18,7 @@ use task_scheduler::{ TaskScheduler, }; use tokio::{self, sync::watch::Receiver}; -use tracing::{error, info, span}; +use tracing::{error, info, instrument, span}; pub struct Scheduler { indexify_state: Arc, @@ -36,6 +36,7 @@ impl Scheduler { } } + #[instrument(skip(self))] pub async fn run_scheduler(&self) -> Result<()> { let _timer = Timer::start(&self.metrics.scheduler_invocations); let state_changes = self diff --git a/server/src/service.rs b/server/src/service.rs index 7dab69659..4e6dda1e0 100644 --- a/server/src/service.rs +++ b/server/src/service.rs @@ -105,8 +105,8 @@ impl Service { axum_server::bind(addr) .handle(handle) .serve(app.into_make_service()) - .await - .unwrap(); + .await?; + Ok(()) } } diff --git a/server/state_store/src/lib.rs b/server/state_store/src/lib.rs index 577f93a2b..28ae370ca 100644 --- a/server/state_store/src/lib.rs +++ b/server/state_store/src/lib.rs @@ -36,7 +36,7 @@ use tokio::sync::{ watch::{Receiver, Sender}, RwLock, }; -use tracing::{error, info}; +use tracing::{error, info, instrument, span}; pub mod invocation_events; pub mod kv; @@ -168,15 +168,33 @@ impl IndexifyState { self.system_tasks_rx.clone() } + #[tracing::instrument( + skip(self, request), + fields( + request_type = request.payload.as_ref(), + state_change_len = request.state_changes_processed.len(), + otel.name = format!("state_machine.write {}", request.payload.as_ref()) + ) + )] pub async fn write(&self, request: StateMachineUpdateRequest) -> Result<()> { let timer_kv = &[KeyValue::new("request", request.payload.to_string())]; - tracing::info!("writing state machine update request: {}", request.payload,); + tracing::info!( + "writing state machine update request: {}", + request.payload.as_ref(), + ); let _timer = Timer::start_with_labels(&self.metrics.state_write, timer_kv); let mut allocated_tasks_by_executor = Vec::new(); let mut tasks_finalized: HashMap> = HashMap::new(); let txn = self.db.transaction(); let new_state_changes = match &request.payload { requests::RequestPayload::InvokeComputeGraph(invoke_compute_graph_request) => { + let _enter = span!( + tracing::Level::INFO, + "invoke_compute_graph", + namespace = invoke_compute_graph_request.namespace.clone(), + invocation_id = invoke_compute_graph_request.invocation_payload.id.clone(), + compute_graph = invoke_compute_graph_request.compute_graph_name.clone(), + ); let state_changes = self .invoke_compute_graph(&invoke_compute_graph_request) .await?; @@ -484,6 +502,7 @@ impl IndexifyState { Ok(vec![state_change]) } + #[instrument(skip(self, request))] async fn invoke_compute_graph( &self, request: &requests::InvokeComputeGraphRequest, diff --git a/server/state_store/src/requests.rs b/server/state_store/src/requests.rs index 6933e8546..05dcaa07d 100644 --- a/server/state_store/src/requests.rs +++ b/server/state_store/src/requests.rs @@ -11,13 +11,14 @@ use data_model::{ TaskDiagnostics, TaskId, }; +use strum::AsRefStr; pub struct StateMachineUpdateRequest { pub payload: RequestPayload, pub state_changes_processed: Vec, } -#[derive(strum::Display)] +#[derive(AsRefStr, strum::Display)] pub enum RequestPayload { InvokeComputeGraph(InvokeComputeGraphRequest), ReplayComputeGraph(ReplayComputeGraphRequest), diff --git a/server/state_store/src/scanner.rs b/server/state_store/src/scanner.rs index 5f6942a99..b90db7a96 100644 --- a/server/state_store/src/scanner.rs +++ b/server/state_store/src/scanner.rs @@ -21,6 +21,7 @@ use metrics::Timer; use opentelemetry::KeyValue; use rocksdb::{Direction, IteratorMode, ReadOptions, TransactionDB}; use serde::de::DeserializeOwned; +use tracing::instrument; use super::state_machine::IndexifyObjectsColumns; use crate::serializer::{JsonEncode, JsonEncoder}; @@ -418,6 +419,7 @@ impl StateReader { Ok(urls) } + #[instrument(skip(self))] pub fn get_unprocessed_state_changes(&self) -> Result> { let kvs = &[KeyValue::new("op", "get_unprocessed_state_changes")]; let _timer = Timer::start_with_labels(&self.metrics.state_read, kvs); diff --git a/server/state_store/src/state_machine.rs b/server/state_store/src/state_machine.rs index b8ec49b4e..5fb28049e 100644 --- a/server/state_store/src/state_machine.rs +++ b/server/state_store/src/state_machine.rs @@ -32,7 +32,7 @@ use rocksdb::{ TransactionDB, }; use strum::AsRefStr; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, instrument}; use super::serializer::{JsonEncode, JsonEncoder}; use crate::requests::{ @@ -629,6 +629,7 @@ pub(crate) enum InvocationCompletion { } // returns true if system task has finished +#[instrument(skip(db, txn, req, sm_metrics))] pub(crate) fn create_tasks( db: Arc, txn: &Transaction,