-
Notifications
You must be signed in to change notification settings - Fork 51
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- A single sleepAsync per bucket - Manual replenish with async - Cancellation of consume Co-authored-by: Jacek Sieka <[email protected]>
- Loading branch information
1 parent
266e2c0
commit 2414646
Showing
3 changed files
with
263 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
# Chronos Rate Limiter | ||
# (c) Copyright 2022-Present | ||
# Status Research & Development GmbH | ||
# | ||
# Licensed under either of | ||
# Apache License, version 2.0, (LICENSE-APACHEv2) | ||
# MIT license (LICENSE-MIT) | ||
|
||
{.push raises: [Defect].} | ||
|
||
import ../chronos | ||
import timer | ||
|
||
export timer | ||
|
||
type | ||
BucketWaiter = object | ||
future: Future[void] | ||
value: int | ||
alreadyConsumed: int | ||
|
||
TokenBucket* = ref object | ||
budget: int | ||
budgetCap: int | ||
lastUpdate: Moment | ||
fillDuration: Duration | ||
workFuture: Future[void] | ||
pendingRequests: seq[BucketWaiter] | ||
manuallyReplenished: AsyncEvent | ||
|
||
proc update(bucket: TokenBucket) = | ||
if bucket.fillDuration == default(Duration): | ||
bucket.budget = min(bucket.budgetCap, bucket.budget) | ||
return | ||
|
||
let | ||
currentTime = Moment.now() | ||
timeDelta = currentTime - bucket.lastUpdate | ||
fillPercent = timeDelta.milliseconds.float / bucket.fillDuration.milliseconds.float | ||
replenished = | ||
int(bucket.budgetCap.float * fillPercent) | ||
deltaFromReplenished = | ||
int(bucket.fillDuration.milliseconds.float * | ||
replenished.float / bucket.budgetCap.float) | ||
|
||
bucket.lastUpdate += milliseconds(deltaFromReplenished) | ||
bucket.budget = min(bucket.budgetCap, bucket.budget + replenished) | ||
|
||
proc tryConsume*(bucket: TokenBucket, tokens: int): bool = | ||
## If `tokens` are available, consume them, | ||
## Otherwhise, return false. | ||
|
||
if bucket.budget >= tokens: | ||
bucket.budget -= tokens | ||
return true | ||
|
||
bucket.update() | ||
|
||
if bucket.budget >= tokens: | ||
bucket.budget -= tokens | ||
true | ||
else: | ||
false | ||
|
||
proc worker(bucket: TokenBucket) {.async.} = | ||
while bucket.pendingRequests.len > 0: | ||
bucket.manuallyReplenished.clear() | ||
template waiter: untyped = bucket.pendingRequests[0] | ||
|
||
if bucket.tryConsume(waiter.value): | ||
waiter.future.complete() | ||
bucket.pendingRequests.delete(0) | ||
else: | ||
waiter.value -= bucket.budget | ||
waiter.alreadyConsumed += bucket.budget | ||
bucket.budget = 0 | ||
|
||
let eventWaiter = bucket.manuallyReplenished.wait() | ||
if bucket.fillDuration.milliseconds > 0: | ||
let | ||
nextCycleValue = float(min(waiter.value, bucket.budgetCap)) | ||
budgetRatio = nextCycleValue.float / bucket.budgetCap.float | ||
timeToTarget = int(budgetRatio * bucket.fillDuration.milliseconds.float) + 1 | ||
#TODO this will create a timer for each blocked bucket, | ||
#which may cause performance issue when creating many | ||
#buckets | ||
sleeper = sleepAsync(milliseconds(timeToTarget)) | ||
await sleeper or eventWaiter | ||
sleeper.cancel() | ||
eventWaiter.cancel() | ||
else: | ||
await eventWaiter | ||
|
||
bucket.workFuture = nil | ||
|
||
proc consume*(bucket: TokenBucket, tokens: int): Future[void] = | ||
## Wait for `tokens` to be available, and consume them. | ||
|
||
let retFuture = newFuture[void]("TokenBucket.consume") | ||
if isNil(bucket.workFuture) or bucket.workFuture.finished(): | ||
if bucket.tryConsume(tokens): | ||
retFuture.complete() | ||
return retFuture | ||
|
||
bucket.pendingRequests.add(BucketWaiter(future: retFuture, value: tokens)) | ||
if isNil(bucket.workFuture) or bucket.workFuture.finished(): | ||
bucket.workFuture = worker(bucket) | ||
|
||
proc cancellation(udata: pointer) = | ||
for index in 0..<bucket.pendingRequests.len: | ||
if bucket.pendingRequests[index].future == retFuture: | ||
bucket.budget += bucket.pendingRequests[index].alreadyConsumed | ||
bucket.pendingRequests.delete(index) | ||
if index == 0: | ||
bucket.manuallyReplenished.fire() | ||
break | ||
retFuture.cancelCallback = cancellation | ||
return retFuture | ||
|
||
proc replenish*(bucket: TokenBucket, tokens: int) = | ||
## Add `tokens` to the budget (capped to the bucket capacity) | ||
bucket.budget += tokens | ||
bucket.update() | ||
bucket.manuallyReplenished.fire() | ||
|
||
proc new*( | ||
T: type[TokenBucket], | ||
budgetCap: int, | ||
fillDuration: Duration = 1.seconds): T = | ||
|
||
## Create a TokenBucket | ||
T( | ||
budget: budgetCap, | ||
budgetCap: budgetCap, | ||
fillDuration: fillDuration, | ||
lastUpdate: Moment.now(), | ||
manuallyReplenished: newAsyncEvent() | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
# Chronos Test Suite | ||
# (c) Copyright 2022-Present | ||
# Status Research & Development GmbH | ||
# | ||
# Licensed under either of | ||
# Apache License, version 2.0, (LICENSE-APACHEv2) | ||
# MIT license (LICENSE-MIT) | ||
|
||
import unittest | ||
import ../chronos | ||
import ../chronos/ratelimit | ||
|
||
suite "Token Bucket": | ||
test "Sync test": | ||
var bucket = TokenBucket.new(1000, 1.milliseconds) | ||
check: | ||
bucket.tryConsume(800) == true | ||
bucket.tryConsume(200) == true | ||
|
||
# Out of budget | ||
bucket.tryConsume(100) == false | ||
waitFor(sleepAsync(10.milliseconds)) | ||
check: | ||
bucket.tryConsume(800) == true | ||
bucket.tryConsume(200) == true | ||
|
||
# Out of budget | ||
bucket.tryConsume(100) == false | ||
|
||
test "Async test": | ||
var bucket = TokenBucket.new(1000, 1.seconds) | ||
check: bucket.tryConsume(1000) == true | ||
|
||
var toWait = newSeq[Future[void]]() | ||
for _ in 0..<150: | ||
toWait.add(bucket.consume(10)) | ||
|
||
let start = Moment.now() | ||
waitFor(allFutures(toWait)) | ||
let duration = Moment.now() - start | ||
|
||
check: duration in 1400.milliseconds .. 1600.milliseconds | ||
|
||
test "Over budget async": | ||
var bucket = TokenBucket.new(100, 10.milliseconds) | ||
# Consume 10* the budget cap | ||
let beforeStart = Moment.now() | ||
waitFor(bucket.consume(1000).wait(1.seconds)) | ||
check Moment.now() - beforeStart in 80.milliseconds .. 120.milliseconds | ||
|
||
test "Sync manual replenish": | ||
var bucket = TokenBucket.new(1000, 0.seconds) | ||
check: | ||
bucket.tryConsume(1000) == true | ||
bucket.tryConsume(1000) == false | ||
bucket.replenish(2000) | ||
check: | ||
bucket.tryConsume(1000) == true | ||
# replenish is capped to the bucket max | ||
bucket.tryConsume(1000) == false | ||
|
||
test "Async manual replenish": | ||
var bucket = TokenBucket.new(10 * 150, 0.seconds) | ||
check: | ||
bucket.tryConsume(10 * 150) == true | ||
bucket.tryConsume(1000) == false | ||
var toWait = newSeq[Future[void]]() | ||
for _ in 0..<150: | ||
toWait.add(bucket.consume(10)) | ||
|
||
let lastOne = bucket.consume(10) | ||
|
||
# Test cap as well | ||
bucket.replenish(1000000) | ||
waitFor(allFutures(toWait).wait(10.milliseconds)) | ||
|
||
check: not lastOne.finished() | ||
|
||
bucket.replenish(10) | ||
waitFor(lastOne.wait(10.milliseconds)) | ||
|
||
test "Async cancellation": | ||
var bucket = TokenBucket.new(100, 0.seconds) | ||
let | ||
fut1 = bucket.consume(20) | ||
futBlocker = bucket.consume(1000) | ||
fut2 = bucket.consume(50) | ||
|
||
waitFor(fut1.wait(10.milliseconds)) | ||
waitFor(sleepAsync(10.milliseconds)) | ||
check: | ||
futBlocker.finished == false | ||
fut2.finished == false | ||
|
||
futBlocker.cancel() | ||
waitFor(fut2.wait(10.milliseconds)) | ||
|
||
test "Very long replenish": | ||
var bucket = TokenBucket.new(7000, 1.hours) | ||
check bucket.tryConsume(7000) | ||
check bucket.tryConsume(1) == false | ||
|
||
# With this setting, it takes 514 milliseconds | ||
# to tick one. Check that we can eventually | ||
# consume, even if we update multiple time | ||
# before that | ||
waitFor(sleepAsync(200.milliseconds)) | ||
check bucket.tryConsume(1) == false | ||
waitFor(sleepAsync(200.milliseconds)) | ||
check bucket.tryConsume(1) == false | ||
waitFor(sleepAsync(200.milliseconds)) | ||
check bucket.tryConsume(1) == true | ||
check bucket.tryConsume(1) == false | ||
|
||
test "Short replenish": | ||
var bucket = TokenBucket.new(15000, 1.milliseconds) | ||
check bucket.tryConsume(15000) | ||
check bucket.tryConsume(1) == false | ||
|
||
waitFor(sleepAsync(1.milliseconds)) | ||
check bucket.tryConsume(15000) == true |