From 6dbc51d16b9c11042f6c8ed4a6a1993252638eee Mon Sep 17 00:00:00 2001 From: Felipe Gonzalez Date: Tue, 7 May 2024 11:16:36 -0300 Subject: [PATCH] fix: Limit users by bytes consumed (#35) --- bootstrap/proxy/config.tf | 16 ++++++++-------- proxy/src/proxy.rs | 11 ++++++++--- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/bootstrap/proxy/config.tf b/bootstrap/proxy/config.tf index 459ec18..39d9156 100644 --- a/bootstrap/proxy/config.tf +++ b/bootstrap/proxy/config.tf @@ -8,11 +8,11 @@ locals { "rates" = [ { "interval" = "1m", - "limit" = floor(1 * 1024 * 60 / var.replicas) + "limit" = floor(1 * 1024 * 1024 * 60 / var.replicas) }, { "interval" = "1d", - "limit" = floor(1 * 1024 * 60 * 60 * 24 / var.replicas) + "limit" = floor(1 * 1024 * 1024 * 60 * 60 * 24 / var.replicas) } ] }, @@ -21,11 +21,11 @@ locals { "rates" = [ { "interval" = "1m", - "limit" = floor(5 * 1024 * 60 / var.replicas) + "limit" = floor(5 * 1024 * 1024 * 60 / var.replicas) }, { "interval" = "1d", - "limit" = floor(5 * 1024 * 60 * 60 * 24 / var.replicas) + "limit" = floor(5 * 1024 * 1024 * 60 * 60 * 24 / var.replicas) } ] }, @@ -34,11 +34,11 @@ locals { "rates" = [ { "interval" = "1m", - "limit" = floor(50 * 1024 * 60 / var.replicas) + "limit" = floor(50 * 1024 * 1024 * 60 / var.replicas) }, { "interval" = "1d", - "limit" = floor(50 * 1024 * 60 * 60 * 24 / var.replicas) + "limit" = floor(50 * 1024 * 1024 * 60 * 60 * 24 / var.replicas) } ] }, @@ -47,11 +47,11 @@ locals { "rates" = [ { "interval" = "1m", - "limit" = floor(100 * 1024 * 60 / var.replicas) + "limit" = floor(100 * 1024 * 1024 * 60 / var.replicas) }, { "interval" = "1d", - "limit" = floor(100 * 1024 * 60 * 60 * 24 / var.replicas) + "limit" = floor(100 * 1024 * 1024 * 60 * 60 * 24 / var.replicas) } ] } diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index cda5e28..06cfe03 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -112,7 +112,7 @@ impl ProxyApp { let _ = io_instance.flush().await; } DuplexEvent::InstanceRead(bytes) => { - self.limiter(&ctx.consumer).await?; + self.limiter(&ctx.consumer, bytes).await?; state.metrics.count_total_packages_bytes( &ctx.consumer, @@ -155,7 +155,7 @@ impl ProxyApp { .insert(consumer.key.clone(), rates); } - async fn limiter(&self, consumer: &Consumer) -> Result<()> { + async fn limiter(&self, consumer: &Consumer, amount_of_bytes: usize) -> Result<()> { let tiers = self.state.tiers.read().await.clone(); let tier = tiers.get(&consumer.tier); if tier.is_none() { @@ -170,7 +170,12 @@ impl ProxyApp { let rate_limiter_map = self.state.limiter.read().await.clone(); let rates = rate_limiter_map.get(&consumer.key).unwrap(); - join_all(rates.iter().map(|r| async { r.acquire_one().await })).await; + join_all( + rates + .iter() + .map(|r| async { r.acquire(amount_of_bytes).await }), + ) + .await; Ok(()) }