diff --git a/core/src/main/scala/ox/resilience/DurationRateLimiter.scala b/core/src/main/scala/ox/resilience/DurationRateLimiter.scala deleted file mode 100644 index be13c9d9..00000000 --- a/core/src/main/scala/ox/resilience/DurationRateLimiter.scala +++ /dev/null @@ -1,88 +0,0 @@ -package ox.resilience - -import ox.{Ox, forkDiscard} -import scala.annotation.tailrec -import scala.concurrent.duration.FiniteDuration - -/** Rate limiter with a customizable algorithm. Operations can be blocked or dropped, when the rate limit is reached. operationMode decides - * if whole time of execution should be considered or just the start. - */ -class DurationRateLimiter private (algorithm: DurationRateLimiterAlgorithm): - /** Runs the operation, blocking if the rate limit is reached, until the rate limiter is replenished. */ - def runBlocking[T](operation: => T): T = - algorithm.acquire() - algorithm.startOperation() - val result = operation - algorithm.endOperation() - result - - /** Runs or drops the operation, if the rate limit is reached. - * - * @return - * `Some` if the operation has been allowed to run, `None` if the operation has been dropped. - */ - def runOrDrop[T](operation: => T): Option[T] = - if algorithm.tryAcquire() then - algorithm.startOperation() - val result = operation - algorithm.endOperation() - Some(result) - else None - -end DurationRateLimiter - -object DurationRateLimiter: - def apply(algorithm: DurationRateLimiterAlgorithm)(using Ox): DurationRateLimiter = - @tailrec - def update(): Unit = - val waitTime = algorithm.getNextUpdate - val millis = waitTime / 1000000 - val nanos = waitTime % 1000000 - Thread.sleep(millis, nanos.toInt) - algorithm.update() - update() - end update - - forkDiscard(update()) - new DurationRateLimiter(algorithm) - end apply - - /** Creates a rate limiter using a fixed window algorithm. - * - * Must be run within an [[Ox]] concurrency scope, as a background fork is created, to replenish the rate limiter. - * - * @param maxOperations - * Maximum number of operations that are allowed to **start** within a time [[window]]. - * @param window - * Interval of time between replenishing the rate limiter. The rate limiter is replenished to allow up to [[maxOperations]] in the next - * time window. - */ - def fixedWindow(maxOperations: Int, window: FiniteDuration)(using - Ox - ): DurationRateLimiter = - apply(DurationRateLimiterAlgorithm.FixedWindow(maxOperations, window)) - - /** Creates a rate limiter using a sliding window algorithm. - * - * Must be run within an [[Ox]] concurrency scope, as a background fork is created, to replenish the rate limiter. - * - * @param maxOperations - * Maximum number of operations that are allowed to **start** within any [[window]] of time. - * @param window - * Length of the window. - */ - def slidingWindow(maxOperations: Int, window: FiniteDuration)(using Ox): DurationRateLimiter = - apply(DurationRateLimiterAlgorithm.SlidingWindow(maxOperations, window)) - - /** Creates a rate limiter with token/leaky bucket algorithm. - * - * Must be run within an [[Ox]] concurrency scope, as a background fork is created, to replenish the rate limiter. - * - * @param maxTokens - * Max capacity of tokens in the algorithm, limiting the operations that are allowed to **start** concurrently. - * @param refillInterval - * Interval of time between adding a single token to the bucket. - */ - def leakyBucket(maxTokens: Int, refillInterval: FiniteDuration)(using Ox): DurationRateLimiter = - apply(DurationRateLimiterAlgorithm.LeakyBucket(maxTokens, refillInterval)) -end DurationRateLimiter diff --git a/core/src/main/scala/ox/resilience/DurationRateLimiterAlgorithm.scala b/core/src/main/scala/ox/resilience/DurationRateLimiterAlgorithm.scala index bb419e86..6cafa84d 100644 --- a/core/src/main/scala/ox/resilience/DurationRateLimiterAlgorithm.scala +++ b/core/src/main/scala/ox/resilience/DurationRateLimiterAlgorithm.scala @@ -8,20 +8,9 @@ import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.concurrent.duration.FiniteDuration -trait DurationRateLimiterAlgorithm extends RateLimiterAlgorithm: - - def startOperation(permits: Int): Unit - - def endOperation(permits: Int): Unit - - final def startOperation(): Unit = startOperation(1) - - final def endOperation(): Unit = endOperation(1) - -end DurationRateLimiterAlgorithm - object DurationRateLimiterAlgorithm: - case class FixedWindow(rate: Int, per: FiniteDuration) extends DurationRateLimiterAlgorithm: + /** Fixed window algorithm: allows to run at most `rate` operations in consecutively segments of duration `per`. */ + case class FixedWindow(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm: private val lastUpdate = new AtomicLong(System.nanoTime()) private val semaphore = new Semaphore(rate) private val runningOperations = new AtomicInteger(0) @@ -42,32 +31,25 @@ object DurationRateLimiterAlgorithm: semaphore.release(rate - semaphore.availablePermits() - runningOperations.get()) end update - def startOperation(permits: Int): Unit = + def runOperation[T](operation: => T, permits: Int): T = runningOperations.updateAndGet(_ + permits) - () - - def endOperation(permits: Int): Unit = + val result = operation runningOperations.updateAndGet(current => (current - permits).max(0)) - () + result end FixedWindow - /** Sliding window algorithm: allows to start at most `rate` operations in the lapse of `per` before current time. */ - case class SlidingWindow(rate: Int, per: FiniteDuration) extends DurationRateLimiterAlgorithm: - // stores the timestamp and the number of permits acquired after calling acquire or tryAcquire successfully + /** Sliding window algorithm: allows to run at most `rate` operations in the lapse of `per` before current time. */ + case class SlidingWindow(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm: + // stores the timestamp and the number of permits acquired after finishing running operation private val log = new AtomicReference[Queue[(Long, Int)]](Queue[(Long, Int)]()) private val semaphore = new Semaphore(rate) - private val runningOperations = new AtomicInteger(0) def acquire(permits: Int): Unit = semaphore.acquire(permits) - addTimestampToLog(permits) def tryAcquire(permits: Int): Boolean = - if semaphore.tryAcquire(permits) then - addTimestampToLog(permits) - true - else false + semaphore.tryAcquire(permits) private def addTimestampToLog(permits: Int): Unit = val now = System.nanoTime() @@ -87,14 +69,11 @@ object DurationRateLimiterAlgorithm: if waitTime > 0 then waitTime else 0L end getNextUpdate - def startOperation(permits: Int): Unit = - runningOperations.updateAndGet(_ + permits) - () - - def endOperation(permits: Int): Unit = - runningOperations.updateAndGet(current => (current - permits).max(0)) + def runOperation[T](operation: => T, permits: Int): T = + val result = operation + // Consider end of operation as a point to release permit after `per` passes addTimestampToLog(permits) - () + result def update(): Unit = val now = System.nanoTime() @@ -103,11 +82,12 @@ object DurationRateLimiterAlgorithm: // remove records older than window size val qUpdated = removeRecords(q, now) // merge old records with the ones concurrently added - val _ = log.updateAndGet(qNew => + log.updateAndGet(qNew => qNew.foldLeft(qUpdated) { case (queue, record) => queue.enqueue(record) } ) + () end update @tailrec @@ -117,42 +97,12 @@ object DurationRateLimiterAlgorithm: case Some((head, tail)) => if head._1 + per.toNanos < now then val (_, permits) = head - semaphore.release(0.max(permits - runningOperations.get())) + semaphore.release(permits) removeRecords(tail, now) else q + end match + end removeRecords end SlidingWindow - /** Token/leaky bucket algorithm It adds a token to start a new operation each `per` with a maximum number of tokens of `rate`. */ - case class LeakyBucket(rate: Int, per: FiniteDuration) extends DurationRateLimiterAlgorithm: - private val refillInterval = per.toNanos - private val lastRefillTime = new AtomicLong(System.nanoTime()) - private val semaphore = new Semaphore(1) - private val runningOperations = AtomicInteger(0) - - def acquire(permits: Int): Unit = - semaphore.acquire(permits) - - def tryAcquire(permits: Int): Boolean = - semaphore.tryAcquire(permits) - - def getNextUpdate: Long = - val waitTime = lastRefillTime.get() + refillInterval - System.nanoTime() - if waitTime > 0 then waitTime else 0L - - def update(): Unit = - val now = System.nanoTime() - lastRefillTime.set(now) - if (semaphore.availablePermits() + runningOperations.get()) < rate then semaphore.release() - - def startOperation(permits: Int): Unit = - runningOperations.updateAndGet(_ + permits) - () - - def endOperation(permits: Int): Unit = - runningOperations.updateAndGet(current => (current - permits).max(0)) - () - - end LeakyBucket - end DurationRateLimiterAlgorithm diff --git a/core/src/main/scala/ox/resilience/RateLimiter.scala b/core/src/main/scala/ox/resilience/RateLimiter.scala index d1894eaf..4f86618d 100644 --- a/core/src/main/scala/ox/resilience/RateLimiter.scala +++ b/core/src/main/scala/ox/resilience/RateLimiter.scala @@ -12,16 +12,8 @@ import scala.annotation.tailrec class RateLimiter private (algorithm: RateLimiterAlgorithm): /** Runs the operation, blocking if the rate limit is reached, until the rate limiter is replenished. */ def runBlocking[T](operation: => T): T = - algorithm match - case alg: DurationRateLimiterAlgorithm => - alg.acquire() - alg.startOperation() - val result = operation - alg.endOperation() - result - case alg: RateLimiterAlgorithm => - alg.acquire() - operation + algorithm.acquire() + algorithm.runOperation(operation) /** Runs or drops the operation, if the rate limit is reached. * @@ -29,17 +21,8 @@ class RateLimiter private (algorithm: RateLimiterAlgorithm): * `Some` if the operation has been allowed to run, `None` if the operation has been dropped. */ def runOrDrop[T](operation: => T): Option[T] = - algorithm match - case alg: DurationRateLimiterAlgorithm => - if alg.tryAcquire() then - alg.startOperation() - val result = operation - alg.endOperation() - Some(result) - else None - case alg: RateLimiterAlgorithm => - if alg.tryAcquire() then Some(operation) - else None + if algorithm.tryAcquire() then Some(algorithm.runOperation(operation)) + else None end RateLimiter @@ -101,12 +84,10 @@ object RateLimiter: * @param refillInterval * Interval of time between adding a single token to the bucket. */ - def leakyBucket(maxTokens: Int, refillInterval: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)(using + def leakyBucket(maxTokens: Int, refillInterval: FiniteDuration)(using Ox ): RateLimiter = - operationMode match - case RateLimiterMode.OperationStart => apply(RateLimiterAlgorithm.LeakyBucket(maxTokens, refillInterval)) - case RateLimiterMode.OperationDuration => apply(DurationRateLimiterAlgorithm.LeakyBucket(maxTokens, refillInterval)) + apply(RateLimiterAlgorithm.LeakyBucket(maxTokens, refillInterval)) end RateLimiter diff --git a/core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala b/core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala index 1ab997e3..3a898b91 100644 --- a/core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala +++ b/core/src/main/scala/ox/resilience/RateLimiterAlgorithm.scala @@ -30,7 +30,11 @@ trait RateLimiterAlgorithm: /** Returns the time in nanoseconds that needs to elapse until the next update. It should not modify internal state. */ def getNextUpdate: Long - def rate: Int + /** Runs operation. For cases where execution time is not needed it just returns result */ + final def runOperation[T](operation: => T): T = runOperation(operation, 1) + + /** Runs operation. For cases where execution time is not needed it just returns result */ + def runOperation[T](operation: => T, permits: Int): T end RateLimiterAlgorithm @@ -56,6 +60,8 @@ object RateLimiterAlgorithm: semaphore.release(rate - semaphore.availablePermits()) end update + def runOperation[T](operation: => T, permits: Int): T = operation + end FixedWindow /** Sliding window algorithm: allows to start at most `rate` operations in the lapse of `per` before current time. */ @@ -99,11 +105,12 @@ object RateLimiterAlgorithm: // remove records older than window size val qUpdated = removeRecords(q, now) // merge old records with the ones concurrently added - val _ = log.updateAndGet(qNew => + log.updateAndGet(qNew => qNew.foldLeft(qUpdated) { case (queue, record) => queue.enqueue(record) } ) + () end update @tailrec @@ -117,6 +124,8 @@ object RateLimiterAlgorithm: removeRecords(tail, now) else q + def runOperation[T](operation: => T, permits: Int): T = operation + end SlidingWindow /** Token/leaky bucket algorithm It adds a token to start a new operation each `per` with a maximum number of tokens of `rate`. */ @@ -140,5 +149,7 @@ object RateLimiterAlgorithm: lastRefillTime.set(now) if semaphore.availablePermits() < rate then semaphore.release() + def runOperation[T](operation: => T, permits: Int): T = operation + end LeakyBucket end RateLimiterAlgorithm diff --git a/core/src/test/scala/ox/resilience/RateLimiterTest.scala b/core/src/test/scala/ox/resilience/RateLimiterTest.scala index 6c29b491..3be9bec1 100644 --- a/core/src/test/scala/ox/resilience/RateLimiterTest.scala +++ b/core/src/test/scala/ox/resilience/RateLimiterTest.scala @@ -390,7 +390,7 @@ class RateLimiterTest extends AnyFlatSpec with Matchers with EitherValues with T } } - it should "not allow to run more long running operations spanning more than window than max rate when considering operation time" in { + it should "not allow to run more operations when operations are still running when considering operation time" in { supervised: val rateLimiter = RateLimiter.slidingWindow(2, FiniteDuration(1, "second"), OperationDuration) @@ -414,10 +414,10 @@ class RateLimiterTest extends AnyFlatSpec with Matchers with EitherValues with T sleep(1500.millis) result3 = rateLimiter.runOrDrop(operation) forkUserDiscard: - sleep(3500.millis) + sleep(4200.millis) result4 = rateLimiter.runOrDrop(operation) forkUserDiscard: - sleep(3500.millis) + sleep(4200.millis) result5 = rateLimiter.runOrDrop(operation) result1 shouldBe Some(0) @@ -427,6 +427,48 @@ class RateLimiterTest extends AnyFlatSpec with Matchers with EitherValues with T result5 shouldBe Some(0) } + it should "not allow to run more operations when operations are still running in window span when considering operation time" in { + supervised: + val rateLimiter = RateLimiter.slidingWindow(3, FiniteDuration(1, "second"), OperationDuration) + + def longOperation = + sleep(3.seconds) + 0 + + def shortOperation = + sleep(300.millis) + 0 + + def instantOperation = 0 + + var result1: Option[Int] = Some(-1) + var result2: Option[Int] = Some(-1) + var result3: Option[Int] = Some(-1) + var result4: Option[Int] = Some(-1) + var result5: Option[Int] = Some(-1) + var result6: Option[Int] = Some(-1) + + supervised: + forkUserDiscard: + result1 = rateLimiter.runOrDrop(shortOperation) + forkUserDiscard: + result2 = rateLimiter.runOrDrop(shortOperation) + forkUserDiscard: + result3 = rateLimiter.runOrDrop(longOperation) + forkUserDiscard: + sleep(1500.millis) + result4 = rateLimiter.runOrDrop(instantOperation) + result5 = rateLimiter.runOrDrop(instantOperation) + result6 = rateLimiter.runOrDrop(instantOperation) + + result1 shouldBe Some(0) + result2 shouldBe Some(0) + result3 shouldBe Some(0) + result4 shouldBe Some(0) + result5 shouldBe Some(0) + result6 shouldBe None + } + behavior of "bucket RateLimiter" it should "drop operation when rate limit is exceeded" in { diff --git a/doc/utils/rate-limiter.md b/doc/utils/rate-limiter.md index 27894d1e..b24537e3 100644 --- a/doc/utils/rate-limiter.md +++ b/doc/utils/rate-limiter.md @@ -1,6 +1,6 @@ # Rate limiter -The rate limiter mechanism allows controlling the rate at which operations are executed. It ensures that at most a certain number of operations are run concurrently within a specified time frame, preventing system overload and ensuring fair resource usage. Note that you can choose if mechanism takes into account only the start of execution or the whole execution of an operation. +The rate limiter mechanism allows controlling the rate at which operations are executed. It ensures that at most a certain number of operations are run concurrently within a specified time frame, preventing system overload and ensuring fair resource usage. Note that you can choose if algorithm takes into account only the start of execution or the whole execution of an operation. ## API @@ -33,10 +33,12 @@ The `operation` can be provided directly using a by-name parameter, i.e. `f: => ## Configuration -The configuration of a `RateLimiter` depends on mode whether to consider execution time and on an underlying algorithm that controls whether an operation can be executed or not. The following algorithms are available: +The configuration of a `RateLimiter` depends on an underlying algorithm that controls whether an operation can be executed or not. The following algorithms are available: - `RateLimiterAlgorithm.FixedWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in fixed windows of `dur` duration. - `RateLimiterAlgorithm.SlidingWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations to be executed in any window of time of duration `dur`. - `RateLimiterAlgorithm.Bucket(maximum: Int, dur: FiniteDuration)` - where `maximum` is the maximum capacity of tokens available in the token bucket algorithm and one token is added each `dur`. It can represent both the leaky bucket algorithm or the token bucket algorithm. +- `DurationRateLimiterAlgorithm.FixedWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans fixed windows of `dur` duration. +- `DurationRateLimiterAlgorithm.SlidingWindow(rate: Int, dur: FiniteDuration)` - where `rate` is the maximum number of operations which execution spans any window of time of duration `dur`. ### API shorthands @@ -44,14 +46,14 @@ You can use one of the following shorthands to define a Rate Limiter with the co - `RateLimiter.fixedWindow(rate: Int, dur: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)`, - `RateLimiter.slidingWindow(rate: Int, dur: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)`, -- `RateLimiter.leakyBucket(maximum: Int, dur: FiniteDuration, operationMode: RateLimiterMode = RateLimiterMode.OperationStart)`, +- `RateLimiter.leakyBucket(maximum: Int, dur: FiniteDuration)` These shorthands also allow to define if the whole execution time of an operation should be considered. See the tests in `ox.resilience.*` for more. ## Custom rate limiter algorithms -The `RateLimiterAlgorithm` employed by `RateLimiter` can be extended to implement new algorithms or modify existing ones. Its interface is modelled like that of a `Semaphore` although the underlying implementation could be different. For best compatibility with the existing interface of `RateLimiter`, methods `acquire` and `tryAcquire` should offer the same guaranties as Java's `Semaphores`. +The `RateLimiterAlgorithm` employed by `RateLimiter` can be extended to implement new algorithms or modify existing ones. Its interface is modelled like that of a `Semaphore` although the underlying implementation could be different. For best compatibility with the existing interface of `RateLimiter`, methods `acquire` and `tryAcquire` should offer the same guaranties as Java's `Semaphores`. There is also method `def runOperation[T](operation: => T, permits: Int): T` for cases where considering span of execution may be necessary. Additionally, there are two methods employed by the `GenericRateLimiter` for updating its internal state automatically: - `def update(): Unit`: Updates the internal state of the rate limiter to reflect its current situation. Invoked in a background fork repeatedly, when a rate limiter is created.