diff --git a/core/src/main/scala/ox/resilience/AdaptiveRetry.scala b/core/src/main/scala/ox/resilience/AdaptiveRetry.scala index 3ef56963..891cffa1 100644 --- a/core/src/main/scala/ox/resilience/AdaptiveRetry.scala +++ b/core/src/main/scala/ox/resilience/AdaptiveRetry.scala @@ -5,9 +5,11 @@ import ox.scheduling.scheduledWithErrorMode import scala.util.Try -/** Provides mechanism of "adaptive" retries. For every retry we take [[failureCost]] from token bucket and for every success we add back to - * the bucket [[successReward]] tokens. One instance can be "shared" across multiple operations against constrained resource. This allows - * to retry in case of transient failures and at the same time doesn't produce more load on systemic failure of a resource. +/** Provides mechanism of "adaptive" retries. Inspired by `AdaptiveRetryStrategy` from `aws-sdk-java-v2` and the talk "AWS re:Invent 2024 - + * Try again: The tools and techniques behind resilient systems". For every retry we take [[failureCost]] from token bucket and for every + * success we add back to the bucket [[successReward]] tokens. Instance of this class is thread-safe and can be "shared" across multiple + * operations against constrained resource. This allows to retry in case of transient failures and at the same time doesn't produce more + * load on systemic failure of a resource. * * @param tokenBucket * instance of [[TokenBucket]]. Provided instance is thread safe and can be "shared" between different instances of [[AdaptiveRetry]] @@ -19,8 +21,8 @@ import scala.util.Try */ case class AdaptiveRetry( tokenBucket: TokenBucket, - failureCost: Int = 1, - successReward: Int = 1 + failureCost: Int, + successReward: Int ): /** Retries an operation using the given error mode until it succeeds or the config decides to stop. Note that any exceptions thrown by * the operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen. @@ -30,7 +32,7 @@ case class AdaptiveRetry( * @param config * The retry config - See [[RetryConfig]]. * @param isFailure - * Function to decide if returned [[E]] should be considered failure. + * Function to decide if returned result [[T]] should be considered failure. * @param errorMode * The error mode to use, which specifies when a result value is considered success, and when a failure. * @param operation @@ -48,20 +50,20 @@ case class AdaptiveRetry( * @see * [[scheduledWithErrorMode]] */ - def apply[E, T, F[_]]( + def retryWithErrorMode[E, T, F[_]]( config: RetryConfig[E, T], - isFailure: E => Boolean = (_: E) => true, + isFailure: T => Boolean = (_: T) => true, errorMode: ErrorMode[E, F] )(operation: => F[T]): F[T] = val isWorthRetrying: E => Boolean = (error: E) => // if we cannot acquire token we short circuit and stop retrying val isWorth = config.resultPolicy.isWorthRetrying(error) - if isWorth && isFailure(error) then tokenBucket.tryAcquire(failureCost) - else isWorth + if isWorth then tokenBucket.tryAcquire(failureCost) + else false val isSuccess: T => Boolean = (result: T) => // if we consider this result as success token are given back to bucket - if config.resultPolicy.isSuccess(result) then + if config.resultPolicy.isSuccess(result) && isFailure(result) then tokenBucket.release(successReward) true else false @@ -69,7 +71,7 @@ case class AdaptiveRetry( val resultPolicy = ResultPolicy(isSuccess, isWorthRetrying) scheduledWithErrorMode(errorMode)(config.copy(resultPolicy = resultPolicy).toScheduledConfig)(operation) - end apply + end retryWithErrorMode /** Retries an operation returning an [[scala.util.Either]] until it succeeds or the config decides to stop. Note that any exceptions * thrown by the operation aren't caught and don't cause a retry to happen. @@ -80,7 +82,7 @@ case class AdaptiveRetry( * @param config * The retry config - see [[RetryConfig]]. * @param isFailure - * Function to decide if returned [[E]] should be considered failure. + * Function to decide if returned result [[T]] should be considered failure. * @param operation * The operation to retry. * @tparam E @@ -93,8 +95,8 @@ case class AdaptiveRetry( * @see * [[scheduledEither]] */ - def retryEither[E, T](config: RetryConfig[E, T], isFailure: E => Boolean = (_: E) => true)(operation: => Either[E, T]): Either[E, T] = - apply(config, isFailure, EitherMode[E])(operation) + def retryEither[E, T](config: RetryConfig[E, T], isFailure: T => Boolean = (_: T) => true)(operation: => Either[E, T]): Either[E, T] = + retryWithErrorMode(config, isFailure, EitherMode[E])(operation) /** Retries an operation returning a direct result until it succeeds or the config decides to stop. * @@ -103,7 +105,7 @@ case class AdaptiveRetry( * @param config * The retry config - see [[RetryConfig]]. * @param isFailure - * Function to decide if returned [[Throwable]] should be considered failure. + * Function to decide if returned result [[T]] should be considered failure. * @param operation * The operation to retry. * @return @@ -113,7 +115,10 @@ case class AdaptiveRetry( * @see * [[scheduled]] */ - def retry[T](config: RetryConfig[Throwable, T], isFailure: Throwable => Boolean = _ => true)(operation: => T): T = - apply(config, isFailure, EitherMode[Throwable])(Try(operation).toEither).fold(throw _, identity) + def retry[T](config: RetryConfig[Throwable, T], isFailure: T => Boolean = (_: T) => true)(operation: => T): T = + retryWithErrorMode(config, isFailure, EitherMode[Throwable])(Try(operation).toEither).fold(throw _, identity) end AdaptiveRetry + +object AdaptiveRetry: + def default: AdaptiveRetry = AdaptiveRetry(TokenBucket(500), 5, 1) diff --git a/core/src/test/scala/ox/resilience/DelayedRetryTest.scala b/core/src/test/scala/ox/resilience/DelayedRetryTest.scala index c0b0b741..278e294b 100644 --- a/core/src/test/scala/ox/resilience/DelayedRetryTest.scala +++ b/core/src/test/scala/ox/resilience/DelayedRetryTest.scala @@ -86,7 +86,7 @@ class DelayedRetryTest extends AnyFlatSpec with Matchers with EitherValues with if counter <= retriesUntilSuccess then throw RuntimeException(errorMessage) else successfulResult // when - val adaptive = AdaptiveRetry(TokenBucket(bucketSize)) + val adaptive = AdaptiveRetry(TokenBucket(bucketSize), 1, 1) val result = the[RuntimeException] thrownBy adaptive.retry(RetryConfig.delayForever(sleep))(f) // then diff --git a/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala b/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala index 8e382b9e..dcd5ccc6 100644 --- a/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala +++ b/core/src/test/scala/ox/resilience/ImmediateRetryTest.scala @@ -174,7 +174,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi if counter <= 2 then Left(errorMessage) else Right("Success") - val adaptive = AdaptiveRetry(TokenBucket(5)) + val adaptive = AdaptiveRetry(TokenBucket(5), 1, 1) // when val result = adaptive.retryEither(RetryConfig.immediate(5))(f) @@ -192,7 +192,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi counter += 1 Left(errorMessage) - val adaptive = AdaptiveRetry(TokenBucket(2)) + val adaptive = AdaptiveRetry(TokenBucket(2), 1, 1) // when val result = adaptive.retryEither(RetryConfig.immediate[String, String](5))(f) @@ -211,7 +211,7 @@ class ImmediateRetryTest extends AnyFlatSpec with EitherValues with TryValues wi counter += 1 Left(errorMessage) - val adaptive = AdaptiveRetry(TokenBucket(2)) + val adaptive = AdaptiveRetry(TokenBucket(2), 1, 1) // when val result = adaptive.retryEither[String, String](RetryConfig.immediate(5), _ => false)(f) diff --git a/doc/utils/retries.md b/doc/utils/retries.md index 3839976e..0ae01cf5 100644 --- a/doc/utils/retries.md +++ b/doc/utils/retries.md @@ -127,7 +127,7 @@ retryWithErrorMode(UnionMode[String])(RetryConfig(Schedule.Immediate(3), ResultP See the tests in `ox.resilience.*` for more. # Adaptive retries -This retry mechanism by implementing part of [AdaptiveRetryStrategy](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/AdaptiveRetryStrategy.html) from `aws-sdk-java-v2`. Class `AdaptiveRetry` contains thread safe `TokenBucket` that acts as a circuit breaker for instance of this class. +This retry mechanism is inspired by the talk `AWS re:Invent 2024 - Try again: The tools and techniques behind resilient systems` and [AdaptiveRetryStrategy](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/retries/AdaptiveRetryStrategy.html) from `aws-sdk-java-v2`. Class `AdaptiveRetry` contains thread safe `TokenBucket` that acts as a circuit breaker for instance of this class. For every retry, tokens are taken from bucket, if there is not enough we stop retrying. For every successful operations tokens are added back to bucket. This allows for "normal" retry mechanism in case of transient failures, but does not allow to generate for example 4 times the load in case of systemic failure (if we retry every operation 3 times). @@ -170,10 +170,10 @@ adaptive.retry(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_.getMe adaptive.retryEither(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")))(eitherOperation) // custom error mode -adaptive(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")), errorMode = UnionMode[String])(unionOperation) +adaptive.retryWithErrorMode(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")), errorMode = UnionMode[String])(unionOperation) // consider "throttling error" not as a failure that should incur the retry penalty -adaptive(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")), isFailure = _ != "throttling error", errorMode = UnionMode[String])(unionOperation) +adaptive.retryWithErrorMode(RetryConfig(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error")), isFailure = _ != "throttling error", errorMode = UnionMode[String])(unionOperation) ``` Instance of `AdaptiveRetry` can be shared for different operation, for example different operations on the same constrained resource.