Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive retries #257

Merged
merged 15 commits into from
Dec 31, 2024
132 changes: 132 additions & 0 deletions core/src/main/scala/ox/resilience/AdaptiveRetry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package ox.resilience

import ox.{EitherMode, ErrorMode}
import ox.scheduling.{ScheduledConfig, SleepMode, scheduledWithErrorMode}

import scala.util.Try

/** Provides mechanism of "adaptive" retries. Inspired by `AdaptiveRetryStrategy` from `aws-sdk-java-v2` and the talk "AWS re:Invent 2024 -
* Try again: The tools and techniques behind resilient systems". For every retry we take [[failureCost]] from token bucket and for every
* success we add back to the bucket [[successReward]] tokens. Instance of this class is thread-safe and can be "shared" across multiple
* operations against constrained resource. This allows to retry in case of transient failures and at the same time doesn't produce more
* load on systemic failure of a resource.
*
* @param tokenBucket
* instance of [[TokenBucket]]. Provided instance is thread safe and can be "shared" between different instances of [[AdaptiveRetry]]
* with different [[failureCost]] for example.
* @param failureCost
* Number of tokens to take from [[tokenBucket]] when retrying.
* @param successReward
* Number of tokens to add back to [[tokenBucket]] after successful operation.
*/
case class AdaptiveRetry(
tokenBucket: TokenBucket,
failureCost: Int,
successReward: Int
):
/** Retries an operation using the given error mode until it succeeds or the config decides to stop. Note that any exceptions thrown by
* the operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen.
*
* This is a special case of [[scheduledWithErrorMode]] with a given set of defaults. See [[RetryConfig]] for more details.
*
* @param config
* The retry config - See [[RetryConfig]].
* @param shouldPayPenaltyCost
* Function to decide if returned result [[T]] should be considered failure in terms of paying cost for retry. Penalty is paid only if
* it is decided to retry operation, the penalty will not be paid for successful operation. Defaults to `true`.
* @param errorMode
* The error mode to use, which specifies when a result value is considered success, and when a failure.
* @param operation
* The operation to retry.
* @tparam E
* type of error.
* @tparam T
* type of result of an operation.
* @tparam F
* the context inside which [[E]] or [[T]] are returned.
* @return
* Either:
* - the result of the function if it eventually succeeds, in the context of `F`, as dictated by the error mode.
* - the error `E` in context `F` as returned by the last attempt if the config decides to stop.
* @see
* [[scheduledWithErrorMode]]
*/
def retryWithErrorMode[E, T, F[_]](
config: RetryConfig[E, T],
shouldPayPenaltyCost: T => Boolean = (_: T) => true,
errorMode: ErrorMode[E, F]
)(operation: => F[T]): F[T] =
val shouldAttempt: Either[E, T] => Boolean = result =>
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
// for result T check if penalty should be paid, in case of E we always pay.
// If shouldPayPenaltyCost return false, we always attempt
if result.map(shouldPayPenaltyCost).getOrElse(true) then tokenBucket.tryAcquire(failureCost)
else true

val afterSuccess: T => Unit = _ => tokenBucket.release(successReward)

val scheduledConfig = ScheduledConfig(
config.schedule,
config.onRetry,
shouldContinueOnError = config.resultPolicy.isWorthRetrying,
shouldContinueOnResult = t => !config.resultPolicy.isSuccess(t),
shouldAttempt = shouldAttempt,
afterSuccess = afterSuccess,
sleepMode = SleepMode.Delay
)

scheduledWithErrorMode(errorMode)(scheduledConfig)(operation)
end retryWithErrorMode

/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the config decides to stop. Note that any exceptions
* thrown by the operation aren't caught and don't cause a retry to happen.
*
* [[retryEither]] is a special case of [[scheduledWithErrorMode]] with a given set of defaults. See implementations of [[RetryConfig]]
* for more details.
*
* @param config
* The retry config - see [[RetryConfig]].
* @param shouldPayPenaltyCost
* Function to decide if returned result [[T]] should be considered failure in terms of paying cost for retry. Penalty is paid only if
* it is decided to retry operation, the penalty will not be paid for successful operation.
* @param operation
* The operation to retry.
* @tparam E
* type of error.
* @tparam T
* type of result of an operation.
* @return
* A [[scala.util.Right]] if the function eventually succeeds, or, otherwise, a [[scala.util.Left]] with the error from the last
* attempt.
* @see
* [[scheduledEither]]
*/
def retryEither[E, T](config: RetryConfig[E, T], shouldPayPenaltyCost: T => Boolean = (_: T) => false)(
operation: => Either[E, T]
): Either[E, T] =
retryWithErrorMode(config, shouldPayPenaltyCost, EitherMode[E])(operation)

