Skip to content

Commit

Permalink
Separate creation & usage of OxDispatcher (#3759)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored May 10, 2024
1 parent 5632d8b commit 2aa60e0
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ case class NettySyncServer(
* server binding, to be used to control stopping of the server or obtaining metadata like port.
*/
def start()(using Ox): NettySyncServerBinding =
startUsingSocketOverride[InetSocketAddress](None, new OxDispatcher()) match
startUsingSocketOverride[InetSocketAddress](None, OxDispatcher.create) match
case (socket, stop) =>
NettySyncServerBinding(socket, stop)

Expand All @@ -87,7 +87,7 @@ case class NettySyncServer(
NettySyncServerBinding(socket, stop)

def startUsingDomainSocket(path: Path)(using Ox): NettySyncDomainSocketBinding =
startUsingSocketOverride(Some(new DomainSocketAddress(path.toFile)), new OxDispatcher()) match
startUsingSocketOverride(Some(new DomainSocketAddress(path.toFile)), OxDispatcher.create) match
case (socket, stop) =>
NettySyncDomainSocketBinding(socket, stop)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package sttp.tapir.server.netty.sync.internal.ox

import ox.*
import ox.channels.Actor
import ox.channels.{Actor, ActorRef}

import scala.util.control.NonFatal
import scala.concurrent.Future
import scala.concurrent.Promise

/** A dispatcher that can start arbitrary forks. Useful when one needs to start an asynchronous task from a thread outside of an Ox scope.
* Normally Ox doesn't allow to start forks from other threads, for example in callbacks of external libraries. If you create an
* OxDispatcher inside a scope and pass it for potential handling on another thread, that thread can call
/** A dispatcher that can start forks, within some "global" scope. Useful when one needs to start an asynchronous task from a thread outside
* of an Ox scope. Normally Ox doesn't allow to start forks from arbitrary threads, for example in callbacks of external libraries. If you
* create an `OxDispatcher` inside a scope and pass it for potential handling on another thread, that thread can call
* {{{
* dispatcher.runAsync {
* // code to be executed in a fork
Expand All @@ -19,22 +19,29 @@ import scala.concurrent.Promise
* }}}
* WARNING! Dispatchers should only be used in special cases, where the proper structure of concurrency scopes cannot be preserved. One
* such example is integration with callback-based systems like Netty, which runs handler methods on its event loop thread.
* @param ox
* concurrency scope where a fork will be run, using a nested scope to isolate failures.
*/
private[sync] class OxDispatcher()(using ox: Ox):
private class Runner:
def runAsync(thunk: Ox ?=> Unit, onError: Throwable => Unit, forkPromise: Promise[CancellableFork[Unit]]): Unit =
forkPromise
.success(forkCancellable {
try supervised(thunk)
catch case NonFatal(e) => onError(e)
})
.discard

private val actor = Actor.create(new Runner)

private[sync] class OxDispatcher private (actor: ActorRef[OxDispatcherRunner]):
def runAsync(thunk: Ox ?=> Unit)(onError: Throwable => Unit): Future[CancellableFork[Unit]] =
val forkPromise = Promise[CancellableFork[Unit]]()
actor.tell(_.runAsync(thunk, onError, forkPromise))
forkPromise.future

private trait OxDispatcherRunner:
def runAsync(thunk: Ox ?=> Unit, onError: Throwable => Unit, forkPromise: Promise[CancellableFork[Unit]]): Unit

object OxDispatcher:
/** @param ox
* concurrency scope where forks will be run, using a nested scope to isolate failures. The dispatcher will only be usable as long as
* this scope doesn't complete.
*/
def create(using ox: Ox): OxDispatcher =
val actor = Actor.create {
new OxDispatcherRunner:
def runAsync(thunk: Ox ?=> Unit, onError: Throwable => Unit, forkPromise: Promise[CancellableFork[Unit]]): Unit =
val fork = forkCancellable {
try supervised(thunk)
catch case NonFatal(e) => onError(e)
}
forkPromise.success(fork).discard
}
new OxDispatcher(actor)
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ class NettySyncTestServerInterpreter(eventLoopGroup: NioEventLoopGroup)
override def route(es: List[ServerEndpoint[OxStreams with WebSockets, Id]], interceptors: Interceptors): IdRoute = {
val serverOptions: NettySyncServerOptions = interceptors(NettySyncServerOptions.customiseInterceptors).options
supervised { // not a correct way, but this method is only used in a few tests which don't test anything related to scopes
NettySyncServerInterpreter(serverOptions).toRoute(es, new OxDispatcher())
NettySyncServerInterpreter(serverOptions).toRoute(es, OxDispatcher.create)
}
}

def route(es: List[ServerEndpoint[OxStreams with WebSockets, Id]], interceptors: Interceptors)(using Ox): IdRoute = {
val serverOptions: NettySyncServerOptions = interceptors(NettySyncServerOptions.customiseInterceptors).options
supervised { // not a correct way, but this method is only used in a few tests which don't test anything related to scopes
NettySyncServerInterpreter(serverOptions).toRoute(es, new OxDispatcher())
NettySyncServerInterpreter(serverOptions).toRoute(es, OxDispatcher.create)
}
}

Expand Down

0 comments on commit 2aa60e0

Please sign in to comment.