diff --git a/.gitignore b/.gitignore index e6aabb4..fd06a2d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,8 @@ /target -proxy.socket \ No newline at end of file +proxy.socket +iognode +cert +docker-compose.yml +.env* +env-crd.yaml +tiers.toml \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 3d6e3dd..2629e46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -550,6 +550,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -712,6 +721,18 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "filetime" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "windows-sys 0.52.0", +] + [[package]] name = "flate2" version = "1.0.28" @@ -753,6 +774,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.30" @@ -1135,6 +1165,26 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.12" @@ -1204,6 +1254,26 @@ dependencies = [ "serde_json", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "kube" version = "0.87.2" @@ -1322,6 +1392,17 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "leaky-bucket" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eb491abd89e9794d50f93c8db610a29509123e3fbbc9c8c67a528e9391cd853" +dependencies = [ + "parking_lot", + "tokio", + "tracing", +] + [[package]] name = "libc" version = "0.2.153" @@ -1446,6 +1527,25 @@ dependencies = [ "memoffset", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.4.2", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1796,6 +1896,15 @@ dependencies = [ "pingora-error", ] +[[package]] +name = "pingora-limits" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43e5e9ff9d54e951dfb0296946e63369b0778cb8534f0a7d1e9079001f1fe66d" +dependencies = [ + "ahash", +] + [[package]] name = "pingora-openssl" version = "0.1.0" @@ -1930,11 +2039,17 @@ dependencies = [ "async-trait", "dotenv", "futures-util", + "leaky-bucket", + "notify", "operator", "pingora", + "pingora-limits", "prometheus", "regex", + "serde", + "serde_json", "tokio", + "toml", "tracing", "tracing-subscriber", ] @@ -2146,6 +2261,15 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.23" @@ -2366,6 +2490,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2762,6 +2895,40 @@ dependencies = [ "tracing", ] +[[package]] +name = "toml" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a9aad4a3066010876e8dcf5a8a06e70a558751117a145c6ce2b82c2e2054290" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c1b5fd4128cc8d3e0cb74d4ed9a9cc7c7284becd4df68f5f940e1ad123606f6" +dependencies = [ + "indexmap 2.2.3", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "tower" version = "0.4.13" @@ -3006,6 +3173,16 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -3119,6 +3296,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -3266,6 +3452,15 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "winnow" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dffa400e67ed5a4dd237983829e66475f0a4a26938c4b04c21baede6262215b8" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.50.0" diff --git a/bootstrap/crds/main.tf b/bootstrap/crds/main.tf index a3364f1..b074372 100644 --- a/bootstrap/crds/main.tf +++ b/bootstrap/crds/main.tf @@ -28,6 +28,11 @@ resource "kubernetes_manifest" "customresourcedefinition_cardanonodeports_demete "name" = "Version" "type" = "string" }, + { + "jsonPath" = ".spec.throughputTier" + "name" = "Throughput Tier" + "type" = "string" + }, { "jsonPath" = ".status.authenticatedEndpoint" "name" = "Authenticated Endpoint" @@ -38,11 +43,6 @@ resource "kubernetes_manifest" "customresourcedefinition_cardanonodeports_demete "name" = "Auth Token" "type" = "string" }, - { - "jsonPath" = ".status.throughputTier" - "name" = "Throughput Tier" - "type" = "string" - }, ] "name" = "v1alpha1" "schema" = { diff --git a/proxy/examples/manifest.yaml b/examples/manifest.yaml similarity index 92% rename from proxy/examples/manifest.yaml rename to examples/manifest.yaml index 69c1d67..39ec49a 100644 --- a/proxy/examples/manifest.yaml +++ b/examples/manifest.yaml @@ -10,17 +10,12 @@ apiVersion: rbac.authorization.k8s.io/v1 metadata: name: kube-rs rules: - - apiGroups: ["demeter.run"] - resources: - [ - "cardanonodeports", - "cardanonodeports/status", - "cardanonodeports/finalizers", - ] - verbs: ["get", "list", "watch", "patch", "update"] - - apiGroups: ["events.k8s.io"] - resources: ["events"] - verbs: ["create"] + - apiGroups: ["*"] + resources: ["*"] + verbs: ["*"] + - apiGroups: ["*"] + resources: ["*"] + verbs: ["*"] --- # Scoped service account apiVersion: v1 @@ -203,9 +198,21 @@ data: 1+pvxRA1z1SjK0UtkVc280pRwNZjue/GtNnc0KgkqN3guJ5WeDumBxmStJ5KGNYG T4jj2oxkaijVhOEOzUkq4w== -----END PRIVATE KEY----- + tiers.toml: | + [[tiers]] + name = "0" + [[tiers.rates]] + interval = "1s" + limit = 10 + + [[tiers]] + name = "1" + [[tiers.rates]] + interval = "1s" + limit = 100 kind: ConfigMap metadata: - name: proxy-vol + name: proxy-config namespace: prj-mainnet-test --- apiVersion: apps/v1 @@ -233,24 +240,28 @@ spec: env: - name: PROXY_ADDR value: "0.0.0.0:80" + - name: PROXY_NAMESPACE + value: "prj-mainnet-test" - name: PROMETHEUS_ADDR value: "0.0.0.0:9187" + - name: PROXY_TIERS_PATH + value: "/tcp-proxy/tiers.toml" - name: NODE_PORT value: "80" - name: NODE_DNS value: "prj-mainnet-test.svc.cluster.local" - name: SSL_CRT_PATH - value: "/certs/localhost.crt" + value: "/tcp-proxy/localhost.crt" - name: SSL_KEY_PATH - value: "/certs/localhost.key" + value: "/tcp-proxy/localhost.key" volumeMounts: - - name: certs - mountPath: /certs + - name: proxy-vol + mountPath: /tcp-proxy volumes: - - name: certs + - name: proxy-vol configMap: - name: proxy-vol + name: proxy-config --- apiVersion: v1 kind: Service diff --git a/proxy/examples/setup b/examples/setup similarity index 61% rename from proxy/examples/setup rename to examples/setup index 580f634..26c5508 100755 --- a/proxy/examples/setup +++ b/examples/setup @@ -3,15 +3,15 @@ ~/go/bin/kind create cluster echo "Building operator CRD" -cargo run --bin=crdgen > env-crd.yaml --manifest-path ../../operator/Cargo.toml +cargo run --bin=crdgen > env-crd.yaml --manifest-path ../operator/Cargo.toml kubectl apply -f env-crd.yaml echo "Building proxy image" -docker build -t proxy:1.0 -f ../../docker/dockerfile.proxy ../../ +docker build -t proxy:1.0 -f ../docker/dockerfile.proxy ../ echo "Building operator image" -docker build -t operator:1.0 -f ../../docker/dockerfile.operator ../../ +docker build -t operator:1.0 -f ../docker/dockerfile.operator ../ echo "Loading proxy image" ~/go/bin/kind load docker-image proxy:1.0 diff --git a/operator/.gitignore b/operator/.gitignore deleted file mode 100644 index ea8c4bf..0000000 --- a/operator/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target diff --git a/operator/src/controller.rs b/operator/src/controller.rs index 91f3cb5..c32dc5a 100644 --- a/operator/src/controller.rs +++ b/operator/src/controller.rs @@ -24,9 +24,9 @@ pub static CARDANO_NODE_PORT_FINALIZER: &str = "cardanonodeports.demeter.run"; #[kube(printcolumn = r#" {"name": "Network", "jsonPath": ".spec.network", "type": "string"}, {"name": "Version", "jsonPath": ".spec.version", "type": "string"}, + {"name": "Throughput Tier", "jsonPath": ".spec.throughputTier", "type": "string"}, {"name": "Authenticated Endpoint", "jsonPath": ".status.authenticatedEndpoint", "type": "string"}, - {"name": "Auth Token", "jsonPath": ".status.authToken", "type": "string"}, - {"name": "Throughput Tier", "jsonPath": ".status.throughputTier", "type": "string"} + {"name": "Auth Token", "jsonPath": ".status.authToken", "type": "string"} "#)] #[serde(rename_all = "camelCase")] pub struct CardanoNodePortSpec { diff --git a/operator/src/lib.rs b/operator/src/lib.rs index c8c6905..8946b5a 100644 --- a/operator/src/lib.rs +++ b/operator/src/lib.rs @@ -90,6 +90,7 @@ impl Display for Network { } pub use kube; +pub use k8s_openapi; pub type Result = std::result::Result; diff --git a/operator/yaml/crd.yaml b/operator/yaml/crd.yaml index 6479990..5afa16e 100644 --- a/operator/yaml/crd.yaml +++ b/operator/yaml/crd.yaml @@ -19,15 +19,15 @@ spec: - jsonPath: .spec.version name: Version type: string + - jsonPath: .spec.throughputTier + name: Throughput Tier + type: string - jsonPath: .status.authenticatedEndpoint name: Authenticated Endpoint type: string - jsonPath: .status.authToken name: Auth Token type: string - - jsonPath: .status.throughputTier - name: Throughput Tier - type: string name: v1alpha1 schema: openAPIV3Schema: diff --git a/operator/yaml/port.yaml b/operator/yaml/port.yaml index 32a8f1b..ffc646c 100644 --- a/operator/yaml/port.yaml +++ b/operator/yaml/port.yaml @@ -11,4 +11,14 @@ metadata: spec: network: "preview" version: "v1" - throughputTier: "1" + throughputTier: "0" +--- +apiVersion: demeter.run/v1alpha1 +kind: CardanoNodePort +metadata: + name: mainnet-user-2 + namespace: prj-mainnet-test +spec: + network: "preview" + version: "v1" + throughputTier: "1" \ No newline at end of file diff --git a/proxy/.gitignore b/proxy/.gitignore deleted file mode 100644 index 99a1553..0000000 --- a/proxy/.gitignore +++ /dev/null @@ -1,6 +0,0 @@ -/target -iognode -cert -docker-compose.yml -.env* -env-crd.yaml \ No newline at end of file diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index fbfe0e6..88595c6 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -14,5 +14,11 @@ tracing = "0.1.40" tracing-subscriber = "0.3.18" futures-util = "0.3.30" pingora = "0.1.0" +pingora-limits = "0.1.0" prometheus = "0.13.3" async-trait = "0.1.77" +leaky-bucket = "1.0.1" +serde = { version = "1.0.197", features = ["derive"] } +serde_json = "1.0.114" +toml = "0.8.10" +notify = "6.1.1" diff --git a/proxy/README.md b/proxy/README.md new file mode 100644 index 0000000..ca04153 --- /dev/null +++ b/proxy/README.md @@ -0,0 +1,67 @@ +# Node Proxy + +This proxy will allow Node to be accessed externally. + +## Environment + +| Key | Value | +| ---------------- | ----------------------- | +| PROXY_ADDR | 0.0.0.0:5000 | +| PROXY_NAMESPACE | | +| PROMETHEUS_ADDR | 0.0.0.0:9090 | +| SSL_CRT_PATH | /localhost.crt | +| SSL_KEY_PATH | /localhost.key | +| NODE_PORT | | +| NODE_DNS | internal k8s dns | +| PROXY_TIERS_PATH | path of tiers toml file | + +## Rate limit +To define rate limits, it's necessary to create a file with the limiters available that the ports can use. The limit of each tier can be configured using `s = second`, `m = minute`, `h = hour` and `d = day` eg: `5s` bucket of 5 seconds. The limiter will limit packages to 1024 bytes each. + +```toml +[[tiers]] +name = "tier0" +[[tiers.rates]] +interval = "1s" +limit = 1 +[[tiers.rates]] +interval = "1m" +limit = 10 +[[tiers.rates]] +interval = "1h" +limit = 100 +[[tiers.rates]] +interval = "1d" +limit = 1000 + +[[tiers]] +name = "tier1" +[[tiers.rates]] +interval = "5s" +limit = 10 +``` + +after configuring, the file path must be set at the env `PROXY_TIERS_PATH`. + + +## Commands + +To generate the CRD will need to execute `crdgen` + +```bash +cargo run --bin=crdgen +``` + +and execute the operator + +```bash +cargo run +``` + +## Metrics + +to collect metrics for Prometheus, an HTTP API will enable the route /metrics. + +``` +/metrics +``` diff --git a/proxy/src/auth.rs b/proxy/src/auth.rs index ae518e4..b37230b 100644 --- a/proxy/src/auth.rs +++ b/proxy/src/auth.rs @@ -14,17 +14,16 @@ use operator::{ CardanoNodePort, }; use pingora::{server::ShutdownWatch, services::background::BackgroundService}; -use tokio::{pin, sync::RwLock}; +use tokio::pin; use tracing::error; use crate::{Consumer, State}; -#[derive(Debug)] pub struct AuthBackgroundService { - state: Arc>, + state: Arc, } impl AuthBackgroundService { - pub fn new(state: Arc>) -> Self { + pub fn new(state: Arc) -> Self { Self { state } } @@ -43,17 +42,19 @@ impl AuthBackgroundService { if crd.status.is_some() { let network = crd.spec.network.to_string(); let version = crd.spec.version.clone(); - let auth_token = crd.status.as_ref().unwrap().auth_token.clone(); + let tier = crd.spec.throughput_tier.to_string(); + let key = crd.status.as_ref().unwrap().auth_token.clone(); let namespace = crd.metadata.namespace.as_ref().unwrap().clone(); let port_name = crd.name_any(); - let hash_key = format!("{}.{}.{}", network, version, auth_token); - let consumer = Consumer::new(namespace, port_name); + let hash_key = format!("{}.{}.{}", network, version, key); + let consumer = Consumer::new(namespace, port_name, tier, key); consumers.insert(hash_key, consumer); } } - self.state.write().await.consumers = consumers; + + *self.state.consumers.write().await = consumers; } } diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 5d774db..1a3a73c 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -1,9 +1,10 @@ -use std::env; +use std::{env, path::PathBuf}; #[derive(Debug, Clone)] pub struct Config { pub proxy_addr: String, pub proxy_namespace: String, + pub proxy_tiers_path: PathBuf, pub prometheus_addr: String, pub ssl_crt_path: String, pub ssl_key_path: String, @@ -15,6 +16,9 @@ impl Config { Self { proxy_addr: env::var("PROXY_ADDR").expect("PROXY_ADDR must be set"), proxy_namespace: env::var("PROXY_NAMESPACE").expect("PROXY_NAMESPACE must be set"), + proxy_tiers_path: env::var("PROXY_TIERS_PATH") + .map(|v| v.into()) + .expect("PROXY_TIERS_PATH must be set"), prometheus_addr: env::var("PROMETHEUS_ADDR").expect("PROMETHEUS_ADDR must be set"), ssl_crt_path: env::var("SSL_CRT_PATH").expect("SSL_CRT_PATH must be set"), ssl_key_path: env::var("SSL_KEY_PATH").expect("SSL_KEY_PATH must be set"), diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 46b5bed..ae23feb 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -1,14 +1,18 @@ -use std::{collections::HashMap, fmt::Display, sync::Arc}; +use std::{collections::HashMap, fmt::Display, sync::Arc, time::Duration}; use auth::AuthBackgroundService; use dotenv::dotenv; +use leaky_bucket::RateLimiter; use pingora::{ listeners::Listeners, server::{configuration::Opt, Server}, services::{background::background_service, listening::Service}, }; -use prometheus::{opts, register_int_counter_vec}; +use prometheus::{opts, register_int_counter_vec, register_int_gauge_vec}; use proxy::ProxyApp; +use regex::Regex; +use serde::{Deserialize, Deserializer}; +use tiers::TierBackgroundService; use tokio::sync::RwLock; use tracing::Level; @@ -17,6 +21,7 @@ use crate::config::Config; mod auth; mod config; mod proxy; +mod tiers; fn main() { dotenv().ok(); @@ -24,7 +29,7 @@ fn main() { tracing_subscriber::fmt().with_max_level(Level::INFO).init(); let config: Arc = Arc::default(); - let state: Arc> = Arc::default(); + let state: Arc = Arc::default(); let opt = Opt::default(); let mut server = Server::new(Some(opt)).unwrap(); @@ -36,6 +41,12 @@ fn main() { ); server.add_service(auth_background_service); + let tier_background_service = background_service( + "K8S Tier Service", + TierBackgroundService::new(state.clone(), config.clone()), + ); + server.add_service(tier_background_service); + let tls_proxy_service = Service::with_listeners( "TLS Proxy Service".to_string(), Listeners::tls( @@ -56,22 +67,22 @@ fn main() { server.run_forever(); } -#[derive(Debug, Clone, Default)] +#[derive(Default)] pub struct State { metrics: Metrics, - consumers: HashMap, + consumers: RwLock>, + limiter: RwLock>>>, + tiers: RwLock>, } impl State { pub fn new() -> Self { - let metrics = Metrics::new(); - let consumers = HashMap::new(); - - Self { metrics, consumers } + Self::default() } - pub fn get_consumer(&self, network: &str, version: &str, token: &str) -> Option { - let hash_key = format!("{}.{}.{}", network, version, token); - self.consumers.get(&hash_key).cloned() + pub async fn get_consumer(&self, network: &str, version: &str, key: &str) -> Option { + let consumers = self.consumers.read().await.clone(); + let hash_key = format!("{}.{}.{}", network, version, key); + consumers.get(&hash_key).cloned() } } @@ -79,12 +90,16 @@ impl State { pub struct Consumer { namespace: String, port_name: String, + tier: String, + key: String, } impl Consumer { - pub fn new(namespace: String, port_name: String) -> Self { + pub fn new(namespace: String, port_name: String, tier: String, key: String) -> Self { Self { namespace, port_name, + tier, + key, } } } @@ -94,12 +109,57 @@ impl Display for Consumer { } } +#[derive(Debug, Clone, Deserialize)] +pub struct Tier { + name: String, + rates: Vec, +} +#[derive(Debug, Clone, Deserialize)] +pub struct TierRate { + limit: usize, + #[serde(deserialize_with = "deserialize_duration")] + interval: Duration, +} +pub fn deserialize_duration<'de, D: Deserializer<'de>>( + deserializer: D, +) -> Result { + let value: String = Deserialize::deserialize(deserializer)?; + let regex = Regex::new(r"([\d]+)([\w])").unwrap(); + let captures = regex.captures(&value); + if captures.is_none() { + return Err(::custom( + "Invalid tier interval format", + )); + } + + let captures = captures.unwrap(); + let number = captures.get(1).unwrap().as_str().parse::().unwrap(); + let symbol = captures.get(2).unwrap().as_str(); + + match symbol { + "s" => Ok(Duration::from_secs(number)), + "m" => Ok(Duration::from_secs(number * 60)), + "h" => Ok(Duration::from_secs(number * 60 * 60)), + "d" => Ok(Duration::from_secs(number * 60 * 60 * 24)), + _ => Err(::custom( + "Invalid symbol tier interval", + )), + } +} + #[derive(Debug, Clone)] pub struct Metrics { total_packages_bytes: prometheus::IntCounterVec, + total_connections: prometheus::IntGaugeVec, } impl Metrics { pub fn new() -> Self { + let total_connections = register_int_gauge_vec!( + opts!("node_proxy_total_connections", "Total connections",), + &["consumer", "namespace", "instance"] + ) + .unwrap(); + let total_packages_bytes = register_int_counter_vec!( opts!("node_proxy_total_packages_bytes", "Total bytes transferred",), &["consumer", "namespace", "instance"] @@ -108,6 +168,7 @@ impl Metrics { Self { total_packages_bytes, + total_connections, } } @@ -124,6 +185,22 @@ impl Metrics { .with_label_values(&[consumer, namespace, instance]) .inc_by(value as u64) } + + pub fn inc_total_connections(&self, consumer: &Consumer, namespace: &str, instance: &str) { + let consumer = &consumer.to_string(); + + self.total_connections + .with_label_values(&[consumer, namespace, instance]) + .inc() + } + + pub fn dec_total_connections(&self, consumer: &Consumer, namespace: &str, instance: &str) { + let consumer = &consumer.to_string(); + + self.total_connections + .with_label_values(&[consumer, namespace, instance]) + .dec() + } } impl Default for Metrics { fn default() -> Self { diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 7eb5045..fa0bb11 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -1,35 +1,49 @@ -use std::{net::SocketAddr, sync::Arc}; - use async_trait::async_trait; +use futures_util::future::join_all; +use leaky_bucket::RateLimiter; use pingora::{ apps::ServerApp, connectors::TransportConnector, protocols::Stream, server::ShutdownWatch, - tls::ssl::NameType, upstreams::peer::BasicPeer, + tls::ssl::NameType, upstreams::peer::BasicPeer, Error, Result, }; use regex::Regex; +use std::{net::SocketAddr, sync::Arc}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::lookup_host, select, - sync::RwLock, }; -use tracing::error; +use tracing::{error, info}; -use crate::{config::Config, State}; +use crate::{config::Config, Consumer, State, Tier}; -pub struct ProxyApp { - client_connector: TransportConnector, - host_regex: Regex, - state: Arc>, - config: Arc, +struct Context { + consumer: Consumer, + namespace: String, + instance: String, +} +impl Context { + pub fn new(consumer: &Consumer, instance: &str, namespace: &str) -> Self { + Self { + consumer: consumer.clone(), + namespace: namespace.into(), + instance: instance.into(), + } + } } enum DuplexEvent { - DownstreamRead(usize), - UpstreamRead(usize), + ClientRead(usize), + InstanceRead(usize), } +pub struct ProxyApp { + client_connector: TransportConnector, + host_regex: Regex, + state: Arc, + config: Arc, +} impl ProxyApp { - pub fn new(config: Arc, state: Arc>) -> Self { + pub fn new(config: Arc, state: Arc) -> Self { ProxyApp { client_connector: TransportConnector::new(None), host_regex: Regex::new(r"(dmtr_[\w\d-]+)\.([\w]+)-([\w\d]+).+").unwrap(), @@ -37,18 +51,138 @@ impl ProxyApp { state, } } + + async fn duplex( + &self, + mut io_client: Stream, + mut io_instance: Stream, + state: Arc, + ctx: Context, + ) -> Result<()> { + state + .metrics + .inc_total_connections(&ctx.consumer, &ctx.namespace, &ctx.instance); + + let mut io_client_buf = [0; 1024]; + let mut io_instance_buf = [0; 1024]; + + loop { + let event: DuplexEvent; + + select! { + n = io_client.read(&mut io_client_buf) => { + match n { + Ok(b) => event = DuplexEvent::ClientRead(b), + Err(err) => { + error!(error = err.to_string(), "client read error"); + event = DuplexEvent::ClientRead(0); + }, + } + }, + n = io_instance.read(&mut io_instance_buf) => { + match n { + Ok(b) => event = DuplexEvent::InstanceRead(b), + Err(err) => { + error!(error = err.to_string(), "instance read error"); + event = DuplexEvent::InstanceRead(0); + }, + } + }, + } + + match event { + DuplexEvent::ClientRead(0) | DuplexEvent::InstanceRead(0) => { + info!("client disconnected"); + state.metrics.dec_total_connections( + &ctx.consumer, + &ctx.namespace, + &ctx.instance, + ); + return Ok(()); + } + DuplexEvent::ClientRead(bytes) => { + state.metrics.count_total_packages_bytes( + &ctx.consumer, + &ctx.namespace, + &ctx.instance, + bytes, + ); + + let _ = io_instance.write_all(&io_client_buf[0..bytes]).await; + let _ = io_instance.flush().await; + } + DuplexEvent::InstanceRead(bytes) => { + self.limiter(&ctx.consumer).await?; + + state.metrics.count_total_packages_bytes( + &ctx.consumer, + &ctx.namespace, + &ctx.instance, + bytes, + ); + + let _ = io_client.write_all(&io_instance_buf[0..bytes]).await; + let _ = io_client.flush().await; + } + } + } + } + + async fn has_limiter(&self, consumer: &Consumer) -> bool { + let rate_limiter_map = self.state.limiter.read().await; + rate_limiter_map.get(&consumer.key).is_some() + } + + async fn add_limiter(&self, consumer: &Consumer, tier: &Tier) { + let rates = tier + .rates + .iter() + .map(|r| { + Arc::new( + RateLimiter::builder() + .initial(r.limit) + .interval(r.interval) + .build(), + ) + }) + .collect(); + + self.state + .limiter + .write() + .await + .insert(consumer.key.clone(), rates); + } + + async fn limiter(&self, consumer: &Consumer) -> Result<()> { + let tiers = self.state.tiers.read().await.clone(); + let tier = tiers.get(&consumer.tier); + if tier.is_none() { + return Err(Error::new(pingora::ErrorType::AcceptError)); + } + let tier = tier.unwrap(); + + if !self.has_limiter(consumer).await { + self.add_limiter(consumer, tier).await; + } + + let rate_limiter_map = self.state.limiter.read().await.clone(); + let rates = rate_limiter_map.get(&consumer.key).unwrap(); + + join_all(rates.iter().map(|r| async { r.acquire_one().await })).await; + + Ok(()) + } } #[async_trait] impl ServerApp for ProxyApp { async fn process_new( self: &Arc, - mut io_server: Stream, + io_client: Stream, _shutdown: &ShutdownWatch, ) -> Option { - let state = self.state.read().await.clone(); - - let hostname = io_server.get_ssl()?.servername(NameType::HOST_NAME); + let hostname = io_client.get_ssl()?.servername(NameType::HOST_NAME); if hostname.is_none() { error!("hostname is not present in the certificate"); return None; @@ -62,17 +196,20 @@ impl ServerApp for ProxyApp { let captures = captures_result?; let token = captures.get(1)?.as_str().to_string(); + let network = captures.get(2)?.as_str().to_string(); let version = captures.get(3)?.as_str().to_string(); let namespace = self.config.proxy_namespace.clone(); - - let consumer = state.get_consumer(&network, &version, &token)?; + + let consumer = self.state.get_consumer(&network, &version, &token).await?; let instance = format!( "node-{network}-{version}.{}:{}", self.config.node_dns, self.config.node_port ); + let context = Context::new(&consumer, &instance, &namespace); + let lookup_result = lookup_host(&instance).await; if let Err(err) = lookup_result { error!(error = err.to_string(), "fail to lookup ip"); @@ -83,63 +220,21 @@ impl ServerApp for ProxyApp { let proxy_to = BasicPeer::new(&node_addr.to_string()); - let io_client = self.client_connector.new_stream(&proxy_to).await; - - match io_client { - Ok(mut io_client) => { - let mut upstream_buf = [0; 1024]; - let mut downstream_buf = [0; 1024]; - - loop { - let downstream_read = io_server.read(&mut upstream_buf); - let upstream_read = io_client.read(&mut downstream_buf); - let event: DuplexEvent; - - select! { - n = downstream_read => { - if let Err(err) = &n { - error!(error=err.to_string(), "Downstream error"); - return None; - } - event = DuplexEvent::DownstreamRead(n.unwrap()) - }, - n = upstream_read => { - if let Err(err) = &n { - error!(error=err.to_string(), "Upstream error"); - return None; - } - event = DuplexEvent::UpstreamRead(n.unwrap()) - }, - } + let io_instance = self.client_connector.new_stream(&proxy_to).await; - match event { - DuplexEvent::DownstreamRead(0) => { - return None; - } - DuplexEvent::UpstreamRead(0) => { - return None; - } - DuplexEvent::DownstreamRead(n) => { - state - .metrics - .count_total_packages_bytes(&consumer, &namespace, &instance, n); - - io_client.write_all(&upstream_buf[0..n]).await.unwrap(); - io_client.flush().await.unwrap(); - } - DuplexEvent::UpstreamRead(n) => { - state - .metrics - .count_total_packages_bytes(&consumer, &namespace, &instance, n); - - io_server.write_all(&downstream_buf[0..n]).await.unwrap(); - io_server.flush().await.unwrap(); - } - } + match io_instance { + Ok(io_instance) => { + if let Err(err) = self + .duplex(io_client, io_instance, self.state.clone(), context) + .await + { + error!(error = err.to_string(), "proxy duplex error"); } + + None } Err(e) => { - error!("failed to create client session: {}", e); + error!("failed to create instance session: {}", e); None } } diff --git a/proxy/src/tiers.rs b/proxy/src/tiers.rs new file mode 100644 index 0000000..23d865b --- /dev/null +++ b/proxy/src/tiers.rs @@ -0,0 +1,99 @@ +use std::error::Error; +use std::{fs, sync::Arc}; + +use async_trait::async_trait; +use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher}; +use pingora::{server::ShutdownWatch, services::background::BackgroundService}; +use serde_json::Value; +use tokio::runtime::{Handle, Runtime}; +use tracing::{error, info, warn}; + +use crate::{config::Config, State, Tier}; + +pub struct TierBackgroundService { + state: Arc, + config: Arc, +} +impl TierBackgroundService { + pub fn new(state: Arc, config: Arc) -> Self { + Self { state, config } + } + + async fn update_tiers(&self) -> Result<(), Box> { + let contents = fs::read_to_string(&self.config.proxy_tiers_path)?; + + let value: Value = toml::from_str(&contents)?; + let tiers_value: Option<&Value> = value.get("tiers"); + if tiers_value.is_none() { + warn!("tiers not configured on toml"); + return Ok(()); + } + + let tiers = serde_json::from_value::>(tiers_value.unwrap().to_owned())?; + + *self.state.tiers.write().await = tiers + .into_iter() + .map(|tier| (tier.name.clone(), tier)) + .collect(); + + self.state.limiter.write().await.clear(); + + Ok(()) + } +} + +fn runtime_handle() -> Handle { + match Handle::try_current() { + Ok(h) => h, + Err(_) => { + let rt = Runtime::new().unwrap(); + rt.handle().clone() + } + } +} + +#[async_trait] +impl BackgroundService for TierBackgroundService { + async fn start(&self, mut _shutdown: ShutdownWatch) { + if let Err(err) = self.update_tiers().await { + error!(error = err.to_string(), "error to update tiers"); + return; + } + + let (tx, mut rx) = tokio::sync::mpsc::channel::(1); + + let watcher_result = RecommendedWatcher::new( + move |result: Result| { + let event = result.unwrap(); + if event.kind.is_modify() { + runtime_handle().block_on(async { + tx.send(event).await.unwrap(); + }); + } + }, + notify::Config::default(), + ); + if let Err(err) = watcher_result { + error!(error = err.to_string(), "error to watcher tier"); + return; + } + + let mut watcher = watcher_result.unwrap(); + let watcher_result = watcher.watch(&self.config.proxy_tiers_path, RecursiveMode::Recursive); + if let Err(err) = watcher_result { + error!(error = err.to_string(), "error to watcher tier"); + return; + } + + loop { + if rx.recv().await.is_some() { + if let Err(err) = self.update_tiers().await { + error!(error = err.to_string(), "error to update tiers"); + continue; + } + + info!("tiers modified"); + } + } + } +}