From c399ff1c643ef7470ab35e63b0ded5d7cf004286 Mon Sep 17 00:00:00 2001 From: Benjamin Geer Date: Thu, 10 Aug 2023 00:14:34 +0200 Subject: [PATCH] Add Pekko support (#2883) Co-authored-by: Krzysiek Ciesielski Co-authored-by: Sergio Noviello --- build.sbt | 19 +++ doc/index.md | 2 + doc/server/pekkohttp.md | 122 +++++++++++++++ doc/stability.md | 1 + .../examples/HelloWorldPekkkoServer.scala | 37 +++++ .../MultipartFormUploadPekkoServer.scala | 62 ++++++++ .../BasicAuthenticationPekkoServer.scala | 46 ++++++ .../security/ServerSecurityLogicPekko.scala | 89 +++++++++++ ...rverSecurityLogicRefreshCookiesPekko.scala | 70 +++++++++ .../StaticContentFromFilesPekkoServer.scala | 54 +++++++ ...taticContentFromResourcesPekkoServer.scala | 33 ++++ .../StaticContentSecurePekkoServer.scala | 59 +++++++ .../streaming/StreamingPekkoServer.scala | 41 +++++ .../PekkoServerStubInterpreterExample.scala | 68 ++++++++ .../websocket/WebSocketPekkoServer.scala | 74 +++++++++ project/Versions.scala | 6 +- .../server/armeria/ArmeriaServerRequest.scala | 4 +- .../server/pekkohttp/PekkoBodyListener.scala | 42 +++++ .../PekkoHttpServerInterpreter.scala | 123 +++++++++++++++ .../pekkohttp/PekkoHttpServerOptions.scala | 84 ++++++++++ .../tapir/server/pekkohttp/PekkoModel.scala | 24 +++ .../server/pekkohttp/PekkoRequestBody.scala | 68 ++++++++ .../server/pekkohttp/PekkoServerRequest.scala | 48 ++++++ .../pekkohttp/PekkoServerSentEvents.scala | 25 +++ .../pekkohttp/PekkoToResponseBody.scala | 145 ++++++++++++++++++ .../server/pekkohttp/PekkoWebSockets.scala | 55 +++++++ .../sttp/tapir/server/pekkohttp/package.scala | 19 +++ .../pekkohttp/PekkoHttpServerStubTest.scala | 26 ++++ .../pekkohttp/PekkoHttpServerTest.scala | 112 ++++++++++++++ .../PekkoHttpTestServerInterpreter.scala | 29 ++++ .../pekkohttp/PekkoServerSentEventsTest.scala | 109 +++++++++++++ 31 files changed, 1692 insertions(+), 4 deletions(-) create mode 100644 doc/server/pekkohttp.md create mode 100644 examples/src/main/scala/sttp/tapir/examples/HelloWorldPekkkoServer.scala create mode 100644 examples/src/main/scala/sttp/tapir/examples/multipart/MultipartFormUploadPekkoServer.scala create mode 100644 examples/src/main/scala/sttp/tapir/examples/security/BasicAuthenticationPekkoServer.scala create mode 100644 examples/src/main/scala/sttp/tapir/examples/security/ServerSecurityLogicPekko.scala create mode 100644 examples/src/main/scala/sttp/tapir/examples/security/ServerSecurityLogicRefreshCookiesPekko.scala create mode 100644 examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentFromFilesPekkoServer.scala create mode 100644 examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentFromResourcesPekkoServer.scala create mode 100644 examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentSecurePekkoServer.scala create mode 100644 examples/src/main/scala/sttp/tapir/examples/streaming/StreamingPekkoServer.scala create mode 100644 examples/src/main/scala/sttp/tapir/examples/testing/PekkoServerStubInterpreterExample.scala create mode 100644 examples/src/main/scala/sttp/tapir/examples/websocket/WebSocketPekkoServer.scala create mode 100644 server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoBodyListener.scala create mode 100644 server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerInterpreter.scala create mode 100644 server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerOptions.scala create mode 100644 server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoModel.scala create mode 100644 server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoRequestBody.scala create mode 100644 server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoServerRequest.scala create mode 100644 server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoServerSentEvents.scala create mode 100644 server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoToResponseBody.scala create mode 100644 server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoWebSockets.scala create mode 100644 server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/package.scala create mode 100644 server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerStubTest.scala create mode 100644 server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala create mode 100644 server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpTestServerInterpreter.scala create mode 100644 server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoServerSentEventsTest.scala diff --git a/build.sbt b/build.sbt index c126c86c77..bd3b40cc40 100644 --- a/build.sbt +++ b/build.sbt @@ -197,6 +197,7 @@ lazy val rawAllAggregates = core.projectRefs ++ serverCore.projectRefs ++ akkaHttpServer.projectRefs ++ akkaGrpcServer.projectRefs ++ + pekkoHttpServer.projectRefs ++ armeriaServer.projectRefs ++ armeriaServerCats.projectRefs ++ armeriaServerZio.projectRefs ++ @@ -1159,6 +1160,21 @@ lazy val akkaHttpServer: ProjectMatrix = (projectMatrix in file("server/akka-htt .jvmPlatform(scalaVersions = scala2Versions) .dependsOn(serverCore, serverTests % Test) +lazy val pekkoHttpServer: ProjectMatrix = (projectMatrix in file("server/pekko-http-server")) + .settings(commonJvmSettings) + .settings( + name := "tapir-pekko-http-server", + libraryDependencies ++= Seq( + "org.apache.pekko" %% "pekko-http" % Versions.pekkoHttp, + "org.apache.pekko" %% "pekko-stream" % Versions.pekkoStreams, + "org.apache.pekko" %% "pekko-slf4j" % Versions.pekkoStreams, + "com.softwaremill.sttp.shared" %% "pekko" % Versions.sttpShared, + "com.softwaremill.sttp.client3" %% "pekko-http-backend" % Versions.sttp % Test + ) + ) + .jvmPlatform(scalaVersions = scala2Versions) + .dependsOn(serverCore, serverTests % Test) + lazy val akkaGrpcServer: ProjectMatrix = (projectMatrix in file("server/akka-grpc-server")) .settings(commonJvmSettings) .settings( @@ -1950,6 +1966,7 @@ lazy val examples: ProjectMatrix = (projectMatrix in file("examples")) "org.http4s" %% "http4s-circe" % Versions.http4s, "org.http4s" %% "http4s-blaze-server" % Versions.http4sBlazeServer, "com.softwaremill.sttp.client3" %% "akka-http-backend" % Versions.sttp, + "com.softwaremill.sttp.client3" %% "pekko-http-backend" % Versions.sttp, "com.softwaremill.sttp.client3" %% "async-http-client-backend-fs2" % Versions.sttp, "com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % Versions.sttp, "com.softwaremill.sttp.client3" %% "async-http-client-backend-cats" % Versions.sttp, @@ -1965,6 +1982,7 @@ lazy val examples: ProjectMatrix = (projectMatrix in file("examples")) .jvmPlatform(scalaVersions = examplesScalaVersions) .dependsOn( akkaHttpServer, + pekkoHttpServer, armeriaServer, jdkhttpServer, http4sServer, @@ -2049,6 +2067,7 @@ lazy val documentation: ProjectMatrix = (projectMatrix in file("generated-doc")) core % "compile->test", testing, akkaHttpServer, + pekkoHttpServer, armeriaServer, armeriaServerCats, armeriaServerZio, diff --git a/doc/index.md b/doc/index.md index 9f9eb21721..bb95d4745d 100644 --- a/doc/index.md +++ b/doc/index.md @@ -13,6 +13,7 @@ input and output parameters. An endpoint specification can be interpreted as: * [Http4s](server/http4s.md) `HttpRoutes[F]` (using cats-effect or [ZIO](server/zio-http4s.md)) * [Netty](server/netty.md) (using `Future`s, cats-effect or ZIO) * [Finatra](server/finatra.md) `http.Controller` + * [Pekko HTTP](server/pekkohttp.md) `Route`s/`Directive`s * [Play](server/play.md) `Route` * [Vert.X](server/vertx.md) `Router => Route` (using `Future`s, cats-effect or ZIO) * [ZIO Http](server/ziohttp.md) `Http` @@ -230,6 +231,7 @@ We offer commercial support for sttp and related technologies, as well as develo server/zio-http4s server/netty server/finatra + server/pekkohttp server/play server/vertx server/ziohttp diff --git a/doc/server/pekkohttp.md b/doc/server/pekkohttp.md new file mode 100644 index 0000000000..0c66500297 --- /dev/null +++ b/doc/server/pekkohttp.md @@ -0,0 +1,122 @@ +# Running as a pekko-http server + +To expose an endpoint as a [pekko-http](https://pekko.apache.org/docs/pekko-http/current/) server, first add the following +dependency: + +```scala +"com.softwaremill.sttp.tapir" %% "tapir-pekko-http-server" % "@VERSION@" +``` + +This will transitively pull some Pekko modules. If you want to force +your own Pekko version, use sbt exclusion. Mind the Scala version in artifact name: + +```scala +"com.softwaremill.sttp.tapir" %% "tapir-pekko-http-server" % "@VERSION@" exclude("org.apache.pekko", "pekko-stream_2.12") +``` + +Now import the object: + +```scala mdoc:compile-only +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter +``` + +## Using `toRoute` + +The `toRoute` method requires a single, or a list of `ServerEndpoint`s, which can be created by adding +[server logic](logic.md) to an endpoint. + +For example: + +```scala mdoc:compile-only +import sttp.tapir._ +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter +import scala.concurrent.Future +import org.apache.pekko.http.scaladsl.server.Route +import scala.concurrent.ExecutionContext.Implicits.global + +def countCharacters(s: String): Future[Either[Unit, Int]] = + Future.successful(Right[Unit, Int](s.length)) + +val countCharactersEndpoint: PublicEndpoint[String, Unit, Int, Any] = + endpoint.in(stringBody).out(plainBody[Int]) + +val countCharactersRoute: Route = + PekkoHttpServerInterpreter().toRoute(countCharactersEndpoint.serverLogic(countCharacters)) +``` + +## Combining directives + +The tapir-generated `Route` captures from the request only what is described by the endpoint. Combine +with other pekko-http directives to add additional behavior, or get more information from the request. + +For example, wrap the tapir-generated route in a metrics route, or nest a security directive in the +tapir-generated directive. + +Edge-case endpoints, which require special logic not expressible using tapir, can be implemented directly +using pekko-http. For example: + +```scala mdoc:compile-only +import sttp.tapir._ +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter +import org.apache.pekko.http.scaladsl.server._ + +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global + +class Special +def metricsDirective: Directive0 = ??? +def specialDirective: Directive1[Special] = ??? +val tapirEndpoint: PublicEndpoint[String, Unit, Unit, Any] = endpoint.in(path[String]("input")) + +val myRoute: Route = metricsDirective { + specialDirective { special => + PekkoHttpServerInterpreter().toRoute(tapirEndpoint.serverLogic[Future] { input => + ??? + /* here we can use both `special` and `input` values */ + }) + } +} +``` + +## Streaming + +The pekko-http interpreter accepts streaming bodies of type `Source[ByteString, Any]`, as described by the `PekkoStreams` +capability. Both response bodies and request bodies can be streamed. Usage: `streamBody(PekkoStreams)(schema, format)`. + +The capability can be added to the classpath independently of the interpreter through the +`"com.softwaremill.sttp.shared" %% "pekko"` dependency. + +## Web sockets + +The interpreter supports web sockets, with pipes of type `Flow[REQ, RESP, Any]`. See [web sockets](../endpoint/websockets.md) +for more details. + +pekko-http does not expose control frames (`Ping`, `Pong` and `Close`), so any setting regarding them are discarded, and +ping/pong frames which are sent explicitly are ignored. [Automatic pings](https://pekko.apache.org/docs/pekko-http/current/server-side/websocket-support.html#automatic-keep-alive-ping-support) can be instead enabled through configuration. + +## Server Sent Events + +The interpreter supports [SSE (Server Sent Events)](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events). + +For example, to define an endpoint that returns event stream: + +```scala mdoc:compile-only +import org.apache.pekko.stream.scaladsl.Source +import sttp.model.sse.ServerSentEvent +import sttp.tapir._ +import sttp.tapir.server.pekkohttp.{PekkoHttpServerInterpreter, serverSentEventsBody} + +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global + +val sseEndpoint = endpoint.get.out(serverSentEventsBody) + +val routes = PekkoHttpServerInterpreter().toRoute(sseEndpoint.serverLogicSuccess[Future](_ => + Future.successful(Source.single(ServerSentEvent(Some("data"), None, None, None))) +)) +``` + +## Configuration + +The interpreter can be configured by providing an `PekkoHttpServerOptions` value, see +[server options](options.md) for details. diff --git a/doc/stability.md b/doc/stability.md index 5118a9b46e..28ff011420 100644 --- a/doc/stability.md +++ b/doc/stability.md @@ -25,6 +25,7 @@ The modules are categorised using the following levels: | finatra | stabilising | | http4s | stabilising | | netty | experimental | +| pekko-http| stabilising | | play | stabilising | | vertx | stabilising | | zio1-http | experimental | diff --git a/examples/src/main/scala/sttp/tapir/examples/HelloWorldPekkkoServer.scala b/examples/src/main/scala/sttp/tapir/examples/HelloWorldPekkkoServer.scala new file mode 100644 index 0000000000..49856a5625 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/HelloWorldPekkkoServer.scala @@ -0,0 +1,37 @@ +package sttp.tapir.examples + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Route +import sttp.client3._ +import sttp.tapir._ +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +object HelloWorldPekkoServer extends App { + implicit val actorSystem: ActorSystem = ActorSystem() + import actorSystem.dispatcher + + // the endpoint: single fixed path input ("hello"), single query parameter + // corresponds to: GET /hello?name=... + val helloWorld: PublicEndpoint[String, Unit, String, Any] = + endpoint.get.in("hello").in(query[String]("name")).out(stringBody) + + // converting an endpoint to a route (providing server-side logic); extension method comes from imported packages + val helloWorldRoute: Route = + PekkoHttpServerInterpreter().toRoute(helloWorld.serverLogicSuccess(name => Future.successful(s"Hello, $name!"))) + + // starting the server + val bindAndCheck = Http().newServerAt("localhost", 8080).bindFlow(helloWorldRoute).map { _ => + // testing + val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend() + val result: String = basicRequest.response(asStringAlways).get(uri"http://localhost:8080/hello?name=Frodo").send(backend).body + println("Got result: " + result) + + assert(result == "Hello, Frodo!") + } + + Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute) +} diff --git a/examples/src/main/scala/sttp/tapir/examples/multipart/MultipartFormUploadPekkoServer.scala b/examples/src/main/scala/sttp/tapir/examples/multipart/MultipartFormUploadPekkoServer.scala new file mode 100644 index 0000000000..4285b8b826 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/multipart/MultipartFormUploadPekkoServer.scala @@ -0,0 +1,62 @@ +package sttp.tapir.examples.multipart + +import java.io.PrintWriter + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Route +import sttp.client3._ +import sttp.tapir.generic.auto._ +import sttp.model.Part +import sttp.tapir._ +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +object MultipartFormUploadPekkoServer extends App { + implicit val actorSystem: ActorSystem = ActorSystem() + import actorSystem.dispatcher + + // the class representing the multipart data + // + // parts can be referenced directly; if part metadata is needed, we define the type wrapped with Part[_]. + // + // note that for binary parts need to be buffered either in-memory or in the filesystem anyway (the whole request + // has to be read to find out what are the parts), so handling multipart requests in a purely streaming fashion is + // not possible + case class UserProfile(name: String, hobby: Option[String], age: Int, photo: Part[TapirFile]) + + // corresponds to: POST /user/profile [multipart form data with fields name, hobby, age, photo] + val setProfile: PublicEndpoint[UserProfile, Unit, String, Any] = + endpoint.post.in("user" / "profile").in(multipartBody[UserProfile]).out(stringBody) + + // converting an endpoint to a route (providing server-side logic); extension method comes from imported packages + val setProfileRoute: Route = PekkoHttpServerInterpreter().toRoute(setProfile.serverLogicSuccess { data => + Future { + val response = s"Received: ${data.name} / ${data.hobby} / ${data.age} / ${data.photo.fileName} (${data.photo.body.length()})" + data.photo.body.delete() + response + } + }) + + // starting the server + val bindAndCheck = Http().newServerAt("localhost", 8080).bindFlow(setProfileRoute).map { _ => + val testFile = java.io.File.createTempFile("user-123", ".jpg") + val pw = new PrintWriter(testFile); pw.write("This is not a photo"); pw.close() + + // testing + val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend() + val result: String = basicRequest + .response(asStringAlways) + .get(uri"http://localhost:8080/user/profile") + .multipartBody(multipart("name", "Frodo"), multipart("hobby", "hiking"), multipart("age", "33"), multipartFile("photo", testFile)) + .send(backend) + .body + println("Got result: " + result) + + assert(result == s"Received: Frodo / Some(hiking) / 33 / Some(${testFile.getName}) (19)") + } + + Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute) +} diff --git a/examples/src/main/scala/sttp/tapir/examples/security/BasicAuthenticationPekkoServer.scala b/examples/src/main/scala/sttp/tapir/examples/security/BasicAuthenticationPekkoServer.scala new file mode 100644 index 0000000000..d82aca2eee --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/security/BasicAuthenticationPekkoServer.scala @@ -0,0 +1,46 @@ +package sttp.tapir.examples.security + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Route +import sttp.client3._ +import sttp.model.StatusCode +import sttp.model.headers.WWWAuthenticateChallenge +import sttp.tapir._ +import sttp.tapir.model._ +import sttp.tapir.server.pekkohttp._ + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} + +object BasicAuthenticationPekkoServer extends App { + implicit val actorSystem: ActorSystem = ActorSystem() + import actorSystem.dispatcher + + val secret: Endpoint[UsernamePassword, Unit, Unit, String, Any] = + endpoint.get.securityIn("secret").securityIn(auth.basic[UsernamePassword](WWWAuthenticateChallenge.basic("example"))).out(stringBody) + + val secretRoute: Route = + PekkoHttpServerInterpreter().toRoute( + secret + .serverSecurityLogic(credentials => Future.successful(Right(credentials.username): Either[Unit, String])) + .serverLogic(username => _ => Future.successful(Right(s"Hello, $username!"))) + ) + + // starting the server + val bindAndCheck = Http().newServerAt("localhost", 8080).bindFlow(secretRoute).map { _ => + // testing + val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend() + val unauthorized = basicRequest.get(uri"http://localhost:8080/secret").send(backend) + println("Got result: " + unauthorized) + assert(unauthorized.code == StatusCode.Unauthorized) + assert(unauthorized.header("WWW-Authenticate").contains("""Basic realm="example"""")) + + val result = basicRequest.get(uri"http://localhost:8080/secret").header("Authorization", "Basic dXNlcjpzZWNyZXQ=").send(backend) + println("Got result: " + result) + assert(result.code == StatusCode.Ok) + assert(result.body == Right("Hello, user!")) + } + + Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute) +} diff --git a/examples/src/main/scala/sttp/tapir/examples/security/ServerSecurityLogicPekko.scala b/examples/src/main/scala/sttp/tapir/examples/security/ServerSecurityLogicPekko.scala new file mode 100644 index 0000000000..adfbcee2b9 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/security/ServerSecurityLogicPekko.scala @@ -0,0 +1,89 @@ +package sttp.tapir.examples.security + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Directives._ +import org.apache.pekko.http.scaladsl.server.Route +import sttp.client3._ +import sttp.model.HeaderNames +import sttp.tapir._ +import sttp.tapir.server.{PartialServerEndpoint, ServerEndpoint} +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} + +object ServerSecurityLogicPekko extends App { + implicit val actorSystem: ActorSystem = ActorSystem() + import actorSystem.dispatcher + + // authentication data structure & logic + case class User(name: String) + case class AuthenticationToken(value: String) + case class AuthenticationError(code: Int) + + def authenticate(token: AuthenticationToken): Future[Either[AuthenticationError, User]] = + Future { + if (token.value == "berries") Right(User("Papa Smurf")) + else if (token.value == "smurf") Right(User("Gargamel")) + else Left(AuthenticationError(1001)) + } + + // defining a base endpoint, which has the authentication logic built-in + val secureEndpoint: PartialServerEndpoint[AuthenticationToken, User, Unit, AuthenticationError, Unit, Any, Future] = endpoint + .securityIn(auth.bearer[String]().mapTo[AuthenticationToken]) + // returning the authentication error code to the user + .errorOut(plainBody[Int].mapTo[AuthenticationError]) + .serverSecurityLogic(authenticate) + + // the errors that might occur in the /hello endpoint - either a wrapped authentication error, or refusal to greet + sealed trait HelloError + case class AuthenticationHelloError(wrapped: AuthenticationError) extends HelloError + case class NoHelloError(why: String) extends HelloError + + // extending the base endpoint with hello-endpoint-specific inputs + val secureHelloWorldWithLogic: ServerEndpoint[Any, Future] = secureEndpoint.get + .in("hello") + .in(query[String]("salutation")) + .out(stringBody) + .mapErrorOut(AuthenticationHelloError)(_.wrapped) + // returning a 400 with the "why" field from the exception + .errorOutVariant[HelloError](oneOfVariant(stringBody.mapTo[NoHelloError])) + // defining the remaining server logic (which uses the authenticated user) + .serverLogic { user => salutation => + Future( + if (user.name == "Gargamel") Left(NoHelloError(s"Not saying hello to ${user.name}!")) + else Right(s"$salutation, ${user.name}!") + ) + } + + // --- + + // interpreting as routes + val helloWorld1Route: Route = PekkoHttpServerInterpreter().toRoute(secureHelloWorldWithLogic) + + // starting the server + val bindAndCheck = Http().newServerAt("localhost", 8080).bind(concat(helloWorld1Route)).map { _ => + // testing + val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend() + + def testWith(path: String, salutation: String, token: String): String = { + val result: String = basicRequest + .response(asStringAlways) + .get(uri"http://localhost:8080/$path?salutation=$salutation") + .header(HeaderNames.Authorization, s"Bearer $token") + .send(backend) + .body + + println(s"For path: $path, salutation: $salutation, token: $token, got result: $result") + result + } + + assert(testWith("hello", "Hello", "berries") == "Hello, Papa Smurf!") + assert(testWith("hello", "Cześć", "berries") == "Cześć, Papa Smurf!") + assert(testWith("hello", "Hello", "apple") == "1001") + assert(testWith("hello", "Hello", "smurf") == "Not saying hello to Gargamel!") + } + + Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute) +} diff --git a/examples/src/main/scala/sttp/tapir/examples/security/ServerSecurityLogicRefreshCookiesPekko.scala b/examples/src/main/scala/sttp/tapir/examples/security/ServerSecurityLogicRefreshCookiesPekko.scala new file mode 100644 index 0000000000..b6f59316b6 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/security/ServerSecurityLogicRefreshCookiesPekko.scala @@ -0,0 +1,70 @@ +package sttp.tapir.examples.security + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Directives.concat +import org.apache.pekko.http.scaladsl.server.Route +import sttp.client3._ +import sttp.model.StatusCode +import sttp.model.headers.CookieValueWithMeta +import sttp.tapir._ +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter +import sttp.tapir.server.{PartialServerEndpointWithSecurityOutput, ServerEndpoint} + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} + +object ServerSecurityLogicRefreshCookiesPekko extends App { + implicit val actorSystem: ActorSystem = ActorSystem() + import actorSystem.dispatcher + + case class User(name: String) + + // we're always refreshing the cookie + def authenticate(token: Option[String]): Either[Unit, (Option[CookieValueWithMeta], User)] = + token match { + case None => Left(()) + case Some(token) => Right((Some(CookieValueWithMeta.unsafeApply("new token")), User(token))) + } + + // defining a base endpoint, which has the authentication logic built-in + val secureEndpoint + : PartialServerEndpointWithSecurityOutput[Option[String], User, Unit, Unit, Option[CookieValueWithMeta], Unit, Any, Future] = endpoint + .securityIn(auth.apiKey(cookie[Option[String]]("token"))) + // optionally returning a refreshed cookie + .out(setCookieOpt("token")) + .errorOut(statusCode(StatusCode.Unauthorized)) + .serverSecurityLogicPureWithOutput(authenticate) + + // extending the base endpoint with hello-endpoint-specific inputs + val secureHelloWorldWithLogic: ServerEndpoint[Any, Future] = secureEndpoint.get + .in("hello") + .in(query[String]("salutation")) + .out(stringBody) + // defining the remaining server logic (which uses the authenticated user) + .serverLogic { user => salutation => + Future(Right(s"$salutation, ${user.name}!")) + } + + // --- + + // interpreting as routes + val helloWorld1Route: Route = PekkoHttpServerInterpreter().toRoute(secureHelloWorldWithLogic) + + // starting the server + val bindAndCheck = Http().newServerAt("localhost", 8080).bind(concat(helloWorld1Route)).map { _ => + // testing + val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend() + + val response = basicRequest + .response(asStringAlways) + .cookie("token", "Steve") + .get(uri"http://localhost:8080/hello?salutation=Welcome") + .send(backend) + + assert(response.body == "Welcome, Steve!") + assert(response.unsafeCookies.map(_.value).toList == List("new token")) + } + + Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute) +} diff --git a/examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentFromFilesPekkoServer.scala b/examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentFromFilesPekkoServer.scala new file mode 100644 index 0000000000..42e3c03e7c --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentFromFilesPekkoServer.scala @@ -0,0 +1,54 @@ +package sttp.tapir.examples.static_content + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Route +import sttp.client3._ +import sttp.model.{ContentRangeUnits, Header, HeaderNames, StatusCode} +import sttp.tapir._ +import sttp.tapir.files._ +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter + +import java.nio.file.{Files, Path, StandardOpenOption} +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, Future} + +object StaticContentFromFilesPekkoServer extends App { + implicit val actorSystem: ActorSystem = ActorSystem() + import actorSystem.dispatcher + + val content = "f1 content" + val exampleDirectory: Path = Files.createTempDirectory("pekko-static-example") + Files.write(exampleDirectory.resolve("f1"), content.getBytes, StandardOpenOption.CREATE_NEW) + + val fileEndpoints = staticFilesServerEndpoints[Future]("range-example")(exampleDirectory.toFile.getAbsolutePath) + val route: Route = PekkoHttpServerInterpreter().toRoute(fileEndpoints) + + // starting the server + val bindAndCheck: Future[Unit] = Http().newServerAt("localhost", 8080).bindFlow(route).map { _ => + // testing + val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend() + val headResponse = basicRequest + .head(uri"http://localhost:8080/range-example/f1") + .response(asStringAlways) + .send(backend) + + assert(headResponse.code == StatusCode.Ok) + assert(headResponse.headers.contains(Header(HeaderNames.AcceptRanges, ContentRangeUnits.Bytes))) + assert(headResponse.headers.contains(Header(HeaderNames.ContentLength, content.length.toString))) + + val getResponse = basicRequest + .headers(Header(HeaderNames.Range, "bytes=3-6")) + .get(uri"http://localhost:8080/range-example/f1") + .response(asStringAlways) + .send(backend) + + assert(getResponse.body == "cont") + assert(getResponse.code == StatusCode.PartialContent) + assert(getResponse.body.length == 4) + assert(getResponse.headers.contains(Header(HeaderNames.ContentRange, "bytes 3-6/10"))) + + } + + Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute) +} diff --git a/examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentFromResourcesPekkoServer.scala b/examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentFromResourcesPekkoServer.scala new file mode 100644 index 0000000000..82579de4b5 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentFromResourcesPekkoServer.scala @@ -0,0 +1,33 @@ +package sttp.tapir.examples.static_content + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Route +import sttp.tapir._ +import sttp.tapir.files._ +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter + +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, Future} +import scala.io.StdIn + +object StaticContentFromResourcesPekkoServer extends App { + implicit val actorSystem: ActorSystem = ActorSystem() + import actorSystem.dispatcher + + // we're pretending to be a SPA application, that is we serve index.html if the requested resource cannot be found + val resourceEndpoints = staticResourcesGetServerEndpoint[Future](emptyInput)( + StaticContentFromResourcesPekkoServer.getClass.getClassLoader, + "webapp", + FilesOptions.default.defaultFile(List("index.html")) + ) + val route: Route = PekkoHttpServerInterpreter().toRoute(resourceEndpoints) + + // starting the server + val bind = Http().newServerAt("localhost", 8080).bindFlow(route) + Await.result(bind, 1.minute) + println("Open: http://localhost:8080 and experiment with various paths.") + println("Press any key to exit ...") + StdIn.readLine() + Await.result(actorSystem.terminate(), 1.minute) +} diff --git a/examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentSecurePekkoServer.scala b/examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentSecurePekkoServer.scala new file mode 100644 index 0000000000..fd3724a674 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/static_content/StaticContentSecurePekkoServer.scala @@ -0,0 +1,59 @@ +package sttp.tapir.examples.static_content + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Route +import sttp.client3._ +import sttp.model.StatusCode +import sttp.tapir._ +import sttp.tapir.files._ +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter + +import java.nio.file.{Files, Path, StandardOpenOption} +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, Future} + +object StaticContentSecurePekkoServer extends App { + // creating test files + val exampleDirectory: Path = Files.createTempDirectory("pekko-static-secure-example") + Files.write(exampleDirectory.resolve("f1"), "f1 content".getBytes, StandardOpenOption.CREATE_NEW) + + implicit val actorSystem: ActorSystem = ActorSystem() + import actorSystem.dispatcher + + // defining the endpoints + val secureFileEndpoints = staticFilesServerEndpoints[Future]("secure")(exampleDirectory.toFile.getAbsolutePath) + .map(_.prependSecurityPure(auth.bearer[String](), statusCode(StatusCode.Forbidden)) { token => + // Right means success, Left - an error, here mapped to a constant status code + if (token.startsWith("secret")) Right(()) else Left(()) + }) + + // starting the server + val route: Route = PekkoHttpServerInterpreter().toRoute(secureFileEndpoints) + + val bindAndCheck: Future[Unit] = Http().newServerAt("localhost", 8080).bindFlow(route).map { _ => + // testing + val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend() + val response1 = basicRequest + .get(uri"http://localhost:8080/secure/f1") + .auth + .bearer("hacker") + .response(asStringAlways) + .send(backend) + + assert(response1.code == StatusCode.Forbidden) + + val response2 = basicRequest + .get(uri"http://localhost:8080/secure/f1") + .auth + .bearer("secret123") + .response(asStringAlways) + .send(backend) + + assert(response2.code == StatusCode.Ok) + assert(response2.body == "f1 content") + } + + Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute) +} + diff --git a/examples/src/main/scala/sttp/tapir/examples/streaming/StreamingPekkoServer.scala b/examples/src/main/scala/sttp/tapir/examples/streaming/StreamingPekkoServer.scala new file mode 100644 index 0000000000..f057292f90 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/streaming/StreamingPekkoServer.scala @@ -0,0 +1,41 @@ +package sttp.tapir.examples.streaming + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Route +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.util.ByteString +import sttp.capabilities.pekko.PekkoStreams +import sttp.client3._ +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter +import sttp.tapir._ + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +object StreamingPekkoServer extends App { + implicit val actorSystem: ActorSystem = ActorSystem() + import actorSystem.dispatcher + + // The endpoint: corresponds to GET /receive. + // We need to provide both the schema of the value (for documentation), as well as the format (media type) of the + // body. Here, the schema is a `string` and the media type is `text/plain`. + val streamingEndpoint: PublicEndpoint[Unit, Unit, Source[ByteString, Any], PekkoStreams] = + endpoint.get.in("receive").out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + + // converting an endpoint to a route (providing server-side logic); extension method comes from imported packages + val testStream: Source[ByteString, Any] = Source.repeat("Hello!").take(10).map(s => ByteString(s)) + val streamingRoute: Route = PekkoHttpServerInterpreter().toRoute(streamingEndpoint.serverLogicSuccess(_ => Future.successful(testStream))) + + // starting the server + val bindAndCheck = Http().newServerAt("localhost", 8080).bindFlow(streamingRoute).map { _ => + // testing + val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend() + val result: String = basicRequest.response(asStringAlways).get(uri"http://localhost:8080/receive").send(backend).body + println("Got result: " + result) + + assert(result == "Hello!" * 10) + } + + Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute) +} diff --git a/examples/src/main/scala/sttp/tapir/examples/testing/PekkoServerStubInterpreterExample.scala b/examples/src/main/scala/sttp/tapir/examples/testing/PekkoServerStubInterpreterExample.scala new file mode 100644 index 0000000000..93a0449de4 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/testing/PekkoServerStubInterpreterExample.scala @@ -0,0 +1,68 @@ +package sttp.tapir.examples.testing + +import org.scalatest.flatspec.AsyncFlatSpec +import org.scalatest.matchers.should.Matchers +import sttp.client3._ +import sttp.client3.testing.SttpBackendStub +import sttp.model.StatusCode +import sttp.tapir._ +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.pekkohttp.PekkoHttpServerOptions +import sttp.tapir.server.interceptor.exception.ExceptionHandler +import sttp.tapir.server.interceptor.CustomiseInterceptors +import sttp.tapir.server.model.ValuedEndpointOutput +import sttp.tapir.server.stub.TapirStubInterpreter + +import scala.concurrent.{ExecutionContext, Future} + +class PekkoServerStubInterpreterExample extends AsyncFlatSpec with Matchers { + + it should "use custom exception handler" in { + val stubBackend: SttpBackend[Future, Any] = TapirStubInterpreter(PekkoUsersApi.options, SttpBackendStub.asynchronousFuture) + .whenServerEndpoint(PekkoUsersApi.greetUser) + .thenThrowException(new RuntimeException("error")) + .backend() + + sttp.client3.basicRequest + .get(uri"http://test.com/api/users/greet") + .send(stubBackend) + .map(_.body shouldBe Left("failed due to error")) + } + + it should "run greet users logic" in { + val stubBackend: SttpBackend[Future, Any] = TapirStubInterpreter(PekkoUsersApi.options, SttpBackendStub.asynchronousFuture) + .whenServerEndpoint(PekkoUsersApi.greetUser) + .thenRunLogic() + .backend() + + val response = sttp.client3.basicRequest + .get(uri"http://test.com/api/users/greet") + .header("Authorization", "Bearer password") + .send(stubBackend) + + // then + response.map(_.body shouldBe Right("hello user123")) + } +} + +object PekkoUsersApi { + + val greetUser: ServerEndpoint[Any, Future] = endpoint.get + .in("api" / "users" / "greet") + .securityIn(auth.bearer[String]()) + .out(stringBody) + .errorOut(stringBody) + .serverSecurityLogic(token => + Future.successful { + if (token == "password") Right("user123") else Left("unauthorized") + } + ) + .serverLogic(user => _ => Future.successful(Right(s"hello $user"))) + + val exceptionHandler = ExceptionHandler.pure[Future](ctx => + Option(ValuedEndpointOutput(stringBody.and(statusCode), (s"failed due to ${ctx.e.getMessage}", StatusCode.InternalServerError))) + ) + def options(implicit ec: ExecutionContext): CustomiseInterceptors[Future, PekkoHttpServerOptions] = + PekkoHttpServerOptions.customiseInterceptors.exceptionHandler(exceptionHandler) +} + diff --git a/examples/src/main/scala/sttp/tapir/examples/websocket/WebSocketPekkoServer.scala b/examples/src/main/scala/sttp/tapir/examples/websocket/WebSocketPekkoServer.scala new file mode 100644 index 0000000000..cd533bf2e5 --- /dev/null +++ b/examples/src/main/scala/sttp/tapir/examples/websocket/WebSocketPekkoServer.scala @@ -0,0 +1,74 @@ +package sttp.tapir.examples.websocket + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Route +import org.apache.pekko.stream.scaladsl.Flow +import io.circe.generic.auto._ +import sttp.tapir.generic.auto._ +import sttp.capabilities.WebSockets +import sttp.capabilities.pekko.PekkoStreams +import sttp.client3._ +import sttp.client3.pekkohttp.PekkoHttpBackend +import sttp.apispec.asyncapi.Server +import sttp.apispec.asyncapi.circe.yaml._ +import sttp.tapir._ +import sttp.tapir.docs.asyncapi.AsyncAPIInterpreter +import sttp.tapir.json.circe._ +import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter +import sttp.ws.WebSocket + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} + +object WebSocketPekkoServer extends App { + case class Response(hello: String) + + // The web socket endpoint: GET /ping. + // We need to provide both the type & media type for the requests, and responses. Here, the requests will be + // strings, and responses will be returned as json. + val wsEndpoint: PublicEndpoint[Unit, Unit, Flow[String, Response, Any], PekkoStreams with WebSockets] = + endpoint.get.in("ping").out(webSocketBody[String, CodecFormat.TextPlain, Response, CodecFormat.Json](PekkoStreams)) + + implicit val actorSystem: ActorSystem = ActorSystem() + import actorSystem.dispatcher + + // Implementation of the web socket: a flow which echoes incoming messages + val wsRoute: Route = + PekkoHttpServerInterpreter().toRoute( + wsEndpoint.serverLogicSuccess(_ => Future.successful(Flow.fromFunction((in: String) => Response(in)): Flow[String, Response, Any])) + ) + + // Documentation + val apiDocs = AsyncAPIInterpreter().toAsyncAPI(wsEndpoint, "JSON echo", "1.0", List("dev" -> Server("localhost:8080", "ws"))).toYaml + println(s"Paste into https://playground.asyncapi.io/ to see the docs for this endpoint:\n$apiDocs") + + // Starting the server + val bindAndCheck = Http() + .newServerAt("localhost", 8080) + .bindFlow(wsRoute) + .flatMap { _ => + // We could have interpreted wsEndpoint as a client, but here we are using sttp client directly + val backend: SttpBackend[Future, WebSockets] = PekkoHttpBackend.usingActorSystem(actorSystem) + // Client which interacts with the web socket + basicRequest + .response(asWebSocket { ws: WebSocket[Future] => + for { + _ <- ws.sendText("world") + _ <- ws.sendText("there") + r1 <- ws.receiveText() + _ = println(r1) + r2 <- ws.receiveText() + _ = println(r2) + _ <- ws.sendText("how are you") + r3 <- ws.receiveText() + _ = println(r3) + } yield () + }) + .get(uri"ws://localhost:8080/ping") + .send(backend) + } + + Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute) +} + diff --git a/project/Versions.scala b/project/Versions.scala index 02ef640b62..c9d5bb47ce 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -7,12 +7,14 @@ object Versions { val circe = "0.14.3" val circeGenericExtras = "0.14.3" val circeYaml = "0.14.2" - val sttp = "3.8.16" + val sttp = "3.9.0" val sttpModel = "1.7.2" - val sttpShared = "1.3.15" + val sttpShared = "1.3.16" val sttpApispec = "0.6.0" val akkaHttp = "10.2.10" val akkaStreams = "2.6.20" + val pekkoHttp = "1.0.0" + val pekkoStreams = "1.0.1" val swaggerUi = "5.3.1" val upickle = "3.1.2" val playJson = "2.9.4" diff --git a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaServerRequest.scala b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaServerRequest.scala index c86d71b092..ec2190ebfa 100644 --- a/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaServerRequest.scala +++ b/server/armeria-server/src/main/scala/sttp/tapir/server/armeria/ArmeriaServerRequest.scala @@ -16,10 +16,10 @@ private[armeria] final case class ArmeriaServerRequest(ctx: ServiceRequestContex private lazy val request: HttpRequest = ctx.request lazy val connectionInfo: ConnectionInfo = { - val remotePort = ctx.remoteAddress[InetSocketAddress]().getPort + val remotePort = ctx.remoteAddress().getPort val clientAddress = InetSocketAddress.createUnresolved(ctx.clientAddress().getHostAddress, remotePort) ConnectionInfo( - Some(ctx.localAddress[InetSocketAddress]()), + Some(ctx.localAddress()), Some(clientAddress), Some(ctx.sessionProtocol().isTls) ) diff --git a/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoBodyListener.scala b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoBodyListener.scala new file mode 100644 index 0000000000..7e18297845 --- /dev/null +++ b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoBodyListener.scala @@ -0,0 +1,42 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.stream.scaladsl.Flow +import org.apache.pekko.util.ByteString +import org.apache.pekko.http.scaladsl.model.HttpEntity +import org.apache.pekko.http.scaladsl.model.UniversalEntity +import sttp.tapir.server.interpreter.BodyListener + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.util.{Failure, Success, Try} + +class PekkoBodyListener(implicit ec: ExecutionContext) extends BodyListener[Future, PekkoResponseBody] { + override def onComplete(body: PekkoResponseBody)(cb: Try[Unit] => Future[Unit]): Future[PekkoResponseBody] = { + body match { + case ws @ Left(_) => cb(Success(())).map(_ => ws) + case Right(e@HttpEntity.Empty) => + Future.successful(Right(e)).andThen { case _ => cb(Success(())) } + case Right(e: UniversalEntity) => + Future.successful( + Right( + e.transformDataBytes( + e.contentLength, + Flow[ByteString].watchTermination() { case (_, f) => + f.onComplete { + case Failure(ex) => cb(Failure(ex)) + case Success(_) => cb(Success(())) + } + } + ) + ) + ) + case Right(e) => + Future.successful(Right(e.transformDataBytes(Flow[ByteString].watchTermination() { case (_, f) => + f.onComplete { + case Failure(ex) => cb(Failure(ex)) + case Success(_) => cb(Success(())) + } + }))) + } + } +} diff --git a/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerInterpreter.scala b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerInterpreter.scala new file mode 100644 index 0000000000..a796316ee4 --- /dev/null +++ b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerInterpreter.scala @@ -0,0 +1,123 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.http.scaladsl.model._ +import org.apache.pekko.http.scaladsl.server.Directives.{ + complete, + extractExecutionContext, + extractMaterializer, + extractRequestContext, + handleWebSocketMessages, + onSuccess, + reject, + respondWithHeaders +} +import org.apache.pekko.http.scaladsl.server.Route +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.util.ByteString +import sttp.capabilities.WebSockets +import sttp.capabilities.pekko.PekkoStreams +import sttp.model.Method +import sttp.monad.FutureMonad +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.pekkohttp.PekkoModel.parseHeadersOrThrowWithoutContentHeaders +import sttp.tapir.server.interceptor.RequestResult +import sttp.tapir.server.interceptor.reject.RejectInterceptor +import sttp.tapir.server.interpreter.{BodyListener, FilterServerEndpoints, RequestBody, ServerInterpreter, ToResponseBody} +import sttp.tapir.server.model.ServerResponse + +import scala.concurrent.{ExecutionContext, Future} + +trait PekkoHttpServerInterpreter { + + implicit def executionContext: ExecutionContext + + def pekkoHttpServerOptions: PekkoHttpServerOptions = PekkoHttpServerOptions.default + + def toRoute(se: ServerEndpoint[PekkoStreams with WebSockets, Future]): Route = toRoute(List(se)) + + def toRoute(ses: List[ServerEndpoint[PekkoStreams with WebSockets, Future]]): Route = + toRoute(new PekkoRequestBody(pekkoHttpServerOptions)(_, _), new PekkoToResponseBody()(_, _))(ses) + + protected def toRoute( + requestBody: (Materializer, ExecutionContext) => RequestBody[Future, PekkoStreams], + toResponseBody: (Materializer, ExecutionContext) => ToResponseBody[PekkoResponseBody, PekkoStreams] + )(ses: List[ServerEndpoint[PekkoStreams with WebSockets, Future]]): Route = { + val filterServerEndpoints = FilterServerEndpoints(ses) + val interceptors = RejectInterceptor.disableWhenSingleEndpoint(pekkoHttpServerOptions.interceptors, ses) + + extractExecutionContext { implicit ec => + extractMaterializer { implicit mat => + implicit val monad: FutureMonad = new FutureMonad() + implicit val bodyListener: BodyListener[Future, PekkoResponseBody] = new PekkoBodyListener + + val interpreter = new ServerInterpreter( + filterServerEndpoints, + requestBody(mat, ec), + toResponseBody(mat, ec), + interceptors, + pekkoHttpServerOptions.deleteFile + ) + + extractRequestContext { ctx => + val serverRequest = PekkoServerRequest(ctx) + onSuccess(interpreter(serverRequest)) { + case RequestResult.Failure(_) => reject + case RequestResult.Response(response) => serverResponseToPekko(response, serverRequest.method) + } + } + } + } + } + + private def serverResponseToPekko(response: ServerResponse[PekkoResponseBody], requestMethod: Method): Route = { + val statusCode = StatusCodes.getForKey(response.code.code).getOrElse(StatusCodes.custom(response.code.code, "")) + val pekkoHeaders = parseHeadersOrThrowWithoutContentHeaders(response) + + response.body match { + case Some(Left(flow)) => + respondWithHeaders(pekkoHeaders) { + handleWebSocketMessages(flow) + } + case Some(Right(entity)) => + complete(HttpResponse(entity = entity, status = statusCode, headers = pekkoHeaders)) + case None => + if (requestMethod.is(Method.HEAD) && response.contentLength.isDefined) { + val contentLength: Long = response.contentLength.getOrElse(0) + val contentType: ContentType = response.contentType match { + case Some(t) => ContentType.parse(t).getOrElse(ContentTypes.NoContentType) + case None => ContentTypes.NoContentType + } + complete( + HttpResponse( + status = statusCode, + headers = pekkoHeaders, + entity = HttpEntity.Default(contentType, contentLength, Source.empty) + ) + ) + } else + response.contentType match { + case Some(t) => + val contentType = ContentType.parse(t).getOrElse(ContentTypes.NoContentType) + complete(HttpResponse(statusCode, headers = pekkoHeaders, entity = HttpEntity.Strict(contentType, ByteString.empty))) + case None => complete(HttpResponse(statusCode, headers = pekkoHeaders)) + } + } + } +} + +object PekkoHttpServerInterpreter { + def apply()(implicit _ec: ExecutionContext): PekkoHttpServerInterpreter = { + new PekkoHttpServerInterpreter { + override implicit def executionContext: ExecutionContext = _ec + } + } + + def apply(serverOptions: PekkoHttpServerOptions)(implicit _ec: ExecutionContext): PekkoHttpServerInterpreter = { + new PekkoHttpServerInterpreter { + override implicit def executionContext: ExecutionContext = _ec + + override def pekkoHttpServerOptions: PekkoHttpServerOptions = serverOptions + } + } +} diff --git a/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerOptions.scala b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerOptions.scala new file mode 100644 index 0000000000..015b09c94b --- /dev/null +++ b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerOptions.scala @@ -0,0 +1,84 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.event.LoggingAdapter +import org.slf4j.LoggerFactory +import sttp.tapir.model.ServerRequest +import sttp.tapir.server.interceptor._ +import sttp.tapir.server.interceptor.log.DefaultServerLog +import sttp.tapir.{Defaults, TapirFile} + +import scala.annotation.nowarn +import scala.concurrent.{ExecutionContext, Future, blocking} + +case class PekkoHttpServerOptions( + createFile: ServerRequest => Future[TapirFile], + deleteFile: TapirFile => Future[Unit], + interceptors: List[Interceptor[Future]] +) { + def prependInterceptor(i: Interceptor[Future]): PekkoHttpServerOptions = copy(interceptors = i :: interceptors) + def appendInterceptor(i: Interceptor[Future]): PekkoHttpServerOptions = copy(interceptors = interceptors :+ i) +} + +object PekkoHttpServerOptions { + + /** Allows customising the interceptors used by the server interpreter. */ + def customiseInterceptors(implicit + ec: ExecutionContext + ): CustomiseInterceptors[Future, PekkoHttpServerOptions] = + CustomiseInterceptors( + createOptions = (ci: CustomiseInterceptors[Future, PekkoHttpServerOptions]) => + PekkoHttpServerOptions(defaultCreateFile(_), defaultDeleteFile(_), ci.interceptors) + ).serverLog(defaultSlf4jServerLog) + + def defaultCreateFile(@nowarn r: ServerRequest)(implicit ec: ExecutionContext): Future[TapirFile] = { + Future(blocking(Defaults.createTempFile())) + } + + def defaultDeleteFile(file: TapirFile)(implicit ec: ExecutionContext): Future[Unit] = { + Future(blocking(Defaults.deleteFile()(file))) + } + + val defaultSlf4jServerLog: DefaultServerLog[Future] = { + val log = LoggerFactory.getLogger(PekkoHttpServerInterpreter.getClass) + + def debugLog(msg: String, exOpt: Option[Throwable]): Future[Unit] = Future.successful { + exOpt match { + case None => log.debug(msg) + case Some(ex) => log.debug(s"$msg; exception: {}", ex) + } + } + + def errorLog(msg: String, ex: Throwable): Future[Unit] = Future.successful(log.error(msg, ex)) + + DefaultServerLog[Future]( + doLogWhenReceived = debugLog(_, None), + doLogWhenHandled = debugLog, + doLogAllDecodeFailures = debugLog, + doLogExceptions = errorLog, + noLog = Future.successful(()) + ) + } + + def defaultServerLog(loggingAdapter: LoggingAdapter): DefaultServerLog[Future] = { + def debugLog(msg: String, exOpt: Option[Throwable]): Future[Unit] = Future.successful { + exOpt match { + case None => loggingAdapter.debug(msg) + case Some(ex) => loggingAdapter.debug(s"$msg; exception: {}", ex) + } + } + + def errorLog(msg: String, ex: Throwable): Future[Unit] = Future.successful { + loggingAdapter.error(ex, msg) + } + + DefaultServerLog[Future]( + doLogWhenReceived = debugLog(_, None), + doLogWhenHandled = debugLog, + doLogAllDecodeFailures = debugLog, + doLogExceptions = errorLog, + noLog = Future.successful(()) + ) + } + + def default(implicit ec: ExecutionContext): PekkoHttpServerOptions = customiseInterceptors.options +} diff --git a/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoModel.scala b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoModel.scala new file mode 100644 index 0000000000..c997e1d351 --- /dev/null +++ b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoModel.scala @@ -0,0 +1,24 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.http.scaladsl.model.HttpHeader +import org.apache.pekko.http.scaladsl.model.HttpHeader.ParsingResult +import sttp.model.{HasHeaders, Header, HeaderNames} + +import scala.collection.immutable.Seq + +private[pekkohttp] object PekkoModel { + private val ctHeaderNameLowerCase = HeaderNames.ContentType.toLowerCase + private val clHeaderNameLowerCase = HeaderNames.ContentLength.toLowerCase + private val teHeaderNameLowerCase = HeaderNames.TransferEncoding.toLowerCase + + def parseHeadersOrThrowWithoutContentHeaders(hs: HasHeaders): Seq[HttpHeader] = + hs.headers + .map(parseHeaderOrThrow) + .filterNot(h => h.is(ctHeaderNameLowerCase) || h.is(clHeaderNameLowerCase) || h.is(teHeaderNameLowerCase)) + + def parseHeaderOrThrow(h: Header): HttpHeader = + HttpHeader.parse(h.name, h.value) match { + case ParsingResult.Ok(h, _) => h + case ParsingResult.Error(error) => throw new IllegalArgumentException(s"Cannot parse header (${h.name}, ${h.value}): $error") + } +} diff --git a/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoRequestBody.scala b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoRequestBody.scala new file mode 100644 index 0000000000..8a341dd9b8 --- /dev/null +++ b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoRequestBody.scala @@ -0,0 +1,68 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.http.scaladsl.model.{HttpEntity, Multipart} +import org.apache.pekko.http.scaladsl.server.RequestContext +import org.apache.pekko.http.scaladsl.unmarshalling.FromEntityUnmarshaller +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.{FileIO, Sink} +import org.apache.pekko.util.ByteString +import sttp.capabilities.pekko.PekkoStreams +import sttp.model.{Header, Part} +import sttp.tapir.model.ServerRequest +import sttp.tapir.server.interpreter.{RawValue, RequestBody} +import sttp.tapir.{FileRange, RawBodyType, RawPart, InputStreamRange} + +import java.io.ByteArrayInputStream + +import scala.concurrent.{ExecutionContext, Future} + +private[pekkohttp] class PekkoRequestBody(serverOptions: PekkoHttpServerOptions)(implicit + mat: Materializer, + ec: ExecutionContext +) extends RequestBody[Future, PekkoStreams] { + override val streams: PekkoStreams = PekkoStreams + override def toRaw[R](request: ServerRequest, bodyType: RawBodyType[R]): Future[RawValue[R]] = + toRawFromEntity(request, akkeRequestEntity(request), bodyType) + override def toStream(request: ServerRequest): streams.BinaryStream = akkeRequestEntity(request).dataBytes + + private def akkeRequestEntity(request: ServerRequest) = request.underlying.asInstanceOf[RequestContext].request.entity + + private def toRawFromEntity[R](request: ServerRequest, body: HttpEntity, bodyType: RawBodyType[R]): Future[RawValue[R]] = { + bodyType match { + case RawBodyType.StringBody(_) => implicitly[FromEntityUnmarshaller[String]].apply(body).map(RawValue(_)) + case RawBodyType.ByteArrayBody => implicitly[FromEntityUnmarshaller[Array[Byte]]].apply(body).map(RawValue(_)) + case RawBodyType.ByteBufferBody => implicitly[FromEntityUnmarshaller[ByteString]].apply(body).map(b => RawValue(b.asByteBuffer)) + case RawBodyType.InputStreamBody => + implicitly[FromEntityUnmarshaller[Array[Byte]]].apply(body).map(b => RawValue(new ByteArrayInputStream(b))) + case RawBodyType.FileBody => + serverOptions + .createFile(request) + .flatMap(file => body.dataBytes.runWith(FileIO.toPath(file.toPath)).map(_ => FileRange(file)).map(f => RawValue(f, Seq(f)))) + case RawBodyType.InputStreamRangeBody => + implicitly[FromEntityUnmarshaller[Array[Byte]]] + .apply(body) + .map(b => RawValue(InputStreamRange(() => new ByteArrayInputStream(b)))) + case m: RawBodyType.MultipartBody => + implicitly[FromEntityUnmarshaller[Multipart.FormData]].apply(body).flatMap { fd => + fd.parts + .mapConcat(part => m.partType(part.name).map((part, _)).toList) + .mapAsync[RawPart](1) { case (part, codecMeta) => toRawPart(request, part, codecMeta) } + .runWith[Future[scala.collection.immutable.Seq[RawPart]]](Sink.seq) + .map(RawValue.fromParts) + .asInstanceOf[Future[RawValue[R]]] + } + } + } + + private def toRawPart[R](request: ServerRequest, part: Multipart.FormData.BodyPart, bodyType: RawBodyType[R]): Future[Part[R]] = { + toRawFromEntity(request, part.entity, bodyType) + .map(r => + Part( + part.name, + r.value, + otherDispositionParams = part.additionalDispositionParams, + headers = part.additionalHeaders.map(h => Header(h.name, h.value)) + ).contentType(part.entity.contentType.toString()) + ) + } +} diff --git a/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoServerRequest.scala b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoServerRequest.scala new file mode 100644 index 0000000000..df9f3387a7 --- /dev/null +++ b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoServerRequest.scala @@ -0,0 +1,48 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.http.scaladsl.server.RequestContext +import org.apache.pekko.http.scaladsl.model.headers.{`Content-Length`, `Content-Type`} +import org.apache.pekko.http.scaladsl.model.{Uri => PekkoUri} +import sttp.model.{Header, Method, QueryParams, Uri} +import sttp.tapir.{AttributeKey, AttributeMap} +import sttp.tapir.model.{ConnectionInfo, ServerRequest} + +import scala.annotation.tailrec +import scala.collection.immutable.Seq + +private[pekkohttp] case class PekkoServerRequest(ctx: RequestContext, attributes: AttributeMap = AttributeMap.Empty) extends ServerRequest { + override def protocol: String = ctx.request.protocol.value + override lazy val connectionInfo: ConnectionInfo = ConnectionInfo(None, None, None) + override def underlying: Any = ctx + + override lazy val pathSegments: List[String] = { + @tailrec + def run(p: PekkoUri.Path, acc: List[String]): List[String] = p match { + case PekkoUri.Path.Slash(pathTail) => run(pathTail, acc) + case PekkoUri.Path.Segment(s, pathTail) => run(pathTail, s :: acc) + case _ => acc.reverse + } + + run(ctx.unmatchedPath, Nil) + } + override lazy val queryParameters: QueryParams = QueryParams.fromMultiMap(ctx.request.uri.query().toMultiMap) + override lazy val method: Method = Method(ctx.request.method.value.toUpperCase) + override lazy val uri: Uri = Uri.unsafeParse(ctx.request.uri.toString()) + + private val EmptyContentType = "none/none" + + // Add low-level headers that have been removed by pekko-http. + // https://doc.pekko.io/docs/pekko-http/current/common/http-model.html?language=scala#http-headers + // https://github.com/softwaremill/tapir/issues/331 + override lazy val headers: Seq[Header] = { + val contentLength = ctx.request.entity.contentLengthOption.map(`Content-Length`(_)) + val contentType = `Content-Type`(ctx.request.entity.contentType) + val pekkoHeaders = contentType :: contentLength.toList ++ ctx.request.headers + pekkoHeaders.filterNot(_.value == EmptyContentType).map(h => Header(h.name(), h.value())) + } + + override def attribute[T](k: AttributeKey[T]): Option[T] = attributes.get(k) + override def attribute[T](k: AttributeKey[T], v: T): PekkoServerRequest = copy(attributes = attributes.put(k, v)) + + override def withUnderlying(underlying: Any): ServerRequest = PekkoServerRequest(ctx = underlying.asInstanceOf[RequestContext], attributes) +} diff --git a/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoServerSentEvents.scala b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoServerSentEvents.scala new file mode 100644 index 0000000000..bf728ffa9f --- /dev/null +++ b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoServerSentEvents.scala @@ -0,0 +1,25 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.scaladsl.{Flow, Framing, Source} +import org.apache.pekko.util.ByteString +import sttp.capabilities.pekko.PekkoStreams +import sttp.model.sse.ServerSentEvent + +object PekkoServerSentEvents { + + def serialiseSSEToBytes: Source[ServerSentEvent, Any] => PekkoStreams.BinaryStream = sseStream => + sseStream.map(sse => { + ByteString(s"${sse.toString()}\n\n") + }) + + def parseBytesToSSE: PekkoStreams.BinaryStream => Source[ServerSentEvent, Any] = stream => stream.via(parse) + + private val parse: Flow[ByteString, ServerSentEvent, NotUsed] = + Framing + .delimiter(ByteString("\n\n"), maximumFrameLength = Int.MaxValue, allowTruncation = true) + .map(_.utf8String) + .map(_.split("\n").toList) + .map(ServerSentEvent.parse) + +} diff --git a/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoToResponseBody.scala b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoToResponseBody.scala new file mode 100644 index 0000000000..2b84d6c85b --- /dev/null +++ b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoToResponseBody.scala @@ -0,0 +1,145 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.http.scaladsl.model._ +import org.apache.pekko.stream.scaladsl.{FileIO, StreamConverters} +import org.apache.pekko.stream.Materializer +import org.apache.pekko.util.ByteString +import sttp.capabilities.pekko.PekkoStreams +import sttp.model.{HasHeaders, HeaderNames, Part} +import sttp.tapir.internal.charset +import sttp.tapir.server.pekkohttp.PekkoModel.parseHeadersOrThrowWithoutContentHeaders +import sttp.tapir.server.interpreter.ToResponseBody +import sttp.tapir.{CodecFormat, FileRange, RawBodyType, RawPart, WebSocketBodyOutput} +import java.nio.charset.{Charset, StandardCharsets} +import scala.concurrent.ExecutionContext +import scala.util.Try + +private[pekkohttp] class PekkoToResponseBody(implicit m: Materializer, ec: ExecutionContext) + extends ToResponseBody[PekkoResponseBody, PekkoStreams] { + override val streams: PekkoStreams = PekkoStreams + private val ChunkSize = 8192 + + override def fromRawValue[R](v: R, headers: HasHeaders, format: CodecFormat, bodyType: RawBodyType[R]): PekkoResponseBody = + Right( + overrideContentTypeIfDefined( + rawValueToResponseEntity(bodyType, formatToContentType(format, charset(bodyType)), headers.contentLength, v), + headers + ) + ) + + override def fromStreamValue( + v: streams.BinaryStream, + headers: HasHeaders, + format: CodecFormat, + charset: Option[Charset] + ): PekkoResponseBody = + Right(overrideContentTypeIfDefined(streamToEntity(formatToContentType(format, charset), headers.contentLength, v), headers)) + + override def fromWebSocketPipe[REQ, RESP]( + pipe: streams.Pipe[REQ, RESP], + o: WebSocketBodyOutput[streams.Pipe[REQ, RESP], REQ, RESP, _, PekkoStreams] + ): PekkoResponseBody = Left(PekkoWebSockets.pipeToBody(pipe, o)) + + private def rawValueToResponseEntity[CF <: CodecFormat, R]( + bodyType: RawBodyType[R], + ct: ContentType, + contentLength: Option[Long], + r: R + ): ResponseEntity = { + bodyType match { + case RawBodyType.StringBody(charset) => + ct match { + case nb: ContentType.NonBinary => HttpEntity(nb, r) + case _ => HttpEntity(ct, r.getBytes(charset)) + } + case RawBodyType.ByteArrayBody => HttpEntity(ct, r) + case RawBodyType.ByteBufferBody => HttpEntity(ct, ByteString(r)) + case RawBodyType.InputStreamBody => streamToEntity(ct, contentLength, StreamConverters.fromInputStream(() => r, ChunkSize)) + case RawBodyType.InputStreamRangeBody => + val resource = r + val initialStream = StreamConverters.fromInputStream(resource.inputStreamFromRangeStart, ChunkSize) + resource.range + .map(r => streamToEntity(ct, contentLength, toRangedStream(initialStream, bytesTotal = r.contentLength))) + .getOrElse(streamToEntity(ct, contentLength, initialStream)) + case RawBodyType.FileBody => + val tapirFile = r + tapirFile.range + .flatMap(r => r.startAndEnd.map(s => HttpEntity(ct, createFileSource(tapirFile, s._1, r.contentLength)))) + .getOrElse(HttpEntity.fromPath(ct, tapirFile.file.toPath)) + case m: RawBodyType.MultipartBody => + val parts = (r: Seq[RawPart]).flatMap(rawPartToBodyPart(m, _)) + val body = Multipart.FormData(parts: _*) + body.toEntity() + } + } + + private def createFileSource( + tapirFile: FileRange, + start: Long, + bytesTotal: Long + ): PekkoStreams.BinaryStream = + toRangedStream(FileIO.fromPath(tapirFile.file.toPath, ChunkSize, startPosition = start), bytesTotal) + + private def toRangedStream(initialStream: PekkoStreams.BinaryStream, bytesTotal: Long): PekkoStreams.BinaryStream = + initialStream + .scan((0L, ByteString.empty)) { case ((bytesConsumed, _), next) => + val bytesInNext = next.length + val bytesFromNext = Math.max(0, Math.min(bytesTotal - bytesConsumed, bytesInNext.toLong)) + (bytesConsumed + bytesInNext, next.take(bytesFromNext.toInt)) + } + .takeWhile(_._1 < bytesTotal, inclusive = true) + .map(_._2) + + private def streamToEntity(contentType: ContentType, contentLength: Option[Long], stream: PekkoStreams.BinaryStream): ResponseEntity = { + contentLength match { + case None => HttpEntity(contentType, stream) + case Some(l) => HttpEntity(contentType, l, stream) + } + } + + private def rawPartToBodyPart[T](m: RawBodyType.MultipartBody, part: Part[T]): Option[Multipart.FormData.BodyPart] = { + m.partType(part.name).map { partType => + val partContentType = part.contentType.map(parseContentType).getOrElse(ContentTypes.`application/octet-stream`) + val partContentLength = part.header(HeaderNames.ContentLength).flatMap(v => Try(v.toLong).toOption) + val body = rawValueToResponseEntity(partType.asInstanceOf[RawBodyType[Any]], partContentType, partContentLength, part.body) match { + case b: BodyPartEntity => overrideContentTypeIfDefined(b, part) + case _ => throw new IllegalArgumentException(s"$partType is not supported in multipart bodies") + } + + Multipart.FormData + .BodyPart( + part.name, + body, + part.otherDispositionParams, + parseHeadersOrThrowWithoutContentHeaders(part).toList + ) + } + } + + private def formatToContentType(format: CodecFormat, charset: Option[Charset]): ContentType = { + format match { + case CodecFormat.Json() => ContentTypes.`application/json` + case CodecFormat.TextPlain() => MediaTypes.`text/plain`.withCharset(charsetToHttpCharset(charset.getOrElse(StandardCharsets.UTF_8))) + case CodecFormat.TextHtml() => MediaTypes.`text/html`.withCharset(charsetToHttpCharset(charset.getOrElse(StandardCharsets.UTF_8))) + case CodecFormat.OctetStream() => MediaTypes.`application/octet-stream` + case CodecFormat.Zip() => MediaTypes.`application/zip` + case CodecFormat.XWwwFormUrlencoded() => MediaTypes.`application/x-www-form-urlencoded` + case CodecFormat.MultipartFormData() => MediaTypes.`multipart/form-data` + case f => + val mt = if (f.mediaType.isText) charset.fold(f.mediaType)(f.mediaType.charset(_)) else f.mediaType + parseContentType(mt.toString()) + } + } + + private def parseContentType(ct: String): ContentType = + ContentType.parse(ct).getOrElse(throw new IllegalArgumentException(s"Cannot parse content type: $ct")) + + private def charsetToHttpCharset(charset: Charset): HttpCharset = HttpCharset.custom(charset.name()) + + private def overrideContentTypeIfDefined[RE <: ResponseEntity](re: RE, headers: HasHeaders): RE = { + headers.contentType match { + case Some(ct) => re.withContentType(parseContentType(ct)).asInstanceOf[RE] + case None => re + } + } +} diff --git a/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoWebSockets.scala b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoWebSockets.scala new file mode 100644 index 0000000000..5bdb918665 --- /dev/null +++ b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/PekkoWebSockets.scala @@ -0,0 +1,55 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage} +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Flow +import org.apache.pekko.util.ByteString +import sttp.capabilities.pekko.PekkoStreams +import sttp.tapir.model.WebSocketFrameDecodeFailure +import sttp.tapir.{DecodeResult, WebSocketBodyOutput} +import sttp.ws.{WebSocketClosed, WebSocketFrame} + +import scala.concurrent.{ExecutionContext, Future} + +private[pekkohttp] object PekkoWebSockets { + def pipeToBody[REQ, RESP]( + pipe: Flow[REQ, RESP, Any], + o: WebSocketBodyOutput[Flow[REQ, RESP, Any], REQ, RESP, _, PekkoStreams] + )(implicit ec: ExecutionContext, mat: Materializer): Flow[Message, Message, Any] = { + Flow[Message] + .mapAsync(1)(messageToFrame(_)) + .map(f => + o.requests.decode(f) match { + case failure: DecodeResult.Failure => throw new WebSocketFrameDecodeFailure(f, failure) + case DecodeResult.Value(v) => v + } + ) + .via(pipe) + .map(o.responses.encode) + .takeWhile { + case WebSocketFrame.Close(_, _) => false + case _ => true + } + .mapConcat(frameToMessage(_).toList) + } + + private def messageToFrame( + m: Message + )(implicit ec: ExecutionContext, mat: Materializer): Future[WebSocketFrame.Data[_]] = + m match { + case msg: TextMessage => + msg.textStream.runFold("")(_ + _).map(t => WebSocketFrame.text(t)) + case msg: BinaryMessage => + msg.dataStream.runFold(ByteString.empty)(_ ++ _).map(b => WebSocketFrame.binary(b.toArray)) + } + + private def frameToMessage(w: WebSocketFrame): Option[Message] = { + w match { + case WebSocketFrame.Text(p, _, _) => Some(TextMessage(p)) + case WebSocketFrame.Binary(p, _, _) => Some(BinaryMessage(ByteString(p))) + case WebSocketFrame.Ping(_) => None + case WebSocketFrame.Pong(_) => None + case WebSocketFrame.Close(_, _) => throw WebSocketClosed(None) + } + } +} diff --git a/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/package.scala b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/package.scala new file mode 100644 index 0000000000..c4861f7b8c --- /dev/null +++ b/server/pekko-http-server/src/main/scala/sttp/tapir/server/pekkohttp/package.scala @@ -0,0 +1,19 @@ +package sttp.tapir.server + +import org.apache.pekko.http.scaladsl.model.ResponseEntity +import org.apache.pekko.http.scaladsl.model.ws.Message +import org.apache.pekko.stream.scaladsl.{Flow, Source} +import org.apache.pekko.util.ByteString +import sttp.capabilities.pekko.PekkoStreams +import sttp.model.sse.ServerSentEvent +import sttp.tapir.{CodecFormat, StreamBodyIO, streamTextBody} + +import java.nio.charset.Charset + +package object pekkohttp { + type PekkoResponseBody = Either[Flow[Message, Message, Any], ResponseEntity] + + val serverSentEventsBody: StreamBodyIO[Source[ByteString, Any], Source[ServerSentEvent, Any], PekkoStreams] = + streamTextBody(PekkoStreams)(CodecFormat.TextEventStream(), Some(Charset.forName("UTF-8"))) + .map(PekkoServerSentEvents.parseBytesToSSE)(PekkoServerSentEvents.serialiseSSEToBytes) +} diff --git a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerStubTest.scala b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerStubTest.scala new file mode 100644 index 0000000000..a103e7919e --- /dev/null +++ b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerStubTest.scala @@ -0,0 +1,26 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.util.ByteString +import sttp.capabilities.pekko.PekkoStreams +import sttp.client3.testing.SttpBackendStub +import sttp.monad.FutureMonad +import sttp.tapir.server.interceptor.CustomiseInterceptors +import sttp.tapir.server.tests.{CreateServerStubTest, ServerStubStreamingTest, ServerStubTest} + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +object PekkoCreateServerStubTest extends CreateServerStubTest[Future, PekkoHttpServerOptions] { + override def customiseInterceptors: CustomiseInterceptors[Future, PekkoHttpServerOptions] = PekkoHttpServerOptions.customiseInterceptors + override def stub[R]: SttpBackendStub[Future, R] = SttpBackendStub(new FutureMonad()) + override def asFuture[A]: Future[A] => Future[A] = identity +} + +class PekkoHttpServerStubTest extends ServerStubTest(PekkoCreateServerStubTest) + +class PekkoHttpServerStubStreamingTest extends ServerStubStreamingTest(PekkoCreateServerStubTest, PekkoStreams) { + + /** Must be an instance of streams.BinaryStream */ + override def sampleStream: Any = Source.single(ByteString("hello")) +} diff --git a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala new file mode 100644 index 0000000000..a442c74eee --- /dev/null +++ b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpServerTest.scala @@ -0,0 +1,112 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.model.HttpEntity +import org.apache.pekko.http.scaladsl.server.{Directives, RequestContext} +import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source} +import cats.data.NonEmptyList +import cats.effect.unsafe.implicits.global +import cats.effect.{IO, Resource} +import cats.implicits._ +import org.scalatest.EitherValues +import org.scalatest.matchers.should.Matchers._ +import sttp.capabilities.pekko.PekkoStreams +import sttp.client3._ +import sttp.client3.pekkohttp.PekkoHttpBackend +import sttp.model.sse.ServerSentEvent +import sttp.monad.FutureMonad +import sttp.monad.syntax._ +import sttp.tapir._ +import sttp.tapir.server.interceptor._ +import sttp.tapir.server.tests._ +import sttp.tapir.tests.{Test, TestSuite} + +import java.util.UUID +import scala.concurrent.Future +import scala.util.Random + +class PekkoHttpServerTest extends TestSuite with EitherValues { + private def randomUUID = Some(UUID.randomUUID().toString) + private val sse1 = ServerSentEvent(randomUUID, randomUUID, randomUUID, Some(Random.nextInt(200))) + private val sse2 = ServerSentEvent(randomUUID, randomUUID, randomUUID, Some(Random.nextInt(200))) + + def actorSystemResource: Resource[IO, ActorSystem] = + Resource.make(IO.delay(ActorSystem()))(actorSystem => IO.fromFuture(IO.delay(actorSystem.terminate())).void) + + override def tests: Resource[IO, List[Test]] = backendResource.flatMap { backend => + actorSystemResource.map { implicit actorSystem => + implicit val m: FutureMonad = new FutureMonad()(actorSystem.dispatcher) + + val interpreter = new PekkoHttpTestServerInterpreter()(actorSystem) + val createServerTest = new DefaultCreateServerTest(backend, interpreter) + + def additionalTests(): List[Test] = List( + Test("endpoint nested in a path directive") { + val e = endpoint.get.in("test" and "directive").out(stringBody).serverLogic(_ => ("ok".asRight[Unit]).unit) + val route = Directives.pathPrefix("api")(PekkoHttpServerInterpreter().toRoute(e)) + interpreter + .server(NonEmptyList.of(route)) + .use { port => + basicRequest.get(uri"http://localhost:$port/api/test/directive").send(backend).map(_.body shouldBe Right("ok")) + } + .unsafeToFuture() + }, + Test("Send and receive SSE") { + implicit val ec = actorSystem.dispatcher + val e = endpoint.get + .in("sse") + .out(serverSentEventsBody) + .serverLogicSuccess[Future](_ => { + Future.successful(Source(List(sse1, sse2))) + }) + val route = PekkoHttpServerInterpreter().toRoute(e) + interpreter + .server(NonEmptyList.of(route)) + .use { port => + IO.fromFuture { + IO( + basicRequest + .get(uri"http://localhost:$port/sse") + .response( + asStreamUnsafe(PekkoStreams).mapRight(stream => + PekkoServerSentEvents.parseBytesToSSE(stream).runFold(List.empty[ServerSentEvent])((acc, sse) => acc :+ sse) + ) + ) + .send(PekkoHttpBackend.usingActorSystem(actorSystem)) + .flatMap(_.body.value.transform(sse => sse shouldBe List(sse1, sse2), ex => fail(ex))) + ) + } + } + .unsafeToFuture() + }, + Test("replace body using a request interceptor") { + val e = endpoint.post.in(stringBody).out(stringBody).serverLogicSuccess[Future](body => Future.successful(body)) + + val route = PekkoHttpServerInterpreter( + PekkoHttpServerOptions.customiseInterceptors + .prependInterceptor(RequestInterceptor.transformServerRequest { request => + val underlying = request.underlying.asInstanceOf[RequestContext] + val changedUnderlying = underlying.withRequest(underlying.request.withEntity(HttpEntity("replaced"))) + Future.successful(request.withUnderlying(changedUnderlying)) + }) + .options + ).toRoute(e) + + interpreter + .server(NonEmptyList.of(route)) + .use { port => + basicRequest.post(uri"http://localhost:$port").body("test123").send(backend).map(_.body shouldBe Right("replaced")) + } + .unsafeToFuture() + } + ) + new AllServerTests(createServerTest, interpreter, backend).tests() ++ + new ServerStreamingTests(createServerTest, PekkoStreams).tests() ++ + new ServerWebSocketTests(createServerTest, PekkoStreams) { + override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = Flow.fromFunction(f) + override def emptyPipe[A, B]: Flow[A, B, Any] = Flow.fromSinkAndSource(Sink.ignore, Source.empty) + }.tests() ++ + additionalTests() + } + } +} diff --git a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpTestServerInterpreter.scala b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpTestServerInterpreter.scala new file mode 100644 index 0000000000..ed60ecddf0 --- /dev/null +++ b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoHttpTestServerInterpreter.scala @@ -0,0 +1,29 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.server.Directives.concat +import org.apache.pekko.http.scaladsl.server.Route +import cats.data.NonEmptyList +import cats.effect.{IO, Resource} +import sttp.capabilities.WebSockets +import sttp.capabilities.pekko.PekkoStreams +import sttp.tapir.server.ServerEndpoint +import sttp.tapir.server.tests.TestServerInterpreter +import sttp.tapir.tests.Port + +import scala.concurrent.Future + +class PekkoHttpTestServerInterpreter(implicit actorSystem: ActorSystem) + extends TestServerInterpreter[Future, PekkoStreams with WebSockets, PekkoHttpServerOptions, Route] { + override def route(es: List[ServerEndpoint[PekkoStreams with WebSockets, Future]], interceptors: Interceptors): Route = { + import actorSystem.dispatcher + val serverOptions: PekkoHttpServerOptions = interceptors(PekkoHttpServerOptions.customiseInterceptors).options + PekkoHttpServerInterpreter(serverOptions).toRoute(es) + } + + override def server(routes: NonEmptyList[Route]): Resource[IO, Port] = { + val bind = IO.fromFuture(IO(Http().newServerAt("localhost", 0).bind(concat(routes.toList: _*)))) + Resource.make(bind)(binding => IO.fromFuture(IO(binding.unbind())).void).map(_.localAddress.getPort) + } +} diff --git a/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoServerSentEventsTest.scala b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoServerSentEventsTest.scala new file mode 100644 index 0000000000..d74888878d --- /dev/null +++ b/server/pekko-http-server/src/test/scala/sttp/tapir/server/pekkohttp/PekkoServerSentEventsTest.scala @@ -0,0 +1,109 @@ +package sttp.tapir.server.pekkohttp + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.util.ByteString +import org.scalatest.funsuite.AsyncFunSuite +import org.scalatest.matchers.should.Matchers +import sttp.model.sse.ServerSentEvent + +class PekkoServerSentEventsTest extends AsyncFunSuite with Matchers { + + implicit val materializer: Materializer = Materializer(ActorSystem("PekkoHttpServerInterpreterTest")) + + test("serialiseSSEToBytes should successfully serialise simple Server Sent Event to ByteString") { + val sse = Source.single(ServerSentEvent(Some("data"), Some("event"), Some("id1"), Some(10))) + val serialised = PekkoServerSentEvents.serialiseSSEToBytes(sse) + val futureEvents = serialised.runFold(List.empty[ByteString])((acc, event) => acc :+ event) + futureEvents.map(sseEvents => { + sseEvents shouldBe List( + ByteString( + s"""data: data + |event: event + |id: id1 + |retry: 10 + | + |""".stripMargin + ) + ) + }) + } + + test("serialiseSSEToBytes should omit fields that are not set") { + val sse = Source.single(ServerSentEvent(Some("data"), None, Some("id1"), None)) + val serialised = PekkoServerSentEvents.serialiseSSEToBytes(sse) + val futureEvents = serialised.runFold(List.empty[ByteString])((acc, event) => acc :+ event) + futureEvents.map(sseEvents => { + sseEvents shouldBe List( + ByteString( + s"""data: data + |id: id1 + | + |""".stripMargin + ) + ) + }) + } + + test("serialiseSSEToBytes should successfully serialise multiline data event") { + val sse = Source.single( + ServerSentEvent( + Some("""some data info 1 + |some data info 2 + |some data info 3""".stripMargin), + None, + None, + None + ) + ) + val serialised = PekkoServerSentEvents.serialiseSSEToBytes(sse) + val futureEvents = serialised.runFold(List.empty[ByteString])((acc, event) => acc :+ event) + futureEvents.map(sseEvents => { + sseEvents shouldBe List( + ByteString( + s"""data: some data info 1 + |data: some data info 2 + |data: some data info 3 + | + |""".stripMargin + ) + ) + }) + } + + test("parseBytesToSSE should successfully parse SSE bytes to SSE structure") { + val sseBytes = Source.single( + ByteString( + """data: event1 data + |event: event1 + |id: id1 + |retry: 5 + | + | + |data: event2 data1 + |data: event2 data2 + |data: event2 data3 + |id: id2 + | + |""".stripMargin + ) + ) + val parsed = PekkoServerSentEvents.parseBytesToSSE(sseBytes) + val futureEvents = parsed.runFold(List.empty[ServerSentEvent])((acc, event) => acc :+ event) + futureEvents.map(events => + events shouldBe List( + ServerSentEvent(Some("event1 data"), Some("event1"), Some("id1"), Some(5)), + ServerSentEvent( + Some("""event2 data1 + |event2 data2 + |event2 data3""".stripMargin), + None, + Some("id2"), + None + ) + ) + ) + } + +}