Skip to content

Commit

Permalink
AdaptiveRetryConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
Kamil-Lontkowski committed Dec 19, 2024
1 parent 480928d commit 726dc1a
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 167 deletions.
80 changes: 80 additions & 0 deletions core/src/main/scala/ox/resilience/AdaptiveRetryConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package ox.resilience

import ox.scheduling.{Schedule, ScheduledConfig, SleepMode}

/** A config that defines how to retry a failed operation.
*
* It is a special case of [[ScheduledConfig]] with [[ScheduledConfig.sleepMode]] always set to [[SleepMode.Delay]]. It uses token bucket
* to determine if operation should be retried. Tokens are taken for every failure and returned on every successful operation, so in case
* of system failure client does not flood service with retry request.
*
* @param schedule
* The retry schedule which determines the maximum number of retries and the delay between subsequent attempts to execute the operation.
* See [[Schedule]] for more details.
* @param resultPolicy
* A policy that allows to customize when a non-erroneous result is considered successful and when an error is worth retrying (which
* allows for failing fast on certain errors). See [[ResultPolicy]] for more details.
* @param onRetry
* A function that is invoked after each retry attempt. The callback receives the number of the current retry attempt (starting from 1)
* and the result of the operation that was attempted. The result is either a successful value or an error. The callback can be used to
* log information about the retry attempts, or to perform other side effects. By default, the callback does nothing.
* @param tokenBucket
* Token bucket which backs up adaptive circuit breaker. If bucket is empty, there will be no more retries. Bucket can be provided by
* user and shared with different [[AdaptiveRetryConfig]]
* @param bucketSize
* Size of [[TokenBucket]]. Will be ignored if [[tokenBucket]] is provided.
* @param onFailureCost
* Cost of tokens for every failure. It is also number of token added to the bucket for successful operation.
* @tparam E
* The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning
* an `Either[E, T]`, this can be any `E`.
* @tparam T
* The successful result type for the operation.
*/
case class AdaptiveRetryConfig[E, T](
schedule: Schedule,
resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T],
onRetry: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => (),
tokenBucket: Option[TokenBucket] = None,
bucketSize: Int = 100,
onFailureCost: Int = 1
) extends RetryConfig[E, T]:
def toScheduledConfig: ScheduledConfig[E, T] =
val bucket = tokenBucket.getOrElse(TokenBucket(bucketSize))
def shouldContinueOnError(e: E): Boolean =
// if we cannot acquire token we short circuit and stop retrying
bucket.tryAcquire(onFailureCost) && resultPolicy.isWorthRetrying(e)

def shouldContinueOnResult(result: T): Boolean =
// if we consider this result as success token are given back to bucket
if resultPolicy.isSuccess(result) then
bucket.release(onFailureCost)
false
else true

ScheduledConfig(
schedule,
onRetry,
shouldContinueOnError = shouldContinueOnError,
shouldContinueOnResult = shouldContinueOnResult,
sleepMode = SleepMode.Delay
)
end toScheduledConfig
end AdaptiveRetryConfig

object AdaptiveRetryConfig:

/** Creates a config that retries up to a given number of times if there are enough token in the bucket, with no delay between subsequent
* attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{AdaptiveRetryConfig(Schedule.Immediate(maxRetries))}}}
*
* @param maxRetries
* The maximum number of retries.
*/
def immediate[E, T](maxRetries: Int, bucketSize: Int = 100): RetryConfig[E, T] =
AdaptiveRetryConfig(
Schedule.Immediate(maxRetries),
bucketSize = bucketSize
)
end AdaptiveRetryConfig
2 changes: 1 addition & 1 deletion core/src/main/scala/ox/resilience/ResultPolicy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object ResultPolicy:

/** A policy that customizes which errors are retried, and considers every non-erroneous result successful
* @param isWorthRetrying
* A predicate that indicates whether an erroneous result should be retried..
* A predicate that indicates whether an erroneous result should be retried.
*/
def retryWhen[E, T](isWorthRetrying: E => Boolean): ResultPolicy[E, T] = ResultPolicy(isWorthRetrying = isWorthRetrying)

Expand Down
127 changes: 3 additions & 124 deletions core/src/main/scala/ox/resilience/RetryConfig.scala
Original file line number Diff line number Diff line change
@@ -1,127 +1,6 @@
package ox.resilience

import ox.scheduling.{SleepMode, Jitter, Schedule, ScheduledConfig}
import ox.scheduling.ScheduledConfig

