From d190bfd7be308a901c93537dad9acadd82aab586 Mon Sep 17 00:00:00 2001 From: Adrian Tanase Date: Wed, 22 May 2024 16:16:52 +0300 Subject: [PATCH] tracing poc in core and cli --- Cargo.toml | 1 + datafusion-cli/Cargo.lock | 398 +++++++++++++++++++++++++++- datafusion-cli/Cargo.toml | 8 + datafusion-cli/src/exec.rs | 3 + datafusion-cli/src/main.rs | 51 +++- datafusion/core/Cargo.toml | 1 + datafusion/physical-plan/Cargo.toml | 1 + datafusion/physical-plan/src/lib.rs | 4 + 8 files changed, 459 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 04c5f4755fe40..c277c5f279334 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,6 +108,7 @@ sqlparser = { version = "0.45.0", features = ["visitor"] } tempfile = "3" thiserror = "1.0.44" tokio = { version = "1.36", features = ["macros", "rt", "sync"] } +tracing = "0.1.40" url = "2.2" [profile.release] diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 62ae154a4225b..0a5752cb547d3 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -88,6 +88,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" +[[package]] +name = "anyhow" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" + [[package]] name = "apache-avro" version = "0.16.0" @@ -343,7 +349,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax", + "regex-syntax 0.8.3", ] [[package]] @@ -379,6 +385,28 @@ dependencies = [ "zstd-safe 7.1.0", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.61", +] + [[package]] name = "async-trait" version = "0.1.80" @@ -706,6 +734,51 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -814,7 +887,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706" dependencies = [ "memchr", - "regex-automata", + "regex-automata 0.4.6", "serde", ] @@ -1049,6 +1122,21 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crunchy" version = "0.2.2" @@ -1161,6 +1249,7 @@ dependencies = [ "tempfile", "tokio", "tokio-util", + "tracing", "url", "uuid", "xz2", @@ -1184,6 +1273,9 @@ dependencies = [ "futures", "mimalloc", "object_store", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "parking_lot", "parquet", "predicates", @@ -1191,6 +1283,9 @@ dependencies = [ "rstest", "rustyline", "tokio", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", "url", ] @@ -1328,7 +1423,7 @@ dependencies = [ "indexmap 2.2.6", "itertools", "log", - "regex-syntax", + "regex-syntax 0.8.3", ] [[package]] @@ -1399,6 +1494,7 @@ dependencies = [ "pin-project-lite", "rand", "tokio", + "tracing", ] [[package]] @@ -1927,6 +2023,18 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -2205,6 +2313,21 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -2282,6 +2405,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.2" @@ -2425,6 +2558,80 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "opentelemetry" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a016b8d9495c639af2145ac22387dcb88e44118e45320d9238fbf4e7889abcb" +dependencies = [ + "async-trait", + "futures-core", + "http", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "prost", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" + +[[package]] +name = "opentelemetry_sdk" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "ordered-float 4.2.0", + "percent-encoding", + "rand", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -2434,6 +2641,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" +dependencies = [ + "num-traits", +] + [[package]] name = "os_str_bytes" version = "6.6.1" @@ -2446,6 +2662,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.2" @@ -2686,6 +2908,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.61", +] + [[package]] name = "quad-rand" version = "0.2.1" @@ -2779,8 +3024,17 @@ checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.6", + "regex-syntax 0.8.3", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2791,7 +3045,7 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.3", ] [[package]] @@ -2800,6 +3054,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.3" @@ -3168,6 +3428,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -3436,6 +3705,16 @@ dependencies = [ "syn 2.0.61", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "thrift" version = "0.17.0" @@ -3444,7 +3723,7 @@ checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", "integer-encoding", - "ordered-float", + "ordered-float 2.10.1", ] [[package]] @@ -3520,6 +3799,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -3576,6 +3865,33 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -3584,9 +3900,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -3634,6 +3954,54 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9be14ba1bbe4ab79e9229f7f89fab8d120b865859f10527f31c033e599d2284" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -3756,6 +4124,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "version_check" version = "0.9.4" @@ -3891,6 +4265,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki" version = "0.22.4" diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 4e3d800cfe978..c41252ac743f4 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -55,8 +55,16 @@ parquet = { version = "51.0.0", default-features = false } regex = "1.8" rustyline = "11.0" tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot", "signal"] } +tracing = "0.1.40" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = "2.2" +# the versions need to be staggered like this, or it won't compile +opentelemetry = "0.22.0" +opentelemetry_sdk = { version = "0.22.0", features = ["rt-tokio"] } +opentelemetry-otlp = "0.15.0" +tracing-opentelemetry = "0.23.0" + [dev-dependencies] assert_cmd = "2.0" ctor = "0.2.0" diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index cfbc97ecbe238..a960dbb2874f2 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -46,14 +46,17 @@ use datafusion::sql::sqlparser; use rustyline::error::ReadlineError; use rustyline::Editor; use tokio::signal; +use tracing::{info, instrument}; /// run and execute SQL statements and commands, against a context with the given print options +#[instrument(skip_all)] pub async fn exec_from_commands( ctx: &mut SessionContext, commands: Vec, print_options: &PrintOptions, ) -> Result<()> { for sql in commands { + info!(?sql, "command"); exec_and_print(ctx, print_options, sql).await?; } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 6f71ccafb729d..c5edc053c71ba 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -38,6 +38,13 @@ use datafusion_cli::{ use clap::Parser; use mimalloc::MiMalloc; +use opentelemetry::global::shutdown_tracer_provider; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::runtime; +use tracing::info; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; @@ -150,12 +157,54 @@ pub async fn main() -> ExitCode { return ExitCode::FAILURE; } + shutdown_tracer_provider(); ExitCode::SUCCESS } +pub fn setup_tracing() { + let fmt_layer = tracing_subscriber::fmt::layer() + // enable everything + // .with_max_level(tracing::Level::INFO) + .compact() + // Display source code file paths + .with_file(true) + // Display source code line numbers + .with_line_number(true) + // Display the thread ID an event was recorded on + .with_thread_ids(true) + // Don't display the event's target (module path) + .with_target(false); + // sets this to be the default, global collector for this application. + // .finish(); + + // log level filtering here + let filter_layer = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("info")) + .unwrap(); + + let registry = tracing_subscriber::registry() + .with(filter_layer) + .with(fmt_layer); + + let otlp_exporter = opentelemetry_otlp::new_exporter().tonic(); + + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(otlp_exporter) + // .install_simple() + .install_batch(runtime::Tokio) + .unwrap(); + + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + registry.with(otel_layer).init(); + + // registry.init(); +} + /// Main CLI entrypoint async fn main_inner() -> Result<()> { - env_logger::init(); + // env_logger::init(); + setup_tracing(); let args = Args::parse(); if !args.quiet { diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index cef45e41a33ca..509938493b906 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -126,6 +126,7 @@ sqlparser = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } tokio-util = { version = "0.7.4", features = ["io"], optional = true } +tracing = { workspace = true } url = { workspace = true } uuid = { version = "1.7", features = ["v4"] } xz2 = { version = "0.1", optional = true, features = ["static"] } diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 3e9c9ee0e6c05..67e7a1f4f44cb 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -64,6 +64,7 @@ parking_lot = { workspace = true } pin-project-lite = "^0.2.7" rand = { workspace = true } tokio = { workspace = true } +tracing = { workspace = true } [dev-dependencies] rstest = { workspace = true } diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 8d8a3e71031e5..6a73b944792bc 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -715,10 +715,13 @@ pub async fn collect( /// /// Dropping the stream will abort the execution of the query, and free up /// any allocated resources +#[instrument(skip_all)] pub fn execute_stream( plan: Arc, context: Arc, ) -> Result { + let name = plan.schema(); + info!("running plan: {:?}", name); match plan.output_partitioning().partition_count() { 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))), 1 => plan.execute(0, context), @@ -801,6 +804,7 @@ pub fn get_plan_string(plan: &Arc) -> Vec { #[cfg(test)] #[allow(clippy::single_component_path_imports)] use rstest_reuse; +use tracing::{info, instrument}; #[cfg(test)] mod tests {