From 072f61c957e1e65b30e86d9df536bfd5fc33cf8a Mon Sep 17 00:00:00 2001 From: Mateusz Dybowski <matdybowski@gmail.com> Date: Mon, 25 Mar 2024 11:11:37 +0100 Subject: [PATCH] Add side effect callbacks during retry (#106) Co-authored-by: Adam Warski <adam@warski.org> --- .../src/main/scala/ox/retry/RetryPolicy.scala | 10 +- core/src/main/scala/ox/retry/retry.scala | 13 ++- core/src/main/scala/ox/syntax.scala | 3 +- .../src/test/scala/ox/retry/OnRetryTest.scala | 63 ++++++++++++ doc/retries.md | 96 ++++++++++++++----- generated-doc/out/retries.md | 8 +- 6 files changed, 158 insertions(+), 35 deletions(-) create mode 100644 core/src/test/scala/ox/retry/OnRetryTest.scala diff --git a/core/src/main/scala/ox/retry/RetryPolicy.scala b/core/src/main/scala/ox/retry/RetryPolicy.scala index 228a0b96..82652bdf 100644 --- a/core/src/main/scala/ox/retry/RetryPolicy.scala +++ b/core/src/main/scala/ox/retry/RetryPolicy.scala @@ -10,13 +10,21 @@ import scala.concurrent.duration.* * @param resultPolicy * A policy that allows to customize when a non-erroneous result is considered successful and when an error is worth retrying (which * allows for failing fast on certain errors). See [[ResultPolicy]] for more details. + * @param onRetry + * A function that is invoked after each retry attempt. The callback receives the number of the current retry attempt (starting from 1) + * and the result of the operation that was attempted. The result is either a successful value or an error. The callback can be used to + * log information about the retry attempts, or to perform other side effects. By default, the callback does nothing. * @tparam E * The error type of the operation. For operations returning a `T` or a `Try[T]`, this is fixed to `Throwable`. For operations returning * an `Either[E, T]`, this can be any `E`. * @tparam T * The successful result type for the operation. */ -case class RetryPolicy[E, T](schedule: Schedule, resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T]) +case class RetryPolicy[E, T]( + schedule: Schedule, + resultPolicy: ResultPolicy[E, T] = ResultPolicy.default[E, T], + onRetry: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => () +) object RetryPolicy: /** Creates a policy that retries up to a given number of times, with no delay between subsequent attempts, using a default diff --git a/core/src/main/scala/ox/retry/retry.scala b/core/src/main/scala/ox/retry/retry.scala index 073f612d..8d0b01ff 100644 --- a/core/src/main/scala/ox/retry/retry.scala +++ b/core/src/main/scala/ox/retry/retry.scala @@ -30,7 +30,8 @@ def retry[T](operation: => T)(policy: RetryPolicy[Throwable, T]): T = * @return * A [[scala.util.Right]] if the function eventually succeeds, or, otherwise, a [[scala.util.Left]] with the error from the last attempt. */ -def retryEither[E, T](operation: => Either[E, T])(policy: RetryPolicy[E, T]): Either[E, T] = retry(EitherMode[E])(operation)(policy) +def retryEither[E, T](operation: => Either[E, T])(policy: RetryPolicy[E, T]): Either[E, T] = + retryWithErrorMode(EitherMode[E])(operation)(policy) /** Retries an operation using the given error mode until it succeeds or the policy decides to stop. Note that any exceptions thrown by the * operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen. @@ -46,23 +47,27 @@ def retryEither[E, T](operation: => Either[E, T])(policy: RetryPolicy[E, T]): Ei * - the result of the function if it eventually succeeds, in the context of `F`, as dictated by the error mode. * - the error `E` in context `F` as returned by the last attempt if the policy decides to stop. */ -def retry[E, F[_], T](em: ErrorMode[E, F])(operation: => F[T])(policy: RetryPolicy[E, T]): F[T] = +def retryWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(operation: => F[T])(policy: RetryPolicy[E, T]): F[T] = @tailrec def loop(attempt: Int, remainingAttempts: Option[Int], lastDelay: Option[FiniteDuration]): F[T] = def sleepIfNeeded = - val delay = policy.schedule.nextDelay(attempt + 1, lastDelay).toMillis + val delay = policy.schedule.nextDelay(attempt, lastDelay).toMillis if (delay > 0) Thread.sleep(delay) delay operation match case v if em.isError(v) => val error = em.getError(v) + policy.onRetry(attempt, Left(error)) + if policy.resultPolicy.isWorthRetrying(error) && remainingAttempts.forall(_ > 0) then val delay = sleepIfNeeded loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay.millis)) else v case v => val result = em.getT(v) + policy.onRetry(attempt, Right(result)) + if !policy.resultPolicy.isSuccess(result) && remainingAttempts.forall(_ > 0) then val delay = sleepIfNeeded loop(attempt + 1, remainingAttempts.map(_ - 1), Some(delay.millis)) @@ -72,4 +77,4 @@ def retry[E, F[_], T](em: ErrorMode[E, F])(operation: => F[T])(policy: RetryPoli case finiteSchedule: Schedule.Finite => Some(finiteSchedule.maxRetries) case _ => None - loop(0, remainingAttempts, None) + loop(1, remainingAttempts, None) diff --git a/core/src/main/scala/ox/syntax.scala b/core/src/main/scala/ox/syntax.scala index 7a3a2f51..4976844d 100644 --- a/core/src/main/scala/ox/syntax.scala +++ b/core/src/main/scala/ox/syntax.scala @@ -25,8 +25,7 @@ object syntax: def race(f2: => T): T = ox.race(f, f2) def raceResultWith(f2: => T): T = ox.raceResult(f, f2) - extension [T <: AutoCloseable](f: => T)(using Ox) - def useInScope: T = ox.useCloseableInScope(f) + extension [T <: AutoCloseable](f: => T)(using Ox) def useInScope: T = ox.useCloseableInScope(f) extension [I, C[E] <: Iterable[E]](f: => C[I]) def mapPar[O](parallelism: Int)(transform: I => O): C[O] = ox.mapPar(parallelism)(f)(transform) diff --git a/core/src/test/scala/ox/retry/OnRetryTest.scala b/core/src/test/scala/ox/retry/OnRetryTest.scala new file mode 100644 index 00000000..18453d49 --- /dev/null +++ b/core/src/test/scala/ox/retry/OnRetryTest.scala @@ -0,0 +1,63 @@ +package ox.retry + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{EitherValues, TryValues} +import ox.retry.* + +class OnRetryTest extends AnyFlatSpec with Matchers with EitherValues with TryValues: + behavior of "RetryPolicy onRetry callback" + + it should "retry a succeeding function with onRetry callback" in { + // given + var onRetryInvocationCount = 0 + + var counter = 0 + val successfulResult = 42 + + def f = + counter += 1 + successfulResult + + var returnedResult: Either[Throwable, Int] = null + def onRetry(attempt: Int, result: Either[Throwable, Int]): Unit = + onRetryInvocationCount += 1 + returnedResult = result + + // when + val result = retry(f)(RetryPolicy(Schedule.Immediate(3), onRetry = onRetry)) + + // then + result shouldBe successfulResult + counter shouldBe 1 + + onRetryInvocationCount shouldBe 1 + returnedResult shouldBe Right(successfulResult) + } + + it should "retry a failing function with onRetry callback" in { + // given + var onRetryInvocationCount = 0 + + var counter = 0 + val failedResult = new RuntimeException("boom") + + def f = + counter += 1 + if true then throw failedResult + + var returnedResult: Either[Throwable, Unit] = null + def onRetry(attempt: Int, result: Either[Throwable, Unit]): Unit = + onRetryInvocationCount += 1 + returnedResult = result + + // when + val result = the[RuntimeException] thrownBy retry(f)(RetryPolicy(Schedule.Immediate(3), onRetry = onRetry)) + + // then + result shouldBe failedResult + counter shouldBe 4 + + onRetryInvocationCount shouldBe 4 + returnedResult shouldBe Left(failedResult) + } diff --git a/doc/retries.md b/doc/retries.md index 1a6a2095..7fecb318 100644 --- a/doc/retries.md +++ b/doc/retries.md @@ -1,8 +1,10 @@ # Retries -The retries mechanism allows to retry a failing operation according to a given policy (e.g. retry 3 times with a 100ms delay between attempts). +The retries mechanism allows to retry a failing operation according to a given policy (e.g. retry 3 times with a 100ms +delay between attempts). ## API + The basic syntax for retries is: ```scala @@ -28,52 +30,95 @@ which accepts arbitrary [error modes](error-handling.md), accepting the computat ## Policies -A retry policy consists of two parts: -- a `Schedule`, which indicates how many times and with what delay should we retry the `operation` after an initial failure, +A retry policy consists of three parts: + +- a `Schedule`, which indicates how many times and with what delay should we retry the `operation` after an initial + failure, - a `ResultPolicy`, which indicates whether: - - a non-erroneous outcome of the `operation` should be considered a success (if not, the `operation` would be retried), - - an erroneous outcome of the `operation` should be retried or fail fast. + - a non-erroneous outcome of the `operation` should be considered a success (if not, the `operation` would be + retried), + - an erroneous outcome of the `operation` should be retried or fail fast. +- a `onRetry`, which is a callback function that is invoked after each attempt to execute the operation. It is used to + perform any necessary actions or checks after each attempt, regardless of whether the attempt was successful or not. The available schedules are defined in the `Schedule` object. Each schedule has a finite and an infinite variant. ### Finite schedules -Finite schedules have a common `maxRetries: Int` parameter, which determines how many times the `operation` would be retried after an initial failure. This means that the operation could be executed at most `maxRetries + 1` times. +Finite schedules have a common `maxRetries: Int` parameter, which determines how many times the `operation` would be +retried after an initial failure. This means that the operation could be executed at most `maxRetries + 1` times. ### Infinite schedules -Each finite schedule has an infinite variant, whose settings are similar to those of the respective finite schedule, but without the `maxRetries` setting. Using the infinite variant can lead to a possibly infinite number of retries (unless the `operation` starts to succeed again at some point). The infinite schedules are created by calling `.forever` on the companion object of the respective finite schedule (see examples below). +Each finite schedule has an infinite variant, whose settings are similar to those of the respective finite schedule, but +without the `maxRetries` setting. Using the infinite variant can lead to a possibly infinite number of retries (unless +the `operation` starts to succeed again at some point). The infinite schedules are created by calling `.forever` on the +companion object of the respective finite schedule (see examples below). ### Schedule types The supported schedules (specifically - their finite variants) are: + - `Immediate(maxRetries: Int)` - retries up to `maxRetries` times without any delay between subsequent attempts. -- `Delay(maxRetries: Int, delay: FiniteDuration)` - retries up to `maxRetries` times , sleeping for `delay` between subsequent attempts. -- `Backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)` - retries up to `maxRetries` times , sleeping for `initialDelay` before the first retry, increasing the sleep between subsequent attempts exponentially (with base `2`) up to an optional `maxDelay` (default: 1 minute). +- `Delay(maxRetries: Int, delay: FiniteDuration)` - retries up to `maxRetries` times , sleeping for `delay` between + subsequent attempts. +- `Backoff(maxRetries: Int, initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)` - retries up + to `maxRetries` times , sleeping for `initialDelay` before the first retry, increasing the sleep between subsequent + attempts exponentially (with base `2`) up to an optional `maxDelay` (default: 1 minute). - Optionally, a random factor (jitter) can be used when calculating the delay before the next attempt. The purpose of jitter is to avoid clustering of subsequent retries, i.e. to reduce the number of clients calling a service exactly at the same time, which can result in subsequent failures, contrary to what you would expect from retrying. By introducing randomness to the delays, the retries become more evenly distributed over time. + Optionally, a random factor (jitter) can be used when calculating the delay before the next attempt. The purpose of + jitter is to avoid clustering of subsequent retries, i.e. to reduce the number of clients calling a service exactly at + the same time, which can result in subsequent failures, contrary to what you would expect from retrying. By + introducing randomness to the delays, the retries become more evenly distributed over time. - See the [AWS Architecture Blog article on backoff and jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/) for a more in-depth explanation. + See + the [AWS Architecture Blog article on backoff and jitter](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/) + for a more in-depth explanation. The following jitter strategies are available (defined in the `Jitter` enum): - - `None` - the default one, when no randomness is added, i.e. a pure exponential backoff is used, - - `Full` - picks a random value between `0` and the exponential backoff calculated for the current attempt, - - `Equal` - similar to `Full`, but prevents very short delays by always using a half of the original backoff and adding a random value between `0` and the other half, - - `Decorrelated` - uses the delay from the previous attempt (`lastDelay`) and picks a random value between the `initalAttempt` and `3 * lastDelay`. + - `None` - the default one, when no randomness is added, i.e. a pure exponential backoff is used, + - `Full` - picks a random value between `0` and the exponential backoff calculated for the current attempt, + - `Equal` - similar to `Full`, but prevents very short delays by always using a half of the original backoff and + adding a random value between `0` and the other half, + - `Decorrelated` - uses the delay from the previous attempt (`lastDelay`) and picks a random value between + the `initalAttempt` and `3 * lastDelay`. ### Result policies A result policy allows to customize how the results of the `operation` are treated. It consists of two predicates: -- `isSuccess: T => Boolean` (default: `true`) - determines whether a non-erroneous result of the `operation` should be considered a success. When it evaluates to `true` - no further attempts would be made, otherwise - we'd keep retrying. - With finite schedules (i.e. those with `maxRetries` defined), if `isSuccess` keeps returning `false` when `maxRetries` are reached, the result is returned as-is, even though it's considered "unsuccessful", -- `isWorthRetrying: E => Boolean` (default: `true`) - determines whether another attempt would be made if the `operation` results in an error `E`. When it evaluates to `true` - we'd keep retrying, otherwise - we'd fail fast with the error. +- `isSuccess: T => Boolean` (default: `true`) - determines whether a non-erroneous result of the `operation` should be + considered a success. When it evaluates to `true` - no further attempts would be made, otherwise - we'd keep retrying. + + With finite schedules (i.e. those with `maxRetries` defined), if `isSuccess` keeps returning `false` when `maxRetries` + are reached, the result is returned as-is, even though it's considered "unsuccessful", +- `isWorthRetrying: E => Boolean` (default: `true`) - determines whether another attempt would be made if + the `operation` results in an error `E`. When it evaluates to `true` - we'd keep retrying, otherwise - we'd fail fast + with the error. + +The `ResultPolicy[E, T]` is generic both over the error (`E`) and result (`T`) type. Note, however, that for the direct +variant `retry`, the error type `E` is fixed to `Throwable`, while for the `Either` and error-mode variants, `E` can ba +an arbitrary type. + +### On retry -The `ResultPolicy[E, T]` is generic both over the error (`E`) and result (`T`) type. Note, however, that for the direct variant `retry`, the error type `E` is fixed to `Throwable`, while for the `Either` and error-mode variants, `E` can ba an arbitrary type. +The callback function has the following signature: + +``` +(Int, Either[E, T]) => Unit +``` + +Where: +- The first parameter, an `Int`, represents the attempt number of the retry operation. +- The second parameter is an `Either[E, T]` type, representing the result of the retry operation. Left represents an + error and Right represents a successful result. ### API shorthands -When you don't need to customize the result policy (i.e. use the default one), you can use one of the following shorthands to define a retry policy with a given schedule (note that the parameters are the same as when manually creating the respective `Schedule`): +When you don't need to customize the result policy (i.e. use the default one), you can use one of the following +shorthands to define a retry policy with a given schedule (note that the parameters are the same as when manually +creating the respective `Schedule`): + - `RetryPolicy.immediate(maxRetries: Int)`, - `RetryPolicy.immediateForever`, - `RetryPolicy.delay(maxRetries: Int, delay: FiniteDuration)`, @@ -82,16 +127,19 @@ When you don't need to customize the result policy (i.e. use the default one), y - `RetryPolicy.backoffForever(initialDelay: FiniteDuration, maxDelay: FiniteDuration, jitter: Jitter)`. If you want to customize a part of the result policy, you can use the following shorthands: + - `ResultPolicy.default[E, T]` - uses the default settings, -- `ResultPolicy.successfulWhen[E, T](isSuccess: T => Boolean)` - uses the default `isWorthRetrying` and the provided `isSuccess`, -- `ResultPolicy.retryWhen[E, T](isWorthRetrying: E => Boolean)` - uses the default `isSuccess` and the provided `isWorthRetrying`, +- `ResultPolicy.successfulWhen[E, T](isSuccess: T => Boolean)` - uses the default `isWorthRetrying` and the + provided `isSuccess`, +- `ResultPolicy.retryWhen[E, T](isWorthRetrying: E => Boolean)` - uses the default `isSuccess` and the + provided `isWorthRetrying`, - `ResultPolicy.neverRetry[E, T]` - uses the default `isSuccess` and fails fast on any error. ## Examples ```scala mdoc:compile-only import ox.UnionMode -import ox.retry.{retry, retryEither} +import ox.retry.{retry, retryEither, retryWithErrorMode} import ox.retry.{Jitter, ResultPolicy, RetryPolicy, Schedule} import scala.concurrent.duration.* @@ -120,7 +168,7 @@ retry(directOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen retryEither(eitherOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error"))) // custom error mode -retry(UnionMode[String])(unionOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error"))) +retryWithErrorMode(UnionMode[String])(unionOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error"))) ``` See the tests in `ox.retry.*` for more. diff --git a/generated-doc/out/retries.md b/generated-doc/out/retries.md index d61968b7..b723f5eb 100644 --- a/generated-doc/out/retries.md +++ b/generated-doc/out/retries.md @@ -6,9 +6,9 @@ The retries mechanism allows to retry a failing operation according to a given p The basic syntax for retries is: ```scala -import ox.retry.retry +import ox.retry.retryWithErrorMode -retry(operation)(policy) +retryWithErrorMode(operation)(policy) ``` or, using syntax sugar: @@ -91,7 +91,7 @@ If you want to customize a part of the result policy, you can use the following ```scala import ox.UnionMode -import ox.retry.{retry, retryEither} +import ox.retry.{retryWithErrorMode, retryEither} import ox.retry.{Jitter, ResultPolicy, RetryPolicy, Schedule} import scala.concurrent.duration.* @@ -120,7 +120,7 @@ retry(directOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen retryEither(eitherOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error"))) // custom error mode -retry(UnionMode[String])(unionOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error"))) +retryWithErrorMode(UnionMode[String])(unionOperation)(RetryPolicy(Schedule.Immediate(3), ResultPolicy.retryWhen(_ != "fatal error"))) ``` See the tests in `ox.retry.*` for more.