diff --git a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/NettySyncServer.scala b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/NettySyncServer.scala index ea08ed5835..5901be3fa8 100644 --- a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/NettySyncServer.scala +++ b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/NettySyncServer.scala @@ -68,7 +68,7 @@ case class NettySyncServer( * server binding, to be used to control stopping of the server or obtaining metadata like port. */ def start()(using Ox): NettySyncServerBinding = - startUsingSocketOverride[InetSocketAddress](None, new OxDispatcher()) match + startUsingSocketOverride[InetSocketAddress](None, OxDispatcher.create) match case (socket, stop) => NettySyncServerBinding(socket, stop) @@ -87,7 +87,7 @@ case class NettySyncServer( NettySyncServerBinding(socket, stop) def startUsingDomainSocket(path: Path)(using Ox): NettySyncDomainSocketBinding = - startUsingSocketOverride(Some(new DomainSocketAddress(path.toFile)), new OxDispatcher()) match + startUsingSocketOverride(Some(new DomainSocketAddress(path.toFile)), OxDispatcher.create) match case (socket, stop) => NettySyncDomainSocketBinding(socket, stop) diff --git a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/ox/OxDispatcher.scala b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/ox/OxDispatcher.scala index 9a9ad8207b..5762a0644d 100644 --- a/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/ox/OxDispatcher.scala +++ b/server/netty-server/sync/src/main/scala/sttp/tapir/server/netty/sync/internal/ox/OxDispatcher.scala @@ -1,15 +1,15 @@ package sttp.tapir.server.netty.sync.internal.ox import ox.* -import ox.channels.Actor +import ox.channels.{Actor, ActorRef} import scala.util.control.NonFatal import scala.concurrent.Future import scala.concurrent.Promise -/** A dispatcher that can start arbitrary forks. Useful when one needs to start an asynchronous task from a thread outside of an Ox scope. - * Normally Ox doesn't allow to start forks from other threads, for example in callbacks of external libraries. If you create an - * OxDispatcher inside a scope and pass it for potential handling on another thread, that thread can call +/** A dispatcher that can start forks, within some "global" scope. Useful when one needs to start an asynchronous task from a thread outside + * of an Ox scope. Normally Ox doesn't allow to start forks from arbitrary threads, for example in callbacks of external libraries. If you + * create an `OxDispatcher` inside a scope and pass it for potential handling on another thread, that thread can call * {{{ * dispatcher.runAsync { * // code to be executed in a fork @@ -19,22 +19,29 @@ import scala.concurrent.Promise * }}} * WARNING! Dispatchers should only be used in special cases, where the proper structure of concurrency scopes cannot be preserved. One * such example is integration with callback-based systems like Netty, which runs handler methods on its event loop thread. - * @param ox - * concurrency scope where a fork will be run, using a nested scope to isolate failures. */ -private[sync] class OxDispatcher()(using ox: Ox): - private class Runner: - def runAsync(thunk: Ox ?=> Unit, onError: Throwable => Unit, forkPromise: Promise[CancellableFork[Unit]]): Unit = - forkPromise - .success(forkCancellable { - try supervised(thunk) - catch case NonFatal(e) => onError(e) - }) - .discard - - private val actor = Actor.create(new Runner) - +private[sync] class OxDispatcher private (actor: ActorRef[OxDispatcherRunner]): def runAsync(thunk: Ox ?=> Unit)(onError: Throwable => Unit): Future[CancellableFork[Unit]] = val forkPromise = Promise[CancellableFork[Unit]]() actor.tell(_.runAsync(thunk, onError, forkPromise)) forkPromise.future + +private trait OxDispatcherRunner: + def runAsync(thunk: Ox ?=> Unit, onError: Throwable => Unit, forkPromise: Promise[CancellableFork[Unit]]): Unit + +object OxDispatcher: + /** @param ox + * concurrency scope where forks will be run, using a nested scope to isolate failures. The dispatcher will only be usable as long as + * this scope doesn't complete. + */ + def create(using ox: Ox): OxDispatcher = + val actor = Actor.create { + new OxDispatcherRunner: + def runAsync(thunk: Ox ?=> Unit, onError: Throwable => Unit, forkPromise: Promise[CancellableFork[Unit]]): Unit = + val fork = forkCancellable { + try supervised(thunk) + catch case NonFatal(e) => onError(e) + } + forkPromise.success(fork).discard + } + new OxDispatcher(actor) diff --git a/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/NettySyncTestServerInterpreter.scala b/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/NettySyncTestServerInterpreter.scala index 5bbdc005fe..7144039ab6 100644 --- a/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/NettySyncTestServerInterpreter.scala +++ b/server/netty-server/sync/src/test/scala/sttp/tapir/server/netty/sync/NettySyncTestServerInterpreter.scala @@ -18,14 +18,14 @@ class NettySyncTestServerInterpreter(eventLoopGroup: NioEventLoopGroup) override def route(es: List[ServerEndpoint[OxStreams with WebSockets, Id]], interceptors: Interceptors): IdRoute = { val serverOptions: NettySyncServerOptions = interceptors(NettySyncServerOptions.customiseInterceptors).options supervised { // not a correct way, but this method is only used in a few tests which don't test anything related to scopes - NettySyncServerInterpreter(serverOptions).toRoute(es, new OxDispatcher()) + NettySyncServerInterpreter(serverOptions).toRoute(es, OxDispatcher.create) } } def route(es: List[ServerEndpoint[OxStreams with WebSockets, Id]], interceptors: Interceptors)(using Ox): IdRoute = { val serverOptions: NettySyncServerOptions = interceptors(NettySyncServerOptions.customiseInterceptors).options supervised { // not a correct way, but this method is only used in a few tests which don't test anything related to scopes - NettySyncServerInterpreter(serverOptions).toRoute(es, new OxDispatcher()) + NettySyncServerInterpreter(serverOptions).toRoute(es, OxDispatcher.create) } }