import scala.concurrent.duration.*

/** A config that defines how to retry a failed operation.
*
* It is a special case of [[ScheduledConfig]] with [[ScheduledConfig.sleepMode]] always set to [[SleepMode.Delay]]
*
* @param schedule
* The retry schedule which determines the maximum number of retries and the delay between subsequent attempts to execute the operation.
* See [[Schedule]] for more details.
* @param resultPolicy
* A policy that allows to customize when a non-erroneous result is considered successful and when an error is worth retrying (which
* allows for failing fast on certain errors). See [[ResultPolicy]] for more details.
* @param onRetry
* A function that is invoked after each retry attempt. The callback receives the number of the current retry attempt (starting from 1)
* and the result of the operation that was attempted. The result is either a successful value or an error. The callback can be used to
* log information about the retry attempts, or to perform other side effects. By default, the callback does nothing.
* @tparam E
* The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning
* an `Either[E, T]`, this can be any `E`.
* @tparam T
* The successful result type for the operation.
*/
case class RetryConfig[E, T](
schedule: Schedule,
resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T],
onRetry: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => ()
):
def toScheduledConfig: ScheduledConfig[E, T] = ScheduledConfig(
schedule,
onRetry,
shouldContinueOnError = resultPolicy.isWorthRetrying,
shouldContinueOnResult = t => !resultPolicy.isSuccess(t),
sleepMode = SleepMode.Delay
)
end RetryConfig

object RetryConfig:
/** Creates a config that retries up to a given number of times, with no delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryConfig(Schedule.Immediate(maxRetries))}}}
*
* @param maxRetries
* The maximum number of retries.
*/
def immediate[E, T](maxRetries: Int): RetryConfig[E, T] = RetryConfig(Schedule.Immediate(maxRetries))

/** Creates a config that retries indefinitely, with no delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryConfig(Schedule.Immediate.forever)}}}
*/
def immediateForever[E, T]: RetryConfig[E, T] = RetryConfig(Schedule.Immediate.forever)

/** Creates a config that retries up to a given number of times, with a fixed delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryConfig(Schedule.Delay(maxRetries, delay))}}}
*
* @param maxRetries
* The maximum number of retries.
* @param delay
* The delay between subsequent attempts.
*/
def delay[E, T](maxRetries: Int, delay: FiniteDuration): RetryConfig[E, T] = RetryConfig(Schedule.Fixed(maxRetries, delay))

/** Creates a config that retries indefinitely, with a fixed delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryConfig(Schedule.Delay.forever(delay))}}}
*
* @param delay
* The delay between subsequent attempts.
*/
def delayForever[E, T](delay: FiniteDuration): RetryConfig[E, T] = RetryConfig(Schedule.Fixed.forever(delay))

/** Creates a config that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts, using a
* default [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryConfig(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))}}}
*
* @param maxRetries
* The maximum number of retries.
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries. Defaults to 1 minute.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
def backoff[E, T](
maxRetries: Int,
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryConfig[E, T] =
RetryConfig(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))

/** Creates a config that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default
* [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryConfig(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))}}}
*
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries. Defaults to 1 minute.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
def backoffForever[E, T](
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): RetryConfig[E, T] =
RetryConfig(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))
end RetryConfig
trait RetryConfig[E, T]:
def toScheduledConfig: ScheduledConfig[E, T]
128 changes: 128 additions & 0 deletions core/src/main/scala/ox/resilience/StandardRetryConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package ox.resilience

import ox.scheduling.{Jitter, Schedule, ScheduledConfig, SleepMode}
import scala.concurrent.duration.*

/** A config that defines how to retry a failed operation.
*
* It is a special case of [[ScheduledConfig]] with [[ScheduledConfig.sleepMode]] always set to [[SleepMode.Delay]]
*
* @param schedule
* The retry schedule which determines the maximum number of retries and the delay between subsequent attempts to execute the operation.
* See [[Schedule]] for more details.
* @param resultPolicy
* A policy that allows to customize when a non-erroneous result is considered successful and when an error is worth retrying (which
* allows for failing fast on certain errors). See [[ResultPolicy]] for more details.
* @param onRetry
* A function that is invoked after each retry attempt. The callback receives the number of the current retry attempt (starting from 1)
* and the result of the operation that was attempted. The result is either a successful value or an error. The callback can be used to
* log information about the retry attempts, or to perform other side effects. By default, the callback does nothing.
* @tparam E
* The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning
* an `Either[E, T]`, this can be any `E`.
* @tparam T
* The successful result type for the operation.
*/
case class StandardRetryConfig[E, T](
schedule: Schedule,
resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T],
onRetry: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => ()
) extends RetryConfig[E, T]:
def toScheduledConfig: ScheduledConfig[E, T] = ScheduledConfig(
schedule,
onRetry,
shouldContinueOnError = resultPolicy.isWorthRetrying,
shouldContinueOnResult = t => !resultPolicy.isSuccess(t),
sleepMode = SleepMode.Delay
)
end StandardRetryConfig

