Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule based on cron expressions #263

Merged
merged 5 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ compileDocumentation := {
lazy val rootProject = (project in file("."))
.settings(commonSettings)
.settings(publishArtifact := false, name := "ox")
.aggregate(core, examples, kafka, mdcLogback, flowReactiveStreams)
.aggregate(core, examples, kafka, mdcLogback, flowReactiveStreams, cron)

lazy val core: Project = (project in file("core"))
.settings(commonSettings)
Expand Down Expand Up @@ -94,6 +94,17 @@ lazy val flowReactiveStreams: Project = (project in file("flow-reactive-streams"
)
.dependsOn(core)

lazy val cron: Project = (project in file("cron"))
.settings(commonSettings)
.settings(
name := "cron",
libraryDependencies ++= Seq(
"com.github.alonsodomin.cron4s" %% "cron4s-core" % "0.7.0",
scalaTest
)
)
.dependsOn(core)

lazy val documentation: Project = (project in file("generated-doc")) // important: it must not be doc/
.enablePlugins(MdocPlugin)
.settings(commonSettings)
Expand All @@ -113,5 +124,6 @@ lazy val documentation: Project = (project in file("generated-doc")) // importan
core,
kafka,
mdcLogback,
flowReactiveStreams
flowReactiveStreams,
cron
)
6 changes: 3 additions & 3 deletions core/src/main/scala/ox/scheduling/Schedule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package ox.scheduling
import scala.concurrent.duration.*
import scala.util.Random

sealed trait Schedule:
trait Schedule:
def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration
def initialDelay: FiniteDuration = Duration.Zero

object Schedule:

private[scheduling] sealed trait Finite extends Schedule:
private[scheduling] trait Finite extends Schedule:
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
def maxRepeats: Int
def andThen(nextSchedule: Finite): Finite = FiniteAndFiniteSchedules(this, nextSchedule)
def andThen(nextSchedule: Infinite): Infinite = FiniteAndInfiniteSchedules(this, nextSchedule)

private[scheduling] sealed trait Infinite extends Schedule
private[scheduling] trait Infinite extends Schedule

/** A schedule that represents an initial delay applied before the first invocation of operation being scheduled. Usually used in
* combination with other schedules using [[andThen]]
Expand Down
34 changes: 34 additions & 0 deletions cron/src/main/scala/ox/scheduling/cron/CronSchedule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package ox.scheduling.cron

import cron4s.lib.javatime.*
import cron4s.{Cron, CronExpr, toDateTimeCronOps}
import ox.scheduling.Schedule

import java.time.LocalDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{Duration, FiniteDuration}

case class CronSchedule(cron: CronExpr) extends Schedule.Infinite:
override def initialDelay: FiniteDuration =
val now = LocalDateTime.now()
val next = cron.next(now)
val duration = next.map(n => ChronoUnit.MILLIS.between(now, n))
duration.map(FiniteDuration.apply(_, TimeUnit.MILLISECONDS)).getOrElse(Duration.Zero)
end initialDelay

def nextDuration(invocation: Int, lastDuration: Option[FiniteDuration]): FiniteDuration =
initialDelay
end CronSchedule

object CronSchedule:
/** @param expression
* cron expression to parse
* @return
* [[CronSchedule]] from cron expression
* @throws cron4s.Error
* in case the cron expression is invalid
*/
def unsafeFromString(expression: String): CronSchedule =
CronSchedule(Cron.unsafeParse(expression))
end CronSchedule
72 changes: 72 additions & 0 deletions cron/src/main/scala/ox/scheduling/cron/cron.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package ox.scheduling.cron

import cron4s.CronExpr
import ox.scheduling.*
import ox.{EitherMode, ErrorMode}

import scala.util.Try

/** Repeats an operation returning a direct result based on cron expression until it succeeds we decide to stop.
*
* [[repeat]] is a special case of [[scheduled]] with a given set of defaults.
*
* @param cron
* [[CronExpr]] to create schedule from
* @param shouldContinueOnResult
* A function that determines whether to continue the loop after a success. The function receives the value that was emitted by the last
* invocation. Defaults to [[_ => true]].
* @param operation
* The operation to repeat.
* @return
* The result of the last invocation if the config decides to stop.
* @throws anything
* The exception thrown by the last invocation if the config decides to stop.
* @see
* [[scheduled]]
*/
def repeat[T](cron: CronExpr, shouldContinueOnResult: T => Boolean = (_: T) => true)(operation: => T): T =
Kamil-Lontkowski marked this conversation as resolved.
Show resolved Hide resolved
repeatEither[Throwable, T](cron, shouldContinueOnResult)(Try(operation).toEither).fold(throw _, identity)

/** Repeats an operation based on cron expression returning an [[scala.util.Either]] until we decide to stop. Note that any exceptions
* thrown by the operation aren't caught and effectively interrupt the repeat loop.
*
* [[repeatEither]] is a special case of [[scheduledEither]] with a given set of defaults.
*
* @param cron
* [[CronExpr]] to create schedule from
* @param shouldContinueOnResult
* A function that determines whether to continue the loop after a success. The function receives the value that was emitted by the last
* invocation. Defaults to [[_ => true]].
* @param operation
* The operation to repeat.
* @return
* The result of the last invocation if the config decides to stop.
* @see
* [[scheduledEither]]
*/
def repeatEither[E, T](cron: CronExpr, shouldContinueOnResult: T => Boolean = (_: T) => true)(operation: => Either[E, T]): Either[E, T] =
repeatWithErrorMode(EitherMode[E])(cron, shouldContinueOnResult)(operation)

/** Repeats an operation based on cron expression using the given error mode until we decide to stop. Note that any exceptions thrown by the
* operation aren't caught and effectively interrupt the repeat loop.
*
* [[repeatWithErrorMode]] is a special case of [[scheduledWithErrorMode]] with a given set of defaults.
*
* @param em
* The error mode to use, which specifies when a result value is considered success, and when a failure.
* @param cron
* [[CronExpr]] to create schedule from
* @param shouldContinueOnResult
* A function that determines whether to continue the loop after a success. The function receives the value that was emitted by the last
* invocation. Defaults to [[_ => true]].
* @param operation
* The operation to repeat.
* @return
* The result of the last invocation if the config decides to stop.
* @see
* [[scheduledWithErrorMode]]
*/
def repeatWithErrorMode[E, F[_], T](em: ErrorMode[E, F])(cron: CronExpr, shouldContinueOnResult: T => Boolean = (_: T) => true)(
operation: => F[T]
): F[T] =
scheduledWithErrorMode[E, F, T](em)(RepeatConfig(CronSchedule(cron), shouldContinueOnResult).toScheduledConfig)(operation)
77 changes: 77 additions & 0 deletions doc/integrations/cron4s.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Cron scheduler

Dependency:

```scala
"com.softwaremill.ox" %% "cron" % "@VERSION@"
```

This module allows to run schedules based on cron expressions from [cron4s](https://github.com/alonsodomin/cron4s).

`CronSchedule` can be used in all places that requires `Schedule` especially in repeat scenarios.

For defining `CronExpr` see [cron4s documentation](https://www.alonsodomin.me/cron4s/userguide/index.html).

## Api

The basic syntax for `cron.repeat` is similar to `repeat`:

```scala
import ox.scheduling.cron.repeat

repeat(cronExpr)(operation)
```

The `repeat` API uses `CronSchedule` underneath, but since it does not provide any configuration beyond `CronExpr` there is no need to provide instance of `CronSchedule` directly.

## Operation definition

Similarly to the `repeat` API, the `operation` can be defined:
* directly using a by-name parameter, i.e. `f: => T`
* using a by-name `Either[E, T]`
* or using an arbitrary [error mode](../basics/error-handling.md), accepting the computation in an `F` context: `f: => F[T]`.


## Configuration

The `cron.repeat` requires a `CronExpr`, which defines cron expression on which the schedule will run.

In addition, it is possible to define a custom `shouldContinueOnResult` strategy for deciding if the operation
should continue to be repeated after a successful result returned by the previous operation (defaults to `_: T => true`).

If an operation returns an error, the repeat loop will always be stopped. If an error handling within the operation
is needed, you can use a `retry` inside it (see an example below) or use `scheduled` with `CronSchedule` instead of `cron.repeat`, which allows
full customization.


## Examples

```scala mdoc:compile-only
import ox.UnionMode
import ox.scheduling.cron.{repeat, repeatEither, repeatWithErrorMode}
import scala.concurrent.duration.*
import ox.resilience.{RetryConfig, retry}
import cron4s.*

def directOperation: Int = ???
def eitherOperation: Either[String, Int] = ???
def unionOperation: String | Int = ???

val cronExpr: CronExpr = Cron.unsafeParse("10-35 2,4,6 * ? * *")

// various operation definitions - same syntax
repeat(cronExpr)(directOperation)
repeatEither(cronExpr)(eitherOperation)

// infinite repeats with a custom strategy
def customStopStrategy: Int => Boolean = ???
repeat(cronExpr, shouldContinueOnResult = customStopStrategy)(directOperation)

// custom error mode
repeatWithErrorMode(UnionMode[String])(cronExpr)(unionOperation)

// repeat with retry inside
repeat(cronExpr) {
retry(RetryConfig.backoff(3, 100.millis))(directOperation)
}
```
2 changes: 1 addition & 1 deletion doc/utils/retries.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Instance with default configuration can be obtained with `AdaptiveRetry.default`

`retry` will attempt to retry an operation if it throws an exception; `retryEither` will additionally retry, if the result is a `Left`. Finally `retryWithErrorMode` is the most flexible, and allows retrying operations using custom failure modes (such as union types).

The methods have an additional parameter, `shouldPayPenaltyCost`, which determines if result `T` should be considered failure in terms of paying cost for retry. Penalty is paid only if it is decided to retry operation, the penalty will not be paid for successful operation.
The methods have an additional parameter, `shouldPayPenaltyCost`, which determines if result `Either[E, T]` should be considered as a failure in terms of paying cost for retry. Penalty is paid only if it is decided to retry operation, the penalty will not be paid for successful operation.

### Examples

Expand Down