diff --git a/Cargo.lock b/Cargo.lock index 6d80bb14d6..ec86a885c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -959,6 +959,26 @@ dependencies = [ "syn 2.0.51", ] +[[package]] +name = "bindgen" +version = "0.69.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +dependencies = [ + "bitflags 2.4.2", + "cexpr", + "clang-sys", + "itertools 0.11.0", + "lazy_static", + "lazycell", + "proc-macro2 1.0.78", + "quote 1.0.35", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.51", +] + [[package]] name = "bip32" version = "0.5.1" @@ -1280,7 +1300,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.3", + "windows-targets 0.52.5", ] [[package]] @@ -3036,7 +3056,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.52.0", ] [[package]] @@ -3676,6 +3696,17 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libproc" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae9ea4b75e1a81675429dafe43441df1caea70081e82246a8cccf514884a88bb" +dependencies = [ + "bindgen 0.69.4", + "errno", + "libc", +] + [[package]] name = "libredox" version = "0.0.1" @@ -3704,7 +3735,7 @@ version = "0.11.0+8.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3386f101bcb4bd252d8e9d2fb41ec3b0862a15a62b478c355b2982efa469e3e" dependencies = [ - "bindgen", + "bindgen 0.65.1", "bzip2-sys", "cc", "glob", @@ -3789,6 +3820,15 @@ dependencies = [ "libc", ] +[[package]] +name = "mach2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b955cdeb2a02b9117f121ce63aa52d08ade45de53e48fe6a38b39c10f6f709" +dependencies = [ + "libc", +] + [[package]] name = "matchers" version = "0.0.1" @@ -3870,6 +3910,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "metrics-process" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7d8f5027620bf43b86e2c8144beea1e4323aec39241f5eae59dee54f79c6a29" +dependencies = [ + "libproc", + "mach2", + "metrics", + "once_cell", + "procfs", + "rlimit", + "windows", +] + [[package]] name = "metrics-tracing-context" version = "0.15.0" @@ -4554,6 +4609,7 @@ dependencies = [ "jmt", "metrics", "metrics-exporter-prometheus", + "metrics-process", "metrics-tracing-context", "metrics-util", "mime_guess", @@ -5024,6 +5080,7 @@ dependencies = [ "im", "itertools 0.11.0", "metrics", + "metrics-exporter-prometheus", "once_cell", "parking_lot", "pbjson-types", @@ -5467,6 +5524,7 @@ dependencies = [ "bincode", "blake2b_simd 1.0.2", "bytes", + "chrono", "cnidarium", "cnidarium-component", "decaf377 0.5.0", @@ -5475,6 +5533,7 @@ dependencies = [ "im", "metrics", "once_cell", + "pbjson-types", "penumbra-keys", "penumbra-proto", "penumbra-tct", @@ -6302,6 +6361,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4" +dependencies = [ + "bitflags 2.4.2", + "hex", + "lazy_static", + "procfs-core", + "rustix 0.38.31", +] + +[[package]] +name = "procfs-core" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29" +dependencies = [ + "bitflags 2.4.2", + "hex", +] + [[package]] name = "proptest" version = "1.4.0" @@ -6792,6 +6874,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" +[[package]] +name = "rlimit" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3560f70f30a0f16d11d01ed078a07740fe6b489667abc7c7b029155d9f21c3d8" +dependencies = [ + "libc", +] + [[package]] name = "rocksdb" version = "0.21.0" @@ -8928,13 +9019,66 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1de69df01bdf1ead2f4ac895dc77c9351aefff65b2f3db429a343f9cbf05e132" +dependencies = [ + "windows-core 0.56.0", + "windows-targets 0.52.5", +] + [[package]] name = "windows-core" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.3", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-core" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4698e52ed2d08f8658ab0c39512a7c00ee5fe2688c65f8c0a4f06750d729f2a6" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-implement" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6fc35f58ecd95a9b71c4f2329b911016e6bec66b3f2e6a4aad86bd2e99e2f9b" +dependencies = [ + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 2.0.51", +] + +[[package]] +name = "windows-interface" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08990546bf4edef8f431fa6326e032865f27138718c587dc21bc0265bbcb57cc" +dependencies = [ + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 2.0.51", +] + +[[package]] +name = "windows-result" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e383302e8ec8515204254685643de10811af0ed97ea37210dc26fb0032647f8" +dependencies = [ + "windows-targets 0.52.5", ] [[package]] @@ -8952,7 +9096,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.3", + "windows-targets 0.52.5", ] [[package]] @@ -8972,17 +9116,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.3" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d380ba1dc7187569a8a9e91ed34b8ccfc33123bbacb8c0aed2d1ad7f3ef2dc5f" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" dependencies = [ - "windows_aarch64_gnullvm 0.52.3", - "windows_aarch64_msvc 0.52.3", - "windows_i686_gnu 0.52.3", - "windows_i686_msvc 0.52.3", - "windows_x86_64_gnu 0.52.3", - "windows_x86_64_gnullvm 0.52.3", - "windows_x86_64_msvc 0.52.3", + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", ] [[package]] @@ -8993,9 +9138,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.3" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68e5dcfb9413f53afd9c8f86e56a7b4d86d9a2fa26090ea2dc9e40fba56c6ec6" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" [[package]] name = "windows_aarch64_msvc" @@ -9005,9 +9150,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.3" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dab469ebbc45798319e69eebf92308e541ce46760b49b18c6b3fe5e8965b30f" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" [[package]] name = "windows_i686_gnu" @@ -9017,9 +9162,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.3" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a4e9b6a7cac734a8b4138a4e1044eac3404d8326b6c0f939276560687a033fb" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" [[package]] name = "windows_i686_msvc" @@ -9029,9 +9180,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.3" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28b0ec9c422ca95ff34a78755cfa6ad4a51371da2a5ace67500cf7ca5f232c58" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" [[package]] name = "windows_x86_64_gnu" @@ -9041,9 +9192,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.3" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "704131571ba93e89d7cd43482277d6632589b18ecf4468f591fbae0a8b101614" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" [[package]] name = "windows_x86_64_gnullvm" @@ -9053,9 +9204,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.3" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42079295511643151e98d61c38c0acc444e52dd42ab456f7ccfd5152e8ecf21c" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" [[package]] name = "windows_x86_64_msvc" @@ -9065,9 +9216,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.3" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0770833d60a970638e989b3fa9fd2bb1aaadcf88963d1659fd7d9990196ed2d6" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "winnow" diff --git a/Cargo.toml b/Cargo.toml index 496633d81c..4b73297ef2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -156,6 +156,7 @@ im = { version = "^15.1.0" } indicatif = { version = "0.16" } jmt = { version = "0.10", features = ["migration"] } metrics = { version = "0.22" } +metrics-exporter-prometheus = { version = "0.13", features = ["http-listener"] } metrics-tracing-context = { version = "0.15" } num-bigint = { version = "0.4" } num-traits = { default-features = false, version = "0.2.15" } diff --git a/crates/bin/pcli/src/command/tx.rs b/crates/bin/pcli/src/command/tx.rs index 7657114771..5eaa0ba983 100644 --- a/crates/bin/pcli/src/command/tx.rs +++ b/crates/bin/pcli/src/command/tx.rs @@ -69,6 +69,11 @@ mod liquidity_position; mod proposal; mod replicate; +/// The planner can fail to build a large transaction, so +/// pcli splits apart the number of positions to close/withdraw +/// in the [`PositionCmd::CloseAll`]/[`PositionCmd::WithdrawAll`] commands. +const POSITION_CHUNK_SIZE: usize = 30; + #[derive(Debug, clap::Subcommand)] pub enum TxCmd { /// Auction related commands. @@ -1112,25 +1117,36 @@ impl TxCmd { return Ok(()); } + println!( + "{} total open positions, closing in {} batches of {}", + owned_position_ids.len(), + owned_position_ids.len() / POSITION_CHUNK_SIZE + 1, + POSITION_CHUNK_SIZE + ); + let mut planner = Planner::new(OsRng); - planner - .set_gas_prices(gas_prices) - .set_fee_tier((*fee_tier).into()); - for position_id in owned_position_ids { - // Close the position - planner.position_close(position_id); - } + // Close 5 positions in a single transaction to avoid planner failures. + for positions_to_close_now in owned_position_ids.chunks(POSITION_CHUNK_SIZE) { + planner + .set_gas_prices(gas_prices) + .set_fee_tier((*fee_tier).into()); - let final_plan = planner - .plan( - app.view - .as_mut() - .context("view service must be initialized")?, - AddressIndex::new(*source), - ) - .await?; - app.build_and_submit_transaction(final_plan).await?; + for position_id in positions_to_close_now { + // Close the position + planner.position_close(*position_id); + } + + let final_plan = planner + .plan( + app.view + .as_mut() + .context("view service must be initialized")?, + AddressIndex::new(*source), + ) + .await?; + app.build_and_submit_transaction(final_plan).await?; + } } TxCmd::Position(PositionCmd::WithdrawAll { source, @@ -1151,53 +1167,64 @@ impl TxCmd { return Ok(()); } - let mut planner = Planner::new(OsRng); - planner - .set_gas_prices(gas_prices) - .set_fee_tier((*fee_tier).into()); + println!( + "{} total closed positions, withdrawing in {} batches of {}", + owned_position_ids.len(), + owned_position_ids.len() / POSITION_CHUNK_SIZE + 1, + POSITION_CHUNK_SIZE, + ); let mut client = DexQueryServiceClient::new(app.pd_channel().await?); - for position_id in owned_position_ids { - // Withdraw the position + let mut planner = Planner::new(OsRng); - // Fetch the information regarding the position from the view service. - let position = client - .liquidity_position_by_id(LiquidityPositionByIdRequest { - position_id: Some(position_id.into()), - }) - .await? - .into_inner(); + // Withdraw 5 positions in a single transaction to avoid planner failures. + for positions_to_withdraw_now in owned_position_ids.chunks(POSITION_CHUNK_SIZE) { + planner + .set_gas_prices(gas_prices) + .set_fee_tier((*fee_tier).into()); - let reserves = position - .data - .clone() - .expect("missing position metadata") - .reserves - .expect("missing position reserves"); - let pair = position - .data - .expect("missing position") - .phi - .expect("missing position trading function") - .pair - .expect("missing trading function pair"); - planner.position_withdraw( - position_id, - reserves.try_into().expect("invalid reserves"), - pair.try_into().expect("invalid pair"), - ); - } + for position_id in positions_to_withdraw_now { + // Withdraw the position - let final_plan = planner - .plan( - app.view - .as_mut() - .context("view service must be initialized")?, - AddressIndex::new(*source), - ) - .await?; - app.build_and_submit_transaction(final_plan).await?; + // Fetch the information regarding the position from the view service. + let position = client + .liquidity_position_by_id(LiquidityPositionByIdRequest { + position_id: Some((*position_id).into()), + }) + .await? + .into_inner(); + + let reserves = position + .data + .clone() + .expect("missing position metadata") + .reserves + .expect("missing position reserves"); + let pair = position + .data + .expect("missing position") + .phi + .expect("missing position trading function") + .pair + .expect("missing trading function pair"); + planner.position_withdraw( + *position_id, + reserves.try_into().expect("invalid reserves"), + pair.try_into().expect("invalid pair"), + ); + } + + let final_plan = planner + .plan( + app.view + .as_mut() + .context("view service must be initialized")?, + AddressIndex::new(*source), + ) + .await?; + app.build_and_submit_transaction(final_plan).await?; + } } TxCmd::Position(PositionCmd::Withdraw { source, diff --git a/crates/bin/pcli/src/opt.rs b/crates/bin/pcli/src/opt.rs index c1ed746db0..c2d3e608b5 100644 --- a/crates/bin/pcli/src/opt.rs +++ b/crates/bin/pcli/src/opt.rs @@ -18,6 +18,7 @@ use penumbra_proto::{ use penumbra_view::ViewServer; use std::io::IsTerminal as _; use tracing_subscriber::EnvFilter; +use url::Url; #[derive(Debug, Parser)] #[clap(name = "pcli", about = "The Penumbra command-line interface.", version)] @@ -27,6 +28,11 @@ pub struct Opt { /// The home directory used to store configuration and data. #[clap(long, default_value_t = default_home(), env = "PENUMBRA_PCLI_HOME")] pub home: Utf8PathBuf, + /// Override the GRPC URL that will be used to connect to a fullnode. + /// + /// By default, this URL is provided by pcli's config. See `pcli init` for more information. + #[clap(long, parse(try_from_str = Url::parse))] + pub grpc_url: Option, } impl Opt { @@ -49,7 +55,11 @@ impl Opt { pub fn load_config(&self) -> Result { let path = self.home.join(crate::CONFIG_FILE_NAME); - PcliConfig::load(path) + let mut config = PcliConfig::load(path)?; + if let Some(grpc_url) = &self.grpc_url { + config.grpc_url = grpc_url.clone(); + } + Ok(config) } pub async fn into_app(self) -> Result<(App, Command)> { diff --git a/crates/bin/pd/Cargo.toml b/crates/bin/pd/Cargo.toml index 6c8e12451f..99db79d89d 100644 --- a/crates/bin/pd/Cargo.toml +++ b/crates/bin/pd/Cargo.toml @@ -59,7 +59,8 @@ ibc-types = { workspace = true, default-features = true } ics23 = { workspace = true } jmt = { workspace = true } metrics = { workspace = true } -metrics-exporter-prometheus = { version = "0.13", features = ["http-listener"] } +metrics-exporter-prometheus = { workspace = true } +metrics-process = "2.0.0" metrics-tracing-context = { workspace = true } metrics-util = "0.16.2" mime_guess = "2" diff --git a/crates/bin/pd/src/lib.rs b/crates/bin/pd/src/lib.rs index ce66736731..7412b17875 100644 --- a/crates/bin/pd/src/lib.rs +++ b/crates/bin/pd/src/lib.rs @@ -5,7 +5,7 @@ // Requires nightly. #![cfg_attr(docsrs, feature(doc_auto_cfg))] -mod metrics; +pub mod metrics; pub mod cli; pub mod migrate; diff --git a/crates/bin/pd/src/main.rs b/crates/bin/pd/src/main.rs index cf954a9a2f..8c37cfe3b3 100644 --- a/crates/bin/pd/src/main.rs +++ b/crates/bin/pd/src/main.rs @@ -166,14 +166,12 @@ async fn main() -> anyhow::Result<()> { }; // Configure a Prometheus recorder and exporter. + use penumbra_dex::component::metrics::PrometheusBuilderExt; let (recorder, exporter) = PrometheusBuilder::new() .with_http_listener(metrics_bind) // Set explicit buckets so that Prometheus endpoint emits true histograms, rather // than the default distribution type summaries, for time-series data. - .set_buckets_for_metric( - metrics_exporter_prometheus::Matcher::Prefix("penumbra_dex_".to_string()), - penumbra_dex::component::metrics::DEX_BUCKETS, - )? + .set_buckets_for_dex_metrics()? .build() .map_err(|e| { let msg = format!( @@ -195,6 +193,8 @@ async fn main() -> anyhow::Result<()> { // register pd's metrics with the exporter. tokio::spawn(exporter); pd::register_metrics(); + tokio::spawn(pd::metrics::sleep_worker::run()); + tokio::spawn(pd::metrics::cpu_worker::run()); // We error out if a service errors, rather than keep running. // A special attempt is made to detect whether binding to target socket failed; diff --git a/crates/bin/pd/src/metrics.rs b/crates/bin/pd/src/metrics.rs index 44c0a4594d..cfd7016a42 100644 --- a/crates/bin/pd/src/metrics.rs +++ b/crates/bin/pd/src/metrics.rs @@ -14,6 +14,9 @@ #[allow(unused_imports)] // It is okay if this reëxport isn't used, see above. pub use metrics::*; +pub mod cpu_worker; +pub mod sleep_worker; + /// Registers all metrics used by this crate. /// /// For this implementation, in the `pd` crate, we also call the `register_metrics()` @@ -21,4 +24,6 @@ pub use metrics::*; pub fn register_metrics() { // This will register metrics for all components. penumbra_app::register_metrics(); + self::sleep_worker::register_metrics(); + self::cpu_worker::register_metrics(); } diff --git a/crates/bin/pd/src/metrics/cpu_worker.rs b/crates/bin/pd/src/metrics/cpu_worker.rs new file mode 100644 index 0000000000..1e5d5af8cf --- /dev/null +++ b/crates/bin/pd/src/metrics/cpu_worker.rs @@ -0,0 +1,34 @@ +//! A metrics-focused worker for gathering CPU load and other system stats. +//! +//! ### Overview +//! +//! This submodule provides a worker that wraps the [metrics-process] logic to export OS-level +//! runtime information about `pd`. + +use std::time::Duration; +use tokio::time::sleep; + +use metrics_process::Collector; + +/// The time to sleep between polling the OS for process info about `pd`. +const SLEEP_DURATION: Duration = Duration::from_secs(2); +/// The string prepended to all metrics emitted by [metrics-process]. +const METRICS_PREFIX: &str = "pd_"; + +pub fn register_metrics() { + // Call `describe()` method to register help string. + let collector = Collector::new(METRICS_PREFIX); + collector.describe(); +} + +/// Run the cpu worker. +/// +/// This function will never return. +pub async fn run() -> std::convert::Infallible { + let collector = Collector::new(METRICS_PREFIX); + loop { + // Periodically call `collect()` method to update information. + collector.collect(); + sleep(SLEEP_DURATION).await; + } +} diff --git a/crates/bin/pd/src/metrics/sleep_worker.rs b/crates/bin/pd/src/metrics/sleep_worker.rs new file mode 100644 index 0000000000..c0834bbd47 --- /dev/null +++ b/crates/bin/pd/src/metrics/sleep_worker.rs @@ -0,0 +1,65 @@ +//! A sleep worker. +//! +//! ### Overview +//! +//! This submodule defines a metric, and an accompanying worker task, for use in measuring +//! scheduler latency in the tokio runtime. This worker will repeatedly sleep for one second, and +//! then observe the amount of time it *actually* spent waiting to be woken up. This is useful for +//! detecting when the asynchronous runtime is being disrupted by blocking I/O, or other expensive +//! non-coöperative computation. +//! +//! Use [`register_metrics()`] to register the [`SLEEP_DRIFT`] metric with an exporter, and spawn +//! the worker onto a runtime by calling [`run()`]. + +use { + super::*, + std::time::{Duration, Instant}, + tokio::time::sleep, +}; + +pub const SLEEP_DRIFT: &str = "pd_async_sleep_drift_microseconds"; + +const ONE_SECOND: Duration = Duration::from_secs(1); +const ONE_SECOND_US: u128 = ONE_SECOND.as_micros(); + +pub fn register_metrics() { + describe_counter!( + SLEEP_DRIFT, + Unit::Microseconds, + "Tracks drift in the async runtime's timer, in microseconds." + ); +} + +/// Run the sleep worker. +/// +/// This function will never return. +pub async fn run() -> std::convert::Infallible { + let counter = counter!(SLEEP_DRIFT); + + loop { + // Ask the async runtime to pause this task for one second, and then observe the amount of + // microseconds we were actually suspended. + let start = Instant::now(); + sleep(ONE_SECOND).await; + let end = Instant::now(); + let actual = end.duration_since(start).as_micros(); + + // Find the difference between the observed sleep duration and our expected duration. + let drift: u64 = actual + .saturating_sub(ONE_SECOND_US) + .try_into() + .unwrap_or_else(|error| { + // In the unlikely event that the number of microseconds we waited can't fit into + // a u64, round down to u64::MAX. This is lossy, but will still indicate that + // there is a severe issue with the runtime. + tracing::error!(?error, %actual, "failed to convert timer drift into a u64"); + u64::MAX + }); + + // If there was scheduler drift, increment the counter. + match drift { + 0 => continue, + n => counter.increment(n), + } + } +} diff --git a/crates/bin/pd/src/migrate.rs b/crates/bin/pd/src/migrate.rs index 862a395e42..de57b970e9 100644 --- a/crates/bin/pd/src/migrate.rs +++ b/crates/bin/pd/src/migrate.rs @@ -147,7 +147,7 @@ pub async fn last_block_timestamp(home: PathBuf) -> anyhow::Result( state: S, ) -> anyhow::Result { - state.get_block_timestamp().await + state.get_current_block_timestamp().await } } diff --git a/crates/core/component/compact-block/src/component/rpc.rs b/crates/core/component/compact-block/src/component/rpc.rs index 13b373bffa..b277e3a6bf 100644 --- a/crates/core/component/compact-block/src/component/rpc.rs +++ b/crates/core/component/compact-block/src/component/rpc.rs @@ -4,8 +4,8 @@ use anyhow::bail; use cnidarium::Storage; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use penumbra_proto::core::component::compact_block::v1::{ - query_service_server::QueryService, CompactBlockRangeRequest, CompactBlockRangeResponse, - CompactBlockRequest, CompactBlockResponse, + query_service_server::QueryService, CompactBlock, CompactBlockRangeRequest, + CompactBlockRangeResponse, CompactBlockRequest, CompactBlockResponse, }; use penumbra_sct::component::clock::EpochRead; use tokio::sync::mpsc; @@ -103,6 +103,11 @@ impl QueryService for Server { let (tx_blocks, rx_blocks) = mpsc::channel(10); let tx_blocks_err = tx_blocks.clone(); + // Wrap the block sender in a guard that ensures we only send the expected next block + let mut tx_blocks = BlockSender { + next_height: start_height, + inner: tx_blocks, + }; tokio::spawn( async move { let _guard = CompactBlockConnectionCounter::new(); @@ -142,7 +147,7 @@ impl QueryService for Server { // Future iterations of this work should start by moving block serialization // outside of the `send_op` future, and investigate if long blocking sends can // happen for benign reasons (i.e not caused by the client). - tx_blocks.send(Ok(compact_block)).await?; + tx_blocks.send(compact_block).await?; metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1); } @@ -171,10 +176,7 @@ impl QueryService for Server { .await .expect("no error fetching block") .expect("compact block for in-range height must be present"); - tx_blocks - .send(Ok(block)) - .await - .map_err(|_| tonic::Status::cancelled("client closed connection"))?; + tx_blocks.send(block).await?; metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1); } @@ -200,10 +202,7 @@ impl QueryService for Server { .await .map_err(|e| tonic::Status::internal(e.to_string()))? .expect("compact block for in-range height must be present"); - tx_blocks - .send(Ok(block)) - .await - .map_err(|_| tonic::Status::cancelled("channel closed"))?; + tx_blocks.send(block).await?; metrics::counter!(metrics::COMPACT_BLOCK_RANGE_SERVED_TOTAL).increment(1); } } @@ -250,3 +249,24 @@ impl Drop for CompactBlockConnectionCounter { metrics::gauge!(metrics::COMPACT_BLOCK_RANGE_ACTIVE_CONNECTIONS).decrement(1.0); } } + +/// Stateful wrapper for a mpsc that tracks the outbound height +struct BlockSender { + next_height: u64, + inner: mpsc::Sender>, +} + +impl BlockSender { + async fn send(&mut self, block: CompactBlock) -> anyhow::Result<()> { + if block.height != self.next_height { + bail!( + "block height mismatch while sending: expected {}, got {}", + self.next_height, + block.height + ); + } + self.inner.send(Ok(block)).await?; + self.next_height += 1; + Ok(()) + } +} diff --git a/crates/core/component/dex/Cargo.toml b/crates/core/component/dex/Cargo.toml index 579582ad21..d4eb923904 100644 --- a/crates/core/component/dex/Cargo.toml +++ b/crates/core/component/dex/Cargo.toml @@ -7,6 +7,7 @@ edition = {workspace = true} component = [ "cnidarium-component", "cnidarium", + "metrics-exporter-prometheus", "penumbra-proto/cnidarium", "penumbra-shielded-pool/component", "penumbra-fee/component", @@ -49,6 +50,7 @@ futures = {workspace = true} hex = {workspace = true} im = {workspace = true} metrics = {workspace = true} +metrics-exporter-prometheus = {workspace = true, optional = true} once_cell = {workspace = true} parking_lot = {workspace = true} pbjson-types = {workspace = true} diff --git a/crates/core/component/dex/src/component/metrics.rs b/crates/core/component/dex/src/component/metrics.rs index d7a7c8b7a0..c894307077 100644 --- a/crates/core/component/dex/src/component/metrics.rs +++ b/crates/core/component/dex/src/component/metrics.rs @@ -30,6 +30,11 @@ pub fn register_metrics() { Unit::Seconds, "The time spent searching for paths while executing trades within the DEX" ); + describe_counter!( + DEX_PATH_SEARCH_RELAX_PATH_DURATION, + Unit::Seconds, + "The time spent relaxing a path while routing trades within the DEX" + ); describe_histogram!( DEX_ROUTE_FILL_DURATION, Unit::Seconds, @@ -45,8 +50,13 @@ pub fn register_metrics() { // We configure buckets for the DEX routing times manually, in order to ensure // Prometheus metrics are structured as a Histogram, rather than as a Summary. // These values may need to be updated over time. -// These values are logarithmically spaced from 5ms to 250ms. -pub const DEX_BUCKETS: &[f64; 16] = &[ +// These values are logarithmically spaced from 5us to 67ms. +const GENERIC_DEX_BUCKETS: &[f64; 16] = &[ + 0.0005, + 0.000792, + 0.001256, + 0.001991, + 0.003155, 0.005, 0.00648985018, 0.00842363108, @@ -58,16 +68,48 @@ pub const DEX_BUCKETS: &[f64; 16] = &[ 0.0402798032, 0.05228197763, 0.06786044041, - 0.08808081833, - 0.11432626298, - 0.14839206374, - 0.1926084524, - 0.250, ]; pub const DEX_PATH_SEARCH_DURATION: &str = "penumbra_dex_path_search_duration_seconds"; +pub const DEX_PATH_SEARCH_RELAX_PATH_DURATION: &str = + "penumbra_dex_path_search_relax_path_duration_seconds"; pub const DEX_ROUTE_FILL_DURATION: &str = "penumbra_dex_route_fill_duration_seconds"; pub const DEX_ARB_DURATION: &str = "penumbra_dex_arb_duration_seconds"; pub const DEX_BATCH_DURATION: &str = "penumbra_dex_batch_duration_seconds"; pub const DEX_RPC_SIMULATE_TRADE_DURATION: &str = "penumbra_dex_rpc_simulate_trade_duration_seconds"; + +/// An extension trait providing DEX-related interfaces for [`PrometheusBuilder`]. +/// +/// [builder]: metrics_exporter_prometheus::PrometheusBuilder +pub trait PrometheusBuilderExt +where + Self: Sized, +{ + /// Configure buckets for histogram metrics. + fn set_buckets_for_dex_metrics(self) -> Result; +} + +impl PrometheusBuilderExt for metrics_exporter_prometheus::PrometheusBuilder { + fn set_buckets_for_dex_metrics(self) -> Result { + use metrics_exporter_prometheus::Matcher::Full; + self.set_buckets_for_metric( + Full(DEX_PATH_SEARCH_DURATION.to_owned()), + GENERIC_DEX_BUCKETS, + )? + .set_buckets_for_metric( + Full(DEX_PATH_SEARCH_RELAX_PATH_DURATION.to_owned()), + GENERIC_DEX_BUCKETS, + )? + .set_buckets_for_metric( + Full(DEX_ROUTE_FILL_DURATION.to_owned()), + GENERIC_DEX_BUCKETS, + )? + .set_buckets_for_metric(Full(DEX_ARB_DURATION.to_owned()), GENERIC_DEX_BUCKETS)? + .set_buckets_for_metric(Full(DEX_BATCH_DURATION.to_owned()), GENERIC_DEX_BUCKETS)? + .set_buckets_for_metric( + Full(DEX_RPC_SIMULATE_TRADE_DURATION.to_owned()), + GENERIC_DEX_BUCKETS, + ) + } +} diff --git a/crates/core/component/dex/src/component/router/path_search.rs b/crates/core/component/dex/src/component/router/path_search.rs index 92e114b198..3fffa26261 100644 --- a/crates/core/component/dex/src/component/router/path_search.rs +++ b/crates/core/component/dex/src/component/router/path_search.rs @@ -6,6 +6,7 @@ use cnidarium::{StateDelta, StateRead}; use futures::StreamExt; use penumbra_asset::asset; use penumbra_num::fixpoint::U128x128; +use tap::Tap; use tokio::task::JoinSet; use tracing::{instrument, Instrument}; @@ -88,7 +89,16 @@ async fn relax_active_paths( "relaxing active paths" ); for path in active_paths { - js.spawn(relax_path(cache.clone(), path, fixed_candidates.clone())); + let candidates = Arc::clone(&fixed_candidates); + let cache = Arc::clone(&cache); + js.spawn(async move { + use crate::component::metrics::DEX_PATH_SEARCH_RELAX_PATH_DURATION; + let metric = metrics::histogram!(DEX_PATH_SEARCH_RELAX_PATH_DURATION); + let start = std::time::Instant::now(); + relax_path(cache, path, candidates) + .await + .tap(|_| metric.record(start.elapsed())) + }); } // Wait for all relaxations to complete. while let Some(task) = js.join_next().await { diff --git a/crates/core/component/ibc/src/component/client.rs b/crates/core/component/ibc/src/component/client.rs index d65347dc57..33240c591b 100644 --- a/crates/core/component/ibc/src/component/client.rs +++ b/crates/core/component/ibc/src/component/client.rs @@ -293,7 +293,7 @@ pub trait StateReadExt: StateRead + penumbra_sct::component::clock::EpochRead { let latest_consensus_state = latest_consensus_state.expect("latest consensus state is Ok"); - let current_block_time = self.get_block_timestamp().await; + let current_block_time = self.get_current_block_timestamp().await; if current_block_time.is_err() { return ClientStatus::Unknown; @@ -490,7 +490,7 @@ mod tests { } async fn get_block_timestamp(state: S) -> Result { - state.get_block_timestamp().await + state.get_current_block_timestamp().await } } @@ -609,7 +609,7 @@ mod tests { // available to the unit test. let timestamp = Time::parse_from_rfc3339("2022-02-11T17:30:50.425417198Z")?; let mut state_tx = state.try_begin_transaction().unwrap(); - state_tx.put_block_timestamp(timestamp); + state_tx.put_block_timestamp(1u64, timestamp); state_tx.put_block_height(1); state_tx.put_ibc_params(crate::params::IBCParameters { ibc_enabled: true, diff --git a/crates/core/component/ibc/src/component/packet.rs b/crates/core/component/ibc/src/component/packet.rs index da3c496c79..71ad23874e 100644 --- a/crates/core/component/ibc/src/component/packet.rs +++ b/crates/core/component/ibc/src/component/packet.rs @@ -134,7 +134,7 @@ pub trait SendPacketRead: StateRead { .get_verified_consensus_state(&client_state.latest_height(), &connection.client_id) .await?; - let current_block_time = self.get_block_timestamp().await?; + let current_block_time = self.get_current_block_timestamp().await?; let time_elapsed = current_block_time.duration_since(latest_consensus_state.timestamp)?; if client_state.expired(time_elapsed) { diff --git a/crates/core/component/sct/Cargo.toml b/crates/core/component/sct/Cargo.toml index 687c387fb6..a29dd5a705 100644 --- a/crates/core/component/sct/Cargo.toml +++ b/crates/core/component/sct/Cargo.toml @@ -33,6 +33,7 @@ hex = {workspace = true} im = {workspace = true} metrics = {workspace = true} once_cell = {workspace = true} +pbjson-types = {workspace = true} penumbra-keys = {workspace = true, default-features = false} penumbra-proto = {workspace = true, default-features = false} penumbra-tct = {workspace = true, default-features = true} @@ -43,3 +44,4 @@ serde = {workspace = true, features = ["derive"]} tendermint = {workspace = true} tonic = {workspace = true, optional = true} tracing = {workspace = true} +chrono = { workspace = true, default-features = false, features = ["serde"] } diff --git a/crates/core/component/sct/src/component/clock.rs b/crates/core/component/sct/src/component/clock.rs index 82ecea9941..e74b1aec84 100644 --- a/crates/core/component/sct/src/component/clock.rs +++ b/crates/core/component/sct/src/component/clock.rs @@ -25,14 +25,35 @@ pub trait EpochRead: StateRead { /// /// # Panic /// Panics if the block timestamp is not a valid RFC3339 time string. - async fn get_block_timestamp(&self) -> Result { + async fn get_current_block_timestamp(&self) -> Result { let timestamp_string: String = self - .get_proto(state_key::block_manager::block_timestamp()) + .get_proto(state_key::block_manager::current_block_timestamp()) .await? - .ok_or_else(|| anyhow!("Missing block_timestamp"))?; + .ok_or_else(|| anyhow!("Missing current_block_timestamp"))?; Ok(tendermint::Time::from_str(×tamp_string) - .context("block_timestamp was an invalid RFC3339 time string")?) + .context("current_block_timestamp was an invalid RFC3339 time string")?) + } + + /// Gets a historic block timestamp from nonverifiable storage. + /// + /// # Errors + /// Returns an error if the block timestamp is missing. + /// + /// # Panic + /// Panics if the block timestamp is not a valid RFC3339 time string. + async fn get_block_timestamp(&self, height: u64) -> Result { + let timestamp_string: String = self + .nonverifiable_get_proto(&state_key::block_manager::block_timestamp(height).as_bytes()) + .await? + .ok_or_else(|| anyhow!("Missing block_timestamp for height {}", height))?; + + Ok( + tendermint::Time::from_str(×tamp_string).context(format!( + "block_timestamp for height {} was an invalid RFC3339 time string", + height + ))?, + ) } /// Get the current application epoch. @@ -72,12 +93,19 @@ impl EpochRead for T {} /// as well as related data like reported timestamps and epoch duration. #[async_trait] pub trait EpochManager: StateWrite { - /// Writes the block timestamp as an RFC3339 string to verifiable storage. - fn put_block_timestamp(&mut self, timestamp: tendermint::Time) { + /// Writes the current block's timestamp as an RFC3339 string to verifiable storage. + /// + /// Also writes the current block's timestamp to the appropriate key in nonverifiable storage. + fn put_block_timestamp(&mut self, height: u64, timestamp: tendermint::Time) { self.put_proto( - state_key::block_manager::block_timestamp().into(), + state_key::block_manager::current_block_timestamp().into(), timestamp.to_rfc3339(), - ) + ); + + self.nonverifiable_put_proto( + state_key::block_manager::block_timestamp(height).into(), + timestamp.to_rfc3339(), + ); } /// Write a value in the end epoch flag in object-storage. diff --git a/crates/core/component/sct/src/component/rpc.rs b/crates/core/component/sct/src/component/rpc.rs index 2cb6b69c6e..7a33abce3e 100644 --- a/crates/core/component/sct/src/component/rpc.rs +++ b/crates/core/component/sct/src/component/rpc.rs @@ -1,7 +1,9 @@ use cnidarium::Storage; +use pbjson_types::Timestamp; use penumbra_proto::core::component::sct::v1::query_service_server::QueryService; use penumbra_proto::core::component::sct::v1::{ AnchorByHeightRequest, AnchorByHeightResponse, EpochByHeightRequest, EpochByHeightResponse, + TimestampByHeightRequest, TimestampByHeightResponse, }; use tonic::Status; use tracing::instrument; @@ -55,4 +57,26 @@ impl QueryService for Server { anchor: anchor.map(Into::into), })) } + + #[instrument(skip(self, request))] + async fn timestamp_by_height( + &self, + request: tonic::Request, + ) -> Result, Status> { + let state = self.storage.latest_snapshot(); + + let height = request.get_ref().height; + let block_time = state.get_block_timestamp(height).await.map_err(|e| { + tonic::Status::unknown(format!("could not get timestamp for height {height}: {e}")) + })?; + let timestamp = chrono::DateTime::parse_from_rfc3339(block_time.to_rfc3339().as_str()) + .expect("timestamp should roundtrip to string"); + + Ok(tonic::Response::new(TimestampByHeightResponse { + timestamp: Some(Timestamp { + seconds: timestamp.timestamp(), + nanos: timestamp.timestamp_subsec_nanos() as i32, + }), + })) + } } diff --git a/crates/core/component/sct/src/component/sct.rs b/crates/core/component/sct/src/component/sct.rs index 74e3ae6c97..a209ea6ad1 100644 --- a/crates/core/component/sct/src/component/sct.rs +++ b/crates/core/component/sct/src/component/sct.rs @@ -53,7 +53,7 @@ impl Component for Sct { ) { let state = Arc::get_mut(state).expect("there's only one reference to the state"); state.put_block_height(begin_block.header.height.into()); - state.put_block_timestamp(begin_block.header.time); + state.put_block_timestamp(begin_block.header.height.into(), begin_block.header.time); } #[instrument(name = "sct_component", skip(_state, _end_block))] diff --git a/crates/core/component/sct/src/state_key.rs b/crates/core/component/sct/src/state_key.rs index 1f28ee2a64..93a32120cb 100644 --- a/crates/core/component/sct/src/state_key.rs +++ b/crates/core/component/sct/src/state_key.rs @@ -9,9 +9,13 @@ pub mod block_manager { "sct/block_manager/block_height" } - pub fn block_timestamp() -> &'static str { + pub fn current_block_timestamp() -> &'static str { "sct/block_manager/block_timestamp" } + + pub fn block_timestamp(height: u64) -> String { + format!("sct/block_manager/historical_block_timestamp/{}", height) + } } pub mod epoch_manager { diff --git a/crates/proto/src/gen/penumbra.core.component.sct.v1.rs b/crates/proto/src/gen/penumbra.core.component.sct.v1.rs index 9e2ba394c9..6f132c5141 100644 --- a/crates/proto/src/gen/penumbra.core.component.sct.v1.rs +++ b/crates/proto/src/gen/penumbra.core.component.sct.v1.rs @@ -327,6 +327,32 @@ impl ::prost::Name for AnchorByHeightResponse { ::prost::alloc::format!("penumbra.core.component.sct.v1.{}", Self::NAME) } } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TimestampByHeightRequest { + #[prost(uint64, tag = "1")] + pub height: u64, +} +impl ::prost::Name for TimestampByHeightRequest { + const NAME: &'static str = "TimestampByHeightRequest"; + const PACKAGE: &'static str = "penumbra.core.component.sct.v1"; + fn full_name() -> ::prost::alloc::string::String { + ::prost::alloc::format!("penumbra.core.component.sct.v1.{}", Self::NAME) + } +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TimestampByHeightResponse { + #[prost(message, optional, tag = "1")] + pub timestamp: ::core::option::Option<::pbjson_types::Timestamp>, +} +impl ::prost::Name for TimestampByHeightResponse { + const NAME: &'static str = "TimestampByHeightResponse"; + const PACKAGE: &'static str = "penumbra.core.component.sct.v1"; + fn full_name() -> ::prost::alloc::string::String { + ::prost::alloc::format!("penumbra.core.component.sct.v1.{}", Self::NAME) + } +} /// Generated client implementations. #[cfg(feature = "rpc")] pub mod query_service_client { @@ -474,6 +500,36 @@ pub mod query_service_client { ); self.inner.unary(req, path, codec).await } + pub async fn timestamp_by_height( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/penumbra.core.component.sct.v1.QueryService/TimestampByHeight", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "penumbra.core.component.sct.v1.QueryService", + "TimestampByHeight", + ), + ); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -498,6 +554,13 @@ pub mod query_service_server { tonic::Response, tonic::Status, >; + async fn timestamp_by_height( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// Query operations for the SCT component. #[derive(Debug)] @@ -671,6 +734,53 @@ pub mod query_service_server { }; Box::pin(fut) } + "/penumbra.core.component.sct.v1.QueryService/TimestampByHeight" => { + #[allow(non_camel_case_types)] + struct TimestampByHeightSvc(pub Arc); + impl< + T: QueryService, + > tonic::server::UnaryService + for TimestampByHeightSvc { + type Response = super::TimestampByHeightResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::timestamp_by_height(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let inner = inner.0; + let method = TimestampByHeightSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { Ok( diff --git a/crates/proto/src/gen/penumbra.core.component.sct.v1.serde.rs b/crates/proto/src/gen/penumbra.core.component.sct.v1.serde.rs index e8536e4abe..6618d12701 100644 --- a/crates/proto/src/gen/penumbra.core.component.sct.v1.serde.rs +++ b/crates/proto/src/gen/penumbra.core.component.sct.v1.serde.rs @@ -2022,3 +2022,196 @@ impl<'de> serde::Deserialize<'de> for SctParameters { deserializer.deserialize_struct("penumbra.core.component.sct.v1.SctParameters", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for TimestampByHeightRequest { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.height != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("penumbra.core.component.sct.v1.TimestampByHeightRequest", len)?; + if self.height != 0 { + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("height", ToString::to_string(&self.height).as_str())?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for TimestampByHeightRequest { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "height", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Height, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "height" => Ok(GeneratedField::Height), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = TimestampByHeightRequest; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct penumbra.core.component.sct.v1.TimestampByHeightRequest") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut height__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Height => { + if height__.is_some() { + return Err(serde::de::Error::duplicate_field("height")); + } + height__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::__SkipField__ => { + let _ = map_.next_value::()?; + } + } + } + Ok(TimestampByHeightRequest { + height: height__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("penumbra.core.component.sct.v1.TimestampByHeightRequest", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for TimestampByHeightResponse { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.timestamp.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("penumbra.core.component.sct.v1.TimestampByHeightResponse", len)?; + if let Some(v) = self.timestamp.as_ref() { + struct_ser.serialize_field("timestamp", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for TimestampByHeightResponse { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "timestamp", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Timestamp, + __SkipField__, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "timestamp" => Ok(GeneratedField::Timestamp), + _ => Ok(GeneratedField::__SkipField__), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = TimestampByHeightResponse; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct penumbra.core.component.sct.v1.TimestampByHeightResponse") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut timestamp__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Timestamp => { + if timestamp__.is_some() { + return Err(serde::de::Error::duplicate_field("timestamp")); + } + timestamp__ = map_.next_value()?; + } + GeneratedField::__SkipField__ => { + let _ = map_.next_value::()?; + } + } + } + Ok(TimestampByHeightResponse { + timestamp: timestamp__, + }) + } + } + deserializer.deserialize_struct("penumbra.core.component.sct.v1.TimestampByHeightResponse", FIELDS, GeneratedVisitor) + } +} diff --git a/crates/proto/src/gen/proto_descriptor.bin.no_lfs b/crates/proto/src/gen/proto_descriptor.bin.no_lfs index c2fd4bd1f1..0c71d1eebc 100644 Binary files a/crates/proto/src/gen/proto_descriptor.bin.no_lfs and b/crates/proto/src/gen/proto_descriptor.bin.no_lfs differ diff --git a/crates/view/src/worker.rs b/crates/view/src/worker.rs index 996a6405c9..56bab3e862 100644 --- a/crates/view/src/worker.rs +++ b/crates/view/src/worker.rs @@ -231,10 +231,17 @@ impl Worker { } }); + let mut expected_height = start_height; + while let Some(block) = buffered_stream.recv().await { let block: CompactBlock = block?.try_into()?; let height = block.height; + if height != expected_height { + tracing::warn!("out of order block detected"); + continue; + } + expected_height += 1; // Lock the SCT only while processing this block. let mut sct_guard = self.sct.write().await; diff --git a/deployments/compose/metrics.yml b/deployments/compose/metrics.yml index 5121cb15a7..32343a156c 100644 --- a/deployments/compose/metrics.yml +++ b/deployments/compose/metrics.yml @@ -19,5 +19,4 @@ services: image: "docker.io/prom/prometheus" network_mode: host volumes: - # TODO: this path is not accurate - - /home/conor/src/penumbra/deployments/config/prometheus.yml:/etc/prometheus/prometheus.yml:ro + - "${PWD:?}/../config/prometheus.yml:/etc/prometheus/prometheus.yml:ro" diff --git a/deployments/config/grafana/dashboards/Penumbra.json b/deployments/config/grafana/dashboards/pd-performance.json similarity index 62% rename from deployments/config/grafana/dashboards/Penumbra.json rename to deployments/config/grafana/dashboards/pd-performance.json index fede433063..d6b39324ca 100644 --- a/deployments/config/grafana/dashboards/Penumbra.json +++ b/deployments/config/grafana/dashboards/pd-performance.json @@ -1,4 +1,35 @@ { + "__inputs": [ + { + "name": "DS_PROMETHEUS", + "label": "Prometheus", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__elements": {}, + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "10.1.5" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "timeseries", + "name": "Time series", + "version": "" + } + ], "annotations": { "list": [ { @@ -21,9 +52,11 @@ } ] }, + "description": "Metrics specific to how pd performs under load.", "editable": true, "fiscalYearStartMonth": 0, "graphTooltip": 0, + "id": null, "links": [], "liveNow": false, "panels": [ @@ -32,6 +65,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "description": "The amount of CPU seconds used by pd, as rate in 1m intervals. How to represent as percentage of total available compute?", "fieldConfig": { "defaults": { "color": { @@ -51,6 +85,7 @@ "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, @@ -80,8 +115,7 @@ "value": 80 } ] - }, - "unit": "s" + } }, "overrides": [] }, @@ -91,7 +125,7 @@ "x": 0, "y": 0 }, - "id": 14, + "id": 17, "options": { "legend": { "calcs": [], @@ -111,13 +145,14 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "rate(cnidarium_get_raw_duration_seconds_sum[5m]) / rate(cnidarium_get_raw_duration_seconds_count[5m])", - "legendFormat": "__auto", + "expr": "rate(pd_process_cpu_seconds_total[$__rate_interval])", + "instant": false, + "legendFormat": "{{instance}}", "range": true, "refId": "A" } ], - "title": "Storage lookups, consensus", + "title": "pd CPU usage", "type": "timeseries" }, { @@ -125,6 +160,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "description": "Tracks drift in the async runtime's timer, in microseconds.", "fieldConfig": { "defaults": { "color": { @@ -144,6 +180,7 @@ "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, @@ -174,7 +211,7 @@ } ] }, - "unit": "s" + "unit": "µs" }, "overrides": [] }, @@ -184,7 +221,7 @@ "x": 12, "y": 0 }, - "id": 16, + "id": 19, "options": { "legend": { "calcs": [], @@ -204,13 +241,14 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "expr": "rate(cnidarium_nonverifiable_get_raw_duration_seconds_sum[5m]) / rate(cnidarium_nonverifiable_get_raw_duration_seconds_count[5m])", - "legendFormat": "__auto", + "expr": "rate(pd_async_sleep_drift_microseconds[$__rate_interval])", + "instant": false, + "legendFormat": "{{instance}}", "range": true, "refId": "A" } ], - "title": "Storage lookups, non-consensus", + "title": "pd async sleep drift", "type": "timeseries" }, { @@ -218,6 +256,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "description": "How much RAM pd is using", "fieldConfig": { "defaults": { "color": { @@ -237,6 +276,7 @@ "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, @@ -266,7 +306,8 @@ "value": 80 } ] - } + }, + "unit": "bytes" }, "overrides": [] }, @@ -276,7 +317,7 @@ "x": 0, "y": 8 }, - "id": 12, + "id": 18, "options": { "legend": { "calcs": [], @@ -295,11 +336,15 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "expr": "sum(penumbra_stake_missed_blocks) by (identity_key)", + "editorMode": "code", + "expr": "pd_process_resident_memory_bytes", + "instant": false, + "legendFormat": "{{instance}}", + "range": true, "refId": "A" } ], - "title": "Validator Missed Blocks", + "title": "pd memory usage", "type": "timeseries" }, { @@ -307,6 +352,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "description": "Tower ABCI uses a lot of filehandles, and defaults of 1024 are often too low. Track actual utilization.", "fieldConfig": { "defaults": { "color": { @@ -326,6 +372,7 @@ "tooltip": false, "viz": false }, + "insertNulls": false, "lineInterpolation": "linear", "lineWidth": 1, "pointSize": 5, @@ -353,203 +400,14 @@ { "color": "red", "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 16 - }, - "id": 10, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "exemplar": true, - "expr": "rate(penumbra_pd_mempool_checktx_total{code=\"0\", kind=\"new\"}[5m])", - "interval": "", - "legendFormat": "", - "refId": "A" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "exemplar": true, - "expr": "rate(penumbra_pd_mempool_checktx_total{code=\"1\", kind=\"new\"}[5m])", - "hide": false, - "interval": "", - "legendFormat": "", - "refId": "B" - } - ], - "title": "Transaction Rates", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null }, { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 24 - }, - "id": 8, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "8.4.7", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "exemplar": true, - "expr": "cnidarium_tct_size_bytes{}", - "interval": "", - "legendFormat": "TCT Size (bytes)", - "refId": "A" - } - ], - "title": "Serialized TCT Size", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null + "color": "#EAB839", + "value": 90 }, { - "color": "red", - "value": 80 + "color": "#6ED0E0", + "value": 1024 } ] } @@ -560,9 +418,9 @@ "h": 8, "w": 12, "x": 12, - "y": 16 + "y": 8 }, - "id": 4, + "id": 20, "options": { "legend": { "calcs": [], @@ -581,19 +439,20 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "exemplar": true, - "expr": "penumbra_pd_mempool_checktx_total", - "interval": "", - "legendFormat": "", + "editorMode": "code", + "expr": "pd_process_open_fds", + "instant": false, + "legendFormat": "{{instance}}", + "range": true, "refId": "A" } ], - "title": "Penumbra CheckTX", + "title": "pd filehandle utilization", "type": "timeseries" } ], - "refresh": false, - "schemaVersion": 37, + "refresh": "", + "schemaVersion": 38, "style": "dark", "tags": [], "templating": { @@ -602,7 +461,7 @@ "current": { "selected": false, "text": "Prometheus", - "value": "Prometheus" + "value": "PBFA97CFB590B2093" }, "hide": 0, "includeAll": false, @@ -624,8 +483,8 @@ }, "timepicker": {}, "timezone": "utc", - "title": "Penumbra", - "uid": "YT0tG3X7z", - "version": 1, + "title": "pd performance", + "uid": "fcdd2dc8-1357-4f8c-b23c-36bcd3914435", + "version": 2, "weekStart": "" } diff --git a/proto/penumbra/penumbra/core/component/sct/v1/sct.proto b/proto/penumbra/penumbra/core/component/sct/v1/sct.proto index c6c85dd299..f18bcbb06e 100644 --- a/proto/penumbra/penumbra/core/component/sct/v1/sct.proto +++ b/proto/penumbra/penumbra/core/component/sct/v1/sct.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package penumbra.core.component.sct.v1; import "penumbra/crypto/tct/v1/tct.proto"; +import "google/protobuf/timestamp.proto"; // Configuration data for the SCT component. message SctParameters { @@ -117,8 +118,17 @@ message AnchorByHeightResponse { crypto.tct.v1.MerkleRoot anchor = 1; } +message TimestampByHeightRequest { + uint64 height = 1; +} + +message TimestampByHeightResponse { + google.protobuf.Timestamp timestamp = 1; +} + // Query operations for the SCT component. service QueryService { rpc AnchorByHeight(AnchorByHeightRequest) returns (AnchorByHeightResponse); rpc EpochByHeight(EpochByHeightRequest) returns (EpochByHeightResponse); + rpc TimestampByHeight(TimestampByHeightRequest) returns (TimestampByHeightResponse); }