diff --git a/bootstrap/proxy/config.tf b/bootstrap/proxy/config.tf index 459ec18..39d9156 100644 --- a/bootstrap/proxy/config.tf +++ b/bootstrap/proxy/config.tf @@ -8,11 +8,11 @@ locals { "rates" = [ { "interval" = "1m", - "limit" = floor(1 * 1024 * 60 / var.replicas) + "limit" = floor(1 * 1024 * 1024 * 60 / var.replicas) }, { "interval" = "1d", - "limit" = floor(1 * 1024 * 60 * 60 * 24 / var.replicas) + "limit" = floor(1 * 1024 * 1024 * 60 * 60 * 24 / var.replicas) } ] }, @@ -21,11 +21,11 @@ locals { "rates" = [ { "interval" = "1m", - "limit" = floor(5 * 1024 * 60 / var.replicas) + "limit" = floor(5 * 1024 * 1024 * 60 / var.replicas) }, { "interval" = "1d", - "limit" = floor(5 * 1024 * 60 * 60 * 24 / var.replicas) + "limit" = floor(5 * 1024 * 1024 * 60 * 60 * 24 / var.replicas) } ] }, @@ -34,11 +34,11 @@ locals { "rates" = [ { "interval" = "1m", - "limit" = floor(50 * 1024 * 60 / var.replicas) + "limit" = floor(50 * 1024 * 1024 * 60 / var.replicas) }, { "interval" = "1d", - "limit" = floor(50 * 1024 * 60 * 60 * 24 / var.replicas) + "limit" = floor(50 * 1024 * 1024 * 60 * 60 * 24 / var.replicas) } ] }, @@ -47,11 +47,11 @@ locals { "rates" = [ { "interval" = "1m", - "limit" = floor(100 * 1024 * 60 / var.replicas) + "limit" = floor(100 * 1024 * 1024 * 60 / var.replicas) }, { "interval" = "1d", - "limit" = floor(100 * 1024 * 60 * 60 * 24 / var.replicas) + "limit" = floor(100 * 1024 * 1024 * 60 * 60 * 24 / var.replicas) } ] } diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index cda5e28..06cfe03 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -112,7 +112,7 @@ impl ProxyApp { let _ = io_instance.flush().await; } DuplexEvent::InstanceRead(bytes) => { - self.limiter(&ctx.consumer).await?; + self.limiter(&ctx.consumer, bytes).await?; state.metrics.count_total_packages_bytes( &ctx.consumer, @@ -155,7 +155,7 @@ impl ProxyApp { .insert(consumer.key.clone(), rates); } - async fn limiter(&self, consumer: &Consumer) -> Result<()> { + async fn limiter(&self, consumer: &Consumer, amount_of_bytes: usize) -> Result<()> { let tiers = self.state.tiers.read().await.clone(); let tier = tiers.get(&consumer.tier); if tier.is_none() { @@ -170,7 +170,12 @@ impl ProxyApp { let rate_limiter_map = self.state.limiter.read().await.clone(); let rates = rate_limiter_map.get(&consumer.key).unwrap(); - join_all(rates.iter().map(|r| async { r.acquire_one().await })).await; + join_all( + rates + .iter() + .map(|r| async { r.acquire(amount_of_bytes).await }), + ) + .await; Ok(()) }