Skip to content

Commit

Permalink
Cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
dvgica committed Oct 31, 2023
1 parent 01306dd commit 7c66db7
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 49 deletions.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ All library functionality is based on implementations of `Periodic`. Therefore a

#### JDK Implementation

`JdkPeriodic` is the default implementation provided in `periodic-core`. It is suitable for most usages, although users with many `AutoUpdatingVar`s or `Runner`s may wish to provide a shared `ScheduledExecutorService` to them, to avoid starting many threads. The number of threads in this shared `ScheduledExecutorService` will need to be tuned based on workload. Threads in the `ScheduledExecutorService` will be blocked.
`JdkPeriodic` is the default implementation provided in `periodic-core`, which is is suitable for many use cases. Usages of the `jdk` and `jdkFuture` methods on the `AutoUpdatingVar` and `FnRunner` companion objects create a new, non-shared `JdkPeriodic` (and thus a new thread) for each invocation. This will work well as long as the number of threads is not problematic for your application.

Users with many `AutoUpdatingVar`s or `FnRunner`s may wish to share a `JdkPeriodic` between them to decrease the total number of threads used. In this case, the shared `JdkPeriodic` may need to be tuned based on workload. Specifically, users may need to provide a `ScheduledExecutorService` to the shared `JdkPeriodic` with an increased thread count (the default number of threads used by a `JdkPeriodic` is one). Threads in the `ScheduledExecutorService` will be blocked.

The JDK implementation works out of the box with sync (`Identity`) or async (`scala.concurrent.Future`) update code. If usage with another effect is desired, provide a typeclass implementation of `ca.dvgi.periodic.jdk.Eval`.

#### Pekko Streams Implementation

The Pekko Streams implementation is completely non-blocking, does not need additional resources besides an `ActorSystem`, and will scale to many `AutoUpdatingVar`s and `Runner`s without requiring tuning. It is recommended if you are already using Pekko or don't mind the extra dependency.
The Pekko Streams implementation is completely non-blocking and does not need additional resources besides an `ActorSystem`. A single `PekkoStreamsPeriodic` can be shared by many `AutoUpdatingVar`s and `FnRunner`s without requiring tuning. It is recommended if you are already using Pekko or don't mind the extra dependency.

The Pekko Streams implementation only works with `scala.concurrent.Future`.

Expand Down Expand Up @@ -129,7 +131,7 @@ def updateData(): Future[String] = Future.successful(Instant.now.toString)
implicit val actorSystem = ActorSystem() // generally you should have an ActorSystem in your process already

