Skip to content

Commit

Permalink
changes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
Kamil-Lontkowski committed Dec 27, 2024
1 parent e2616e6 commit 367fc4c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 25 deletions.
41 changes: 23 additions & 18 deletions core/src/main/scala/ox/resilience/AdaptiveRetry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import ox.scheduling.scheduledWithErrorMode

import scala.util.Try

/** Provides mechanism of "adaptive" retries. For every retry we take [[failureCost]] from token bucket and for every success we add back to
* the bucket [[successReward]] tokens. One instance can be "shared" across multiple operations against constrained resource. This allows
* to retry in case of transient failures and at the same time doesn't produce more load on systemic failure of a resource.
/** Provides mechanism of "adaptive" retries. Inspired by `AdaptiveRetryStrategy` from `aws-sdk-java-v2` and the talk "AWS re:Invent 2024 -
* Try again: The tools and techniques behind resilient systems". For every retry we take [[failureCost]] from token bucket and for every
* success we add back to the bucket [[successReward]] tokens. Instance of this class is thread-safe and can be "shared" across multiple
* operations against constrained resource. This allows to retry in case of transient failures and at the same time doesn't produce more
* load on systemic failure of a resource.
*
* @param tokenBucket
* instance of [[TokenBucket]]. Provided instance is thread safe and can be "shared" between different instances of [[AdaptiveRetry]]
Expand All @@ -19,8 +21,8 @@ import scala.util.Try
*/
case class AdaptiveRetry(
tokenBucket: TokenBucket,
failureCost: Int = 1,
successReward: Int = 1
failureCost: Int,
successReward: Int
):
/** Retries an operation using the given error mode until it succeeds or the config decides to stop. Note that any exceptions thrown by
* the operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen.
Expand All @@ -30,7 +32,7 @@ case class AdaptiveRetry(
* @param config
* The retry config - See [[RetryConfig]].
* @param isFailure
* Function to decide if returned [[E]] should be considered failure.
* Function to decide if returned result [[T]] should be considered failure.
* @param errorMode
* The error mode to use, which specifies when a result value is considered success, and when a failure.
* @param operation
Expand All @@ -48,28 +50,28 @@ case class AdaptiveRetry(
* @see
* [[scheduledWithErrorMode]]
*/
def apply[E, T, F[_]](
def retryWithErrorMode[E, T, F[_]](
config: RetryConfig[E, T],
isFailure: E => Boolean = (_: E) => true,
isFailure: T => Boolean = (_: T) => true,
errorMode: ErrorMode[E, F]
)(operation: => F[T]): F[T] =
val isWorthRetrying: E => Boolean = (error: E) =>
// if we cannot acquire token we short circuit and stop retrying
val isWorth = config.resultPolicy.isWorthRetrying(error)
if isWorth && isFailure(error) then tokenBucket.tryAcquire(failureCost)
else isWorth
if isWorth then tokenBucket.tryAcquire(failureCost)
else false

val isSuccess: T => Boolean = (result: T) =>
// if we consider this result as success token are given back to bucket
if config.resultPolicy.isSuccess(result) then
if config.resultPolicy.isSuccess(result) && isFailure(result) then
tokenBucket.release(successReward)
true
else false
end isSuccess

val resultPolicy = ResultPolicy(isSuccess, isWorthRetrying)
scheduledWithErrorMode(errorMode)(config.copy(resultPolicy = resultPolicy).toScheduledConfig)(operation)
end apply
end retryWithErrorMode

/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the config decides to stop. Note that any exceptions
* thrown by the operation aren't caught and don't cause a retry to happen.
Expand All @@ -80,7 +82,7 @@ case class AdaptiveRetry(
* @param config
* The retry config - see [[RetryConfig]].
* @param isFailure
* Function to decide if returned [[E]] should be considered failure.
* Function to decide if returned result [[T]] should be considered failure.
* @param operation
* The operation to retry.
* @tparam E
Expand All @@ -93,8 +95,8 @@ case class AdaptiveRetry(
* @see
* [[scheduledEither]]
*/
def retryEither[E, T](config: RetryConfig[E, T], isFailure: E => Boolean = (_: E) => true)(operation: => Either[E, T]): Either[E, T] =
apply(config, isFailure, EitherMode[E])(operation)
def retryEither[E, T](config: RetryConfig[E, T], isFailure: T => Boolean = (_: T) => true)(operation: => Either[E, T]): Either[E, T] =
retryWithErrorMode(config, isFailure, EitherMode[E])(operation)

/** Retries an operation returning a direct result until it succeeds or the config decides to stop.
*
Expand All @@ -103,7 +105,7 @@ case class AdaptiveRetry(
* @param config
* The retry config - see [[RetryConfig]].
* @param isFailure
* Function to decide if returned [[Throwable]] should be considered failure.
* Function to decide if returned result [[T]] should be considered failure.
* @param operation
* The operation to retry.
* @return
Expand All @@ -113,7 +115,10 @@ case class AdaptiveRetry(
* @see
* [[scheduled]]
*/
def retry[T](config: RetryConfig[Throwable, T], isFailure: Throwable => Boolean = _ => true)(operation: => T): T =
apply(config, isFailure, EitherMode[Throwable])(Try(operation).toEither).fold(throw _, identity)
def retry[T](config: RetryConfig[Throwable, T], isFailure: T => Boolean = (_: T) => true)(operation: => T): T =
retryWithErrorMode(config, isFailure, EitherMode[Throwable])(Try(operation).toEither).fold(throw _, identity)

end AdaptiveRetry

object AdaptiveRetry:
def default: AdaptiveRetry = AdaptiveRetry(TokenBucket(500), 5, 1)
2 changes: 1 addition & 1 deletion core/src/test/scala/ox/resilience/DelayedRetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with
if counter <= retriesUntilSuccess then throw RuntimeException(errorMessage) else successfulResult

// when
val adaptive = AdaptiveRetry(TokenBucket(bucketSize))
val adaptive = AdaptiveRetry(TokenBucket(bucketSize), 1, 1)
val result = the[RuntimeException] thrownBy adaptive.retry(RetryConfig.delayForever(sleep))(f)

// then
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/ox/resilience/ImmediateRetryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
if counter <= 2 then Left(errorMessage)
else Right("Success")

val adaptive = AdaptiveRetry(TokenBucket(5))
val adaptive = AdaptiveRetry(TokenBucket(5), 1, 1)
// when
val result = adaptive.retryEither(RetryConfig.immediate(5))(f)

Expand All @@ -192,7 +192,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
counter += 1
Left(errorMessage)

val adaptive = AdaptiveRetry(TokenBucket(2))
val adaptive = AdaptiveRetry(TokenBucket(2), 1, 1)
// when
val result = adaptive.retryEither(RetryConfig.immediate[String, String](5))(f)

Expand All @@ -211,7 +211,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi
counter += 1
Left(errorMessage)

val adaptive = AdaptiveRetry(TokenBucket(2))
val adaptive = AdaptiveRetry(TokenBucket(2), 1, 1)
// when
val result = adaptive.retryEither[String, String](RetryConfig.immediate(5), _ => false)(f)

Expand Down
6 changes: 3 additions & 3 deletions doc/utils/retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ retryWithErrorMode(UnionMode[String])(RetryConfig(Schedule.Immediate(3), ResultP
See the tests in `ox.resilience.*` for more.

# Adaptive retries
This retry mechanism by implementing part of [AdaptiveRetryStrategy](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/AdaptiveRetryStrategy.html) from `aws-sdk-java-v2`. Class `AdaptiveRetry` contains thread safe `TokenBucket` that acts as a circuit breaker for instance of this class.
This retry mechanism is inspired by the talk `AWS re:Invent 2024 - Try again: The tools and techniques behind resilient systems` and [AdaptiveRetryStrategy](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/AdaptiveRetryStrategy.html) from `aws-sdk-java-v2`. Class `AdaptiveRetry` contains thread safe `TokenBucket` that acts as a circuit breaker for instance of this class.
For every retry, tokens are taken from bucket, if there is not enough we stop retrying. For every successful operations tokens are added back to bucket.
This allows for "normal" retry mechanism in case of transient failures, but does not allow to generate for example 4 times the load in case of systemic failure (if we retry every operation 3 times).

Expand Down Expand Up @@ -170,10 +170,10 @@ adaptive.retry(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMe
adaptive.retryEither(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(eitherOperation)

// custom error mode
adaptive(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")), errorMode = UnionMode[String])(unionOperation)
adaptive.retryWithErrorMode(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")), errorMode = UnionMode[String])(unionOperation)

// consider "throttling error" not as a failure that should incur the retry penalty
adaptive(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")), isFailure = _ != "throttling error", errorMode = UnionMode[String])(unionOperation)
adaptive.retryWithErrorMode(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")), isFailure = _ != "throttling error", errorMode = UnionMode[String])(unionOperation)
```

Instance of `AdaptiveRetry` can be shared for different operation, for example different operations on the same constrained resource.
Expand Down

0 comments on commit 367fc4c

Please sign in to comment.