Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty (fix): Fix async REST response handling (Scala Future, error Rx) #3351

Merged
merged 7 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class FinagleRouter(session: Session, private[finagle] val config: FinagleServer
config.controllerProvider,
FinagleBackend,
config.responseHandler,
MessageCodecFactory.defaultFactory.orElse(MessageCodecFactory.newFactory(config.customCodec))
MessageCodecFactory.defaultFactory.orElse(MessageCodecFactory.newFactory(config.customCodec)),
config.executionContext
)

override def apply(request: Request, service: Service[Request, Response]): Future[Response] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.twitter.util.{Await, Future}
import javax.annotation.PostConstruct
import wvlet.airframe.*
import wvlet.airframe.codec.MessageCodec
import wvlet.airframe.control.MultipleExceptions
import wvlet.airframe.control.{MultipleExceptions, ThreadUtil}
import wvlet.airframe.http.finagle.FinagleServer.FinagleService
import wvlet.airframe.http.finagle.filter.HttpAccessLogFilter
import wvlet.airframe.http.{
Expand All @@ -40,9 +40,10 @@ import wvlet.airframe.surface.Surface
import wvlet.log.LogSupport
import wvlet.log.io.IOUtil

import java.util.concurrent.Executors
import scala.annotation.tailrec
import scala.collection.parallel.immutable.ParVector
import scala.concurrent.ExecutionException
import scala.concurrent.{ExecutionContext, ExecutionException}
import scala.util.Try
import scala.util.control.NonFatal

Expand All @@ -62,7 +63,15 @@ case class FinagleServerConfig(
// A top-level filter applied before routing requests
beforeRoutingFilter: Filter[Request, Response, Request, Response] = Filter.identity,
// Service called when no matching route is found
fallbackService: Service[Request, Response] = FinagleServer.notFound
fallbackService: Service[Request, Response] = FinagleServer.notFound,
// Thread manager for handling Future[_] responses
executionContext: ExecutionContext = {
// Using the global thread pool causes an issue in sbt's layered class loader #918
// So need to use the local daemon thread pool
ExecutionContext.fromExecutorService(
Executors.newCachedThreadPool(ThreadUtil.newDaemonThreadFactory("airframe-finagle"))
)
}
) {
// Lazily acquire an unused port to avoid conflicts between multiple servers
lazy val port = serverPort.getOrElse(IOUtil.unusedPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import wvlet.airframe.rx.Rx
import wvlet.log.LogSupport

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}

object NettyBackend extends HttpBackend[Request, Response, Rx] with LogSupport { self =>
private val rxBackend = new RxNettyBackend
Expand All @@ -37,17 +38,18 @@ object NettyBackend extends HttpBackend[Request, Response, Rx] with LogSupport {
Rx.single(a)
}

override def toFuture[A](a: Future[A], e: ExecutionContext): Rx[A] = {
Rx.future(a)(e)
override def toFuture[A](a: Future[A], ex: ExecutionContext): Rx[A] = {
val v = Await.result(a, scala.concurrent.duration.Duration.Inf)
Rx.single(v)
}

override def toScalaFuture[A](a: Rx[A]): Future[A] = {
val promise: Promise[A] = Promise()
a.toRx
.map { x =>
promise.success(x)
}
.recover { case e: Throwable => promise.failure(e) }
val rx = a.transform {
case Success(x) => promise.success(x)
case Failure(ex) => promise.failure(ex)
}
rx.run { effect => }
promise.future
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ import wvlet.airframe.{Design, Session}
import wvlet.log.LogSupport
import wvlet.log.io.IOUtil

import java.util.concurrent.TimeUnit
import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.PostConstruct
import scala.collection.immutable.ListMap
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}

case class NettyServerConfig(
Expand All @@ -52,7 +53,15 @@ case class NettyServerConfig(
new LogRotationHttpLogger(config)
},
loggingFilter: HttpLogger => RxHttpFilter = { new RPCLoggingFilter(_) },
customCodec: PartialFunction[Surface, MessageCodec[_]] = PartialFunction.empty
customCodec: PartialFunction[Surface, MessageCodec[_]] = PartialFunction.empty,
// Thread manager for handling Future[_] responses
executionContext: ExecutionContext = {
// Using the global thread pool causes an issue in sbt's layered class loader #918
// So need to use the local daemon thread pool
ExecutionContext.fromExecutorService(
Executors.newCachedThreadPool(ThreadUtil.newDaemonThreadFactory("airframe-netty"))
)
}
) {
lazy val port = serverPort.getOrElse(IOUtil.unusedPort)

Expand Down Expand Up @@ -149,7 +158,6 @@ class NettyServer(config: NettyServerConfig, session: Session) extends HttpServe
new NioEventLoopGroup(numWorkers, tf)
}
}

private var channelFuture: Option[Channel] = None

override def localAddress: String = s"localhost:${config.port}"
Expand Down Expand Up @@ -238,7 +246,8 @@ class NettyServer(config: NettyServerConfig, session: Session) extends HttpServe
NettyBackend,
new NettyResponseHandler,
// Set a custom codec and use JSON map output
MessageCodecFactory.defaultFactoryForJSON.withCodecs(config.customCodec)
MessageCodecFactory.defaultFactoryForJSON.withCodecs(config.customCodec),
config.executionContext
)
)
}
Expand Down
Loading
Loading