object StandardRetryConfig:
/** Creates a config that retries up to a given number of times, with no delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryConfig(Schedule.Immediate(maxRetries))}}}
*
* @param maxRetries
* The maximum number of retries.
*/
def immediate[E, T](maxRetries: Int): StandardRetryConfig[E, T] = StandardRetryConfig(Schedule.Immediate(maxRetries))

/** Creates a config that retries indefinitely, with no delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryConfig(Schedule.Immediate.forever)}}}
*/
def immediateForever[E, T]: StandardRetryConfig[E, T] = StandardRetryConfig(Schedule.Immediate.forever)

/** Creates a config that retries up to a given number of times, with a fixed delay between subsequent attempts, using a default
* [[ResultPolicy]].
*
* This is a shorthand for {{{RetryConfig(Schedule.Delay(maxRetries, delay))}}}
*
* @param maxRetries
* The maximum number of retries.
* @param delay
* The delay between subsequent attempts.
*/
def delay[E, T](maxRetries: Int, delay: FiniteDuration): StandardRetryConfig[E, T] = StandardRetryConfig(
Schedule.Fixed(maxRetries, delay)
)

/** Creates a config that retries indefinitely, with a fixed delay between subsequent attempts, using a default [[ResultPolicy]].
*
* This is a shorthand for {{{RetryConfig(Schedule.Delay.forever(delay))}}}
*
* @param delay
* The delay between subsequent attempts.
*/
def delayForever[E, T](delay: FiniteDuration): StandardRetryConfig[E, T] = StandardRetryConfig(Schedule.Fixed.forever(delay))

/** Creates a config that retries up to a given number of times, with an increasing delay (backoff) between subsequent attempts, using a
* default [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryConfig(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))}}}
*
* @param maxRetries
* The maximum number of retries.
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries. Defaults to 1 minute.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
def backoff[E, T](
maxRetries: Int,
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): StandardRetryConfig[E, T] =
StandardRetryConfig(Schedule.Backoff(maxRetries, initialDelay, maxDelay, jitter))

/** Creates a config that retries indefinitely, with an increasing delay (backoff) between subsequent attempts, using a default
* [[ResultPolicy]].
*
* The backoff is exponential with base 2 (i.e. the next delay is twice as long as the previous one), starting at the given initial delay
* and capped at the given maximum delay.
*
* This is a shorthand for {{{RetryConfig(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))}}}
*
* @param initialDelay
* The delay before the first retry.
* @param maxDelay
* The maximum delay between subsequent retries. Defaults to 1 minute.
* @param jitter
* A random factor used for calculating the delay between subsequent retries. See [[Jitter]] for more details. Defaults to no jitter,
* i.e. an exponential backoff with no adjustments.
*/
def backoffForever[E, T](
initialDelay: FiniteDuration,
maxDelay: FiniteDuration = 1.minute,
jitter: Jitter = Jitter.None
): StandardRetryConfig[E, T] =
StandardRetryConfig(Schedule.Backoff.forever(initialDelay, maxDelay, jitter))
end StandardRetryConfig
16 changes: 16 additions & 0 deletions core/src/main/scala/ox/resilience/TokenBucket.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ox.resilience

import java.util.concurrent.Semaphore

case class TokenBucket(bucketSize: Int):
private val semaphore = Semaphore(bucketSize)

def tryAcquire(permits: Int): Boolean =
semaphore.tryAcquire(permits)

def release(permits: Int): Unit =
val availablePermits = semaphore.availablePermits()
val toRelease = if availablePermits + permits >= bucketSize then bucketSize - availablePermits else permits
semaphore.release(toRelease)

end TokenBucket
Loading

0 comments on commit 726dc1a

Please sign in to comment.