/** Retries an operation returning a direct result until it succeeds or the config decides to stop.
*
* [[retry]] is a special case of [[scheduledWithErrorMode]] with a given set of defaults. See [[RetryConfig]].
*
* @param config
* The retry config - see [[RetryConfig]].
* @param shouldPayPenaltyCost
* Function to decide if returned result [[T]] should be considered failure in terms of paying cost for retry. Penalty is paid only if
* it is decided to retry operation, the penalty will not be paid for successful operation.
* @param operation
* The operation to retry.
* @return
* The result of the function if it eventually succeeds.
* @throws anything
* The exception thrown by the last attempt if the config decides to stop.
* @see
* [[scheduled]]
*/
def retry[T](config: RetryConfig[Throwable, T], shouldPayPenaltyCost: T => Boolean = (_: T) => false)(operation: => T): T =
retryWithErrorMode(config, shouldPayPenaltyCost, EitherMode[Throwable])(Try(operation).toEither).fold(throw _, identity)

end AdaptiveRetry

object AdaptiveRetry:
def default: AdaptiveRetry = AdaptiveRetry(TokenBucket(500), 5, 1)
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion core/src/main/scala/ox/resilience/ResultPolicy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object ResultPolicy:

/** A policy that customizes which errors are retried, and considers every non-erroneous result successful
* @param isWorthRetrying
* A predicate that indicates whether an erroneous result should be retried..
* A predicate that indicates whether an erroneous result should be retried.
*/
def retryWhen[E, T](isWorthRetrying: E => Boolean): ResultPolicy[E, T] = ResultPolicy(isWorthRetrying = isWorthRetrying)

Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/ox/resilience/RetryConfig.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ox.resilience

import ox.scheduling.{SleepMode, Jitter, Schedule, ScheduledConfig}

import ox.scheduling.{Jitter, Schedule, ScheduledConfig, SleepMode}
import scala.concurrent.duration.*

