Skip to content

Commit

Permalink
Reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Sep 13, 2022
1 parent 5a69230 commit 239b330
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 223 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ val commonJsSettings = commonSettings ++ Seq(
lazy val rootProject = (project in file("."))
.settings(commonSettings: _*)
.settings(publish / skip := true, name := "odelay", scalaVersion := scala2_13)
.aggregate(core.projectRefs ++ testing.projectRefs ++ coreTests.projectRefs ++ netty3.projectRefs ++ netty.projectRefs ++twitter.projectRefs: _*)
.aggregate(core.projectRefs ++ testing.projectRefs ++ coreTests.projectRefs ++ netty3.projectRefs ++ netty.projectRefs ++ twitter.projectRefs: _*)

lazy val core = (projectMatrix in file("odelay-core"))
.settings(
Expand Down
89 changes: 42 additions & 47 deletions odelay-core/src/main/scala/Delay.scala
Original file line number Diff line number Diff line change
@@ -1,77 +1,72 @@
package odelay

import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.{Duration, FiniteDuration}
import java.util.concurrent.CancellationException

import scala.util.control.NonFatal

/**
* Provides an interface for producing Delays. Use
* requires an implicit [[odelay.Timer]] to be in implicit scope.
* {{{
* val delay = odelay.Delay(2.seconds) {
* todo
* }
* }}}
*/
/** Provides an interface for producing Delays. Use requires an implicit [[odelay.Timer]] to be in implicit scope.
* {{{
* val delay = odelay.Delay(2.seconds) {
* todo
* }
* }}}
*/
object Delay {

/** @return a one-off Delay which may be cancelled */
def apply[T]
(delay: FiniteDuration)
(todo: => T)
(implicit timer: Timer): Delay[T] =
timer(delay, todo)
def apply[T](delay: FiniteDuration)(todo: => T)(implicit timer: Timer): Delay[T] =
timer(delay, todo)

/** @return a periodic Delay which may be cancelled */
def every[T]
(every: FiniteDuration)
(delay: FiniteDuration = Duration.Zero)
(todo: => T)
(implicit timer: Timer): PeriodicDelay[T] =
timer(delay, every, todo)

private [odelay] def cancel[T](p: Promise[T]) =
def every[T](every: FiniteDuration)(delay: FiniteDuration = Duration.Zero)(todo: => T)(implicit
timer: Timer
): PeriodicDelay[T] =
timer(delay, every, todo)

private[odelay] def cancel[T](p: Promise[T]) =
if (!p.isCompleted) p.failure(new CancellationException)
}

/** A Delay is the default of a deferred operation */
trait Delay[T] {
/** @return a Future represent the execution of the Delays operation. Delays
* to be repeated expose a future that will never complete until cancelled */

/** @return
* a Future represent the execution of the Delays operation. Delays to be repeated expose a future that will never
* complete until cancelled
*/
def future: Future[T]

/** Cancels the execution of the delayed operation. Once a Delay
* is canceled, if additional attempts to cancel will result in undefined
* behavior */
/** Cancels the execution of the delayed operation. Once a Delay is canceled, if additional attempts to cancel will
* result in undefined behavior
*/
def cancel(): Unit
}

trait PeriodicDelay[T] extends Delay[T] {
def period: FiniteDuration
}

/** If calling cancel on a Delay's implemention has no other effect
* than cancelling the underlying promise. Use this as a mix in.
* {{{
* val timer = new Timer {
* def apply(delay: FiniteDuration, op: => T) = new PromisingDelay[T] with SelfCancelation[T] {
* schedule(delay, completePromise(op))
* }
* ...
* }
* }}}
*/
/** If calling cancel on a Delay's implemention has no other effect than cancelling the underlying promise. Use this as
* a mix in.
* {{{
* val timer = new Timer {
* def apply(delay: FiniteDuration, op: => T) = new PromisingDelay[T] with SelfCancelation[T] {
* schedule(delay, completePromise(op))
* }
* ...
* }
* }}}
*/
trait SelfCancelation[T] { self: PromisingDelay[T] =>
def cancel() = cancelPromise()
}

/**
* A building block for writing your own [[odelay.Timer]].
* Call `completePromise(_)` with the value of the result
* of the operation. Call `cancelPromise()` to cancel it.
* To query the current state of the promise, use `promiseIncomplete`
*/
/** A building block for writing your own [[odelay.Timer]]. Call `completePromise(_)` with the value of the result of
* the operation. Call `cancelPromise()` to cancel it. To query the current state of the promise, use
* `promiseIncomplete`
*/
abstract class PromisingDelay[T] extends Delay[T] {
private val promise = Promise[T]()

Expand All @@ -95,4 +90,4 @@ abstract class PromisingDelay[T] extends Delay[T] {
def future: Future[T] = promise.future
}

abstract class PeriodicPromisingDelay[T](val period: FiniteDuration) extends PromisingDelay[T] with PeriodicDelay[T]
abstract class PeriodicPromisingDelay[T](val period: FiniteDuration) extends PromisingDelay[T] with PeriodicDelay[T]
28 changes: 12 additions & 16 deletions odelay-core/src/main/scala/package.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
/** Odelay defines a set of primitives for delaying
* the execution of operations.
*
* This is differs from scala.concurrent.Futures in that
* the execution of an operation will not occur until a provided delay, specified
* as a scala.concurrent.duration.Duration. The delay of a
* task may also may be canceled. Operations may also be executed after a series
* of delays, also represented by scala.concurrent.duration.Durations.
*
* These primitives can be used to complement the usage of scala.concurrent.Futures by defining
* a deterministic delay for the future operation as well as a way
* to cancel the future operation.
*
* An odelay.Delay represents a delayed operation and defines a future method which may be used to trigger dependent actions
* and delay cancellations.
*/
/** Odelay defines a set of primitives for delaying the execution of operations.
*
* This is differs from scala.concurrent.Futures in that the execution of an operation will not occur until a provided
* delay, specified as a scala.concurrent.duration.Duration. The delay of a task may also may be canceled. Operations
* may also be executed after a series of delays, also represented by scala.concurrent.duration.Durations.
*
* These primitives can be used to complement the usage of scala.concurrent.Futures by defining a deterministic delay
* for the future operation as well as a way to cancel the future operation.
*
* An odelay.Delay represents a delayed operation and defines a future method which may be used to trigger dependent
* actions and delay cancellations.
*/
package object odelay
11 changes: 7 additions & 4 deletions odelay-core/src/main/scala/timer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import scala.concurrent.duration.FiniteDuration
import scala.annotation.implicitNotFound

/** The deferrer of some arbitrary operation */
@implicitNotFound(
"Cannot find an implicit odelay.Timer, either define one yourself or import odelay.Timer.default")
@implicitNotFound("Cannot find an implicit odelay.Timer, either define one yourself or import odelay.Timer.default")
trait Timer {

/** Delays the execution of an operation until the provided duration */
def apply[T](delay: FiniteDuration, op: => T): Delay[T]
/** Delays the execution of an operation until the provided deplay and then after, repeats the operation at the every duration after.
* Timeouts returned by this expose a Future that will never complete until cancelled */

/** Delays the execution of an operation until the provided deplay and then after, repeats the operation at the every
* duration after. Timeouts returned by this expose a Future that will never complete until cancelled
*/
def apply[T](delay: FiniteDuration, every: FiniteDuration, todo: => T): PeriodicDelay[T]

/** Stops the timer and releases any retained resources. Once a Timer is stoped, it's behavior is undefined. */
def stop(): Unit
}
Expand Down
17 changes: 9 additions & 8 deletions odelay-core/src/main/scalajs/JsTimer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,33 @@ import scalajs.js.timers._
import scala.concurrent.duration.FiniteDuration

class JsTimer() extends Timer {

/** Delays the execution of an operation until the provided duration */
def apply[T](delay: FiniteDuration, op: => T): Delay[T] = {
new PromisingDelay[T] {
val clearable: SetTimeoutHandle = setTimeout(delay){
val clearable: SetTimeoutHandle = setTimeout(delay) {
completePromise(op)
}

def cancel(): Unit = { clearTimeout(clearable); cancelPromise() }
}
}

/** Delays the execution of an operation until the provided deplay and then after, repeats the operation at the every duration after.
* Timeouts returned by this expose a Future that will never complete until cancelled */
/** Delays the execution of an operation until the provided deplay and then after, repeats the operation at the every
* duration after. Timeouts returned by this expose a Future that will never complete until cancelled
*/
def apply[T](delay: FiniteDuration, every: FiniteDuration, todo: => T): PeriodicDelay[T] = {
new PeriodicPromisingDelay[T](every) {
var clearable: SetIntervalHandle = null
val initclearable: SetTimeoutHandle = setTimeout(delay){
val initclearable: SetTimeoutHandle = setTimeout(delay) {
clearTimeout(initclearable);
clearable = setInterval(every){
if(promiseIncomplete) todo
clearable = setInterval(every) {
if (promiseIncomplete) todo
}
}

def cancel(): Unit = {
if(clearable != null) clearInterval(clearable)
if (clearable != null) clearInterval(clearable)
cancelPromise()
}
}
Expand All @@ -39,7 +41,6 @@ class JsTimer() extends Timer {
def stop(): Unit = ()
}


object JsTimer {
def newTimer: Timer = new JsTimer()
}
94 changes: 55 additions & 39 deletions odelay-core/src/main/scalajvm/jdk/JdkTimer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,52 @@ package odelay.jdk

import java.util.concurrent.{
Future => JFuture,
RejectedExecutionHandler, ScheduledExecutorService,
ScheduledThreadPoolExecutor, ThreadFactory }
RejectedExecutionHandler,
ScheduledExecutorService,
ScheduledThreadPoolExecutor,
ThreadFactory
}
import java.util.concurrent.atomic.AtomicInteger
import odelay.{ Delay, PeriodicDelay, PeriodicPromisingDelay, PromisingDelay, Timer }
import odelay.{Delay, PeriodicDelay, PeriodicPromisingDelay, PromisingDelay, Timer}
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

/** A Timer implemented in terms of a jdk ScheduledThreadPoolExecutor */
class JdkTimer(
underlying: ScheduledExecutorService,
interruptOnCancel: Boolean)
extends Timer {
class JdkTimer(underlying: ScheduledExecutorService, interruptOnCancel: Boolean) extends Timer {

/** customizing constructor */
def this(
poolSize: Int = JdkTimer.poolSize,
threads: ThreadFactory = JdkTimer.threadFactory,
handler: Option[RejectedExecutionHandler] = JdkTimer.rejectionHandler,
interruptOnCancel: Boolean = JdkTimer.interruptOnCancel) =
this(handler.map( rejections => new ScheduledThreadPoolExecutor(poolSize, threads, rejections))
.getOrElse(new ScheduledThreadPoolExecutor(poolSize, threads)),
interruptOnCancel)
poolSize: Int = JdkTimer.poolSize,
threads: ThreadFactory = JdkTimer.threadFactory,
handler: Option[RejectedExecutionHandler] = JdkTimer.rejectionHandler,
interruptOnCancel: Boolean = JdkTimer.interruptOnCancel
) =
this(
handler
.map(rejections => new ScheduledThreadPoolExecutor(poolSize, threads, rejections))
.getOrElse(new ScheduledThreadPoolExecutor(poolSize, threads)),
interruptOnCancel
)

def apply[T](delay: FiniteDuration, op: => T): Delay[T] =
new PromisingDelay[T] {
val jfuture: Option[JFuture[_]] = try {
Some(underlying.schedule(new Runnable {
def run() = completePromise(op)
}, delay.length, delay.unit))
} catch {
case NonFatal(e) =>
failPromise(e)
None
}
val jfuture: Option[JFuture[_]] =
try {
Some(
underlying.schedule(
new Runnable {
def run() = completePromise(op)
},
delay.length,
delay.unit
)
)
} catch {
case NonFatal(e) =>
failPromise(e)
None
}

def cancel() = jfuture.filterNot(_.isCancelled).foreach { f =>
f.cancel(interruptOnCancel)
Expand All @@ -46,15 +57,23 @@ class JdkTimer(

def apply[T](delay: FiniteDuration, every: FiniteDuration, op: => T): PeriodicDelay[T] =
new PeriodicPromisingDelay[T](every) {
val jfuture: Option[JFuture[_]] = try {
Some(underlying.scheduleWithFixedDelay(new Runnable {
def run = if (promiseIncomplete) op
}, delay.toUnit(every.unit).toLong, every.length, every.unit))
} catch {
case NonFatal(e) =>
failPromise(e)
None
}
val jfuture: Option[JFuture[_]] =
try {
Some(
underlying.scheduleWithFixedDelay(
new Runnable {
def run = if (promiseIncomplete) op
},
delay.toUnit(every.unit).toLong,
every.length,
every.unit
)
)
} catch {
case NonFatal(e) =>
failPromise(e)
None
}

def cancel() = jfuture.filterNot(_.isCancelled).foreach { f =>
f.cancel(interruptOnCancel)
Expand All @@ -68,15 +87,13 @@ class JdkTimer(
/** defaults for jdk timers */
object JdkTimer {
lazy val poolSize = Runtime.getRuntime().availableProcessors()

/** @return a new ThreadFactory with that produces new threads named odelay-{threadNum} */
def threadFactory: ThreadFactory = new ThreadFactory {
val grp = new ThreadGroup(
Thread.currentThread().getThreadGroup(), "odelay")
val grp = new ThreadGroup(Thread.currentThread().getThreadGroup(), "odelay")
val threads = new AtomicInteger(1)
def newThread(runs: Runnable) =
new Thread(
grp, runs,
"odelay-%s" format threads.getAndIncrement()) {
new Thread(grp, runs, "odelay-%s" format threads.getAndIncrement()) {
setDaemon(true)
}
}
Expand All @@ -86,6 +103,5 @@ object JdkTimer {
val interruptOnCancel = true

/** @return a _new_ Timer. when used clients should be sure to call stop() on all instances for a clean shutdown */
def newTimer: Timer = new JdkTimer(
poolSize, threadFactory, rejectionHandler, interruptOnCancel)
def newTimer: Timer = new JdkTimer(poolSize, threadFactory, rejectionHandler, interruptOnCancel)
}
Loading

0 comments on commit 239b330

Please sign in to comment.