val data = AutoUpdatingVar(
PekkoStreamsPeriodic[String]() // T must be explicitly provided, it can't be inferred
PekkoStreamsPeriodic() // can also be shared by many AutoUpdatingVars or FnRunners
)(
updateData(),
UpdateInterval.Static(1.second),
Expand All @@ -150,7 +152,7 @@ def doSomething(): FiniteDuration = {
10.seconds
}

// alternately use FnRunner.jdkFuture or FnRunner.apply
// alternately use FnRunner.jdkFuture or FnRunner.apply(somePeriodic)
val runner = FnRunner.jdk(doSomething, AttemptStrategy.Infinite(1.second), "time printer")
```
## Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import java.util.concurrent.ScheduledExecutorService
* A name for this variable, used in logging. If unspecified, the simple class name of T will be
* used.
*/
class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R, T])(
class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R])(
updateVar: => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: AttemptStrategy,
Expand All @@ -61,13 +61,13 @@ class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R, T])(

@volatile private var variable: Option[T] = None

private val _ready = periodic.scheduleNow(
private val _ready = periodic.scheduleNow[T](
log,
"initialize var",
() => updateVar,
newV => {
variable = Some(newV)
periodic.scheduleRecurring(
periodic.scheduleRecurring[T](
log,
"update var",
updateInterval.duration(newV),
Expand Down Expand Up @@ -109,7 +109,7 @@ object AutoUpdatingVar {
/** @see
* [[ca.dvgi.periodic.AutoUpdatingVar]]
*/
def apply[U[_], R[_], T](periodic: Periodic[U, R, T])(
def apply[U[_], R[_], T](periodic: Periodic[U, R])(
updateVar: => U[T],
updateInterval: UpdateInterval[T],
updateAttemptStrategy: AttemptStrategy,
Expand Down Expand Up @@ -144,7 +144,7 @@ object AutoUpdatingVar {
executorOverride: Option[ScheduledExecutorService] = None
)(implicit ct: ClassTag[T]): AutoUpdatingVar[Identity, Future, T] = {
new AutoUpdatingVar(
new JdkPeriodic[Identity, T](executorOverride)
new JdkPeriodic[Identity](executorOverride)
)(
updateVar,
updateInterval,
Expand Down Expand Up @@ -172,7 +172,7 @@ object AutoUpdatingVar {
executorOverride: Option[ScheduledExecutorService] = None
)(implicit ct: ClassTag[T]): AutoUpdatingVar[Future, Future, T] = {
new AutoUpdatingVar(
new JdkPeriodic[Future, T](executorOverride)
new JdkPeriodic[Future](executorOverride)
)(
updateVar,
updateInterval,
Expand Down
10 changes: 5 additions & 5 deletions periodic-core/src/main/scala/ca/dvgi/periodic/FnRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.concurrent.Future
* @param initialDelay
* If specified, the first run of the function will be delayed this much
*/
class FnRunner[F[_], R[_]](periodic: Periodic[F, R, FiniteDuration])(
class FnRunner[F[_], R[_]](periodic: Periodic[F, R])(
fn: => F[FiniteDuration],
fnAttemptStrategy: AttemptStrategy,
fnName: String,
Expand All @@ -33,7 +33,7 @@ class FnRunner[F[_], R[_]](periodic: Periodic[F, R, FiniteDuration])(

log.info(s"Starting. ${fnAttemptStrategy.description}")

periodic.scheduleRecurring(
periodic.scheduleRecurring[FiniteDuration](
log,
fnName,
initialDelay,
Expand All @@ -50,7 +50,7 @@ class FnRunner[F[_], R[_]](periodic: Periodic[F, R, FiniteDuration])(
}

object FnRunner {
def apply[F[_], R[_]](periodic: Periodic[F, R, FiniteDuration])(
def apply[F[_], R[_]](periodic: Periodic[F, R])(
fn: => F[FiniteDuration],
fnAttemptStrategy: AttemptStrategy,
fnName: String,
Expand All @@ -63,7 +63,7 @@ object FnRunner {
fnName: String,
initialDelay: FiniteDuration = 0.seconds
): FnRunner[Identity, Future] =
new FnRunner(JdkPeriodic[Identity, FiniteDuration]())(
new FnRunner(JdkPeriodic[Identity]())(
fn,
fnAttemptStrategy,
fnName,
Expand All @@ -76,7 +76,7 @@ object FnRunner {
fnName: String,
initialDelay: FiniteDuration = 0.seconds
): FnRunner[Future, Future] =
new FnRunner(JdkPeriodic[Future, FiniteDuration]())(
new FnRunner(JdkPeriodic[Future]())(
fn,
fnAttemptStrategy,
fnName,
Expand Down
6 changes: 3 additions & 3 deletions periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import scala.concurrent.duration.FiniteDuration
import org.slf4j.Logger
import scala.concurrent.duration.Duration

trait Periodic[F[_], R[_], T] extends AutoCloseable {
def scheduleNow(
trait Periodic[F[_], R[_]] extends AutoCloseable {
def scheduleNow[T](
log: Logger,
operationName: String,
fn: () => F[T],
Expand All @@ -14,7 +14,7 @@ trait Periodic[F[_], R[_], T] extends AutoCloseable {
blockUntilCompleteTimeout: Option[Duration] = None
): R[Unit]

def scheduleRecurring(
def scheduleRecurring[T](
log: Logger,
operationName: String,
initialDelay: FiniteDuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import java.util.concurrent.Executors
import scala.concurrent.duration.Duration
import scala.concurrent.Await

class JdkPeriodic[F[_], T](
class JdkPeriodic[F[_]](
executorOverride: Option[ScheduledExecutorService] = None
)(implicit evalF: Eval[F])
extends Periodic[F, Future, T] {
extends Periodic[F, Future] {

private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1))

Expand All @@ -32,7 +32,7 @@ class JdkPeriodic[F[_], T](

@volatile private var recurringTask: Option[ScheduledFuture[_]] = None

override def scheduleNow(
override def scheduleNow[T](
log: Logger,
operationName: String,
fn: () => F[T],
Expand Down Expand Up @@ -92,7 +92,7 @@ class JdkPeriodic[F[_], T](
}
}

override def scheduleRecurring(
override def scheduleRecurring[T](
log: Logger,
operationName: String,
initialDelay: FiniteDuration,
Expand All @@ -116,7 +116,7 @@ class JdkPeriodic[F[_], T](
()
}

private def scheduleNext(delay: FiniteDuration)(implicit
private def scheduleNext[T](delay: FiniteDuration)(implicit
log: Logger,
operationName: String,
fn: () => F[T],
Expand All @@ -140,7 +140,7 @@ class JdkPeriodic[F[_], T](
()
}

private class FnRunnable(attempt: Int)(implicit
private class FnRunnable[T](attempt: Int)(implicit
log: Logger,
operationName: String,
fn: () => F[T],
Expand Down Expand Up @@ -200,9 +200,9 @@ class JdkPeriodic[F[_], T](
}

object JdkPeriodic {
def apply[F[_], T](
def apply[F[_]](
executorOverride: Option[ScheduledExecutorService] = None
)(implicit evalF: Eval[F]): JdkPeriodic[F, T] = {
new JdkPeriodic[F, T](executorOverride)
)(implicit evalF: Eval[F]): JdkPeriodic[F] = {
new JdkPeriodic[F](executorOverride)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {

def evalU[T](ut: U[T]): T

def testAll(periodic: () => Periodic[U, Future, Int])(implicit
def testAll(periodic: () => Periodic[U, Future])(implicit
loc: munit.Location
): Unit = {
implicit val per = periodic
Expand All @@ -55,7 +55,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testBasicsWithBlocking(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {
FunFixture(
_ => {
Expand Down Expand Up @@ -97,7 +97,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testAdjustsUpdateInterval(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

FunFixture(
Expand Down Expand Up @@ -138,7 +138,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testReturnsFailedReady(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

FunFixture(
Expand All @@ -158,7 +158,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testThrowsFromLatest(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

FunFixture(
Expand All @@ -181,7 +181,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testThrowsFromConstructor(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

test(
Expand All @@ -201,7 +201,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testHandlesInititializationErrors(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

FunFixture(
Expand All @@ -227,7 +227,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testInfiniteReattempts(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

FunFixture(
Expand Down Expand Up @@ -265,7 +265,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite {
def testFiniteReattempts(
)(implicit
loc: munit.Location,
periodic: () => Periodic[U, Future, Int]
periodic: () => Periodic[U, Future]
): Unit = {

var terminated = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class FutureJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Future] {

def pureU(thunk: => Int): Future[Int] = Future(thunk)

def periodicBuilder() = new JdkPeriodic[Future, Int]()
def periodicBuilder() = new JdkPeriodic[Future]()

testAll(periodicBuilder)

Expand All @@ -24,7 +24,7 @@ class FutureJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Future] {
val ses = Executors.newScheduledThreadPool(1)
val v =
new AutoUpdatingVar(
JdkPeriodic[Future, Int](Some(ses))
JdkPeriodic[Future](Some(ses))
)(
holder.get,
UpdateInterval.Static(2.seconds),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class IdentityJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Identity

def pureU(thunk: => Int): Identity[Int] = thunk

def periodicBuilder() = new JdkPeriodic[Identity, Int]
def periodicBuilder() = new JdkPeriodic[Identity]

testAll(periodicBuilder)

Expand All @@ -21,7 +21,7 @@ class IdentityJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Identity
val ses = Executors.newScheduledThreadPool(1)
val v =
AutoUpdatingVar(
JdkPeriodic[Identity, Int](Some(ses))
JdkPeriodic[Identity](Some(ses))
)(
holder.get,
UpdateInterval.Static(2.seconds),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ import scala.util.Try
* @param actorSystem
* An ActorSystem used to execute periodic actions.
*/
class PekkoStreamsPeriodic[T](implicit
class PekkoStreamsPeriodic(implicit
actorSystem: ActorSystem
) extends Periodic[Future, Future, T] {
) extends Periodic[Future, Future] {
import PekkoStreamsPeriodic._

implicit private val ec: ExecutionContext = actorSystem.dispatcher

private val killSwitch = KillSwitches.shared("close")

override def scheduleNow(
override def scheduleNow[T](
log: Logger,
operationName: String,
fn: () => Future[T],
Expand Down Expand Up @@ -66,7 +66,7 @@ class PekkoStreamsPeriodic[T](implicit
}
}

override def scheduleRecurring(
override def scheduleRecurring[T](
log: Logger,
operationName: String,
initialDelay: FiniteDuration,
Expand All @@ -82,7 +82,7 @@ class PekkoStreamsPeriodic[T](implicit
killSwitch.shutdown()
}

private def scheduleNext(delay: FiniteDuration)(implicit
private def scheduleNext[T](delay: FiniteDuration)(implicit
log: Logger,
operationName: String,
fn: () => Future[T],
Expand Down Expand Up @@ -141,7 +141,7 @@ class PekkoStreamsPeriodic[T](implicit
}
}

private def buildRunFnSource(delay: FiniteDuration)(implicit
private def buildRunFnSource[T](delay: FiniteDuration)(implicit
log: Logger,
operationName: String,
fn: () => Future[T]
Expand All @@ -165,7 +165,7 @@ class PekkoStreamsPeriodic[T](implicit
object PekkoStreamsPeriodic {
def apply[T]()(implicit
actorSystem: ActorSystem
): PekkoStreamsPeriodic[T] = new PekkoStreamsPeriodic[T]
): PekkoStreamsPeriodic = new PekkoStreamsPeriodic

private case class RunFnException(cause: Throwable) extends RuntimeException
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class PekkoStreamsAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Future]

def pureU(thunk: => Int): Future[Int] = Future(thunk)

def periodicBuilder(): () => Periodic[Future, Future, Int] =
() => PekkoStreamsPeriodic[Int]()
def periodicBuilder(): () => Periodic[Future, Future] =
() => PekkoStreamsPeriodic()

testAll(periodicBuilder())
}

0 comments on commit 7c66db7

Please sign in to comment.