Skip to content

Commit

Permalink
Change ScheduleContinue to ScheduleStop
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Dec 31, 2024
1 parent 05136b1 commit 70e44ec
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 36 deletions.
21 changes: 7 additions & 14 deletions core/src/main/scala/ox/resilience/AdaptiveRetry.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ox.resilience

import ox.scheduling.{ScheduleContinue, ScheduledConfig, SleepMode, scheduledWithErrorMode}
import ox.scheduling.{ScheduleStop, ScheduledConfig, SleepMode, scheduledWithErrorMode}
import ox.*

import scala.util.Try
Expand Down Expand Up @@ -39,8 +39,6 @@ case class AdaptiveRetry(
/** Retries an operation using the given error mode until it succeeds or the config decides to stop. Note that any exceptions thrown by
* the operation aren't caught (unless the operation catches them as part of its implementation) and don't cause a retry to happen.
*
* This is a special case of [[scheduledWithErrorMode]] with a given set of defaults. See [[RetryConfig]] for more details.
*
* @param config
* The retry config - See [[RetryConfig]].
* @param shouldPayPenaltyCost
Expand Down Expand Up @@ -68,21 +66,21 @@ case class AdaptiveRetry(
shouldPayPenaltyCost: T => Boolean = (_: T) => true
)(operation: => F[T]): F[T] =

val afterAttempt: (Int, Either[E, T]) => ScheduleContinue = (attemptNum, attempt) =>
val afterAttempt: (Int, Either[E, T]) => ScheduleStop = (attemptNum, attempt) =>
config.onRetry(attemptNum, attempt)
attempt match
case Left(value) =>
// If we want to retry we try to acquire tokens from bucket
if config.resultPolicy.isWorthRetrying(value) then ScheduleContinue(tokenBucket.tryAcquire(failureCost))
else ScheduleContinue.No
if config.resultPolicy.isWorthRetrying(value) then ScheduleStop(!tokenBucket.tryAcquire(failureCost))
else ScheduleStop.Yes
case Right(value) =>
// If we are successful, we release tokens to bucket and end schedule
if config.resultPolicy.isSuccess(value) then
tokenBucket.release(successReward)
ScheduleContinue.No
ScheduleStop.Yes
// If it is not success we check if we need to acquire tokens, then we check bucket, otherwise we continue
else if shouldPayPenaltyCost(value) then ScheduleContinue(tokenBucket.tryAcquire(failureCost))
else ScheduleContinue.Yes
else if shouldPayPenaltyCost(value) then ScheduleStop(!tokenBucket.tryAcquire(failureCost))
else ScheduleStop.No
end match
end afterAttempt

Expand All @@ -98,9 +96,6 @@ case class AdaptiveRetry(
/** Retries an operation returning an [[scala.util.Either]] until it succeeds or the config decides to stop. Note that any exceptions
* thrown by the operation aren't caught and don't cause a retry to happen.
*
* [[retryEither]] is a special case of [[scheduledWithErrorMode]] with a given set of defaults. See implementations of [[RetryConfig]]
* for more details.
*
* @param config
* The retry config - see [[RetryConfig]].
* @param shouldPayPenaltyCost
Expand All @@ -124,8 +119,6 @@ case class AdaptiveRetry(
retryWithErrorMode(EitherMode[E])(config, shouldPayPenaltyCost)(operation)

/** Retries an operation returning a direct result until it succeeds or the config decides to stop.
*
* [[retry]] is a special case of [[scheduledWithErrorMode]] with a given set of defaults. See [[RetryConfig]].
*
* @param config
* The retry config - see [[RetryConfig]].
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/ox/resilience/RetryConfig.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ox.resilience

import ox.scheduling.{Jitter, Schedule, ScheduleContinue, ScheduledConfig, SleepMode}
import ox.scheduling.{Jitter, Schedule, ScheduleStop, ScheduledConfig, SleepMode}

import scala.concurrent.duration.*

Expand Down Expand Up @@ -30,12 +30,11 @@ case class RetryConfig[E, T](
onRetry: (Int, Either[E, T]) => Unit = (_: Int, _: Either[E, T]) => ()
):
def toScheduledConfig: ScheduledConfig[E, T] =
val afterAttempt: (Int, Either[E, T]) => ScheduleContinue = (attemptNum, attempt) =>
val afterAttempt: (Int, Either[E, T]) => ScheduleStop = (attemptNum, attempt) =>
onRetry(attemptNum, attempt)
attempt match
case Left(value) => ScheduleContinue(resultPolicy.isWorthRetrying(value))
case Right(value) => ScheduleContinue(!resultPolicy.isSuccess(value))
end afterAttempt
case Left(value) => ScheduleStop(!resultPolicy.isWorthRetrying(value))
case Right(value) => ScheduleStop(resultPolicy.isSuccess(value))

ScheduledConfig(schedule, afterAttempt, SleepMode.Delay)
end toScheduledConfig
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/ox/scheduling/RepeatConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ case class RepeatConfig[E, T](
shouldContinueOnResult: T => Boolean = (_: T) => true
):
def toScheduledConfig: ScheduledConfig[E, T] =
val afterAttempt: (Int, Either[E, T]) => ScheduleContinue = (_, attempt) =>
val afterAttempt: (Int, Either[E, T]) => ScheduleStop = (_, attempt) =>
attempt match
case Left(_) => ScheduleContinue.No
case Right(value) => ScheduleContinue(shouldContinueOnResult(value))
end afterAttempt
case Left(_) => ScheduleStop.Yes
case Right(value) => ScheduleStop(!shouldContinueOnResult(value))

ScheduledConfig(schedule, afterAttempt, SleepMode.Interval)
end toScheduledConfig
Expand Down
26 changes: 13 additions & 13 deletions core/src/main/scala/ox/scheduling/scheduled.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@ enum SleepMode:
end SleepMode

/** @see [[ScheduleConfig.afterAttempt]] */
enum ScheduleContinue(val continue: Boolean):
case Yes extends ScheduleContinue(true)
case No extends ScheduleContinue(false)
enum ScheduleStop(val stop: Boolean):
case Yes extends ScheduleStop(true)
case No extends ScheduleStop(false)

object ScheduleContinue:
def apply(predicate: Boolean): ScheduleContinue = if predicate then Yes else No
object ScheduleStop:
def apply(stop: Boolean): ScheduleStop = if stop then Yes else No

/** A config that defines how to schedule an operation.
*
* @param schedule
* The schedule which determines the maximum number of invocations and the duration between subsequent invocations. See [[Schedule]] for
* more details.
* @param afterAttempt
* A function that determines if schedule should continue. It is invoked after every attempt with current invocation number (starting
* from 1) and the result of the operation.
* A callback invoked after every attempt, with the current invocation number (starting from 1) and the result of the operation. Might
* decide to short-curcuit further attempts, and stop the schedule. Schedule configuration (e.g. max number of attempts) takes
* precedence.
* @param sleepMode
* The mode that specifies how to interpret the duration provided by the schedule. See [[SleepMode]] for more details.
* @tparam E
Expand All @@ -44,8 +45,7 @@ object ScheduleContinue:
*/
case class ScheduledConfig[E, T](
schedule: Schedule,
afterAttempt: (Int, Either[E, T]) => ScheduleContinue = (_, attempt: Either[E, T]) =>
attempt.map(_ => ScheduleContinue.Yes).getOrElse(ScheduleContinue.No),
afterAttempt: (Int, Either[E, T]) => ScheduleStop = (_, _: Either[E, T]) => ScheduleStop.No,
sleepMode: SleepMode = SleepMode.Interval
)

Expand Down Expand Up @@ -107,17 +107,17 @@ def scheduledWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(config: ScheduledCon
operation match
case v if em.isError(v) =>
val error = em.getError(v)
val shouldContinue = config.afterAttempt(invocation, Left(error))
val shouldStop = config.afterAttempt(invocation, Left(error))

if remainingInvocations.forall(_ > 0) && shouldContinue.continue then
if remainingInvocations.forall(_ > 0) && !shouldStop.stop then
val delay = sleepIfNeeded(startTimestamp)
loop(invocation + 1, remainingInvocations.map(_ - 1), Some(delay))
else v
case v =>
val result = em.getT(v)
val shouldContinue = config.afterAttempt(invocation, Right(result))
val shouldStop = config.afterAttempt(invocation, Right(result))

if remainingInvocations.forall(_ > 0) && shouldContinue.continue then
if remainingInvocations.forall(_ > 0) && !shouldStop.stop then
val delay = sleepIfNeeded(startTimestamp)
loop(invocation + 1, remainingInvocations.map(_ - 1), Some(delay))
else v
Expand Down

0 comments on commit 70e44ec

Please sign in to comment.