diff --git a/airframe-http-finagle/src/main/scala/wvlet/airframe/http/finagle/FinagleFilter.scala b/airframe-http-finagle/src/main/scala/wvlet/airframe/http/finagle/FinagleFilter.scala index d66ed85681..1617a893a7 100644 --- a/airframe-http-finagle/src/main/scala/wvlet/airframe/http/finagle/FinagleFilter.scala +++ b/airframe-http-finagle/src/main/scala/wvlet/airframe/http/finagle/FinagleFilter.scala @@ -37,7 +37,8 @@ class FinagleRouter(session: Session, private[finagle] val config: FinagleServer config.controllerProvider, FinagleBackend, config.responseHandler, - MessageCodecFactory.defaultFactory.orElse(MessageCodecFactory.newFactory(config.customCodec)) + MessageCodecFactory.defaultFactory.orElse(MessageCodecFactory.newFactory(config.customCodec)), + config.executionContext ) override def apply(request: Request, service: Service[Request, Response]): Future[Response] = { diff --git a/airframe-http-finagle/src/main/scala/wvlet/airframe/http/finagle/FinagleServer.scala b/airframe-http-finagle/src/main/scala/wvlet/airframe/http/finagle/FinagleServer.scala index dbcbcca656..9ab91bfe13 100644 --- a/airframe-http-finagle/src/main/scala/wvlet/airframe/http/finagle/FinagleServer.scala +++ b/airframe-http-finagle/src/main/scala/wvlet/airframe/http/finagle/FinagleServer.scala @@ -22,7 +22,7 @@ import com.twitter.util.{Await, Future} import javax.annotation.PostConstruct import wvlet.airframe.* import wvlet.airframe.codec.MessageCodec -import wvlet.airframe.control.MultipleExceptions +import wvlet.airframe.control.{MultipleExceptions, ThreadUtil} import wvlet.airframe.http.finagle.FinagleServer.FinagleService import wvlet.airframe.http.finagle.filter.HttpAccessLogFilter import wvlet.airframe.http.{ @@ -40,9 +40,10 @@ import wvlet.airframe.surface.Surface import wvlet.log.LogSupport import wvlet.log.io.IOUtil +import java.util.concurrent.Executors import scala.annotation.tailrec import scala.collection.parallel.immutable.ParVector -import scala.concurrent.ExecutionException +import scala.concurrent.{ExecutionContext, ExecutionException} import scala.util.Try import scala.util.control.NonFatal @@ -62,7 +63,15 @@ case class FinagleServerConfig( // A top-level filter applied before routing requests beforeRoutingFilter: Filter[Request, Response, Request, Response] = Filter.identity, // Service called when no matching route is found - fallbackService: Service[Request, Response] = FinagleServer.notFound + fallbackService: Service[Request, Response] = FinagleServer.notFound, + // Thread manager for handling Future[_] responses + executionContext: ExecutionContext = { + // Using the global thread pool causes an issue in sbt's layered class loader #918 + // So need to use the local daemon thread pool + ExecutionContext.fromExecutorService( + Executors.newCachedThreadPool(ThreadUtil.newDaemonThreadFactory("airframe-finagle")) + ) + } ) { // Lazily acquire an unused port to avoid conflicts between multiple servers lazy val port = serverPort.getOrElse(IOUtil.unusedPort) diff --git a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyBackend.scala b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyBackend.scala index 9da3d9c9a4..a741a89416 100644 --- a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyBackend.scala +++ b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyBackend.scala @@ -19,7 +19,8 @@ import wvlet.airframe.rx.Rx import wvlet.log.LogSupport import scala.collection.mutable -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{Await, ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} object NettyBackend extends HttpBackend[Request, Response, Rx] with LogSupport { self => private val rxBackend = new RxNettyBackend @@ -37,17 +38,18 @@ object NettyBackend extends HttpBackend[Request, Response, Rx] with LogSupport { Rx.single(a) } - override def toFuture[A](a: Future[A], e: ExecutionContext): Rx[A] = { - Rx.future(a)(e) + override def toFuture[A](a: Future[A], ex: ExecutionContext): Rx[A] = { + val v = Await.result(a, scala.concurrent.duration.Duration.Inf) + Rx.single(v) } override def toScalaFuture[A](a: Rx[A]): Future[A] = { val promise: Promise[A] = Promise() - a.toRx - .map { x => - promise.success(x) - } - .recover { case e: Throwable => promise.failure(e) } + val rx = a.transform { + case Success(x) => promise.success(x) + case Failure(ex) => promise.failure(ex) + } + rx.run { effect => } promise.future } diff --git a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala index e951ecdda2..a4b6aa18e6 100644 --- a/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala +++ b/airframe-http-netty/src/main/scala/wvlet/airframe/http/netty/NettyServer.scala @@ -35,10 +35,11 @@ import wvlet.airframe.{Design, Session} import wvlet.log.LogSupport import wvlet.log.io.IOUtil -import java.util.concurrent.TimeUnit +import java.util.concurrent.{Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import javax.annotation.PostConstruct import scala.collection.immutable.ListMap +import scala.concurrent.ExecutionContext import scala.util.{Failure, Success, Try} case class NettyServerConfig( @@ -52,7 +53,15 @@ case class NettyServerConfig( new LogRotationHttpLogger(config) }, loggingFilter: HttpLogger => RxHttpFilter = { new RPCLoggingFilter(_) }, - customCodec: PartialFunction[Surface, MessageCodec[_]] = PartialFunction.empty + customCodec: PartialFunction[Surface, MessageCodec[_]] = PartialFunction.empty, + // Thread manager for handling Future[_] responses + executionContext: ExecutionContext = { + // Using the global thread pool causes an issue in sbt's layered class loader #918 + // So need to use the local daemon thread pool + ExecutionContext.fromExecutorService( + Executors.newCachedThreadPool(ThreadUtil.newDaemonThreadFactory("airframe-netty")) + ) + } ) { lazy val port = serverPort.getOrElse(IOUtil.unusedPort) @@ -149,7 +158,6 @@ class NettyServer(config: NettyServerConfig, session: Session) extends HttpServe new NioEventLoopGroup(numWorkers, tf) } } - private var channelFuture: Option[Channel] = None override def localAddress: String = s"localhost:${config.port}" @@ -238,7 +246,8 @@ class NettyServer(config: NettyServerConfig, session: Session) extends HttpServe NettyBackend, new NettyResponseHandler, // Set a custom codec and use JSON map output - MessageCodecFactory.defaultFactoryForJSON.withCodecs(config.customCodec) + MessageCodecFactory.defaultFactoryForJSON.withCodecs(config.customCodec), + config.executionContext ) ) } diff --git a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRESTServerTest.scala b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRESTServerTest.scala index 4520c338eb..be364fb3ca 100644 --- a/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRESTServerTest.scala +++ b/airframe-http-netty/src/test/scala/wvlet/airframe/http/netty/NettyRESTServerTest.scala @@ -18,6 +18,7 @@ import wvlet.airframe.Design import wvlet.airframe.codec.{JSONCodec, MessageCodec} import wvlet.airframe.control.Control import wvlet.airframe.http.* +import wvlet.airframe.http.HttpHeader.MediaType import wvlet.airframe.http.client.{AsyncClient, SyncClient} import wvlet.airframe.msgpack.spi.MessagePack import wvlet.airspec.AirSpec @@ -86,26 +87,12 @@ class MyApi extends LogSupport { throw new InvocationTargetException(new IllegalArgumentException("test error")) } -// @Endpoint(path = "/v1/reader") -// def reader: Reader[Buf] = { -// val json = MessageCodec.of[RichInfo].toJson(getRichInfo) -// Reader.fromBuf(Buf.Utf8(json)) -// } -// -// @Endpoint(path = "/v1/reader-seq") -// def readerSeq: Reader[RichInfo] = { -// val r1 = Reader.fromSeq(Seq(getRichInfo)) -// val r2 = Reader.fromSeq(Seq(getRichInfo)) -// val stream = AsyncStream.fromSeq(Seq(r1, r2)) -// Reader.concat(stream) -// } - @Endpoint(path = "/v1/delete", method = HttpMethod.DELETE) def emptyResponse: Unit = {} @Endpoint(path = "/v1/scala-future", method = HttpMethod.GET) def scalaFutureResponse: scala.concurrent.Future[String] = { - scala.concurrent.Future.successful("Hello Scala Future") + Future.successful("Hello Scala Future") } @Endpoint(path = "/v1/scala-future2", method = HttpMethod.GET) @@ -138,7 +125,7 @@ class NettyRESTServerTest extends AirSpec { } test("support production mode") { (server: HttpServer) => - // #432: Just need to check the startup of finagle without MISSING_DEPENDENCY error + // #432: Just need to check the startup of NettyServer (HttpServer interface) without causing MISSING_DEPENDENCY error } test("async responses") { (client: AsyncClient) => @@ -163,206 +150,147 @@ class NettyRESTServerTest extends AirSpec { } } -// test("test various responses") { (client: FinagleClient) => -// test("support JSON response") { -// // JSON response -// val json = Await.result(client.send(Request("/v1/rich_info_future")).map { response => response.contentString }) -// -// json shouldBe """{"version":"0.1","name":"MyApi","details":{"serverType":"test-server"}}""" -// } -// -// test("support JSON POST request") { -// val request = Request("/v1/json_api") -// request.method = Method.Post -// request.contentString = """{"id":10, "name":"leo"}""" -// val ret = Await.result(client.send(request).map(_.contentString)) -// ret shouldBe """RichRequest(10,leo)""" -// } -// -// test("return a response header except for Content-Type") { -// val request = Request("/v1/http_header_test") -// val ret = Await.result(client.send(request)) -// -// ret.headerMap.getOrElse("Server", "") shouldBe "Airframe" -// ret.contentString shouldBe """Hello""" -// } -// -// test("JSON POST request with explicit JSON content type") { -// val request = Request("/v1/json_api") -// request.method = Method.Post -// request.contentString = """{"id":10, "name":"leo"}""" -// request.setContentTypeJson() -// val ret = Await.result(client.send(request).map(_.contentString)) -// ret shouldBe """RichRequest(10,leo)""" -// } -// -// test("test parameter mappings") { -// // Use the default argument -// { -// val request = Request("/v1/json_api_default") -// request.method = Method.Post -// val ret = Await.result(client.send(request).map(_.contentString)) -// ret shouldBe """RichRequest(100,dummy)""" -// } -// -// // GET requests with query parameters -// { -// val request = Request("/v1/json_api?id=10&name=leo") -// request.method = Method.Get -// val ret = Await.result(client.send(request).map(_.contentString)) -// ret shouldBe """RichRequest(10,leo)""" -// } -// -// // JSON requests with POST -// { -// val request = Request("/v1/json_api") -// request.method = Method.Post -// request.contentString = """{"id":10, "name":"leo"}""" -// val ret = Await.result(client.send(request).map(_.contentString)) -// ret shouldBe """RichRequest(10,leo)""" -// } -// } -// -// test("test error response") { -// warn("Exception response test") -// val l = Logger.of[FinagleServer] -// val lv = l.getLogLevel -// l.setLogLevel(LogLevel.ERROR) -// try { -// val request = Request("/v1/error") -// val ret = Await.result(client.sendSafe(request)) // Receive the raw error response -// ret.statusCode shouldBe 500 -// } finally { -// l.setLogLevel(lv) -// } -// } -// -// test("MsgPack response") { -// // MessagePack request -// { -// val request = Request("/v1/json_api") -// request.method = Method.Post -// val msgpack = JSONCodec.toMsgPack("""{"id":10, "name":"leo"}""") -// request.content = ByteArray.Owned(msgpack) -// request.contentType = "application/x-msgpack" -// val ret = Await.result(client.send(request).map(_.contentString)) -// ret shouldBe """RichRequest(10,leo)""" -// } -// -// // Receive MessagePack -// { -// val request = Request("/v1/raw_string_arg") -// request.method = Method.Post -// request.contentType = "application/x-msgpack" -// val msgpack = MessagePack.newBufferPacker.packString("1.0").toByteArray -// request.content = ByteArray.Owned(msgpack) -// val response = Await.result(client.send(request)) -// response.contentString shouldBe "1.0" -// response.statusCode shouldBe HttpStatus.Ok_200.code -// } -// } -// -// test("Raw string request") { -// // Raw string arg -// val request = Request("/v1/raw_string_arg") -// request.method = Method.Post -// request.contentString = "1.0" -// Await.result(client.send(request).map(_.contentString)) shouldBe "1.0" -// } -// -// test("Finagle Reader[Buf] response") { -// val request = Request("/v1/reader") -// request.method = Method.Get -// val json = Await.result(client.send(request).map(_.contentString)) -// val codec = MessageCodec.of[RichInfo] -// codec.unpackJson(json) shouldBe Some(RichInfo("0.1", "MyApi", RichNestedInfo("test-server"))) -// } -// -// val richInfo = RichInfo("0.1", "MyApi", RichNestedInfo("test-server")) -// -// test("convert Reader[X] response to JSON stream") { -// val request = Request("/v1/reader-seq") -// request.method = Method.Get -// val json = Await.result(client.send(request).map(_.contentString)) -// debug(json) -// val codec = MessageCodec.of[Seq[RichInfo]] -// codec.fromJson(json) shouldBe Seq(richInfo, richInfo) -// } -// -// test("Convert Reader[X] response to MsgPack stream") { -// val request = Request("/v1/reader-seq") -// request.method = Method.Get -// request.accept = "application/x-msgpack" -// val msgpack = Await.result { -// client.send(request).map { resp => -// val c = resp.content -// val msgpack = new Array[Byte](c.length) -// c.write(msgpack, 0) -// msgpack -// } -// } -// val codec = MessageCodec.of[RichInfo] -// -// Control.withResource(MessagePack.newUnpacker(msgpack)) { unpacker => -// while (unpacker.hasNext) { -// val v = unpacker.unpackValue -// codec.fromMsgPack(v.toMsgpack) shouldBe richInfo -// } -// } -// } -// -// test("return 204 for Unit response") { -// val result = Await.result(client.send(Request(Method.Delete, "/v1/delete"))) -// result.statusCode shouldBe HttpStatus.NoContent_204.code -// } -// -// test("support scala.concurrent.Future[X]") { -// val result = Await.result(client.send(Request(Method.Get, "/v1/scala-future"))) -// result.statusCode shouldBe HttpStatus.Ok_200.code -// result.contentString shouldBe "Hello Scala Future" -// } -// -// test("support scala.concurrent.Future[Response]") { -// val result = Await.result(client.send(Request(Method.Get, "/v1/scala-future2"))) -// result.statusCode shouldBe HttpStatus.Ok_200.code -// result.contentString shouldBe "Hello Scala Future" -// } -// -// test("support query parameter mapping") { -// val result = Await.result(client.send(Request(Method.Get, "/v1/user/1/profile?session_id=xyz"))) -// result.statusCode shouldBe HttpStatus.Ok_200.code -// result.contentString shouldBe "1:xyz" -// } -// -// test("support missing query parameter mapping") { -// val result = Await.result(client.send(Request(Method.Get, "/v1/user/1/profile"))) -// result.statusCode shouldBe HttpStatus.Ok_200.code -// result.contentString shouldBe "1:unknown" -// } -// -// test("support query parameter mapping for POST") { -// val r = Request(Method.Post, "/v1/user/1/profile?session_id=xyz") -// r.contentString = "hello" -// val result = Await.result(client.send(r)) -// result.statusCode shouldBe HttpStatus.Ok_200.code -// result.contentString shouldBe "1:xyz" -// } -// -// test("support option parameter mapping for POST") { -// val r = Request(Method.Post, "/v1/user/1/profile") -// r.contentString = "hello" -// val result = Await.result(client.send(r)) -// result.statusCode shouldBe HttpStatus.Ok_200.code -// result.contentString shouldBe "1:hello" -// } -// -// test("skip content body mapping for application/octet-stream requests") { -// val r = Request(Method.Post, "/v1/user/1/profile") -// r.contentString = "hello" // This content should not be used for RPC binding -// r.contentType = MediaType.OctetStream -// val result = Await.result(client.send(r)) -// result.statusCode shouldBe HttpStatus.Ok_200.code -// result.contentString shouldBe "1:unknown" -// } -// } + test("test various responses") { (client: AsyncClient) => + test("support JSON response") { + // JSON response + client.send(Http.GET("/v1/rich_info_future")).map { response => + response.contentString shouldBe """{"version":"0.1","name":"MyApi","details":{"serverType":"test-server"}}""" + } + } + + test("support JSON POST request") { + val request = Http.POST("/v1/json_api").withJson("""{"id":10, "name":"leo"}""") + client.send(request).map { + _.contentString shouldBe """RichRequest(10,leo)""" + } + } + + test("return a response header except for Content-Type") { + val request = Http.GET("/v1/http_header_test") + client.send(request).map { ret => + ret.header.getOrElse("Server", "") shouldBe "Airframe" + ret.contentString shouldBe """Hello""" + } + } + + test("JSON POST request with explicit JSON content type") { + val request = Http.POST("/v1/json_api").withJson("""{"id":10, "name":"leo"}""") + client.send(request).map { + _.contentString shouldBe """RichRequest(10,leo)""" + } + } + + test("test parameter mappings") { + test("Use the default argument") { + client.send(Http.POST("/v1/json_api_default")).map { + _.contentString shouldBe """RichRequest(100,dummy)""" + } + } + + test("GET request with query parameters") { + client.send(Http.GET("/v1/json_api?id=10&name=leo")).map { + _.contentString shouldBe """RichRequest(10,leo)""" + } + } + + // JSON requests with POST + test("JSON POST request with explicit JSON content type") { + val request = Http.POST("/v1/json_api").withJson("""{"id":10, "name":"leo"}""") + client.send(request).map(_.contentString shouldBe """RichRequest(10,leo)""") + } + } + + test("test error response") { + warn("Exception response test") + // Receive the raw error response + client.withConfig(_.noRetry).sendSafe(Http.GET("/v1/error")).map { ret => + ret.statusCode shouldBe 500 + ret.header.get(HttpHeader.xAirframeRPCStatus) shouldBe defined + } + } + + test("MsgPack response") { + test("MessagePack request") { + val msgpack = JSONCodec.toMsgPack("""{"id":10, "name":"leo"}""") + val request = Http.POST("/v1/json_api").withMsgPack(msgpack) + client.send(request).map(_.contentString shouldBe """RichRequest(10,leo)""") + } + + test("Receive MessagePack") { + val msgpack = MessagePack.newBufferPacker.packString("1.0").toByteArray + val request = Http.POST("/v1/raw_string_arg").withMsgPack(msgpack) + client.send(request).map { response => + response.contentString shouldBe "1.0" + response.statusCode shouldBe HttpStatus.Ok_200.code + } + } + } + + test("Raw string request") { + // Raw string arg + val request = Http.POST("/v1/raw_string_arg").withContent("1.0") + client.send(request).map(_.contentString shouldBe "1.0") + } + + test("return 204 for Unit response") { + client.send(Http.DELETE("/v1/delete")).map { response => + response.status shouldBe HttpStatus.NoContent_204 + } + } + + test("support scala.concurrent.Future[X]") { + client.send(Http.GET("/v1/scala-future")).map { response => + response.status shouldBe HttpStatus.Ok_200 + response.contentString shouldBe "Hello Scala Future" + } + } + + test("support scala.concurrent.Future[Response]") { + client.send(Http.GET("/v1/scala-future2")).map { response => + response.status shouldBe HttpStatus.Ok_200 + response.contentString shouldBe "Hello Scala Future" + } + } + + test("support query parameter mapping") { + client.send(Http.GET("/v1/user/1/profile?session_id=xyz")).map { result => + result.status shouldBe HttpStatus.Ok_200 + result.contentString shouldBe "1:xyz" + } + } + + test("support missing query parameter mapping") { + client.send(Http.GET("/v1/user/1/profile")).map { result => + result.status shouldBe HttpStatus.Ok_200 + result.contentString shouldBe "1:unknown" + } + } + + test("support query parameter mapping for POST") { + val r = Http.POST("/v1/user/1/profile?session_id=xyz") + client.send(r).map { result => + result.status shouldBe HttpStatus.Ok_200 + result.contentString shouldBe "1:xyz" + } + } + + test("support option parameter mapping for POST") { + val r = Http.POST("/v1/user/1/profile") + client.send(r).map { result => + result.status shouldBe HttpStatus.Ok_200 + result.contentString shouldBe "1:unknown" + } + } + + test("skip content body mapping for application/octet-stream requests") { + val r = Http + .POST("/v1/user/1/profile") + .withContent("hello") // This content should not be used for RPC binding + .withContentType(MediaType.OctetStream) + client.send(r).map { result => + result.status shouldBe HttpStatus.Ok_200 + result.contentString shouldBe "1:unknown" + } + } + } } diff --git a/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/JavaHttpClientChannel.scala b/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/JavaHttpClientChannel.scala index a1a0db80d3..e6aabb8029 100644 --- a/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/JavaHttpClientChannel.scala +++ b/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/client/JavaHttpClientChannel.scala @@ -18,6 +18,7 @@ import wvlet.airframe.control.IO import wvlet.airframe.http.HttpMessage.{Request, Response} import wvlet.airframe.http.* import wvlet.airframe.rx.Rx +import wvlet.log.LogSupport import java.io.InputStream import java.net.URI @@ -35,7 +36,8 @@ import scala.jdk.CollectionConverters.* * @param config */ class JavaHttpClientChannel(serverAddress: ServerAddress, private[http] val config: HttpClientConfig) - extends HttpChannel { + extends HttpChannel + with LogSupport { private val javaHttpClient: HttpClient = initClient(config) private def initClient(config: HttpClientConfig): HttpClient = { diff --git a/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/router/HttpEndpointExecutionContext.scala b/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/router/HttpEndpointExecutionContext.scala index fd31ccfcb3..da9bbd9b2d 100644 --- a/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/router/HttpEndpointExecutionContext.scala +++ b/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/router/HttpEndpointExecutionContext.scala @@ -14,11 +14,12 @@ package wvlet.airframe.http.router import java.lang.reflect.InvocationTargetException - import wvlet.airframe.codec.{MessageCodec, MessageCodecFactory} +import wvlet.airframe.control.ThreadUtil import wvlet.airframe.http.{HttpBackend, HttpContext, HttpRequestAdapter} import wvlet.log.LogSupport +import java.util.concurrent.Executors import scala.concurrent.ExecutionContext import scala.language.higherKinds @@ -32,7 +33,8 @@ class HttpEndpointExecutionContext[Req: HttpRequestAdapter, Resp, F[_]]( routeMatch: RouteMatch, responseHandler: ResponseHandler[Req, Resp], controller: Any, - codecFactory: MessageCodecFactory + codecFactory: MessageCodecFactory, + executionContext: ExecutionContext ) extends HttpContext[Req, Resp, F] with LogSupport { @@ -70,19 +72,17 @@ class HttpEndpointExecutionContext[Req: HttpRequestAdapter, Resp, F[_]]( // Check the type of X val futureValueSurface = route.returnTypeSurface.typeArgs(0) - // TODO: Is using global execution a right choice? - val ex = ExecutionContext.global futureValueSurface.rawType match { // If X is the backend Response type, return as is: case valueCls if backend.isRawResponseType(valueCls) => - // Convert Scala Future to Finagle Future - backend.toFuture(result.asInstanceOf[scala.concurrent.Future[Resp]], ex) + // Convert Scala Future to the backend-specific Future + backend.toFuture(result.asInstanceOf[scala.concurrent.Future[Resp]], executionContext) case other => // If X is other type, convert X into an HttpResponse val scalaFuture = result .asInstanceOf[scala.concurrent.Future[_]] - .map { x => responseHandler.toHttpResponse(route, request, futureValueSurface, x) }(ex) - backend.toFuture(scalaFuture, ex) + .map { x => responseHandler.toHttpResponse(route, request, futureValueSurface, x) }(executionContext) + backend.toFuture(scalaFuture, executionContext) } case _ => // If the route returns non future value, convert it into Future response diff --git a/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/router/HttpRequestDispatcher.scala b/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/router/HttpRequestDispatcher.scala index 1cd5957731..4d878c92cb 100644 --- a/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/router/HttpRequestDispatcher.scala +++ b/airframe-http/.jvm/src/main/scala/wvlet/airframe/http/router/HttpRequestDispatcher.scala @@ -18,6 +18,7 @@ import wvlet.airframe.codec.MessageCodecFactory import wvlet.airframe.http.* import wvlet.log.LogSupport +import scala.concurrent.ExecutionContext import scala.language.higherKinds /** @@ -42,7 +43,8 @@ object HttpRequestDispatcher extends LogSupport { controllerProvider: ControllerProvider, backend: HttpBackend[Req, Resp, F], responseHandler: ResponseHandler[Req, Resp], - codecFactory: MessageCodecFactory + codecFactory: MessageCodecFactory, + executionContext: ExecutionContext ): HttpFilter[Req, Resp, F] = { // Generate a table for Route -> matching HttpFilter val routingTable = buildRoutingTable(backend, session, router, backend.defaultFilter, controllerProvider) @@ -54,9 +56,16 @@ object HttpRequestDispatcher extends LogSupport { val routeFilter = routingTable.findFilter(routeMatch.route) // Create a new context for processing the matched route with the controller val context = - new HttpEndpointExecutionContext(backend, routeMatch, responseHandler, routeFilter.controller, codecFactory) + new HttpEndpointExecutionContext( + backend, + routeMatch, + responseHandler, + routeFilter.controller, + codecFactory, + executionContext + ) val currentService = routeFilter.filter.andThen(context) - currentService(request) + currentService.apply(request) case None => // If no matching route is found, use the leaf filter if exists routingTable.leafFilter match { diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/AsyncClient.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/AsyncClient.scala index 60c80d04e7..86aed54e3d 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/AsyncClient.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/AsyncClient.scala @@ -20,6 +20,8 @@ import wvlet.airframe.rx.Rx import wvlet.airframe.surface.Surface import wvlet.log.LogSupport +import java.util.concurrent.atomic.AtomicReference + /** * A standard async http client interface for Rx[_] */ @@ -73,8 +75,7 @@ trait AsyncClient extends AsyncClientCompat with HttpClientFactory[AsyncClient] } .recover { // Or if request has been failing, apply the response filter only to the last response - val response = lastResponse.map(config.responseFilter(_)) - HttpClients.defaultHttpClientErrorHandler(response) + HttpClients.defaultHttpClientErrorHandler(() => lastResponse.map(config.responseFilter(_))) } } diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala index babc53e6dc..7d489a38df 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala @@ -102,6 +102,9 @@ case class HttpClientConfig( this.copy(codecFactory = newCodecFactory) def withRetryContext(filter: RetryContext => RetryContext): HttpClientConfig = this.copy(retryContext = filter(retryContext)) + def noRetry: HttpClientConfig = { + this.copy(retryContext = retryContext.noRetry) + } def withCircuitBreaker(f: CircuitBreaker => CircuitBreaker): HttpClientConfig = { this.copy(circuitBreaker = f(circuitBreaker)) } diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala index a515eecb82..b7ffd27ee2 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala @@ -70,7 +70,8 @@ object HttpClients extends LogSupport { } private[client] def defaultHttpClientErrorHandler( - lastResponse: Option[Response] + // Need to evaluate the last response lazily because it may not be available when this method is called + lastResponse: () => Option[Response] ): PartialFunction[Throwable, Nothing] = { case e: HttpClientException => val resp = e.response.toHttpResponse @@ -88,7 +89,7 @@ object HttpClients extends LogSupport { val resp = e.toResponse throw new HttpClientException(resp, e.status.httpStatus, e.message, e) case e: CircuitBreakerOpenException => - val resp = lastResponse.getOrElse(Http.response(HttpStatus.ServiceUnavailable_503)) + val resp = lastResponse().getOrElse(Http.response(HttpStatus.ServiceUnavailable_503)) throw new HttpClientException( resp, status = resp.status, @@ -97,12 +98,12 @@ object HttpClients extends LogSupport { ) case e: MaxRetryException => throw HttpClientMaxRetryException( - lastResponse.getOrElse(Http.response(HttpStatus.InternalServerError_500)), + lastResponse().getOrElse(Http.response(HttpStatus.InternalServerError_500)), e.retryContext, e.retryContext.lastError ) case NonFatal(e) => - val resp = lastResponse.getOrElse(Http.response(HttpStatus.InternalServerError_500)) + val resp = lastResponse().getOrElse(Http.response(HttpStatus.InternalServerError_500)) throw new HttpClientException( resp, status = resp.status, diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala index 09f1ccf198..c40f2fd4af 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala @@ -76,8 +76,7 @@ trait SyncClient extends SyncClientCompat with HttpClientFactory[SyncClient] wit } .recover { // Or if request has been failing, apply the response filter only to the last response - val response = lastResponse.map(config.responseFilter(_)) - HttpClients.defaultHttpClientErrorHandler(response) + HttpClients.defaultHttpClientErrorHandler(() => lastResponse.map(config.responseFilter(_))) } }