From aff063fc1f056e5cd4b0f7aa6f29d8402a7bb7c8 Mon Sep 17 00:00:00 2001 From: Paulo Bressan Date: Thu, 21 Mar 2024 18:23:04 -0300 Subject: [PATCH] DCU compute metrics (#20) --- Cargo.lock | 432 ++++++++++++++++------------------------ examples/manifest.yaml | 11 +- examples/setup | 6 +- operator/Cargo.toml | 7 +- operator/src/config.rs | 24 ++- operator/src/lib.rs | 15 +- operator/src/main.rs | 38 +--- operator/src/metrics.rs | 259 +++++++++++++++++++++++- proxy/README.md | 2 +- 9 files changed, 477 insertions(+), 317 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2629e46..fe286f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,185 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "actix-codec" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f7b0a21988c1bf877cf4759ef5ddaac04c1c9fe808c9142ecb78ba97d97a28a" -dependencies = [ - "bitflags 2.4.2", - "bytes", - "futures-core", - "futures-sink", - "memchr", - "pin-project-lite", - "tokio", - "tokio-util", - "tracing", -] - -[[package]] -name = "actix-http" -version = "3.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d223b13fd481fc0d1f83bb12659ae774d9e3601814c68a0bc539731698cca743" -dependencies = [ - "actix-codec", - "actix-rt", - "actix-service", - "actix-utils", - "ahash", - "base64", - "bitflags 2.4.2", - "brotli", - "bytes", - "bytestring", - "derive_more", - "encoding_rs", - "flate2", - "futures-core", - "h2 0.3.24", - "http 0.2.11", - "httparse", - "httpdate", - "itoa", - "language-tags", - "local-channel", - "mime", - "percent-encoding", - "pin-project-lite", - "rand", - "sha1", - "smallvec", - "tokio", - "tokio-util", - "tracing", - "zstd", -] - -[[package]] -name = "actix-macros" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" -dependencies = [ - "quote", - "syn 2.0.50", -] - -[[package]] -name = "actix-router" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d22475596539443685426b6bdadb926ad0ecaefdfc5fb05e5e3441f15463c511" -dependencies = [ - "bytestring", - "http 0.2.11", - "regex", - "serde", - "tracing", -] - -[[package]] -name = "actix-rt" -version = "2.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28f32d40287d3f402ae0028a9d54bef51af15c8769492826a69d28f81893151d" -dependencies = [ - "futures-core", - "tokio", -] - -[[package]] -name = "actix-server" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3eb13e7eef0423ea6eab0e59f6c72e7cb46d33691ad56a726b3cd07ddec2c2d4" -dependencies = [ - "actix-rt", - "actix-service", - "actix-utils", - "futures-core", - "futures-util", - "mio", - "socket2", - "tokio", - "tracing", -] - -[[package]] -name = "actix-service" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b894941f818cfdc7ccc4b9e60fa7e53b5042a2e8567270f9147d5591893373a" -dependencies = [ - "futures-core", - "paste", - "pin-project-lite", -] - -[[package]] -name = "actix-utils" -version = "3.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88a1dcdff1466e3c2488e1cb5c36a71822750ad43839937f85d2f4d9f8b705d8" -dependencies = [ - "local-waker", - "pin-project-lite", -] - -[[package]] -name = "actix-web" -version = "4.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a6556ddebb638c2358714d853257ed226ece6023ef9364f23f0c70737ea984" -dependencies = [ - "actix-codec", - "actix-http", - "actix-macros", - "actix-router", - "actix-rt", - "actix-server", - "actix-service", - "actix-utils", - "actix-web-codegen", - "ahash", - "bytes", - "bytestring", - "cfg-if", - "cookie", - "derive_more", - "encoding_rs", - "futures-core", - "futures-util", - "itoa", - "language-tags", - "log", - "mime", - "once_cell", - "pin-project-lite", - "regex", - "serde", - "serde_json", - "serde_urlencoded", - "smallvec", - "socket2", - "time", - "url", -] - -[[package]] -name = "actix-web-codegen" -version = "4.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1f50ebbb30eca122b188319a4398b3f7bb4a8cdf50ecfb73bfc6a3c3ce54f5" -dependencies = [ - "actix-router", - "proc-macro2", - "quote", - "syn 2.0.50", -] - [[package]] name = "addr2line" version = "0.21.0" @@ -438,15 +259,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" -[[package]] -name = "bytestring" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74d80203ea6b29df88012294f62733de21cfeab47f17b41af3a38bc30a03ee72" -dependencies = [ - "bytes", -] - [[package]] name = "cc" version = "1.0.86" @@ -470,8 +282,10 @@ checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-targets 0.52.0", ] @@ -499,23 +313,6 @@ dependencies = [ "cc", ] -[[package]] -name = "convert_case" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" - -[[package]] -name = "cookie" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb" -dependencies = [ - "percent-encoding", - "time", - "version_check", -] - [[package]] name = "core-foundation" version = "0.9.4" @@ -664,19 +461,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "derive_more" -version = "0.99.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" -dependencies = [ - "convert_case", - "proc-macro2", - "quote", - "rustc_version", - "syn 1.0.109", -] - [[package]] name = "digest" version = "0.10.7" @@ -721,6 +505,22 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "fastrand" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" + [[package]] name = "filetime" version = "0.2.23" @@ -1036,6 +836,29 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +dependencies = [ + "bytes", + "futures-core", + "http 1.0.0", + "http-body 1.0.0", + "pin-project-lite", +] + [[package]] name = "http-range-header" version = "0.3.1" @@ -1066,7 +889,7 @@ dependencies = [ "futures-util", "h2 0.3.24", "http 0.2.11", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1078,6 +901,27 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.2", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -1086,7 +930,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.11", - "hyper", + "hyper 0.14.28", "log", "rustls", "rustls-native-certs", @@ -1100,12 +944,45 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.28", "pin-project-lite", "tokio", "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.28", + "native-tls", + "tokio", + "tokio-native-tls", +] + +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.2.0", + "pin-project-lite", + "socket2", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -1300,8 +1177,8 @@ dependencies = [ "futures", "home", "http 0.2.11", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-rustls", "hyper-timeout", "jsonpath-rust", @@ -1380,12 +1257,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "language-tags" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" - [[package]] name = "lazy_static" version = "1.4.0" @@ -1426,21 +1297,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" [[package]] -name = "local-channel" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6cbc85e69b8df4b8bb8b89ec634e7189099cea8927a276b7384ce5488e53ec8" -dependencies = [ - "futures-core", - "futures-sink", - "local-waker", -] - -[[package]] -name = "local-waker" -version = "0.1.4" +name = "linux-raw-sys" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d873d7c67ce09b42110d801813efbc9364414e356be9935700d368351657487" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" @@ -1515,6 +1375,24 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nix" version = "0.24.3" @@ -1654,15 +1532,20 @@ dependencies = [ name = "operator" version = "0.1.0" dependencies = [ - "actix-web", "argon2", "bech32", + "chrono", "dotenv", "futures", + "http-body-util", + "hyper 1.2.0", + "hyper-util", "k8s-openapi", "kube", "lazy_static", "prometheus", + "regex", + "reqwest", "schemars", "serde", "serde_json", @@ -1722,12 +1605,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "paste" -version = "1.0.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" - [[package]] name = "pem" version = "3.0.3" @@ -2144,13 +2021,15 @@ dependencies = [ "futures-util", "h2 0.3.24", "http 0.2.11", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-rustls", + "hyper-tls", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -2162,6 +2041,7 @@ dependencies = [ "sync_wrapper", "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls", "tower-service", "url", @@ -2212,6 +2092,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.38.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" +dependencies = [ + "bitflags 2.4.2", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "rustls" version = "0.21.10" @@ -2547,17 +2440,6 @@ dependencies = [ "rust_decimal", ] -[[package]] -name = "sha1" -version = "0.10.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest", -] - [[package]] name = "sha2" version = "0.10.8" @@ -2709,6 +2591,18 @@ dependencies = [ "libc", ] +[[package]] +name = "tempfile" +version = "3.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +dependencies = [ + "cfg-if", + "fastrand", + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -2834,6 +2728,16 @@ dependencies = [ "syn 2.0.50", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-openssl" version = "0.6.4" @@ -2958,7 +2862,7 @@ dependencies = [ "futures-core", "futures-util", "http 0.2.11", - "http-body", + "http-body 0.4.6", "http-range-header", "mime", "pin-project-lite", diff --git a/examples/manifest.yaml b/examples/manifest.yaml index 39ec49a..b7aa0e6 100644 --- a/examples/manifest.yaml +++ b/examples/manifest.yaml @@ -92,6 +92,9 @@ data: - job_name: proxy static_configs: - targets: ["proxy:9187"] + - job_name: operator + static_configs: + - targets: ["operator:9187"] kind: ConfigMap metadata: name: prometheus-vol @@ -310,6 +313,12 @@ spec: env: - name: ADDR value: "0.0.0.0:9187" + - name: METRICS_DELAY + value: "60" + - name: PROMETHEUS_URL + value: "http://prometheus/api/v1" + - name: DCU_PER_PACKAGE + value: "preview=5,preprod=5,mainnet=5" --- apiVersion: v1 kind: Service @@ -324,7 +333,7 @@ spec: type: ClusterIP ports: - name: operator - port: 80 + port: 9187 targetPort: 9187 protocol: TCP --- diff --git a/examples/setup b/examples/setup index 26c5508..964fb44 100755 --- a/examples/setup +++ b/examples/setup @@ -1,6 +1,6 @@ #!/bin/bash -~/go/bin/kind create cluster +kind create cluster echo "Building operator CRD" cargo run --bin=crdgen > env-crd.yaml --manifest-path ../operator/Cargo.toml @@ -14,9 +14,9 @@ echo "Building operator image" docker build -t operator:1.0 -f ../docker/dockerfile.operator ../ echo "Loading proxy image" -~/go/bin/kind load docker-image proxy:1.0 +kind load docker-image proxy:1.0 echo "Loading operator image" -~/go/bin/kind load docker-image operator:1.0 +kind load docker-image operator:1.0 kubectl apply -f manifest.yaml \ No newline at end of file diff --git a/operator/Cargo.toml b/operator/Cargo.toml index deedbe3..c7de677 100644 --- a/operator/Cargo.toml +++ b/operator/Cargo.toml @@ -7,7 +7,6 @@ default-run = "controller" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -actix-web = "4.4.1" argon2 = "0.5.3" bech32 = "0.11.0" dotenv = "0.15.0" @@ -24,6 +23,12 @@ thiserror = "1.0.52" tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] } tracing = "0.1.40" tracing-subscriber = "0.3.18" +http-body-util = "0.1.0" +hyper = { version = "1.1.0", features = ["full"] } +hyper-util = { version = "0.1.3", features = ["full"] } +reqwest = { version = "0.11.23", features = ["json"] } +regex = "1.10.2" +chrono = "0.4.31" [[bin]] name="controller" diff --git a/operator/src/config.rs b/operator/src/config.rs index d520b19..8d95385 100644 --- a/operator/src/config.rs +++ b/operator/src/config.rs @@ -1,5 +1,5 @@ use lazy_static::lazy_static; -use std::env; +use std::{collections::HashMap, env, time::Duration}; lazy_static! { static ref CONTROLLER_CONFIG: Config = Config::from_env(); @@ -14,6 +14,9 @@ pub struct Config { pub dns_zone: String, pub extension_name: String, pub api_key_salt: String, + pub metrics_delay: Duration, + pub prometheus_url: String, + pub dcu_per_package: HashMap, } impl Config { @@ -22,6 +25,25 @@ impl Config { dns_zone: env::var("DNS_ZONE").unwrap_or("demeter.run".into()), extension_name: env::var("EXTENSION_NAME").unwrap_or("node-m1".into()), api_key_salt: env::var("API_KEY_SALT").unwrap_or("cardano-node-salt".into()), + metrics_delay: Duration::from_secs( + std::env::var("METRICS_DELAY") + .expect("METRICS_DELAY must be set") + .parse::() + .expect("METRICS_DELAY must be a number"), + ), + prometheus_url: env::var("PROMETHEUS_URL").expect("PROMETHEUS_URL must be set"), + dcu_per_package: env::var("DCU_PER_PACKAGE") + .expect("DCU_PER_PACKAGE must be set") + .split(',') + .map(|pair| { + let parts: Vec<&str> = pair.split('=').collect(); + let dcu = parts[1] + .parse::() + .expect("DCU_PER_PACKAGE must be NETWORK=NUMBER"); + + (parts[0].into(), dcu) + }) + .collect(), } } } diff --git a/operator/src/lib.rs b/operator/src/lib.rs index 8946b5a..4d9a482 100644 --- a/operator/src/lib.rs +++ b/operator/src/lib.rs @@ -18,6 +18,12 @@ pub enum Error { #[error("Bech32 Error: {0}")] Bech32Error(String), + + #[error("Http Request error: {0}")] + HttpError(String), + + #[error("Config Error: {0}")] + ConfigError(String), } impl Error { pub fn metric_label(&self) -> String { @@ -50,7 +56,7 @@ impl From for Error { } } -#[derive(Clone, Default)] +#[derive(Clone)] pub struct State { registry: Registry, pub metrics: Metrics, @@ -66,6 +72,11 @@ impl State { self.registry.gather() } } +impl Default for State { + fn default() -> Self { + Self::new() + } +} #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] pub enum Network { @@ -89,8 +100,8 @@ impl Display for Network { } } -pub use kube; pub use k8s_openapi; +pub use kube; pub type Result = std::result::Result; diff --git a/operator/src/main.rs b/operator/src/main.rs index b6b6c5d..37f8c94 100644 --- a/operator/src/main.rs +++ b/operator/src/main.rs @@ -1,27 +1,9 @@ -use actix_web::{ - get, middleware, web::Data, App, HttpRequest, HttpResponse, HttpServer, Responder, -}; use dotenv::dotenv; -use prometheus::{Encoder, TextEncoder}; use std::{io, sync::Arc}; -use tracing::{info, Level}; +use tracing::Level; use operator::{controller, metrics as metrics_collector, State}; -#[get("/metrics")] -async fn metrics(c: Data>, _req: HttpRequest) -> impl Responder { - let metrics = c.metrics_collected(); - let encoder = TextEncoder::new(); - let mut buffer = vec![]; - encoder.encode(&metrics, &mut buffer).unwrap(); - HttpResponse::Ok().body(buffer) -} - -#[get("/health")] -async fn health(_: HttpRequest) -> impl Responder { - HttpResponse::Ok().json("healthy") -} - #[tokio::main] async fn main() -> io::Result<()> { dotenv().ok(); @@ -30,22 +12,10 @@ async fn main() -> io::Result<()> { let state = Arc::new(State::default()); - let controller = controller::run(state.clone()); - let metrics_collector = metrics_collector::run_metrics_collector(state.clone()); - - let addr = std::env::var("ADDR").unwrap_or("0.0.0.0:8080".into()); - - let server = HttpServer::new(move || { - App::new() - .app_data(Data::new(state.clone())) - .wrap(middleware::Logger::default()) - .service(health) - .service(metrics) - }) - .bind(&addr)?; - info!({ addr }, "metrics server running"); + metrics_collector::run_metrics_collector(state.clone()); + metrics_collector::run_metrics_server(state.clone()); - tokio::join!(server.run(), controller, metrics_collector).0?; + controller::run(state.clone()).await; Ok(()) } diff --git a/operator/src/metrics.rs b/operator/src/metrics.rs index 5066946..806228e 100644 --- a/operator/src/metrics.rs +++ b/operator/src/metrics.rs @@ -1,34 +1,64 @@ -use std::{sync::Arc, time::Duration}; +use std::{net::SocketAddr, str::FromStr, sync::Arc}; -use kube::ResourceExt; -use prometheus::{opts, IntCounterVec, Registry}; -use tracing::info; +use chrono::Utc; +use http_body_util::{combinators::BoxBody, BodyExt, Full}; +use hyper::{body::Bytes, server::conn::http1, service::service_fn, Response}; +use hyper_util::rt::TokioIo; +use kube::{Resource, ResourceExt}; +use prometheus::{opts, Encoder, IntCounterVec, Registry, TextEncoder}; +use regex::Regex; +use serde::{Deserialize, Deserializer}; +use tokio::net::TcpListener; +use tracing::{error, info, instrument, warn}; -use crate::{CardanoNodePort, Error, State}; +use crate::{get_config, CardanoNodePort, Error, State}; #[derive(Clone)] pub struct Metrics { + pub dcu: IntCounterVec, pub reconcile_failures: IntCounterVec, + pub metrics_failures: IntCounterVec, } impl Default for Metrics { fn default() -> Self { + let dcu = IntCounterVec::new( + opts!("dmtr_consumed_dcus", "quantity of dcu consumed",), + &["project", "service", "service_type", "tenancy"], + ) + .unwrap(); + let reconcile_failures = IntCounterVec::new( opts!( - "crd_controller_reconciliation_errors_total", + "node_operator_crd_reconciliation_errors_total", "reconciliation errors", ), &["instance", "error"], ) .unwrap(); - Metrics { reconcile_failures } + let metrics_failures = IntCounterVec::new( + opts!( + "node_operator_metrics_errors_total", + "errors to calculation metrics", + ), + &["error"], + ) + .unwrap(); + + Metrics { + dcu, + reconcile_failures, + metrics_failures, + } } } impl Metrics { pub fn register(self, registry: &Registry) -> Result { registry.register(Box::new(self.reconcile_failures.clone()))?; + registry.register(Box::new(self.metrics_failures.clone()))?; + registry.register(Box::new(self.dcu.clone()))?; Ok(self) } @@ -38,13 +68,222 @@ impl Metrics { .with_label_values(&[crd.name_any().as_ref(), e.metric_label().as_ref()]) .inc() } + + pub fn metrics_failure(&self, e: &Error) { + self.metrics_failures + .with_label_values(&[e.metric_label().as_ref()]) + .inc() + } + + pub fn count_dcu_consumed(&self, project: &str, network: &str, dcu: f64) { + let service = format!("{}-{}", CardanoNodePort::kind(&()), network); + let service_type = format!( + "{}.{}", + CardanoNodePort::plural(&()), + CardanoNodePort::group(&()) + ); + let tenancy = "proxy"; + + let dcu: u64 = dcu.ceil() as u64; + + self.dcu + .with_label_values(&[project, &service, &service_type, tenancy]) + .inc_by(dcu); + } +} + +async fn api_get_metrics( + state: Arc, +) -> Result>, hyper::Error> { + let metrics = state.metrics_collected(); + + let encoder = TextEncoder::new(); + let mut buffer = vec![]; + encoder.encode(&metrics, &mut buffer).unwrap(); + + let res = Response::builder() + .body( + Full::new(buffer.into()) + .map_err(|never| match never {}) + .boxed(), + ) + .unwrap(); + Ok(res) +} + +#[instrument("metrics collector server", skip_all)] +pub fn run_metrics_server(state: Arc) { + tokio::spawn(async move { + let addr = std::env::var("ADDR").unwrap_or("0.0.0.0:8080".into()); + let addr_result = SocketAddr::from_str(&addr); + if let Err(err) = addr_result { + error!(error = err.to_string(), "invalid prometheus addr"); + std::process::exit(1); + } + let addr = addr_result.unwrap(); + + let listener_result = TcpListener::bind(addr).await; + if let Err(err) = listener_result { + error!( + error = err.to_string(), + "fail to bind tcp prometheus server listener" + ); + std::process::exit(1); + } + let listener = listener_result.unwrap(); + + info!(addr = addr.to_string(), "metrics listening"); + + loop { + let state = state.clone(); + + let accept_result = listener.accept().await; + if let Err(err) = accept_result { + error!(error = err.to_string(), "accept client prometheus server"); + continue; + } + let (stream, _) = accept_result.unwrap(); + + let io = TokioIo::new(stream); + + tokio::task::spawn(async move { + let service = service_fn(move |_| api_get_metrics(state.clone())); + + if let Err(err) = http1::Builder::new().serve_connection(io, service).await { + error!(error = err.to_string(), "failed metrics server connection"); + } + }); + } + }); } -pub async fn run_metrics_collector(_state: Arc) { - tokio::spawn(async { +#[instrument("metrics collector run", skip_all)] +pub fn run_metrics_collector(state: Arc) { + tokio::spawn(async move { info!("collecting metrics running"); + + let config = get_config(); + let client = reqwest::Client::builder().build().unwrap(); + let project_regex = Regex::new(r"prj-(.+)\..+").unwrap(); + let network_regex = Regex::new(r"node-([\w]+)-.+").unwrap(); + let mut last_execution = Utc::now(); + loop { - tokio::time::sleep(Duration::from_secs(6)).await; + tokio::time::sleep(config.metrics_delay).await; + + let end = Utc::now(); + let start = (end - last_execution).num_seconds(); + + last_execution = end; + + let query = format!( + "sum by (consumer, exported_instance) (increase(node_proxy_total_packages_bytes[{start}s] @ {}))", + end.timestamp_millis() / 1000 + ); + + let result = client + .get(format!("{}/query?query={query}", config.prometheus_url)) + .send() + .await; + + if let Err(err) = result { + error!(error = err.to_string(), "error to make prometheus request"); + state + .metrics + .metrics_failure(&Error::HttpError(err.to_string())); + continue; + } + + let response = result.unwrap(); + let status = response.status(); + if status.is_client_error() || status.is_server_error() { + error!(status = status.to_string(), "request status code fail"); + state.metrics.metrics_failure(&Error::HttpError(format!( + "Prometheus request error. Status: {} Query: {}", + status, query + ))); + continue; + } + + let response = response.json::().await.unwrap(); + + for result in response.data.result { + if result.value == 0.0 + || result.metric.consumer.is_none() + || result.metric.exported_instance.is_none() + { + continue; + } + + let consumer = result.metric.consumer.unwrap(); + let project_captures = project_regex.captures(&consumer); + if project_captures.is_none() { + warn!(consumer, "invalid project to the regex"); + continue; + } + let project_captures = project_captures.unwrap(); + let project = project_captures.get(1).unwrap().as_str(); + + let instance = result.metric.exported_instance.unwrap(); + let network_captures = network_regex.captures(&instance); + if network_captures.is_none() { + warn!(instance, "invalid network to the regex"); + continue; + } + let network_captures = network_captures.unwrap(); + let network = network_captures.get(1).unwrap().as_str(); + + let dcu_per_package = config.dcu_per_package.get(network); + if dcu_per_package.is_none() { + let error = Error::ConfigError(format!( + "dcu_per_package not configured to {} network", + network + )); + error!(error = error.to_string()); + state.metrics.metrics_failure(&error); + continue; + } + let dcu_per_package = dcu_per_package.unwrap(); + + let dcu = result.value * dcu_per_package; + state.metrics.count_dcu_consumed(project, network, dcu); + } } }); } + +#[derive(Debug, Deserialize)] +struct PrometheusDataResultMetric { + consumer: Option, + exported_instance: Option, +} + +#[derive(Debug, Deserialize)] +struct PrometheusDataResult { + metric: PrometheusDataResultMetric, + #[serde(deserialize_with = "deserialize_value")] + value: f64, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct PrometheusData { + result: Vec, +} + +#[derive(Debug, Deserialize)] +struct PrometheusResponse { + data: PrometheusData, +} + +fn deserialize_value<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let value: Vec = Deserialize::deserialize(deserializer)?; + Ok(value.into_iter().as_slice()[1] + .as_str() + .unwrap() + .parse::() + .unwrap()) +} diff --git a/proxy/README.md b/proxy/README.md index ca04153..03246a4 100644 --- a/proxy/README.md +++ b/proxy/README.md @@ -16,7 +16,7 @@ This proxy will allow Node to be accessed externally. | PROXY_TIERS_PATH | path of tiers toml file | ## Rate limit -To define rate limits, it's necessary to create a file with the limiters available that the ports can use. The limit of each tier can be configured using `s = second`, `m = minute`, `h = hour` and `d = day` eg: `5s` bucket of 5 seconds. The limiter will limit packages to 1024 bytes each. +To define rate limits, it's necessary to create a file with the limiters available that the ports can use. The limit of each tier can be configured using `s = second`, `m = minute`, `h = hour` and `d = day` eg: `5s` bucket of 5 seconds. The limiter will limit packages to `1024` bytes each. ```toml [[tiers]]