Skip to content

Commit

Permalink
feat(proxy): added watcher to tiers on config map
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Mar 8, 2024
1 parent 39877dc commit 1e2c263
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 1 deletion.
55 changes: 55 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ pingora-limits = "0.1.0"
prometheus = "0.13.3"
async-trait = "0.1.77"
leaky-bucket = "1.0.1"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.114"
toml = "0.8.10"
4 changes: 4 additions & 0 deletions proxy/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::env;
pub struct Config {
pub proxy_addr: String,
pub proxy_namespace: String,
pub proxy_tiers_name: String,
pub proxy_tiers_key: String,
pub prometheus_addr: String,
pub ssl_crt_path: String,
pub ssl_key_path: String,
Expand All @@ -15,6 +17,8 @@ impl Config {
Self {
proxy_addr: env::var("PROXY_ADDR").expect("PROXY_ADDR must be set"),
proxy_namespace: env::var("PROXY_NAMESPACE").expect("PROXY_NAMESPACE must be set"),
proxy_tiers_name: env::var("PROXY_TIERS_NAME").expect("PROXY_TIERS_NAME must be set"),
proxy_tiers_key: env::var("PROXY_TIERS_KEY").expect("PROXY_TIERS_KEY must be set"),
prometheus_addr: env::var("PROMETHEUS_ADDR").expect("PROMETHEUS_ADDR must be set"),
ssl_crt_path: env::var("SSL_CRT_PATH").expect("SSL_CRT_PATH must be set"),
ssl_key_path: env::var("SSL_KEY_PATH").expect("SSL_KEY_PATH must be set"),
Expand Down
21 changes: 20 additions & 1 deletion proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use pingora::{
};
use prometheus::{opts, register_int_counter_vec, register_int_gauge_vec};
use proxy::ProxyApp;
use serde::Deserialize;
use tiers::TierBackgroundService;
use tokio::sync::{Mutex, RwLock};
use tracing::Level;

Expand All @@ -18,6 +20,7 @@ use crate::config::Config;
mod auth;
mod config;
mod proxy;
mod tiers;

fn main() {
dotenv().ok();
Expand All @@ -37,6 +40,12 @@ fn main() {
);
server.add_service(auth_background_service);

let tier_background_service = background_service(
"K8S Tier Service",
TierBackgroundService::new(state.clone(), config.clone()),
);
server.add_service(tier_background_service);

let tls_proxy_service = Service::with_listeners(
"TLS Proxy Service".to_string(),
Listeners::tls(
Expand All @@ -61,18 +70,21 @@ fn main() {
pub struct State {
metrics: Metrics,
consumers: HashMap<String, Consumer>,
limiter: Arc<Mutex<HashMap<String, RateLimiter>>>
limiter: Arc<Mutex<HashMap<String, RateLimiter>>>,
tiers: HashMap<String, Tier>,
}
impl State {
pub fn new() -> Self {
let metrics = Metrics::new();
let consumers = HashMap::new();
let limiter = Default::default();
let tiers = HashMap::new();

Self {
metrics,
consumers,
limiter,
tiers,
}
}

Expand Down Expand Up @@ -103,6 +115,13 @@ impl Display for Consumer {
}
}

#[derive(Debug, Clone, Deserialize)]
pub struct Tier {
name: String,
max_connections: u32,
max_bytes_per_minute: u32,
}

#[derive(Debug, Clone)]
pub struct Metrics {
total_packages_bytes: prometheus::IntCounterVec,
Expand Down
76 changes: 76 additions & 0 deletions proxy/src/tiers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::sync::Arc;

use async_trait::async_trait;
use futures_util::TryStreamExt;
use operator::{
k8s_openapi::api::core::v1::ConfigMap,
kube::{
runtime::watcher::{self},
Api, Client,
},
};
use pingora::{server::ShutdownWatch, services::background::BackgroundService};
use serde_json::Value;
use tokio::{pin, sync::RwLock};
use tracing::{error, warn};

use crate::{config::Config, State, Tier};

pub struct TierBackgroundService {
state: Arc<RwLock<State>>,
config: Arc<Config>,
}
impl TierBackgroundService {
pub fn new(state: Arc<RwLock<State>>, config: Arc<Config>) -> Self {
Self { state, config }
}
}

#[async_trait]
impl BackgroundService for TierBackgroundService {
async fn start(&self, mut _shutdown: ShutdownWatch) {
let client = Client::try_default()
.await
.expect("failed to create kube client");

let api = Api::<ConfigMap>::namespaced(client.clone(), &self.config.proxy_namespace);
let stream = watcher::watch_object(api, &self.config.proxy_tiers_name);
pin!(stream);

loop {
let stream_result = stream.try_next().await;
if let Err(err) = stream_result {
error!(error = err.to_string(), "error to update tier");
continue;
}

if let Some(config_map) = stream_result.unwrap().flatten() {
if let Some(data) = config_map.data {
if let Some(toml_data) = data.get(&self.config.proxy_tiers_key) {
let value_result: Result<Value, _> = toml::from_str(toml_data);
if let Err(err) = value_result {
error!(error = err.to_string(), "error to deserialize toml");
continue;
}

let tiers_value: Option<&Value> =
value_result.as_ref().unwrap().get("tiers");
if tiers_value.is_none() {
warn!("tiers not configured on toml");
continue;
}

let tiers =
serde_json::from_value::<Vec<Tier>>(tiers_value.unwrap().to_owned())
.unwrap();

self.state.write().await.tiers = tiers
.into_iter()
.map(|tier| (tier.name.clone(), tier))
.collect();
}
}
}
}
}
}

0 comments on commit 1e2c263

Please sign in to comment.