Skip to content

Commit

Permalink
fix: Limit users by bytes consumed (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
gonzalezzfelipe authored May 7, 2024
1 parent 56e6c82 commit 6dbc51d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
16 changes: 8 additions & 8 deletions bootstrap/proxy/config.tf
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ locals {
"rates" = [
{
"interval" = "1m",
"limit" = floor(1 * 1024 * 60 / var.replicas)
"limit" = floor(1 * 1024 * 1024 * 60 / var.replicas)
},
{
"interval" = "1d",
"limit" = floor(1 * 1024 * 60 * 60 * 24 / var.replicas)
"limit" = floor(1 * 1024 * 1024 * 60 * 60 * 24 / var.replicas)
}
]
},
Expand All @@ -21,11 +21,11 @@ locals {
"rates" = [
{
"interval" = "1m",
"limit" = floor(5 * 1024 * 60 / var.replicas)
"limit" = floor(5 * 1024 * 1024 * 60 / var.replicas)
},
{
"interval" = "1d",
"limit" = floor(5 * 1024 * 60 * 60 * 24 / var.replicas)
"limit" = floor(5 * 1024 * 1024 * 60 * 60 * 24 / var.replicas)
}
]
},
Expand All @@ -34,11 +34,11 @@ locals {
"rates" = [
{
"interval" = "1m",
"limit" = floor(50 * 1024 * 60 / var.replicas)
"limit" = floor(50 * 1024 * 1024 * 60 / var.replicas)
},
{
"interval" = "1d",
"limit" = floor(50 * 1024 * 60 * 60 * 24 / var.replicas)
"limit" = floor(50 * 1024 * 1024 * 60 * 60 * 24 / var.replicas)
}
]
},
Expand All @@ -47,11 +47,11 @@ locals {
"rates" = [
{
"interval" = "1m",
"limit" = floor(100 * 1024 * 60 / var.replicas)
"limit" = floor(100 * 1024 * 1024 * 60 / var.replicas)
},
{
"interval" = "1d",
"limit" = floor(100 * 1024 * 60 * 60 * 24 / var.replicas)
"limit" = floor(100 * 1024 * 1024 * 60 * 60 * 24 / var.replicas)
}
]
}
Expand Down
11 changes: 8 additions & 3 deletions proxy/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl ProxyApp {
let _ = io_instance.flush().await;
}
DuplexEvent::InstanceRead(bytes) => {
self.limiter(&ctx.consumer).await?;
self.limiter(&ctx.consumer, bytes).await?;

state.metrics.count_total_packages_bytes(
&ctx.consumer,
Expand Down Expand Up @@ -155,7 +155,7 @@ impl ProxyApp {
.insert(consumer.key.clone(), rates);
}

async fn limiter(&self, consumer: &Consumer) -> Result<()> {
async fn limiter(&self, consumer: &Consumer, amount_of_bytes: usize) -> Result<()> {
let tiers = self.state.tiers.read().await.clone();
let tier = tiers.get(&consumer.tier);
if tier.is_none() {
Expand All @@ -170,7 +170,12 @@ impl ProxyApp {
let rate_limiter_map = self.state.limiter.read().await.clone();
let rates = rate_limiter_map.get(&consumer.key).unwrap();

join_all(rates.iter().map(|r| async { r.acquire_one().await })).await;
join_all(
rates
.iter()
.map(|r| async { r.acquire(amount_of_bytes).await }),
)
.await;

Ok(())
}
Expand Down

0 comments on commit 6dbc51d

Please sign in to comment.