From 7c66db7300520db55a10bd3cb65834a337474bb2 Mon Sep 17 00:00:00 2001 From: David van Geest Date: Mon, 30 Oct 2023 23:24:56 -0400 Subject: [PATCH] Cleanup. --- README.md | 10 ++++++---- .../ca/dvgi/periodic/AutoUpdatingVar.scala | 12 ++++++------ .../main/scala/ca/dvgi/periodic/FnRunner.scala | 10 +++++----- .../main/scala/ca/dvgi/periodic/Periodic.scala | 6 +++--- .../ca/dvgi/periodic/jdk/JdkPeriodic.scala | 18 +++++++++--------- .../dvgi/periodic/AutoUpdaterTestsFuture.scala | 18 +++++++++--------- .../jdk/FutureJdkAutoUpdatingVarTest.scala | 4 ++-- .../jdk/IdentityJdkAutoUpdatingVarTest.scala | 4 ++-- .../pekko/stream/PekkoStreamsPeriodic.scala | 14 +++++++------- .../PekkoStreamsAutoUpdatingVarTest.scala | 4 ++-- 10 files changed, 51 insertions(+), 49 deletions(-) diff --git a/README.md b/README.md index 98d6ff7..ff8315b 100644 --- a/README.md +++ b/README.md @@ -59,13 +59,15 @@ All library functionality is based on implementations of `Periodic`. Therefore a #### JDK Implementation -`JdkPeriodic` is the default implementation provided in `periodic-core`. It is suitable for most usages, although users with many `AutoUpdatingVar`s or `Runner`s may wish to provide a shared `ScheduledExecutorService` to them, to avoid starting many threads. The number of threads in this shared `ScheduledExecutorService` will need to be tuned based on workload. Threads in the `ScheduledExecutorService` will be blocked. +`JdkPeriodic` is the default implementation provided in `periodic-core`, which is is suitable for many use cases. Usages of the `jdk` and `jdkFuture` methods on the `AutoUpdatingVar` and `FnRunner` companion objects create a new, non-shared `JdkPeriodic` (and thus a new thread) for each invocation. This will work well as long as the number of threads is not problematic for your application. + +Users with many `AutoUpdatingVar`s or `FnRunner`s may wish to share a `JdkPeriodic` between them to decrease the total number of threads used. In this case, the shared `JdkPeriodic` may need to be tuned based on workload. Specifically, users may need to provide a `ScheduledExecutorService` to the shared `JdkPeriodic` with an increased thread count (the default number of threads used by a `JdkPeriodic` is one). Threads in the `ScheduledExecutorService` will be blocked. The JDK implementation works out of the box with sync (`Identity`) or async (`scala.concurrent.Future`) update code. If usage with another effect is desired, provide a typeclass implementation of `ca.dvgi.periodic.jdk.Eval`. #### Pekko Streams Implementation -The Pekko Streams implementation is completely non-blocking, does not need additional resources besides an `ActorSystem`, and will scale to many `AutoUpdatingVar`s and `Runner`s without requiring tuning. It is recommended if you are already using Pekko or don't mind the extra dependency. +The Pekko Streams implementation is completely non-blocking and does not need additional resources besides an `ActorSystem`. A single `PekkoStreamsPeriodic` can be shared by many `AutoUpdatingVar`s and `FnRunner`s without requiring tuning. It is recommended if you are already using Pekko or don't mind the extra dependency. The Pekko Streams implementation only works with `scala.concurrent.Future`. @@ -129,7 +131,7 @@ def updateData(): Future[String] = Future.successful(Instant.now.toString) implicit val actorSystem = ActorSystem() // generally you should have an ActorSystem in your process already val data = AutoUpdatingVar( - PekkoStreamsPeriodic[String]() // T must be explicitly provided, it can't be inferred + PekkoStreamsPeriodic() // can also be shared by many AutoUpdatingVars or FnRunners )( updateData(), UpdateInterval.Static(1.second), @@ -150,7 +152,7 @@ def doSomething(): FiniteDuration = { 10.seconds } -// alternately use FnRunner.jdkFuture or FnRunner.apply +// alternately use FnRunner.jdkFuture or FnRunner.apply(somePeriodic) val runner = FnRunner.jdk(doSomething, AttemptStrategy.Infinite(1.second), "time printer") ``` ## Contributing diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala index 37a20c8..d98485f 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/AutoUpdatingVar.scala @@ -40,7 +40,7 @@ import java.util.concurrent.ScheduledExecutorService * A name for this variable, used in logging. If unspecified, the simple class name of T will be * used. */ -class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R, T])( +class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R])( updateVar: => U[T], updateInterval: UpdateInterval[T], updateAttemptStrategy: AttemptStrategy, @@ -61,13 +61,13 @@ class AutoUpdatingVar[U[_], R[_], T](periodic: Periodic[U, R, T])( @volatile private var variable: Option[T] = None - private val _ready = periodic.scheduleNow( + private val _ready = periodic.scheduleNow[T]( log, "initialize var", () => updateVar, newV => { variable = Some(newV) - periodic.scheduleRecurring( + periodic.scheduleRecurring[T]( log, "update var", updateInterval.duration(newV), @@ -109,7 +109,7 @@ object AutoUpdatingVar { /** @see * [[ca.dvgi.periodic.AutoUpdatingVar]] */ - def apply[U[_], R[_], T](periodic: Periodic[U, R, T])( + def apply[U[_], R[_], T](periodic: Periodic[U, R])( updateVar: => U[T], updateInterval: UpdateInterval[T], updateAttemptStrategy: AttemptStrategy, @@ -144,7 +144,7 @@ object AutoUpdatingVar { executorOverride: Option[ScheduledExecutorService] = None )(implicit ct: ClassTag[T]): AutoUpdatingVar[Identity, Future, T] = { new AutoUpdatingVar( - new JdkPeriodic[Identity, T](executorOverride) + new JdkPeriodic[Identity](executorOverride) )( updateVar, updateInterval, @@ -172,7 +172,7 @@ object AutoUpdatingVar { executorOverride: Option[ScheduledExecutorService] = None )(implicit ct: ClassTag[T]): AutoUpdatingVar[Future, Future, T] = { new AutoUpdatingVar( - new JdkPeriodic[Future, T](executorOverride) + new JdkPeriodic[Future](executorOverride) )( updateVar, updateInterval, diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/FnRunner.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/FnRunner.scala index 911e0c6..3d7d298 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/FnRunner.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/FnRunner.scala @@ -22,7 +22,7 @@ import scala.concurrent.Future * @param initialDelay * If specified, the first run of the function will be delayed this much */ -class FnRunner[F[_], R[_]](periodic: Periodic[F, R, FiniteDuration])( +class FnRunner[F[_], R[_]](periodic: Periodic[F, R])( fn: => F[FiniteDuration], fnAttemptStrategy: AttemptStrategy, fnName: String, @@ -33,7 +33,7 @@ class FnRunner[F[_], R[_]](periodic: Periodic[F, R, FiniteDuration])( log.info(s"Starting. ${fnAttemptStrategy.description}") - periodic.scheduleRecurring( + periodic.scheduleRecurring[FiniteDuration]( log, fnName, initialDelay, @@ -50,7 +50,7 @@ class FnRunner[F[_], R[_]](periodic: Periodic[F, R, FiniteDuration])( } object FnRunner { - def apply[F[_], R[_]](periodic: Periodic[F, R, FiniteDuration])( + def apply[F[_], R[_]](periodic: Periodic[F, R])( fn: => F[FiniteDuration], fnAttemptStrategy: AttemptStrategy, fnName: String, @@ -63,7 +63,7 @@ object FnRunner { fnName: String, initialDelay: FiniteDuration = 0.seconds ): FnRunner[Identity, Future] = - new FnRunner(JdkPeriodic[Identity, FiniteDuration]())( + new FnRunner(JdkPeriodic[Identity]())( fn, fnAttemptStrategy, fnName, @@ -76,7 +76,7 @@ object FnRunner { fnName: String, initialDelay: FiniteDuration = 0.seconds ): FnRunner[Future, Future] = - new FnRunner(JdkPeriodic[Future, FiniteDuration]())( + new FnRunner(JdkPeriodic[Future]())( fn, fnAttemptStrategy, fnName, diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala index df0460b..41b70bd 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/Periodic.scala @@ -4,8 +4,8 @@ import scala.concurrent.duration.FiniteDuration import org.slf4j.Logger import scala.concurrent.duration.Duration -trait Periodic[F[_], R[_], T] extends AutoCloseable { - def scheduleNow( +trait Periodic[F[_], R[_]] extends AutoCloseable { + def scheduleNow[T]( log: Logger, operationName: String, fn: () => F[T], @@ -14,7 +14,7 @@ trait Periodic[F[_], R[_], T] extends AutoCloseable { blockUntilCompleteTimeout: Option[Duration] = None ): R[Unit] - def scheduleRecurring( + def scheduleRecurring[T]( log: Logger, operationName: String, initialDelay: FiniteDuration, diff --git a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala index 45bcfd1..cd44f3f 100644 --- a/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala +++ b/periodic-core/src/main/scala/ca/dvgi/periodic/jdk/JdkPeriodic.scala @@ -17,10 +17,10 @@ import java.util.concurrent.Executors import scala.concurrent.duration.Duration import scala.concurrent.Await -class JdkPeriodic[F[_], T]( +class JdkPeriodic[F[_]]( executorOverride: Option[ScheduledExecutorService] = None )(implicit evalF: Eval[F]) - extends Periodic[F, Future, T] { + extends Periodic[F, Future] { private val executor = executorOverride.getOrElse(Executors.newScheduledThreadPool(1)) @@ -32,7 +32,7 @@ class JdkPeriodic[F[_], T]( @volatile private var recurringTask: Option[ScheduledFuture[_]] = None - override def scheduleNow( + override def scheduleNow[T]( log: Logger, operationName: String, fn: () => F[T], @@ -92,7 +92,7 @@ class JdkPeriodic[F[_], T]( } } - override def scheduleRecurring( + override def scheduleRecurring[T]( log: Logger, operationName: String, initialDelay: FiniteDuration, @@ -116,7 +116,7 @@ class JdkPeriodic[F[_], T]( () } - private def scheduleNext(delay: FiniteDuration)(implicit + private def scheduleNext[T](delay: FiniteDuration)(implicit log: Logger, operationName: String, fn: () => F[T], @@ -140,7 +140,7 @@ class JdkPeriodic[F[_], T]( () } - private class FnRunnable(attempt: Int)(implicit + private class FnRunnable[T](attempt: Int)(implicit log: Logger, operationName: String, fn: () => F[T], @@ -200,9 +200,9 @@ class JdkPeriodic[F[_], T]( } object JdkPeriodic { - def apply[F[_], T]( + def apply[F[_]]( executorOverride: Option[ScheduledExecutorService] = None - )(implicit evalF: Eval[F]): JdkPeriodic[F, T] = { - new JdkPeriodic[F, T](executorOverride) + )(implicit evalF: Eval[F]): JdkPeriodic[F] = { + new JdkPeriodic[F](executorOverride) } } diff --git a/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala b/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala index a0984c3..7d2a1ae 100644 --- a/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala +++ b/periodic-core/src/test/scala/ca/dvgi/periodic/AutoUpdaterTestsFuture.scala @@ -31,7 +31,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { def evalU[T](ut: U[T]): T - def testAll(periodic: () => Periodic[U, Future, Int])(implicit + def testAll(periodic: () => Periodic[U, Future])(implicit loc: munit.Location ): Unit = { implicit val per = periodic @@ -55,7 +55,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { def testBasicsWithBlocking( )(implicit loc: munit.Location, - periodic: () => Periodic[U, Future, Int] + periodic: () => Periodic[U, Future] ): Unit = { FunFixture( _ => { @@ -97,7 +97,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { def testAdjustsUpdateInterval( )(implicit loc: munit.Location, - periodic: () => Periodic[U, Future, Int] + periodic: () => Periodic[U, Future] ): Unit = { FunFixture( @@ -138,7 +138,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { def testReturnsFailedReady( )(implicit loc: munit.Location, - periodic: () => Periodic[U, Future, Int] + periodic: () => Periodic[U, Future] ): Unit = { FunFixture( @@ -158,7 +158,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { def testThrowsFromLatest( )(implicit loc: munit.Location, - periodic: () => Periodic[U, Future, Int] + periodic: () => Periodic[U, Future] ): Unit = { FunFixture( @@ -181,7 +181,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { def testThrowsFromConstructor( )(implicit loc: munit.Location, - periodic: () => Periodic[U, Future, Int] + periodic: () => Periodic[U, Future] ): Unit = { test( @@ -201,7 +201,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { def testHandlesInititializationErrors( )(implicit loc: munit.Location, - periodic: () => Periodic[U, Future, Int] + periodic: () => Periodic[U, Future] ): Unit = { FunFixture( @@ -227,7 +227,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { def testInfiniteReattempts( )(implicit loc: munit.Location, - periodic: () => Periodic[U, Future, Int] + periodic: () => Periodic[U, Future] ): Unit = { FunFixture( @@ -265,7 +265,7 @@ trait AutoUpdatingVarTestsFuture[U[_]] extends FunSuite { def testFiniteReattempts( )(implicit loc: munit.Location, - periodic: () => Periodic[U, Future, Int] + periodic: () => Periodic[U, Future] ): Unit = { var terminated = false diff --git a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdatingVarTest.scala b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdatingVarTest.scala index f3a68e3..741eeb5 100644 --- a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdatingVarTest.scala +++ b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/FutureJdkAutoUpdatingVarTest.scala @@ -14,7 +14,7 @@ class FutureJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Future] { def pureU(thunk: => Int): Future[Int] = Future(thunk) - def periodicBuilder() = new JdkPeriodic[Future, Int]() + def periodicBuilder() = new JdkPeriodic[Future]() testAll(periodicBuilder) @@ -24,7 +24,7 @@ class FutureJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Future] { val ses = Executors.newScheduledThreadPool(1) val v = new AutoUpdatingVar( - JdkPeriodic[Future, Int](Some(ses)) + JdkPeriodic[Future](Some(ses)) )( holder.get, UpdateInterval.Static(2.seconds), diff --git a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdatingVarTest.scala b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdatingVarTest.scala index aed35b6..1c36557 100644 --- a/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdatingVarTest.scala +++ b/periodic-core/src/test/scala/ca/dvgi/periodic/jdk/IdentityJdkAutoUpdatingVarTest.scala @@ -11,7 +11,7 @@ class IdentityJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Identity def pureU(thunk: => Int): Identity[Int] = thunk - def periodicBuilder() = new JdkPeriodic[Identity, Int] + def periodicBuilder() = new JdkPeriodic[Identity] testAll(periodicBuilder) @@ -21,7 +21,7 @@ class IdentityJdkAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Identity val ses = Executors.newScheduledThreadPool(1) val v = AutoUpdatingVar( - JdkPeriodic[Identity, Int](Some(ses)) + JdkPeriodic[Identity](Some(ses)) )( holder.get, UpdateInterval.Static(2.seconds), diff --git a/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala b/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala index 71f4e04..baf6153 100644 --- a/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala +++ b/periodic-pekko-stream/src/main/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsPeriodic.scala @@ -23,16 +23,16 @@ import scala.util.Try * @param actorSystem * An ActorSystem used to execute periodic actions. */ -class PekkoStreamsPeriodic[T](implicit +class PekkoStreamsPeriodic(implicit actorSystem: ActorSystem -) extends Periodic[Future, Future, T] { +) extends Periodic[Future, Future] { import PekkoStreamsPeriodic._ implicit private val ec: ExecutionContext = actorSystem.dispatcher private val killSwitch = KillSwitches.shared("close") - override def scheduleNow( + override def scheduleNow[T]( log: Logger, operationName: String, fn: () => Future[T], @@ -66,7 +66,7 @@ class PekkoStreamsPeriodic[T](implicit } } - override def scheduleRecurring( + override def scheduleRecurring[T]( log: Logger, operationName: String, initialDelay: FiniteDuration, @@ -82,7 +82,7 @@ class PekkoStreamsPeriodic[T](implicit killSwitch.shutdown() } - private def scheduleNext(delay: FiniteDuration)(implicit + private def scheduleNext[T](delay: FiniteDuration)(implicit log: Logger, operationName: String, fn: () => Future[T], @@ -141,7 +141,7 @@ class PekkoStreamsPeriodic[T](implicit } } - private def buildRunFnSource(delay: FiniteDuration)(implicit + private def buildRunFnSource[T](delay: FiniteDuration)(implicit log: Logger, operationName: String, fn: () => Future[T] @@ -165,7 +165,7 @@ class PekkoStreamsPeriodic[T](implicit object PekkoStreamsPeriodic { def apply[T]()(implicit actorSystem: ActorSystem - ): PekkoStreamsPeriodic[T] = new PekkoStreamsPeriodic[T] + ): PekkoStreamsPeriodic = new PekkoStreamsPeriodic private case class RunFnException(cause: Throwable) extends RuntimeException } diff --git a/periodic-pekko-stream/src/test/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdatingVarTest.scala b/periodic-pekko-stream/src/test/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdatingVarTest.scala index 0c43c3d..9466057 100644 --- a/periodic-pekko-stream/src/test/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdatingVarTest.scala +++ b/periodic-pekko-stream/src/test/scala/ca/dvgi/periodic/pekko/stream/PekkoStreamsAutoUpdatingVarTest.scala @@ -25,8 +25,8 @@ class PekkoStreamsAutoUpdatingVarTest extends AutoUpdatingVarTestsFuture[Future] def pureU(thunk: => Int): Future[Int] = Future(thunk) - def periodicBuilder(): () => Periodic[Future, Future, Int] = - () => PekkoStreamsPeriodic[Int]() + def periodicBuilder(): () => Periodic[Future, Future] = + () => PekkoStreamsPeriodic() testAll(periodicBuilder()) }