Skip to content

Commit

Permalink
Make it easier to use tapir with impure effects (#2282)
Browse files Browse the repository at this point in the history
* Make it easier to use tapir with impure effects

* Simplify

* Revert "Simplify"

This reverts commit 1a0ccde.

* Make package private + mima

---------

Co-authored-by: kyri-petrou <[email protected]>
  • Loading branch information
ghostdogpr and kyri-petrou authored Jun 18, 2024
1 parent 3f14b94 commit 56a6879
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 113 deletions.
29 changes: 3 additions & 26 deletions adapters/akka-http/src/main/scala/caliban/AkkaHttpAdapter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import caliban.interop.tapir.{ HttpInterpreter, HttpUploadInterpreter, StreamCon
import sttp.capabilities.WebSockets
import sttp.capabilities.akka.AkkaStreams
import sttp.capabilities.akka.AkkaStreams.Pipe
import sttp.model.{ MediaType, StatusCode }
import sttp.model.StatusCode
import sttp.tapir.Codec.JsonCodec
import sttp.tapir.PublicEndpoint
import sttp.tapir.model.ServerRequest
Expand All @@ -27,19 +27,15 @@ class AkkaHttpAdapter private (private val options: AkkaHttpServerOptions)(impli
def makeHttpService[R, E](
interpreter: HttpInterpreter[R, E]
)(implicit runtime: Runtime[R], materializer: Materializer): Route =
akkaInterpreter.toRoute(
interpreter
.serverEndpoints[R, AkkaStreams](AkkaStreams)
.map(convertHttpStreamingEndpoint[R, (GraphQLRequest, ServerRequest)])
)
akkaInterpreter.toRoute(interpreter.serverEndpointsFuture[AkkaStreams](AkkaStreams)(runtime))

def makeHttpUploadService[R, E](interpreter: HttpUploadInterpreter[R, E])(implicit
runtime: Runtime[R],
materializer: Materializer,
requestCodec: JsonCodec[GraphQLRequest],
mapCodec: JsonCodec[Map[String, Seq[String]]]
): Route =
akkaInterpreter.toRoute(convertHttpStreamingEndpoint(interpreter.serverEndpoint[R, AkkaStreams](AkkaStreams)))
akkaInterpreter.toRoute(interpreter.serverEndpointFuture[AkkaStreams](AkkaStreams)(runtime))

def makeWebSocketService[R, E](
interpreter: WebSocketInterpreter[R, E]
Expand Down Expand Up @@ -107,25 +103,6 @@ object AkkaHttpAdapter {

type AkkaPipe = Flow[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput], Any]

def convertHttpStreamingEndpoint[R, I](
endpoint: ServerEndpoint.Full[Unit, Unit, I, TapirResponse, CalibanResponse[
AkkaStreams.BinaryStream
], AkkaStreams, RIO[R, *]]
)(implicit runtime: Runtime[R]): ServerEndpoint[AkkaStreams, Future] =
ServerEndpoint[
Unit,
Unit,
I,
TapirResponse,
CalibanResponse[AkkaStreams.BinaryStream],
AkkaStreams,
Future
](
endpoint.endpoint,
_ => _ => Future.successful(Right(())),
_ => _ => req => Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(endpoint.logic(zioMonadError)(())(req)))
)

def convertWebSocketEndpoint[R](
endpoint: ServerEndpoint.Full[
Unit,
Expand Down
35 changes: 6 additions & 29 deletions adapters/pekko-http/src/main/scala/caliban/PekkoHttpAdapter.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package caliban

import caliban.PekkoHttpAdapter._
import caliban.interop.tapir.TapirAdapter._
import caliban.interop.tapir.{ HttpInterpreter, HttpUploadInterpreter, StreamConstructor, WebSocketInterpreter }
import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.stream.scaladsl.{ Flow, Sink, Source }
import org.apache.pekko.stream.{ Materializer, OverflowStrategy }
import org.apache.pekko.util.ByteString
import caliban.PekkoHttpAdapter._
import caliban.interop.tapir.TapirAdapter._
import caliban.interop.tapir.{ HttpInterpreter, HttpUploadInterpreter, StreamConstructor, WebSocketInterpreter }
import sttp.capabilities.WebSockets
import sttp.capabilities.pekko.PekkoStreams
import sttp.capabilities.pekko.PekkoStreams.Pipe
import sttp.model.{ MediaType, StatusCode }
import sttp.model.StatusCode
import sttp.tapir.Codec.JsonCodec
import sttp.tapir.PublicEndpoint
import sttp.tapir.model.ServerRequest
Expand All @@ -27,19 +27,15 @@ class PekkoHttpAdapter private (val options: PekkoHttpServerOptions)(implicit ec
def makeHttpService[R, E](
interpreter: HttpInterpreter[R, E]
)(implicit runtime: Runtime[R], materializer: Materializer): Route =
pekkoInterpreter.toRoute(
interpreter
.serverEndpoints[R, PekkoStreams](PekkoStreams)
.map(convertHttpStreamingEndpoint[R, (GraphQLRequest, ServerRequest)])
)
pekkoInterpreter.toRoute(interpreter.serverEndpointsFuture[PekkoStreams](PekkoStreams)(runtime))

def makeHttpUploadService[R, E](interpreter: HttpUploadInterpreter[R, E])(implicit
runtime: Runtime[R],
materializer: Materializer,
requestCodec: JsonCodec[GraphQLRequest],
mapCodec: JsonCodec[Map[String, Seq[String]]]
): Route =
pekkoInterpreter.toRoute(convertHttpStreamingEndpoint(interpreter.serverEndpoint[R, PekkoStreams](PekkoStreams)))
pekkoInterpreter.toRoute(interpreter.serverEndpointFuture[PekkoStreams](PekkoStreams)(runtime))

def makeWebSocketService[R, E](
interpreter: WebSocketInterpreter[R, E]
Expand Down Expand Up @@ -107,25 +103,6 @@ object PekkoHttpAdapter {

type PekkoPipe = Flow[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput], Any]

def convertHttpStreamingEndpoint[R, I](
endpoint: ServerEndpoint.Full[Unit, Unit, I, TapirResponse, CalibanResponse[
PekkoStreams.BinaryStream
], PekkoStreams, RIO[R, *]]
)(implicit runtime: Runtime[R]): ServerEndpoint[PekkoStreams, Future] =
ServerEndpoint[
Unit,
Unit,
I,
TapirResponse,
CalibanResponse[PekkoStreams.BinaryStream],
PekkoStreams,
Future
](
endpoint.endpoint,
_ => _ => Future.successful(Right(())),
_ => _ => req => Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(endpoint.logic(zioMonadError)(())(req)))
)

def convertWebSocketEndpoint[R](
endpoint: ServerEndpoint.Full[
Unit,
Expand Down
39 changes: 2 additions & 37 deletions adapters/play/src/main/scala/caliban/PlayAdapter.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package caliban

import caliban.PlayAdapter.convertHttpStreamingEndpoint
import caliban.interop.tapir.TapirAdapter._
import caliban.interop.tapir.{ HttpInterpreter, HttpUploadInterpreter, StreamConstructor, WebSocketInterpreter }
import org.apache.pekko.stream.scaladsl.{ Flow, Sink, Source }
Expand Down Expand Up @@ -28,19 +27,15 @@ class PlayAdapter private (private val options: Option[PlayServerOptions]) {
def makeHttpService[R, E](
interpreter: HttpInterpreter[R, E]
)(implicit runtime: Runtime[R], materializer: Materializer): Routes =
playInterpreter.toRoutes(
interpreter
.serverEndpoints[R, PekkoStreams](PekkoStreams)
.map(convertHttpStreamingEndpoint[R, (GraphQLRequest, ServerRequest)])
)
playInterpreter.toRoutes(interpreter.serverEndpointsFuture[PekkoStreams](PekkoStreams)(runtime))

def makeHttpUploadService[R, E](interpreter: HttpUploadInterpreter[R, E])(implicit
runtime: Runtime[R],
materializer: Materializer,
requestCodec: JsonCodec[GraphQLRequest],
mapCodec: JsonCodec[Map[String, Seq[String]]]
): Routes =
playInterpreter.toRoutes(convertHttpStreamingEndpoint(interpreter.serverEndpoint[R, PekkoStreams](PekkoStreams)))
playInterpreter.toRoutes(interpreter.serverEndpointFuture[PekkoStreams](PekkoStreams)(runtime))

def makeWebSocketService[R, E](
interpreter: WebSocketInterpreter[R, E]
Expand Down Expand Up @@ -105,36 +100,6 @@ object PlayAdapter extends PlayAdapter(None) {

type AkkaPipe = Flow[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput], Any]

def convertHttpStreamingEndpoint[R, Input](
endpoint: ServerEndpoint.Full[
Unit,
Unit,
Input,
TapirResponse,
CalibanResponse[PekkoStreams.BinaryStream],
PekkoStreams,
RIO[R, *]
]
)(implicit runtime: Runtime[R], mat: Materializer): ServerEndpoint[PekkoStreams, Future] =
ServerEndpoint[
Unit,
Unit,
Input,
TapirResponse,
CalibanResponse[PekkoStreams.BinaryStream],
PekkoStreams,
Future
](
endpoint.endpoint,
_ => _ => Future.successful(Right(())),
_ =>
_ =>
req =>
Unsafe.unsafe { implicit u =>
runtime.unsafe.runToFuture(endpoint.logic(zioMonadError)(())(req))
}
)

def convertWebSocketEndpoint[R](
endpoint: ServerEndpoint.Full[
Unit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ import caliban._
import caliban.interop.tapir.TapirAdapter._
import sttp.capabilities.Streams
import sttp.model.{ headers => _, _ }
import sttp.shared.Identity
import sttp.tapir.Codec.JsonCodec
import sttp.tapir.model.ServerRequest
import sttp.tapir._
import sttp.tapir.server.ServerEndpoint
import zio._

import scala.concurrent.Future

sealed trait HttpInterpreter[-R, E] { self =>
protected def endpoints[S](streams: Streams[S]): List[
PublicEndpoint[(GraphQLRequest, ServerRequest), TapirResponse, CalibanResponse[streams.BinaryStream], S]
Expand All @@ -31,6 +35,24 @@ sealed trait HttpInterpreter[-R, E] { self =>
endpoints[S](streams).map(_.serverLogic(logic(_)))
}

def serverEndpointsFuture[S](streams: Streams[S])(runtime: Runtime[R])(implicit
streamConstructor: StreamConstructor[streams.BinaryStream]
): List[ServerEndpoint[S, Future]] = {
implicit val r: Runtime[R] = runtime
serverEndpoints(streams).map(
convertHttpEndpointToFuture[R, streams.BinaryStream, S, (GraphQLRequest, ServerRequest)]
)
}

def serverEndpointsIdentity[S](streams: Streams[S])(runtime: Runtime[R])(implicit
streamConstructor: StreamConstructor[streams.BinaryStream]
): List[ServerEndpoint[S, Identity]] = {
implicit val r: Runtime[R] = runtime
serverEndpoints(streams).map(
convertHttpEndpointToIdentity[R, streams.BinaryStream, S, (GraphQLRequest, ServerRequest)]
)
}

def intercept[R1](interceptor: Interceptor[R1, R]): HttpInterpreter[R1, E] =
HttpInterpreter.Intercepted(self, interceptor)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package caliban.interop.tapir

import caliban.Value.{ IntValue, StringValue }
import caliban._
import caliban.interop.tapir.TapirAdapter._
import caliban.uploads.{ FileMeta, GraphQLUploadRequest, Uploads }
import sttp.capabilities.Streams
import sttp.model._
import sttp.shared.Identity
import sttp.tapir.Codec.JsonCodec
import sttp.tapir._
import sttp.tapir.model.ServerRequest
import sttp.tapir.server.ServerEndpoint
import zio._

import java.nio.charset.StandardCharsets
import scala.util.Try
import scala.concurrent.Future

sealed trait HttpUploadInterpreter[-R, E] { self =>
protected def endpoint[S](
Expand Down Expand Up @@ -81,6 +82,24 @@ sealed trait HttpUploadInterpreter[-R, E] { self =>
endpoint(streams).serverLogic(logic(_))
}

def serverEndpointFuture[S](streams: Streams[S])(runtime: Runtime[R])(implicit
streamConstructor: StreamConstructor[streams.BinaryStream],
requestCodec: JsonCodec[GraphQLRequest],
mapCodec: JsonCodec[Map[String, Seq[String]]]
): ServerEndpoint[S, Future] = {
implicit val r: Runtime[R] = runtime
convertHttpEndpointToFuture[R, streams.BinaryStream, S, UploadRequest](serverEndpoint(streams))
}

def serverEndpointIdentity[S](streams: Streams[S])(runtime: Runtime[R])(implicit
streamConstructor: StreamConstructor[streams.BinaryStream],
requestCodec: JsonCodec[GraphQLRequest],
mapCodec: JsonCodec[Map[String, Seq[String]]]
): ServerEndpoint[S, Identity] = {
implicit val r: Runtime[R] = runtime
convertHttpEndpointToIdentity[R, streams.BinaryStream, S, UploadRequest](serverEndpoint(streams))
}

def intercept[R1](interceptor: Interceptor[R1, R]): HttpUploadInterpreter[R1, E] =
HttpUploadInterpreter.Intercepted(self, interceptor)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package caliban.interop.tapir

import sttp.capabilities.Streams
import zio.stream.ZStream

trait StreamConstructor[BS] {
Expand All @@ -9,7 +8,8 @@ trait StreamConstructor[BS] {

object StreamConstructor {
implicit val zioStreams: StreamConstructor[ZStream[Any, Throwable, Byte]] =
new StreamConstructor[ZStream[Any, Throwable, Byte]] {
override def apply(stream: ZStream[Any, Throwable, Byte]): ZStream[Any, Throwable, Byte] = stream
}
(stream: ZStream[Any, Throwable, Byte]) => stream

implicit val noStreams: StreamConstructor[Nothing] =
(_: ZStream[Any, Throwable, Byte]) => throw new UnsupportedOperationException("Streams are not supported")
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import sttp.capabilities.{ Streams, WebSockets }
import sttp.model.sse.ServerSentEvent
import sttp.model.{ headers => _, _ }
import sttp.monad.MonadError
import sttp.shared.Identity
import sttp.tapir.Codec.JsonCodec
import sttp.tapir.model.ServerRequest
import sttp.tapir.server.ServerEndpoint
Expand Down Expand Up @@ -197,24 +198,27 @@ object TapirAdapter {
) =
Left(response.toResponseValue(keepDataOnErrors, excludeExtensions))

def convertHttpEndpointToFuture[R](
endpoint: ServerEndpoint[ZioStreams, RIO[R, *]]
)(implicit runtime: Runtime[R]): ServerEndpoint[ZioStreams, Future] =
ServerEndpoint[
endpoint.SECURITY_INPUT,
endpoint.PRINCIPAL,
endpoint.INPUT,
endpoint.ERROR_OUTPUT,
endpoint.OUTPUT,
ZioStreams,
Future
](
private val rightUnit: Right[Nothing, Unit] = Right(())

private[caliban] def convertHttpEndpointToFuture[R, BS, S, I](
endpoint: ServerEndpoint.Full[Unit, Unit, I, TapirResponse, CalibanResponse[BS], S, RIO[R, *]]
)(implicit runtime: Runtime[R]): ServerEndpoint[S, Future] =
ServerEndpoint[Unit, Unit, I, TapirResponse, CalibanResponse[BS], S, Future](
endpoint.endpoint,
_ => _ => Future.successful(rightUnit),
_ =>
a => Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(endpoint.securityLogic(zioMonadError)(a)).future),
_ =>
req => Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(endpoint.logic(zioMonadError)(())(req)).future)
)

private[caliban] def convertHttpEndpointToIdentity[R, BS, S, I](
endpoint: ServerEndpoint.Full[Unit, Unit, I, TapirResponse, CalibanResponse[BS], S, RIO[R, *]]
)(implicit runtime: Runtime[R]): ServerEndpoint[S, Identity] =
ServerEndpoint[Unit, Unit, I, TapirResponse, CalibanResponse[BS], S, Identity](
endpoint.endpoint,
_ => _ => rightUnit,
_ =>
u =>
req => Unsafe.unsafe(implicit un => runtime.unsafe.runToFuture(endpoint.logic(zioMonadError)(u)(req)).future)
_ => req => Unsafe.unsafe(implicit u => runtime.unsafe.run(endpoint.logic(zioMonadError)(())(req)).getOrThrow())
)

def zioMonadError[R]: MonadError[RIO[R, *]] = new MonadError[RIO[R, *]] {
Expand Down

0 comments on commit 56a6879

Please sign in to comment.