From 6ba722783baf43258bde5d5ee87d6306626377be Mon Sep 17 00:00:00 2001 From: David Kazlauskas Date: Wed, 25 Sep 2024 08:34:47 +0300 Subject: [PATCH] Add prometheus instrumentation for coprocessor --- fhevm-engine/Cargo.lock | 399 +++++++++++++++++++- fhevm-engine/Cargo.toml | 1 + fhevm-engine/coprocessor/Cargo.toml | 2 + fhevm-engine/coprocessor/Makefile | 10 +- fhevm-engine/coprocessor/src/cli.rs | 6 +- fhevm-engine/coprocessor/src/main.rs | 6 + fhevm-engine/coprocessor/src/metrics.rs | 27 ++ fhevm-engine/coprocessor/src/server.rs | 88 ++++- fhevm-engine/coprocessor/src/tests/utils.rs | 1 + fhevm-engine/coprocessor/src/tfhe_worker.rs | 23 ++ 10 files changed, 542 insertions(+), 21 deletions(-) create mode 100644 fhevm-engine/coprocessor/src/metrics.rs diff --git a/fhevm-engine/Cargo.lock b/fhevm-engine/Cargo.lock index f3562b44..2b31c711 100644 --- a/fhevm-engine/Cargo.lock +++ b/fhevm-engine/Cargo.lock @@ -2,6 +2,189 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "actix-codec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f7b0a21988c1bf877cf4759ef5ddaac04c1c9fe808c9142ecb78ba97d97a28a" +dependencies = [ + "bitflags 2.6.0", + "bytes", + "futures-core", + "futures-sink", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "actix-http" +version = "3.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d48f96fc3003717aeb9856ca3d02a8c7de502667ad76eeacd830b48d2e91fac4" +dependencies = [ + "actix-codec", + "actix-rt", + "actix-service", + "actix-utils", + "ahash", + "base64 0.22.1", + "bitflags 2.6.0", + "brotli", + "bytes", + "bytestring", + "derive_more 0.99.18", + "encoding_rs", + "flate2", + "futures-core", + "h2 0.3.26", + "http 0.2.12", + "httparse", + "httpdate", + "itoa", + "language-tags", + "local-channel", + "mime", + "percent-encoding", + "pin-project-lite", + "rand", + "sha1", + "smallvec", + "tokio", + "tokio-util", + "tracing", + "zstd", +] + +[[package]] +name = "actix-macros" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" +dependencies = [ + "quote", + "syn 2.0.75", +] + +[[package]] +name = "actix-router" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13d324164c51f63867b57e73ba5936ea151b8a41a1d23d1031eeb9f70d0236f8" +dependencies = [ + "bytestring", + "cfg-if", + "http 0.2.12", + "regex", + "regex-lite", + "serde", + "tracing", +] + +[[package]] +name = "actix-rt" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eda4e2a6e042aa4e55ac438a2ae052d3b5da0ecf83d7411e1a368946925208" +dependencies = [ + "futures-core", + "tokio", +] + +[[package]] +name = "actix-server" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ca2549781d8dd6d75c40cf6b6051260a2cc2f3c62343d761a969a0640646894" +dependencies = [ + "actix-rt", + "actix-service", + "actix-utils", + "futures-core", + "futures-util", + "mio", + "socket2", + "tokio", + "tracing", +] + +[[package]] +name = "actix-service" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a" +dependencies = [ + "futures-core", + "paste", + "pin-project-lite", +] + +[[package]] +name = "actix-utils" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88a1dcdff1466e3c2488e1cb5c36a71822750ad43839937f85d2f4d9f8b705d8" +dependencies = [ + "local-waker", + "pin-project-lite", +] + +[[package]] +name = "actix-web" +version = "4.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9180d76e5cc7ccbc4d60a506f2c727730b154010262df5b910eb17dbe4b8cb38" +dependencies = [ + "actix-codec", + "actix-http", + "actix-macros", + "actix-router", + "actix-rt", + "actix-server", + "actix-service", + "actix-utils", + "actix-web-codegen", + "ahash", + "bytes", + "bytestring", + "cfg-if", + "cookie", + "derive_more 0.99.18", + "encoding_rs", + "futures-core", + "futures-util", + "impl-more", + "itoa", + "language-tags", + "log", + "mime", + "once_cell", + "pin-project-lite", + "regex", + "regex-lite", + "serde", + "serde_json", + "serde_urlencoded", + "smallvec", + "socket2", + "time", + "url", +] + +[[package]] +name = "actix-web-codegen" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591380e2e68490b5dfaf1dd1aa0ebe78d84ba7067078512b4ea6e4492d622b8" +dependencies = [ + "actix-router", + "proc-macro2", + "quote", + "syn 2.0.75", +] + [[package]] name = "addr2line" version = "0.22.0" @@ -59,6 +242,21 @@ dependencies = [ "serde", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.18" @@ -133,7 +331,7 @@ dependencies = [ "alloy-sol-type-parser", "alloy-sol-types", "const-hex", - "derive_more", + "derive_more 1.0.0", "itoa", "serde", "serde_json", @@ -174,7 +372,7 @@ dependencies = [ "alloy-rlp", "alloy-serde", "c-kzg", - "derive_more", + "derive_more 1.0.0", "once_cell", "serde", "sha2", @@ -260,7 +458,7 @@ dependencies = [ "bytes", "cfg-if", "const-hex", - "derive_more", + "derive_more 1.0.0", "hex-literal", "itoa", "k256", @@ -1289,6 +1487,27 @@ dependencies = [ "serde_with", ] +[[package]] +name = "brotli" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "4.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a45bd2e4095a8b518033b128020dd4a55aab1c0a381ba4404a472630f4bc362" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -1332,6 +1551,15 @@ dependencies = [ "either", ] +[[package]] +name = "bytestring" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d80203ea6b29df88012294f62733de21cfeab47f17b41af3a38bc30a03ee72" +dependencies = [ + "bytes", +] + [[package]] name = "c-kzg" version = "1.0.3" @@ -1353,6 +1581,8 @@ version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72db2f7947ecee9b03b510377e8bb9077afa27176fdbff55c51027e976fdcc48" dependencies = [ + "jobserver", + "libc", "shlex", ] @@ -1509,10 +1739,28 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + +[[package]] +name = "cookie" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb" +dependencies = [ + "percent-encoding", + "time", + "version_check", +] + [[package]] name = "coprocessor" version = "0.1.0" dependencies = [ + "actix-web", "alloy", "bigdecimal", "bincode", @@ -1521,6 +1769,7 @@ dependencies = [ "hex", "lazy_static", "lru", + "prometheus", "prost", "rand", "regex", @@ -1740,6 +1989,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_more" +version = "0.99.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version 0.4.1", + "syn 2.0.75", +] + [[package]] name = "derive_more" version = "1.0.0" @@ -1879,6 +2141,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -2228,6 +2499,25 @@ dependencies = [ "subtle", ] +[[package]] +name = "h2" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap 2.4.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.6" @@ -2424,7 +2714,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "httparse", @@ -2581,6 +2871,12 @@ dependencies = [ "parity-scale-codec", ] +[[package]] +name = "impl-more" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d" + [[package]] name = "impl-trait-for-tuples" version = "0.2.2" @@ -2677,6 +2973,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.70" @@ -2733,6 +3038,12 @@ dependencies = [ "sha3-asm", ] +[[package]] +name = "language-tags" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" + [[package]] name = "lazy_static" version = "1.5.0" @@ -2793,6 +3104,23 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "local-channel" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6cbc85e69b8df4b8bb8b89ec634e7189099cea8927a276b7384ce5488e53ec8" +dependencies = [ + "futures-core", + "futures-sink", + "local-waker", +] + +[[package]] +name = "local-waker" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d873d7c67ce09b42110d801813efbc9364414e356be9935700d368351657487" + [[package]] name = "lock_api" version = "0.4.12" @@ -2888,6 +3216,7 @@ checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ "hermit-abi", "libc", + "log", "wasi", "windows-sys 0.52.0", ] @@ -3394,6 +3723,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + [[package]] name = "proptest" version = "1.5.0" @@ -3467,6 +3811,12 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "pulp" version = "0.18.22" @@ -4210,6 +4560,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -4884,7 +5243,9 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -4975,7 +5336,7 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", - "h2", + "h2 0.4.6", "http 1.1.0", "http-body 1.0.1", "http-body-util", @@ -5713,3 +6074,31 @@ dependencies = [ "quote", "syn 2.0.75", ] + +[[package]] +name = "zstd" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54a3ab4db68cea366acc5c897c7b4d4d1b8994a9cd6e6f841f8964566a419059" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.13+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38ff0f21cfee8f97d94cef41359e0c89aa6113028ab0291aa8ca0038995a95aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/fhevm-engine/Cargo.toml b/fhevm-engine/Cargo.toml index 91b47964..c545e78a 100644 --- a/fhevm-engine/Cargo.toml +++ b/fhevm-engine/Cargo.toml @@ -12,6 +12,7 @@ sha3 = "0.10.8" anyhow = "1.0.86" daggy = "0.8.0" serde = "1.0.210" +prometheus = "0.13.4" [profile.dev.package.tfhe] overflow-checks = false diff --git a/fhevm-engine/coprocessor/Cargo.toml b/fhevm-engine/coprocessor/Cargo.toml index a14df9ba..a2fc644c 100644 --- a/fhevm-engine/coprocessor/Cargo.toml +++ b/fhevm-engine/coprocessor/Cargo.toml @@ -31,6 +31,8 @@ fhevm-engine-common = { path = "../fhevm-engine-common" } strum = { version = "0.26", features = ["derive"] } bincode.workspace = true sha3.workspace = true +prometheus.workspace = true +actix-web = "4.9.0" [dev-dependencies] testcontainers = "0.21" diff --git a/fhevm-engine/coprocessor/Makefile b/fhevm-engine/coprocessor/Makefile index 62f9f881..7b1d62fb 100644 --- a/fhevm-engine/coprocessor/Makefile +++ b/fhevm-engine/coprocessor/Makefile @@ -1,4 +1,6 @@ +DB_URL ?= DATABASE_URL=postgres://postgres:postgres@127.0.0.1:5432/coprocessor + .PHONY: build build: cargo build @@ -11,9 +13,9 @@ cleanup: init_db: docker compose up -d sleep 3 - DATABASE_URL=postgres://postgres:postgres@127.0.0.1:5432/coprocessor sqlx db create - DATABASE_URL=postgres://postgres:postgres@127.0.0.1:5432/coprocessor sqlx migrate run - DATABASE_URL=postgres://postgres:postgres@127.0.0.1:5432/coprocessor cargo test setup_test_user -- --nocapture --ignored + $(DB_URL) sqlx db create + $(DB_URL) sqlx migrate run + $(DB_URL) cargo test setup_test_user -- --nocapture --ignored .PHONY: recreate_db recreate_db: @@ -23,4 +25,4 @@ recreate_db: .PHONY: clean_run clean_run: $(MAKE) recreate_db - RUST_BACKTRACE=1 cargo run -- --run-server --run-bg-worker + RUST_BACKTRACE=1 $(DB_URL) cargo run --release -- --run-server --run-bg-worker diff --git a/fhevm-engine/coprocessor/src/cli.rs b/fhevm-engine/coprocessor/src/cli.rs index 27ff6f1a..74fa069d 100644 --- a/fhevm-engine/coprocessor/src/cli.rs +++ b/fhevm-engine/coprocessor/src/cli.rs @@ -32,7 +32,7 @@ pub struct Args { pub tenant_key_cache_size: i32, /// Maximum compact inputs to upload - #[arg(long, default_value_t = 8)] + #[arg(long, default_value_t = 10)] pub maximimum_compact_inputs_upload: usize, /// Maximum compact inputs to upload @@ -55,6 +55,10 @@ pub struct Args { #[arg(long, default_value = "127.0.0.1:50051")] pub server_addr: String, + /// Prometheus metrics server address + #[arg(long, default_value = "0.0.0.0:9100")] + pub metrics_addr: String, + /// Postgres database url. If unspecified DATABASE_URL environment variable is used #[arg(long)] pub database_url: Option, diff --git a/fhevm-engine/coprocessor/src/main.rs b/fhevm-engine/coprocessor/src/main.rs index d23e4d7d..4b1f718b 100644 --- a/fhevm-engine/coprocessor/src/main.rs +++ b/fhevm-engine/coprocessor/src/main.rs @@ -9,6 +9,7 @@ mod tests; mod tfhe_worker; mod types; mod utils; +mod metrics; fn main() { let args = crate::cli::parse_args(); @@ -70,6 +71,11 @@ async fn async_main( set.spawn(crate::tfhe_worker::run_tfhe_worker(args.clone())); } + if !args.metrics_addr.is_empty() { + println!("Initializing metrics server"); + set.spawn(crate::metrics::run_metrics_server(args.clone())); + } + if set.is_empty() { panic!("No tasks specified to run"); } diff --git a/fhevm-engine/coprocessor/src/metrics.rs b/fhevm-engine/coprocessor/src/metrics.rs new file mode 100644 index 00000000..69d9b917 --- /dev/null +++ b/fhevm-engine/coprocessor/src/metrics.rs @@ -0,0 +1,27 @@ +async fn metrics() -> impl actix_web::Responder { + let encoder = prometheus::TextEncoder::new(); + let metric_families = prometheus::gather(); + encoder.encode_to_string(&metric_families).expect("can't encode metrics") +} + +async fn healthcheck() -> impl actix_web::Responder { + "OK" +} + +pub async fn run_metrics_server( + args: crate::cli::Args, +) -> Result<(), Box> { + println!("metrics server listening at {}", args.metrics_addr); + let _ = actix_web::HttpServer::new(|| { + actix_web::App::new() + .route("/metrics", actix_web::web::to(metrics)) + .route("/health", actix_web::web::to(healthcheck)) + }) + .bind(&args.metrics_addr) + .expect("can't bind to metrics server address") + .workers(1) + .run() + .await?; + + Ok(()) +} \ No newline at end of file diff --git a/fhevm-engine/coprocessor/src/server.rs b/fhevm-engine/coprocessor/src/server.rs index 45b4ac66..eaeef856 100644 --- a/fhevm-engine/coprocessor/src/server.rs +++ b/fhevm-engine/coprocessor/src/server.rs @@ -21,10 +21,12 @@ use fhevm_engine_common::tfhe_ops::{ try_expand_ciphertext_list, validate_fhe_type, }; use fhevm_engine_common::types::{FhevmError, SupportedFheCiphertexts, SupportedFheOperations}; +use prometheus::{register_int_counter, IntCounter}; use sha3::{Digest, Keccak256}; use sqlx::{query, Acquire}; use tokio::task::spawn_blocking; use tonic::transport::Server; +use lazy_static::lazy_static; pub mod common { tonic::include_proto!("fhevm.common"); @@ -34,6 +36,25 @@ pub mod coprocessor { tonic::include_proto!("fhevm.coprocessor"); } +lazy_static! { + static ref UPLOAD_INPUTS_COUNTER: IntCounter = + register_int_counter!("coprocessor_upload_inputs_count", "grpc calls for inputs upload endpoint").unwrap(); + static ref UPLOAD_INPUTS_ERRORS: IntCounter = + register_int_counter!("coprocessor_upload_inputs_errors", "grpc errors while calling upload inputs").unwrap(); + static ref ASYNC_COMPUTE_COUNTER: IntCounter = + register_int_counter!("coprocessor_async_compute_count", "grpc calls for async compute endpoint").unwrap(); + static ref ASYNC_COMPUTE_ERRORS: IntCounter = + register_int_counter!("coprocessor_async_compute_errors", "grpc errors while calling async compute").unwrap(); + static ref TRIVIAL_ENCRYPT_COUNTER: IntCounter = + register_int_counter!("coprocessor_trivial_encrypt_count", "grpc calls for trivial encrypt endpoint").unwrap(); + static ref TRIVIAL_ENCRYPT_ERRORS: IntCounter = + register_int_counter!("coprocessor_trivial_encrypt_errors", "grpc errors while calling trivial encrypt").unwrap(); + static ref GET_CIPHERTEXTS_COUNTER: IntCounter = + register_int_counter!("coprocessor_get_ciphertexts_count", "grpc calls for get ciphertexts endpoint").unwrap(); + static ref GET_CIPHERTEXTS_ERRORS: IntCounter = + register_int_counter!("coprocessor_get_ciphertexts_errors", "grpc errors while calling get ciphertexts").unwrap(); +} + pub struct CoprocessorService { pool: sqlx::Pool, args: crate::cli::Args, @@ -140,6 +161,58 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { + UPLOAD_INPUTS_COUNTER.inc(); + self.upload_inputs_impl(request).await.inspect_err(|_| { + UPLOAD_INPUTS_ERRORS.inc(); + }) + } + + async fn async_compute( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + ASYNC_COMPUTE_COUNTER.inc(); + self.async_compute_impl(request).await.inspect_err(|_| { + ASYNC_COMPUTE_ERRORS.inc(); + }) + } + + async fn wait_computations( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + return Err(tonic::Status::unimplemented("not implemented")); + } + + async fn trivial_encrypt_ciphertexts( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + TRIVIAL_ENCRYPT_COUNTER.inc(); + self.trivial_encrypt_ciphertexts_impl(request).await.inspect_err(|_| { + TRIVIAL_ENCRYPT_ERRORS.inc() + }) + } + + async fn get_ciphertexts( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + GET_CIPHERTEXTS_COUNTER.inc(); + self.get_ciphertexts_impl(request).await.inspect_err(|_| { + GET_CIPHERTEXTS_ERRORS.inc(); + }) + } +} + +impl CoprocessorService { + async fn upload_inputs_impl( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> { + UPLOAD_INPUTS_COUNTER.inc(); + let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?; let req = request.get_ref(); @@ -400,7 +473,7 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ Ok(tonic::Response::new(response)) } - async fn async_compute( + async fn async_compute_impl( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { @@ -534,14 +607,7 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ return Ok(tonic::Response::new(GenericResponse { response_code: 0 })); } - async fn wait_computations( - &self, - _request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - return Err(tonic::Status::unimplemented("not implemented")); - } - - async fn trivial_encrypt_ciphertexts( + async fn trivial_encrypt_ciphertexts_impl( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { @@ -616,7 +682,7 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ return Ok(tonic::Response::new(GenericResponse { response_code: 0 })); } - async fn get_ciphertexts( + async fn get_ciphertexts_impl( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> @@ -676,4 +742,4 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ return Ok(tonic::Response::new(result)); } -} +} \ No newline at end of file diff --git a/fhevm-engine/coprocessor/src/tests/utils.rs b/fhevm-engine/coprocessor/src/tests/utils.rs index 4c9ca3b7..b778d923 100644 --- a/fhevm-engine/coprocessor/src/tests/utils.rs +++ b/fhevm-engine/coprocessor/src/tests/utils.rs @@ -95,6 +95,7 @@ async fn start_coprocessor(rx: Receiver, app_port: u16, db_url: &str) { tokio_threads: 2, pg_pool_max_connections: 2, server_addr: format!("127.0.0.1:{app_port}"), + metrics_addr: "".to_string(), database_url: Some(db_url.to_string()), maximimum_compact_inputs_upload: 10, coprocessor_private_key: "./coprocessor.key".to_string(), diff --git a/fhevm-engine/coprocessor/src/tfhe_worker.rs b/fhevm-engine/coprocessor/src/tfhe_worker.rs index b5f8a5b4..ee4f7884 100644 --- a/fhevm-engine/coprocessor/src/tfhe_worker.rs +++ b/fhevm-engine/coprocessor/src/tfhe_worker.rs @@ -5,11 +5,28 @@ use fhevm_engine_common::{ tfhe_ops::{current_ciphertext_version, perform_fhe_operation}, types::SupportedFheOperations, }; +use prometheus::{register_int_counter, IntCounter}; use sqlx::{postgres::PgListener, query, Acquire}; use std::{ collections::{BTreeSet, HashMap}, num::NonZeroUsize, }; +use lazy_static::lazy_static; + +lazy_static! { + static ref WORKER_ERRORS_COUNTER: IntCounter = + register_int_counter!("coprocessor_worker_errors", "worker errors encountered").unwrap(); + static ref WORK_ITEMS_POLL_COUNTER: IntCounter = + register_int_counter!("coprocessor_work_items_polls", "times work items are polled from database").unwrap(); + static ref WORK_ITEMS_NOTIFICATIONS_COUNTER: IntCounter = + register_int_counter!("coprocessor_work_items_notifications", "times instant notifications for work items received from the database").unwrap(); + static ref WORK_ITEMS_FOUND_COUNTER: IntCounter = + register_int_counter!("coprocessor_work_items_found", "work items queried from database").unwrap(); + static ref WORK_ITEMS_ERRORS_COUNTER: IntCounter = + register_int_counter!("coprocessor_work_items_errors", "work items errored out during computation").unwrap(); + static ref WORK_ITEMS_PROCESSED_COUNTER: IntCounter = + register_int_counter!("coprocessor_work_items_processed", "work items successfully processed and stored in the database").unwrap(); +} pub async fn run_tfhe_worker( args: crate::cli::Args, @@ -17,6 +34,7 @@ pub async fn run_tfhe_worker( loop { // here we log the errors and make sure we retry if let Err(cycle_error) = tfhe_worker_cycle(&args).await { + WORKER_ERRORS_COUNTER.inc(); eprintln!( "Error in background worker, retrying shortly: {:?}", cycle_error @@ -49,9 +67,11 @@ async fn tfhe_worker_cycle( if !immedially_poll_more_work { tokio::select! { _ = listener.try_recv() => { + WORK_ITEMS_NOTIFICATIONS_COUNTER.inc(); println!("Received work_available notification from postgres"); }, _ = tokio::time::sleep(tokio::time::Duration::from_millis(5000)) => { + WORK_ITEMS_POLL_COUNTER.inc(); println!("Polling the database for more work on timer"); }, }; @@ -93,6 +113,7 @@ async fn tfhe_worker_cycle( continue; } + WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64); println!("Processing {} work items", the_work.len()); // make sure we process each tenant sequentially not to @@ -248,8 +269,10 @@ async fn tfhe_worker_cycle( ) .execute(trx.as_mut()) .await?; + WORK_ITEMS_PROCESSED_COUNTER.inc(); } Err((err, tenant_id, output_handle)) => { + WORKER_ERRORS_COUNTER.inc(); let _ = query!( " UPDATE computations