Skip to content

Commit

Permalink
Add Pekko support (#2883)
Browse files Browse the repository at this point in the history
Co-authored-by: Krzysiek Ciesielski <[email protected]>
Co-authored-by: Sergio Noviello <[email protected]>
  • Loading branch information
3 people authored Aug 9, 2023
1 parent cfaaced commit c399ff1
Show file tree
Hide file tree
Showing 31 changed files with 1,692 additions and 4 deletions.
19 changes: 19 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ lazy val rawAllAggregates = core.projectRefs ++
serverCore.projectRefs ++
akkaHttpServer.projectRefs ++
akkaGrpcServer.projectRefs ++
pekkoHttpServer.projectRefs ++
armeriaServer.projectRefs ++
armeriaServerCats.projectRefs ++
armeriaServerZio.projectRefs ++
Expand Down Expand Up @@ -1159,6 +1160,21 @@ lazy val akkaHttpServer: ProjectMatrix = (projectMatrix in file("server/akka-htt
.jvmPlatform(scalaVersions = scala2Versions)
.dependsOn(serverCore, serverTests % Test)

lazy val pekkoHttpServer: ProjectMatrix = (projectMatrix in file("server/pekko-http-server"))
.settings(commonJvmSettings)
.settings(
name := "tapir-pekko-http-server",
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-http" % Versions.pekkoHttp,
"org.apache.pekko" %% "pekko-stream" % Versions.pekkoStreams,
"org.apache.pekko" %% "pekko-slf4j" % Versions.pekkoStreams,
"com.softwaremill.sttp.shared" %% "pekko" % Versions.sttpShared,
"com.softwaremill.sttp.client3" %% "pekko-http-backend" % Versions.sttp % Test
)
)
.jvmPlatform(scalaVersions = scala2Versions)
.dependsOn(serverCore, serverTests % Test)

lazy val akkaGrpcServer: ProjectMatrix = (projectMatrix in file("server/akka-grpc-server"))
.settings(commonJvmSettings)
.settings(
Expand Down Expand Up @@ -1950,6 +1966,7 @@ lazy val examples: ProjectMatrix = (projectMatrix in file("examples"))
"org.http4s" %% "http4s-circe" % Versions.http4s,
"org.http4s" %% "http4s-blaze-server" % Versions.http4sBlazeServer,
"com.softwaremill.sttp.client3" %% "akka-http-backend" % Versions.sttp,
"com.softwaremill.sttp.client3" %% "pekko-http-backend" % Versions.sttp,
"com.softwaremill.sttp.client3" %% "async-http-client-backend-fs2" % Versions.sttp,
"com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % Versions.sttp,
"com.softwaremill.sttp.client3" %% "async-http-client-backend-cats" % Versions.sttp,
Expand All @@ -1965,6 +1982,7 @@ lazy val examples: ProjectMatrix = (projectMatrix in file("examples"))
.jvmPlatform(scalaVersions = examplesScalaVersions)
.dependsOn(
akkaHttpServer,
pekkoHttpServer,
armeriaServer,
jdkhttpServer,
http4sServer,
Expand Down Expand Up @@ -2049,6 +2067,7 @@ lazy val documentation: ProjectMatrix = (projectMatrix in file("generated-doc"))
core % "compile->test",
testing,
akkaHttpServer,
pekkoHttpServer,
armeriaServer,
armeriaServerCats,
armeriaServerZio,
Expand Down
2 changes: 2 additions & 0 deletions doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ input and output parameters. An endpoint specification can be interpreted as:
* [Http4s](server/http4s.md) `HttpRoutes[F]` (using cats-effect or [ZIO](server/zio-http4s.md))
* [Netty](server/netty.md) (using `Future`s, cats-effect or ZIO)
* [Finatra](server/finatra.md) `http.Controller`
* [Pekko HTTP](server/pekkohttp.md) `Route`s/`Directive`s
* [Play](server/play.md) `Route`
* [Vert.X](server/vertx.md) `Router => Route` (using `Future`s, cats-effect or ZIO)
* [ZIO Http](server/ziohttp.md) `Http`
Expand Down Expand Up @@ -230,6 +231,7 @@ We offer commercial support for sttp and related technologies, as well as develo
server/zio-http4s
server/netty
server/finatra
server/pekkohttp
server/play
server/vertx
server/ziohttp
Expand Down
122 changes: 122 additions & 0 deletions doc/server/pekkohttp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Running as a pekko-http server

To expose an endpoint as a [pekko-http](https://pekko.apache.org/docs/pekko-http/current/) server, first add the following
dependency:

```scala
"com.softwaremill.sttp.tapir" %% "tapir-pekko-http-server" % "@VERSION@"
```

This will transitively pull some Pekko modules. If you want to force
your own Pekko version, use sbt exclusion. Mind the Scala version in artifact name:

```scala
"com.softwaremill.sttp.tapir" %% "tapir-pekko-http-server" % "@VERSION@" exclude("org.apache.pekko", "pekko-stream_2.12")
```

Now import the object:

```scala mdoc:compile-only
import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter
```

## Using `toRoute`

The `toRoute` method requires a single, or a list of `ServerEndpoint`s, which can be created by adding
[server logic](logic.md) to an endpoint.

For example:

```scala mdoc:compile-only
import sttp.tapir._
import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter
import scala.concurrent.Future
import org.apache.pekko.http.scaladsl.server.Route
import scala.concurrent.ExecutionContext.Implicits.global

def countCharacters(s: String): Future[Either[Unit, Int]] =
Future.successful(Right[Unit, Int](s.length))

val countCharactersEndpoint: PublicEndpoint[String, Unit, Int, Any] =
endpoint.in(stringBody).out(plainBody[Int])

val countCharactersRoute: Route =
PekkoHttpServerInterpreter().toRoute(countCharactersEndpoint.serverLogic(countCharacters))
```

## Combining directives

The tapir-generated `Route` captures from the request only what is described by the endpoint. Combine
with other pekko-http directives to add additional behavior, or get more information from the request.

For example, wrap the tapir-generated route in a metrics route, or nest a security directive in the
tapir-generated directive.

Edge-case endpoints, which require special logic not expressible using tapir, can be implemented directly
using pekko-http. For example:

```scala mdoc:compile-only
import sttp.tapir._
import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter
import org.apache.pekko.http.scaladsl.server._

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

class Special
def metricsDirective: Directive0 = ???
def specialDirective: Directive1[Special] = ???
val tapirEndpoint: PublicEndpoint[String, Unit, Unit, Any] = endpoint.in(path[String]("input"))

val myRoute: Route = metricsDirective {
specialDirective { special =>
PekkoHttpServerInterpreter().toRoute(tapirEndpoint.serverLogic[Future] { input =>
???
/* here we can use both `special` and `input` values */
})
}
}
```

## Streaming

The pekko-http interpreter accepts streaming bodies of type `Source[ByteString, Any]`, as described by the `PekkoStreams`
capability. Both response bodies and request bodies can be streamed. Usage: `streamBody(PekkoStreams)(schema, format)`.

The capability can be added to the classpath independently of the interpreter through the
`"com.softwaremill.sttp.shared" %% "pekko"` dependency.

## Web sockets

The interpreter supports web sockets, with pipes of type `Flow[REQ, RESP, Any]`. See [web sockets](../endpoint/websockets.md)
for more details.

pekko-http does not expose control frames (`Ping`, `Pong` and `Close`), so any setting regarding them are discarded, and
ping/pong frames which are sent explicitly are ignored. [Automatic pings](https://pekko.apache.org/docs/pekko-http/current/server-side/websocket-support.html#automatic-keep-alive-ping-support) can be instead enabled through configuration.

## Server Sent Events

The interpreter supports [SSE (Server Sent Events)](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events).

For example, to define an endpoint that returns event stream:

```scala mdoc:compile-only
import org.apache.pekko.stream.scaladsl.Source
import sttp.model.sse.ServerSentEvent
import sttp.tapir._
import sttp.tapir.server.pekkohttp.{PekkoHttpServerInterpreter, serverSentEventsBody}

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val sseEndpoint = endpoint.get.out(serverSentEventsBody)

val routes = PekkoHttpServerInterpreter().toRoute(sseEndpoint.serverLogicSuccess[Future](_ =>
Future.successful(Source.single(ServerSentEvent(Some("data"), None, None, None)))
))
```

## Configuration

The interpreter can be configured by providing an `PekkoHttpServerOptions` value, see
[server options](options.md) for details.
1 change: 1 addition & 0 deletions doc/stability.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The modules are categorised using the following levels:
| finatra | stabilising |
| http4s | stabilising |
| netty | experimental |
| pekko-http| stabilising |
| play | stabilising |
| vertx | stabilising |
| zio1-http | experimental |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package sttp.tapir.examples

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.server.Route
import sttp.client3._
import sttp.tapir._
import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

object HelloWorldPekkoServer extends App {
implicit val actorSystem: ActorSystem = ActorSystem()
import actorSystem.dispatcher

// the endpoint: single fixed path input ("hello"), single query parameter
// corresponds to: GET /hello?name=...
val helloWorld: PublicEndpoint[String, Unit, String, Any] =
endpoint.get.in("hello").in(query[String]("name")).out(stringBody)

// converting an endpoint to a route (providing server-side logic); extension method comes from imported packages
val helloWorldRoute: Route =
PekkoHttpServerInterpreter().toRoute(helloWorld.serverLogicSuccess(name => Future.successful(s"Hello, $name!")))

// starting the server
val bindAndCheck = Http().newServerAt("localhost", 8080).bindFlow(helloWorldRoute).map { _ =>
// testing
val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend()
val result: String = basicRequest.response(asStringAlways).get(uri"http://localhost:8080/hello?name=Frodo").send(backend).body
println("Got result: " + result)

assert(result == "Hello, Frodo!")
}

Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package sttp.tapir.examples.multipart

import java.io.PrintWriter

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.server.Route
import sttp.client3._
import sttp.tapir.generic.auto._
import sttp.model.Part
import sttp.tapir._
import sttp.tapir.server.pekkohttp.PekkoHttpServerInterpreter

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

object MultipartFormUploadPekkoServer extends App {
implicit val actorSystem: ActorSystem = ActorSystem()
import actorSystem.dispatcher

// the class representing the multipart data
//
// parts can be referenced directly; if part metadata is needed, we define the type wrapped with Part[_].
//
// note that for binary parts need to be buffered either in-memory or in the filesystem anyway (the whole request
// has to be read to find out what are the parts), so handling multipart requests in a purely streaming fashion is
// not possible
case class UserProfile(name: String, hobby: Option[String], age: Int, photo: Part[TapirFile])

// corresponds to: POST /user/profile [multipart form data with fields name, hobby, age, photo]
val setProfile: PublicEndpoint[UserProfile, Unit, String, Any] =
endpoint.post.in("user" / "profile").in(multipartBody[UserProfile]).out(stringBody)

// converting an endpoint to a route (providing server-side logic); extension method comes from imported packages
val setProfileRoute: Route = PekkoHttpServerInterpreter().toRoute(setProfile.serverLogicSuccess { data =>
Future {
val response = s"Received: ${data.name} / ${data.hobby} / ${data.age} / ${data.photo.fileName} (${data.photo.body.length()})"
data.photo.body.delete()
response
}
})

// starting the server
val bindAndCheck = Http().newServerAt("localhost", 8080).bindFlow(setProfileRoute).map { _ =>
val testFile = java.io.File.createTempFile("user-123", ".jpg")
val pw = new PrintWriter(testFile); pw.write("This is not a photo"); pw.close()

// testing
val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend()
val result: String = basicRequest
.response(asStringAlways)
.get(uri"http://localhost:8080/user/profile")
.multipartBody(multipart("name", "Frodo"), multipart("hobby", "hiking"), multipart("age", "33"), multipartFile("photo", testFile))
.send(backend)
.body
println("Got result: " + result)

assert(result == s"Received: Frodo / Some(hiking) / 33 / Some(${testFile.getName}) (19)")
}

Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package sttp.tapir.examples.security

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.server.Route
import sttp.client3._
import sttp.model.StatusCode
import sttp.model.headers.WWWAuthenticateChallenge
import sttp.tapir._
import sttp.tapir.model._
import sttp.tapir.server.pekkohttp._

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object BasicAuthenticationPekkoServer extends App {
implicit val actorSystem: ActorSystem = ActorSystem()
import actorSystem.dispatcher

val secret: Endpoint[UsernamePassword, Unit, Unit, String, Any] =
endpoint.get.securityIn("secret").securityIn(auth.basic[UsernamePassword](WWWAuthenticateChallenge.basic("example"))).out(stringBody)

val secretRoute: Route =
PekkoHttpServerInterpreter().toRoute(
secret
.serverSecurityLogic(credentials => Future.successful(Right(credentials.username): Either[Unit, String]))
.serverLogic(username => _ => Future.successful(Right(s"Hello, $username!")))
)

// starting the server
val bindAndCheck = Http().newServerAt("localhost", 8080).bindFlow(secretRoute).map { _ =>
// testing
val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend()
val unauthorized = basicRequest.get(uri"http://localhost:8080/secret").send(backend)
println("Got result: " + unauthorized)
assert(unauthorized.code == StatusCode.Unauthorized)
assert(unauthorized.header("WWW-Authenticate").contains("""Basic realm="example""""))

val result = basicRequest.get(uri"http://localhost:8080/secret").header("Authorization", "Basic dXNlcjpzZWNyZXQ=").send(backend)
println("Got result: " + result)
assert(result.code == StatusCode.Ok)
assert(result.body == Right("Hello, user!"))
}

Await.result(bindAndCheck.transformWith { r => actorSystem.terminate().transform(_ => r) }, 1.minute)
}
Loading

0 comments on commit c399ff1

Please sign in to comment.