Skip to content

Commit

Permalink
Pingora tcp proxy (#12)
Browse files Browse the repository at this point in the history
* feat(proxy): added pingora tcp proxy

* chore(proxy): adjusted to use prometheus addr

* chore(proxy): added metrics to get how many bytes a consumer has been used

* chore(proxy): adjusted lint
  • Loading branch information
paulobressan authored Mar 1, 2024
1 parent 1c79691 commit 4aadb37
Show file tree
Hide file tree
Showing 10 changed files with 1,072 additions and 331 deletions.
956 changes: 802 additions & 154 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dockerfile.operator
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM rust:1.74-slim-buster as build
WORKDIR /app

RUN apt update
RUN apt install -y pkg-config libssl-dev
RUN apt install -y build-essential pkg-config libssl-dev cmake

COPY ./Cargo.toml ./Cargo.toml
COPY ./operator ./operator
Expand Down
2 changes: 1 addition & 1 deletion docker/dockerfile.proxy
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM rust:1.74-slim-buster as build
WORKDIR /app

RUN apt update
RUN apt install -y pkg-config libssl-dev
RUN apt install -y build-essential pkg-config libssl-dev cmake

COPY ./Cargo.toml ./Cargo.toml
COPY ./operator ./operator
Expand Down
4 changes: 2 additions & 2 deletions operator/yaml/port.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ metadata:
name: mainnet-user
namespace: prj-mainnet-test
spec:
network: "mainnet"
version: "stable"
network: "preview"
version: "v1"
throughputTier: "1"
8 changes: 3 additions & 5 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ edition = "2021"

[dependencies]
operator = { path = "../operator" }
futures = "0.3.30"
native-tls = "0.2.11"
rustls = "0.22.2"
rustls-pemfile = "2.1.0"
tokio = { version = "1.36.0", features = ["full"] }
tokio-rustls = "0.25.0"
regex = "1.10.3"
dotenv = "0.15.0"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
futures-util = "0.3.30"
pingora = "0.1.0"
prometheus = "0.13.3"
async-trait = "0.1.77"
4 changes: 4 additions & 0 deletions proxy/examples/manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ spec:
env:
- name: PROXY_ADDR
value: "0.0.0.0:80"
- name: PROMETHEUS_ADDR
value: "0.0.0.0:9187"
- name: NODE_PORT
value: "80"
- name: NODE_DNS
Expand Down Expand Up @@ -365,6 +367,7 @@ metadata:
spec:
network: "mainnet"
version: "stable"
throughputTier: "1"
---
# Cardano Node Port 2
apiVersion: demeter.run/v1alpha1
Expand All @@ -375,3 +378,4 @@ metadata:
spec:
network: "mainnet"
version: "stable"
throughputTier: "1"
87 changes: 51 additions & 36 deletions proxy/src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use futures_util::TryStreamExt;
use operator::{
kube::{
Expand All @@ -12,57 +13,71 @@ use operator::{
},
CardanoNodePort,
};
use pingora::{server::ShutdownWatch, services::background::BackgroundService};
use tokio::{pin, sync::RwLock};
use tracing::error;

use crate::{Consumer, State};

pub async fn start(state: Arc<RwLock<State>>) {
let client = Client::try_default()
.await
.expect("failed to create kube client");

let api = Api::<CardanoNodePort>::all(client.clone());
update_auth(state.clone(), api.clone()).await;

let stream = watcher::watcher(api.clone(), Config::default()).touched_objects();
pin!(stream);
#[derive(Debug)]
pub struct AuthBackgroundService {
state: Arc<RwLock<State>>,
}
impl AuthBackgroundService {
pub fn new(state: Arc<RwLock<State>>) -> Self {
Self { state }
}

loop {
let result = stream.try_next().await;
async fn update_auth(&self, api: Api<CardanoNodePort>) {
let result = api.list(&ListParams::default()).await;
if let Err(err) = result {
error!(error = err.to_string(), "fail crd auth watcher");
continue;
error!(
error = err.to_string(),
"error to get crds while updating auth keys"
);
return;
}

update_auth(state.clone(), api.clone()).await;
let mut consumers = HashMap::new();
for crd in result.unwrap().items.iter() {
if crd.status.is_some() {
let network = crd.spec.network.to_string();
let version = crd.spec.version.clone();
let auth_token = crd.status.as_ref().unwrap().auth_token.clone();
let namespace = crd.metadata.namespace.as_ref().unwrap().clone();
let port_name = crd.name_any();

let hash_key = format!("{}.{}.{}", network, version, auth_token);
let consumer = Consumer::new(namespace, port_name);

consumers.insert(hash_key, consumer);
}
}
self.state.write().await.consumers = consumers;
}
}

async fn update_auth(state: Arc<RwLock<State>>, api: Api<CardanoNodePort>) {
let result = api.list(&ListParams::default()).await;
if let Err(err) = result {
error!(
error = err.to_string(),
"error to get crds while updating auth keys"
);
return;
}
#[async_trait]
impl BackgroundService for AuthBackgroundService {
async fn start(&self, mut _shutdown: ShutdownWatch) {
let client = Client::try_default()
.await
.expect("failed to create kube client");

let api = Api::<CardanoNodePort>::all(client.clone());
self.update_auth(api.clone()).await;

let mut consumers = HashMap::new();
for crd in result.unwrap().items.iter() {
if crd.status.is_some() {
let network = crd.spec.network.to_string();
let version = crd.spec.version.clone();
let auth_token = crd.status.as_ref().unwrap().auth_token.clone();
let namespace = crd.metadata.namespace.as_ref().unwrap().clone();
let port_name = crd.name_any();
let stream = watcher::watcher(api.clone(), Config::default()).touched_objects();
pin!(stream);

let hash_key = format!("{}.{}.{}", network, version, auth_token);
let consumer = Consumer::new(namespace, port_name);
loop {
let result = stream.try_next().await;
if let Err(err) = result {
error!(error = err.to_string(), "fail crd auth watcher");
continue;
}

consumers.insert(hash_key, consumer);
self.update_auth(api.clone()).await;
}
}
state.write().await.consumers = consumers;
}
22 changes: 12 additions & 10 deletions proxy/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
use std::{env, path::PathBuf};
use std::env;

#[derive(Debug, Clone)]
pub struct Config {
pub proxy_addr: String,
pub ssl_crt_path: PathBuf,
pub ssl_key_path: PathBuf,
pub prometheus_addr: String,
pub ssl_crt_path: String,
pub ssl_key_path: String,
pub node_port: u16,
pub node_dns: String,
}

impl Config {
pub fn new() -> Self {
Self {
proxy_addr: env::var("PROXY_ADDR").expect("PROXY_ADDR must be set"),
ssl_crt_path: env::var("SSL_CRT_PATH")
.map(|e| e.into())
.expect("SSL_CRT_PATH must be set"),
ssl_key_path: env::var("SSL_KEY_PATH")
.map(|e| e.into())
.expect("SSL_KEY_PATH must be set"),
prometheus_addr: env::var("PROMETHEUS_ADDR").expect("PROMETHEUS_ADDR must be set"),
ssl_crt_path: env::var("SSL_CRT_PATH").expect("SSL_CRT_PATH must be set"),
ssl_key_path: env::var("SSL_KEY_PATH").expect("SSL_KEY_PATH must be set"),
node_port: env::var("NODE_PORT")
.expect("NODE_PORT must be set")
.parse()
Expand All @@ -27,3 +24,8 @@ impl Config {
}
}
}
impl Default for Config {
fn default() -> Self {
Self::new()
}
}
98 changes: 76 additions & 22 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
use std::{collections::HashMap, fmt::Display, sync::Arc};

use auth::AuthBackgroundService;
use dotenv::dotenv;
use regex::Regex;
use std::{collections::HashMap, error::Error, fmt::Display, sync::Arc};
use pingora::{
listeners::Listeners,
server::{configuration::Opt, Server},
services::{background::background_service, listening::Service},
};
use prometheus::{opts, register_int_counter_vec};
use proxy::ProxyApp;
use tokio::sync::RwLock;
use tracing::Level;

Expand All @@ -10,43 +18,60 @@ mod auth;
mod config;
mod proxy;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
fn main() {
dotenv().ok();

tracing_subscriber::fmt().with_max_level(Level::INFO).init();
let state = Arc::new(RwLock::new(State::try_new()?));

let auth = auth::start(state.clone());
let proxy_server = proxy::start(state.clone());
let config: Arc<Config> = Arc::default();
let state: Arc<RwLock<State>> = Arc::default();

let opt = Opt::default();
let mut server = Server::new(Some(opt)).unwrap();
server.bootstrap();

let auth_background_service = background_service(
"K8S Auth Service",
AuthBackgroundService::new(state.clone()),
);
server.add_service(auth_background_service);

tokio::join!(auth, proxy_server);
let tls_proxy_service = Service::with_listeners(
"TLS Proxy Service".to_string(),
Listeners::tls(
&config.proxy_addr,
&config.ssl_crt_path,
&config.ssl_key_path,
)
.unwrap(),
Arc::new(ProxyApp::new(config.clone(), state)),
);
server.add_service(tls_proxy_service);

Ok(())
let mut prometheus_service_http =
pingora::services::listening::Service::prometheus_http_service();
prometheus_service_http.add_tcp(&config.prometheus_addr);
server.add_service(prometheus_service_http);

server.run_forever();
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct State {
config: Config,
host_regex: Regex,
metrics: Metrics,
consumers: HashMap<String, Consumer>,
}
impl State {
pub fn try_new() -> Result<Self, Box<dyn Error>> {
let config = Config::new();
let host_regex = Regex::new(r"(dmtr_[\w\d-]+)\.([\w]+)-([\w\d]+).+")?;
pub fn new() -> Self {
let metrics = Metrics::new();
let consumers = HashMap::new();

Ok(Self {
config,
host_regex,
consumers,
})
Self { metrics, consumers }
}

pub fn is_authenticated(&self, network: &str, version: &str, token: &str) -> bool {
pub fn get_consumer(&self, network: &str, version: &str, token: &str) -> Option<Consumer> {
let hash_key = format!("{}.{}.{}", network, version, token);
self.consumers.get(&hash_key).is_some()
self.consumers.get(&hash_key).cloned()
}
}

Expand All @@ -68,3 +93,32 @@ impl Display for Consumer {
write!(f, "{}.{}", self.namespace, self.port_name)
}
}

#[derive(Debug, Clone)]
pub struct Metrics {
total_packages_bytes: prometheus::IntCounterVec,
}
impl Metrics {
pub fn new() -> Self {
let total_packages_bytes = register_int_counter_vec!(
opts!("node_proxy_total_packages_bytes", "Total bytes transferred",),
&["consumer"]
)
.unwrap();

Self {
total_packages_bytes,
}
}

pub fn count_total_packages_bytes(&self, consumer: &Consumer, value: usize) {
self.total_packages_bytes
.with_label_values(&[&consumer.to_string()])
.inc_by(value as u64)
}
}
impl Default for Metrics {
fn default() -> Self {
Self::new()
}
}
Loading

0 comments on commit 4aadb37

Please sign in to comment.