Skip to content

Commit

Permalink
Cardano node proxy connection tier improvements (#42)
Browse files Browse the repository at this point in the history
* chore(proxy): added more logs when the user is limited

* chore(proxy): changed disconnect log to debug mode

* feat(proxy): load log level from env

* feat(proxy): load log level from env
  • Loading branch information
paulobressan authored May 29, 2024
1 parent d8e7409 commit 13529ab
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 12 deletions.
34 changes: 31 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ tokio = { version = "1.36.0", features = ["full"] }
regex = "1.10.3"
dotenv = "0.15.0"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
tracing-subscriber = {version = "0.3.18", features = ["env-filter"]}
futures-util = "0.3.30"
pingora = "0.1.0"
pingora-limits = "0.1.0"
Expand Down
7 changes: 5 additions & 2 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use regex::Regex;
use serde::{Deserialize, Deserializer};
use tiers::TierBackgroundService;
use tokio::sync::RwLock;
use tracing::Level;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

use crate::config::Config;

Expand All @@ -27,7 +27,10 @@ mod tiers;
fn main() {
dotenv().ok();

tracing_subscriber::fmt().with_max_level(Level::INFO).init();
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();

let config: Arc<Config> = Arc::default();
let state: Arc<State> = Arc::default();
Expand Down
39 changes: 33 additions & 6 deletions proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
net::lookup_host,
select,
};
use tracing::{error, info};
use tracing::{debug, error};

use crate::{config::Config, Consumer, State, Tier};

Expand Down Expand Up @@ -94,7 +94,12 @@ impl ProxyApp {

match event {
DuplexEvent::ClientRead(0) | DuplexEvent::InstanceRead(0) => {
info!("client disconnected");
debug!(
consumer = ctx.consumer.to_string(),
active_connections = ctx.consumer.active_connections,
"client disconnected"
);

ctx.consumer.dec_connections(self.state.clone()).await;
state.metrics.dec_total_connections(
&ctx.consumer,
Expand Down Expand Up @@ -190,13 +195,18 @@ impl ProxyApp {
Ok(())
}

async fn limiter_connection(&self, consumer: &Consumer) -> Result<()> {
async fn get_tier(&self, tier: &str) -> Result<Tier> {
let tiers = self.state.tiers.read().await.clone();
let tier = tiers.get(&consumer.tier);
let tier = tiers.get(tier);
if tier.is_none() {
return Err(Error::new(pingora::ErrorType::AcceptError));
}
let tier = tier.unwrap();
let tier = tier.unwrap().clone();
Ok(tier)
}

async fn limiter_connection(&self, consumer: &Consumer) -> Result<()> {
let tier = self.get_tier(&consumer.tier).await?;

if consumer.active_connections >= tier.max_connections {
return Err(Error::new(pingora::ErrorType::Custom(
Expand Down Expand Up @@ -238,11 +248,28 @@ impl ServerApp for ProxyApp {

let namespace = self.config.proxy_namespace.clone();
if let Err(err) = self.limiter_connection(&consumer).await {
error!(error = err.to_string(), consumer = consumer.to_string());
self.state
.metrics
.count_total_connections_denied(&consumer, &namespace, &instance);

let tier_result = self.get_tier(&consumer.tier).await;
if let Err(err) = tier_result {
error!(
error = err.to_string(),
consumer = consumer.to_string(),
"Error to get the tier"
);
return None;
}

let tier = tier_result.unwrap();
error!(
error = err.to_string(),
consumer = consumer.to_string(),
active_connections = consumer.active_connections,
max_connections = tier.max_connections
);

return None;
}

Expand Down

0 comments on commit 13529ab

Please sign in to comment.