/** A config that defines how to retry a failed operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ object StartTimeRateLimiterAlgorithm:
case class LeakyBucket(rate: Int, per: FiniteDuration) extends RateLimiterAlgorithm:
private val refillInterval = per.toNanos
private val lastRefillTime = new AtomicLong(System.nanoTime())
private val semaphore = new Semaphore(1)
private val bucket = TokenBucket(rate, Some(1))

def acquire(permits: Int): Unit =
semaphore.acquire(permits)
bucket.acquire(permits)

def tryAcquire(permits: Int): Boolean =
semaphore.tryAcquire(permits)
bucket.tryAcquire(permits)

def getNextUpdate: Long =
val waitTime = lastRefillTime.get() + refillInterval - System.nanoTime()
Expand All @@ -117,7 +117,7 @@ object StartTimeRateLimiterAlgorithm:
def update(): Unit =
val now = System.nanoTime()
lastRefillTime.set(now)
if semaphore.availablePermits() < rate then semaphore.release()
bucket.release(1)

def runOperation[T](operation: => T, permits: Int): T = operation

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/ox/resilience/TokenBucket.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ox.resilience

import java.util.concurrent.Semaphore

case class TokenBucket(bucketSize: Int, initSize: Option[Int] = None):
private val semaphore = Semaphore(initSize.getOrElse(bucketSize))

def tryAcquire(permits: Int): Boolean =
semaphore.tryAcquire(permits)

def acquire(permits: Int): Unit =
semaphore.acquire(permits)

def release(permits: Int): Unit =
val availablePermits = semaphore.availablePermits()
val toRelease = if availablePermits + permits >= bucketSize then bucketSize - availablePermits else permits
semaphore.release(toRelease)

end TokenBucket
4 changes: 2 additions & 2 deletions core/src/main/scala/ox/resilience/retry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.util.Try

/** Retries an operation returning a direct result until it succeeds or the config decides to stop.
*
* [[retry]] is a special case of [[scheduled]] with a given set of defaults. See [[RetryConfig]] for more details.
* [[retry]] is a special case of [[scheduled]] with a given set of defaults. See [[RetryConfig]].
*
* @param config
* The retry config - see [[RetryConfig]].
Expand Down Expand Up @@ -49,7 +49,7 @@ def retryEither[E, T](config: RetryConfig[E, T])(operation: => Either[E, T]): Ei
* @param em
* The error mode to use, which specifies when a result value is considered success, and when a failure.
* @param config
* The retry config - see [[RetryConfig]].
* The retry config - See [[RetryConfig]].
* @param operation
* The operation to retry.
* @return
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/scala/ox/scheduling/scheduled.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ end SleepMode
* @param shouldContinueOnResult
* A function that determines whether to continue the loop after a success. The function receives the value that was emitted by the last
* invocation. Defaults to [[_ => true]].
*
* @param shouldAttempt
* A function that determines whether to attempt a retry. This function is called after shouldContinueOnError or shouldContinueOnResult
* returns true and the result is considered for retry, it may perform side effects to determine if attempt should be made.
* @param afterSuccess
* A function that is invoked after every successful attempt. Performs side effects.
* @param sleepMode
* The mode that specifies how to interpret the duration provided by the schedule. See [[SleepMode]] for more details.
* @tparam E
Expand All @@ -46,6 +52,8 @@ case class ScheduledConfig[E, T](
onOperationResult: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => (),
shouldContinueOnError: E => Boolean = (_: E) => false,
shouldContinueOnResult: T => Boolean = (_: T) => true,
shouldAttempt: Either[E, T] => Boolean = (_: Either[E, T]) => true,
afterSuccess: T => Unit = (_: T) => (),
sleepMode: SleepMode = SleepMode.Interval
)

Expand Down Expand Up @@ -109,18 +117,20 @@ def scheduledWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(config: ScheduledCon
val error = em.getError(v)
config.onOperationResult(invocation, Left(error))

if config.shouldContinueOnError(error) && remainingInvocations.forall(_ > 0) then
if config.shouldContinueOnError(error) && remainingInvocations.forall(_ > 0) && config.shouldAttempt(Left(error)) then
val delay = sleepIfNeeded(startTimestamp)
loop(invocation + 1, remainingInvocations.map(_ - 1), Some(delay))
else v
case v =>
val result = em.getT(v)
config.onOperationResult(invocation, Right(result))

if config.shouldContinueOnResult(result) && remainingInvocations.forall(_ > 0) then
if config.shouldContinueOnResult(result) && remainingInvocations.forall(_ > 0) && config.shouldAttempt(Right(result)) then
val delay = sleepIfNeeded(startTimestamp)
loop(invocation + 1, remainingInvocations.map(_ - 1), Some(delay))
else v
else
config.afterSuccess(result)
v
end match
end loop

Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/ox/resilience/BackoffRetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ class BackoffRetryTest extends AnyFlatSpec with Matchers with EitherValues with
if true then throw new RuntimeException("boom")

// when
val (result, elapsedTime) = measure(the[RuntimeException] thrownBy retry(RetryConfig.backoff(maxRetries, initialDelay, maxDelay))(f))
val (result, elapsedTime) =
measure(the[RuntimeException] thrownBy retry(RetryConfig.backoff(maxRetries, initialDelay, maxDelay))(f))

// then
result should have message "boom"
Expand Down
26 changes: 26 additions & 0 deletions core/src/test/scala/ox/resilience/DelayedRetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import ox.util.ElapsedTime
import ox.resilience.*

import scala.concurrent.duration.*
import scala.util.Try

class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with TryValues with ElapsedTime:

Expand Down Expand Up @@ -68,4 +69,29 @@ class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with
elapsedTime.toMillis should be >= maxRetries * sleep.toMillis
counter shouldBe 4
}

behavior of "adaptive retry with delayed config"

it should "retry a failing function forever or until adaptive retry blocks it" in {
// given
var counter = 0
val sleep = 2.millis
val retriesUntilSuccess = 1_000
val successfulResult = 42
val bucketSize = 500
val errorMessage = "boom"

def f =
counter += 1
if counter <= retriesUntilSuccess then throw RuntimeException(errorMessage) else successfulResult

// when
val adaptive = AdaptiveRetry(TokenBucket(bucketSize), 1, 1)
val result = the[RuntimeException] thrownBy adaptive.retry(RetryConfig.delayForever(sleep))(f)

// then
result should have message errorMessage
counter shouldBe bucketSize + 1
}

end DelayedRetryTest
60 changes: 60 additions & 0 deletions core/src/test/scala/ox/resilience/ImmediateRetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,64 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
result.left.value shouldBe errorMessage
counter shouldBe 4
}

behavior of "Adaptive retry with immediate config"

it should "retry a failing adaptive" in {
// given
var counter = 0
val errorMessage = "boom"

def f =
counter += 1
if counter <= 2 then Left(errorMessage)
else Right("Success")

val adaptive = AdaptiveRetry(TokenBucket(5), 1, 1)
// when
val result = adaptive.retryEither(RetryConfig.immediate(5))(f)

// then
result.value shouldBe "Success"
counter shouldBe 3
}

it should "stop retrying after emptying bucket" in {
// given
var counter = 0
val errorMessage = "boom"

def f =
counter += 1
Left(errorMessage)

val adaptive = AdaptiveRetry(TokenBucket(2), 1, 1)
// when
val result = adaptive.retryEither(RetryConfig.immediate[String, String](5))(f)

// then
result.left.value shouldBe errorMessage
// One for first try, two for retries with bucket size 2
counter shouldBe 3
}

it should "not pay exceptionCost if result T is going to be retried and shouldPayPenaltyCost returns false" in {
// given
var counter = 0
val message = "success"

def f =
counter += 1
Right(message)

val adaptive = AdaptiveRetry(TokenBucket(2), 1, 1)
val retryConfig = RetryConfig.immediate(5).copy(resultPolicy = ResultPolicy.successfulWhen[String, String](_ => false))
// when
val result = adaptive.retryEither(retryConfig, _ => false)(f)

// then
result.value shouldBe message
counter shouldBe 6
}

end ImmediateRetryTest
Loading