From 56a687963e8247455335453565293053be63e41f Mon Sep 17 00:00:00 2001 From: Pierre Ricadat Date: Tue, 18 Jun 2024 18:14:41 +0900 Subject: [PATCH] Make it easier to use tapir with impure effects (#2282) * Make it easier to use tapir with impure effects * Simplify * Revert "Simplify" This reverts commit 1a0ccdeeeff87500ac36b85c21a6ac74dace104e. * Make package private + mima --------- Co-authored-by: kyri-petrou <67301607+kyri-petrou@users.noreply.github.com> --- .../main/scala/caliban/AkkaHttpAdapter.scala | 29 ++------------ .../main/scala/caliban/PekkoHttpAdapter.scala | 35 +++-------------- .../src/main/scala/caliban/PlayAdapter.scala | 39 +------------------ .../interop/tapir/HttpInterpreter.scala | 22 +++++++++++ .../interop/tapir/HttpUploadInterpreter.scala | 23 ++++++++++- .../interop/tapir/StreamConstructor.scala | 8 ++-- .../caliban/interop/tapir/TapirAdapter.scala | 34 +++++++++------- 7 files changed, 77 insertions(+), 113 deletions(-) diff --git a/adapters/akka-http/src/main/scala/caliban/AkkaHttpAdapter.scala b/adapters/akka-http/src/main/scala/caliban/AkkaHttpAdapter.scala index 43ce2935e7..a0715b9b84 100644 --- a/adapters/akka-http/src/main/scala/caliban/AkkaHttpAdapter.scala +++ b/adapters/akka-http/src/main/scala/caliban/AkkaHttpAdapter.scala @@ -10,7 +10,7 @@ import caliban.interop.tapir.{ HttpInterpreter, HttpUploadInterpreter, StreamCon import sttp.capabilities.WebSockets import sttp.capabilities.akka.AkkaStreams import sttp.capabilities.akka.AkkaStreams.Pipe -import sttp.model.{ MediaType, StatusCode } +import sttp.model.StatusCode import sttp.tapir.Codec.JsonCodec import sttp.tapir.PublicEndpoint import sttp.tapir.model.ServerRequest @@ -27,11 +27,7 @@ class AkkaHttpAdapter private (private val options: AkkaHttpServerOptions)(impli def makeHttpService[R, E]( interpreter: HttpInterpreter[R, E] )(implicit runtime: Runtime[R], materializer: Materializer): Route = - akkaInterpreter.toRoute( - interpreter - .serverEndpoints[R, AkkaStreams](AkkaStreams) - .map(convertHttpStreamingEndpoint[R, (GraphQLRequest, ServerRequest)]) - ) + akkaInterpreter.toRoute(interpreter.serverEndpointsFuture[AkkaStreams](AkkaStreams)(runtime)) def makeHttpUploadService[R, E](interpreter: HttpUploadInterpreter[R, E])(implicit runtime: Runtime[R], @@ -39,7 +35,7 @@ class AkkaHttpAdapter private (private val options: AkkaHttpServerOptions)(impli requestCodec: JsonCodec[GraphQLRequest], mapCodec: JsonCodec[Map[String, Seq[String]]] ): Route = - akkaInterpreter.toRoute(convertHttpStreamingEndpoint(interpreter.serverEndpoint[R, AkkaStreams](AkkaStreams))) + akkaInterpreter.toRoute(interpreter.serverEndpointFuture[AkkaStreams](AkkaStreams)(runtime)) def makeWebSocketService[R, E]( interpreter: WebSocketInterpreter[R, E] @@ -107,25 +103,6 @@ object AkkaHttpAdapter { type AkkaPipe = Flow[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput], Any] - def convertHttpStreamingEndpoint[R, I]( - endpoint: ServerEndpoint.Full[Unit, Unit, I, TapirResponse, CalibanResponse[ - AkkaStreams.BinaryStream - ], AkkaStreams, RIO[R, *]] - )(implicit runtime: Runtime[R]): ServerEndpoint[AkkaStreams, Future] = - ServerEndpoint[ - Unit, - Unit, - I, - TapirResponse, - CalibanResponse[AkkaStreams.BinaryStream], - AkkaStreams, - Future - ]( - endpoint.endpoint, - _ => _ => Future.successful(Right(())), - _ => _ => req => Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(endpoint.logic(zioMonadError)(())(req))) - ) - def convertWebSocketEndpoint[R]( endpoint: ServerEndpoint.Full[ Unit, diff --git a/adapters/pekko-http/src/main/scala/caliban/PekkoHttpAdapter.scala b/adapters/pekko-http/src/main/scala/caliban/PekkoHttpAdapter.scala index bc4caa0076..d0f2c519f4 100644 --- a/adapters/pekko-http/src/main/scala/caliban/PekkoHttpAdapter.scala +++ b/adapters/pekko-http/src/main/scala/caliban/PekkoHttpAdapter.scala @@ -1,16 +1,16 @@ package caliban +import caliban.PekkoHttpAdapter._ +import caliban.interop.tapir.TapirAdapter._ +import caliban.interop.tapir.{ HttpInterpreter, HttpUploadInterpreter, StreamConstructor, WebSocketInterpreter } import org.apache.pekko.http.scaladsl.server.Route import org.apache.pekko.stream.scaladsl.{ Flow, Sink, Source } import org.apache.pekko.stream.{ Materializer, OverflowStrategy } import org.apache.pekko.util.ByteString -import caliban.PekkoHttpAdapter._ -import caliban.interop.tapir.TapirAdapter._ -import caliban.interop.tapir.{ HttpInterpreter, HttpUploadInterpreter, StreamConstructor, WebSocketInterpreter } import sttp.capabilities.WebSockets import sttp.capabilities.pekko.PekkoStreams import sttp.capabilities.pekko.PekkoStreams.Pipe -import sttp.model.{ MediaType, StatusCode } +import sttp.model.StatusCode import sttp.tapir.Codec.JsonCodec import sttp.tapir.PublicEndpoint import sttp.tapir.model.ServerRequest @@ -27,11 +27,7 @@ class PekkoHttpAdapter private (val options: PekkoHttpServerOptions)(implicit ec def makeHttpService[R, E]( interpreter: HttpInterpreter[R, E] )(implicit runtime: Runtime[R], materializer: Materializer): Route = - pekkoInterpreter.toRoute( - interpreter - .serverEndpoints[R, PekkoStreams](PekkoStreams) - .map(convertHttpStreamingEndpoint[R, (GraphQLRequest, ServerRequest)]) - ) + pekkoInterpreter.toRoute(interpreter.serverEndpointsFuture[PekkoStreams](PekkoStreams)(runtime)) def makeHttpUploadService[R, E](interpreter: HttpUploadInterpreter[R, E])(implicit runtime: Runtime[R], @@ -39,7 +35,7 @@ class PekkoHttpAdapter private (val options: PekkoHttpServerOptions)(implicit ec requestCodec: JsonCodec[GraphQLRequest], mapCodec: JsonCodec[Map[String, Seq[String]]] ): Route = - pekkoInterpreter.toRoute(convertHttpStreamingEndpoint(interpreter.serverEndpoint[R, PekkoStreams](PekkoStreams))) + pekkoInterpreter.toRoute(interpreter.serverEndpointFuture[PekkoStreams](PekkoStreams)(runtime)) def makeWebSocketService[R, E]( interpreter: WebSocketInterpreter[R, E] @@ -107,25 +103,6 @@ object PekkoHttpAdapter { type PekkoPipe = Flow[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput], Any] - def convertHttpStreamingEndpoint[R, I]( - endpoint: ServerEndpoint.Full[Unit, Unit, I, TapirResponse, CalibanResponse[ - PekkoStreams.BinaryStream - ], PekkoStreams, RIO[R, *]] - )(implicit runtime: Runtime[R]): ServerEndpoint[PekkoStreams, Future] = - ServerEndpoint[ - Unit, - Unit, - I, - TapirResponse, - CalibanResponse[PekkoStreams.BinaryStream], - PekkoStreams, - Future - ]( - endpoint.endpoint, - _ => _ => Future.successful(Right(())), - _ => _ => req => Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(endpoint.logic(zioMonadError)(())(req))) - ) - def convertWebSocketEndpoint[R]( endpoint: ServerEndpoint.Full[ Unit, diff --git a/adapters/play/src/main/scala/caliban/PlayAdapter.scala b/adapters/play/src/main/scala/caliban/PlayAdapter.scala index 4d929b0dd5..f2aaee6d93 100644 --- a/adapters/play/src/main/scala/caliban/PlayAdapter.scala +++ b/adapters/play/src/main/scala/caliban/PlayAdapter.scala @@ -1,6 +1,5 @@ package caliban -import caliban.PlayAdapter.convertHttpStreamingEndpoint import caliban.interop.tapir.TapirAdapter._ import caliban.interop.tapir.{ HttpInterpreter, HttpUploadInterpreter, StreamConstructor, WebSocketInterpreter } import org.apache.pekko.stream.scaladsl.{ Flow, Sink, Source } @@ -28,11 +27,7 @@ class PlayAdapter private (private val options: Option[PlayServerOptions]) { def makeHttpService[R, E]( interpreter: HttpInterpreter[R, E] )(implicit runtime: Runtime[R], materializer: Materializer): Routes = - playInterpreter.toRoutes( - interpreter - .serverEndpoints[R, PekkoStreams](PekkoStreams) - .map(convertHttpStreamingEndpoint[R, (GraphQLRequest, ServerRequest)]) - ) + playInterpreter.toRoutes(interpreter.serverEndpointsFuture[PekkoStreams](PekkoStreams)(runtime)) def makeHttpUploadService[R, E](interpreter: HttpUploadInterpreter[R, E])(implicit runtime: Runtime[R], @@ -40,7 +35,7 @@ class PlayAdapter private (private val options: Option[PlayServerOptions]) { requestCodec: JsonCodec[GraphQLRequest], mapCodec: JsonCodec[Map[String, Seq[String]]] ): Routes = - playInterpreter.toRoutes(convertHttpStreamingEndpoint(interpreter.serverEndpoint[R, PekkoStreams](PekkoStreams))) + playInterpreter.toRoutes(interpreter.serverEndpointFuture[PekkoStreams](PekkoStreams)(runtime)) def makeWebSocketService[R, E]( interpreter: WebSocketInterpreter[R, E] @@ -105,36 +100,6 @@ object PlayAdapter extends PlayAdapter(None) { type AkkaPipe = Flow[GraphQLWSInput, Either[GraphQLWSClose, GraphQLWSOutput], Any] - def convertHttpStreamingEndpoint[R, Input]( - endpoint: ServerEndpoint.Full[ - Unit, - Unit, - Input, - TapirResponse, - CalibanResponse[PekkoStreams.BinaryStream], - PekkoStreams, - RIO[R, *] - ] - )(implicit runtime: Runtime[R], mat: Materializer): ServerEndpoint[PekkoStreams, Future] = - ServerEndpoint[ - Unit, - Unit, - Input, - TapirResponse, - CalibanResponse[PekkoStreams.BinaryStream], - PekkoStreams, - Future - ]( - endpoint.endpoint, - _ => _ => Future.successful(Right(())), - _ => - _ => - req => - Unsafe.unsafe { implicit u => - runtime.unsafe.runToFuture(endpoint.logic(zioMonadError)(())(req)) - } - ) - def convertWebSocketEndpoint[R]( endpoint: ServerEndpoint.Full[ Unit, diff --git a/interop/tapir/src/main/scala/caliban/interop/tapir/HttpInterpreter.scala b/interop/tapir/src/main/scala/caliban/interop/tapir/HttpInterpreter.scala index 6c0ce5f231..ac8e6b7fc5 100644 --- a/interop/tapir/src/main/scala/caliban/interop/tapir/HttpInterpreter.scala +++ b/interop/tapir/src/main/scala/caliban/interop/tapir/HttpInterpreter.scala @@ -4,11 +4,15 @@ import caliban._ import caliban.interop.tapir.TapirAdapter._ import sttp.capabilities.Streams import sttp.model.{ headers => _, _ } +import sttp.shared.Identity import sttp.tapir.Codec.JsonCodec import sttp.tapir.model.ServerRequest import sttp.tapir._ +import sttp.tapir.server.ServerEndpoint import zio._ +import scala.concurrent.Future + sealed trait HttpInterpreter[-R, E] { self => protected def endpoints[S](streams: Streams[S]): List[ PublicEndpoint[(GraphQLRequest, ServerRequest), TapirResponse, CalibanResponse[streams.BinaryStream], S] @@ -31,6 +35,24 @@ sealed trait HttpInterpreter[-R, E] { self => endpoints[S](streams).map(_.serverLogic(logic(_))) } + def serverEndpointsFuture[S](streams: Streams[S])(runtime: Runtime[R])(implicit + streamConstructor: StreamConstructor[streams.BinaryStream] + ): List[ServerEndpoint[S, Future]] = { + implicit val r: Runtime[R] = runtime + serverEndpoints(streams).map( + convertHttpEndpointToFuture[R, streams.BinaryStream, S, (GraphQLRequest, ServerRequest)] + ) + } + + def serverEndpointsIdentity[S](streams: Streams[S])(runtime: Runtime[R])(implicit + streamConstructor: StreamConstructor[streams.BinaryStream] + ): List[ServerEndpoint[S, Identity]] = { + implicit val r: Runtime[R] = runtime + serverEndpoints(streams).map( + convertHttpEndpointToIdentity[R, streams.BinaryStream, S, (GraphQLRequest, ServerRequest)] + ) + } + def intercept[R1](interceptor: Interceptor[R1, R]): HttpInterpreter[R1, E] = HttpInterpreter.Intercepted(self, interceptor) diff --git a/interop/tapir/src/main/scala/caliban/interop/tapir/HttpUploadInterpreter.scala b/interop/tapir/src/main/scala/caliban/interop/tapir/HttpUploadInterpreter.scala index d6109aadfd..4b9ec400ac 100644 --- a/interop/tapir/src/main/scala/caliban/interop/tapir/HttpUploadInterpreter.scala +++ b/interop/tapir/src/main/scala/caliban/interop/tapir/HttpUploadInterpreter.scala @@ -1,18 +1,19 @@ package caliban.interop.tapir -import caliban.Value.{ IntValue, StringValue } import caliban._ import caliban.interop.tapir.TapirAdapter._ import caliban.uploads.{ FileMeta, GraphQLUploadRequest, Uploads } import sttp.capabilities.Streams import sttp.model._ +import sttp.shared.Identity import sttp.tapir.Codec.JsonCodec import sttp.tapir._ import sttp.tapir.model.ServerRequest +import sttp.tapir.server.ServerEndpoint import zio._ import java.nio.charset.StandardCharsets -import scala.util.Try +import scala.concurrent.Future sealed trait HttpUploadInterpreter[-R, E] { self => protected def endpoint[S]( @@ -81,6 +82,24 @@ sealed trait HttpUploadInterpreter[-R, E] { self => endpoint(streams).serverLogic(logic(_)) } + def serverEndpointFuture[S](streams: Streams[S])(runtime: Runtime[R])(implicit + streamConstructor: StreamConstructor[streams.BinaryStream], + requestCodec: JsonCodec[GraphQLRequest], + mapCodec: JsonCodec[Map[String, Seq[String]]] + ): ServerEndpoint[S, Future] = { + implicit val r: Runtime[R] = runtime + convertHttpEndpointToFuture[R, streams.BinaryStream, S, UploadRequest](serverEndpoint(streams)) + } + + def serverEndpointIdentity[S](streams: Streams[S])(runtime: Runtime[R])(implicit + streamConstructor: StreamConstructor[streams.BinaryStream], + requestCodec: JsonCodec[GraphQLRequest], + mapCodec: JsonCodec[Map[String, Seq[String]]] + ): ServerEndpoint[S, Identity] = { + implicit val r: Runtime[R] = runtime + convertHttpEndpointToIdentity[R, streams.BinaryStream, S, UploadRequest](serverEndpoint(streams)) + } + def intercept[R1](interceptor: Interceptor[R1, R]): HttpUploadInterpreter[R1, E] = HttpUploadInterpreter.Intercepted(self, interceptor) diff --git a/interop/tapir/src/main/scala/caliban/interop/tapir/StreamConstructor.scala b/interop/tapir/src/main/scala/caliban/interop/tapir/StreamConstructor.scala index bb23d24e0a..39da8d1719 100644 --- a/interop/tapir/src/main/scala/caliban/interop/tapir/StreamConstructor.scala +++ b/interop/tapir/src/main/scala/caliban/interop/tapir/StreamConstructor.scala @@ -1,6 +1,5 @@ package caliban.interop.tapir -import sttp.capabilities.Streams import zio.stream.ZStream trait StreamConstructor[BS] { @@ -9,7 +8,8 @@ trait StreamConstructor[BS] { object StreamConstructor { implicit val zioStreams: StreamConstructor[ZStream[Any, Throwable, Byte]] = - new StreamConstructor[ZStream[Any, Throwable, Byte]] { - override def apply(stream: ZStream[Any, Throwable, Byte]): ZStream[Any, Throwable, Byte] = stream - } + (stream: ZStream[Any, Throwable, Byte]) => stream + + implicit val noStreams: StreamConstructor[Nothing] = + (_: ZStream[Any, Throwable, Byte]) => throw new UnsupportedOperationException("Streams are not supported") } diff --git a/interop/tapir/src/main/scala/caliban/interop/tapir/TapirAdapter.scala b/interop/tapir/src/main/scala/caliban/interop/tapir/TapirAdapter.scala index cb942053d6..a97e00ef56 100644 --- a/interop/tapir/src/main/scala/caliban/interop/tapir/TapirAdapter.scala +++ b/interop/tapir/src/main/scala/caliban/interop/tapir/TapirAdapter.scala @@ -8,6 +8,7 @@ import sttp.capabilities.{ Streams, WebSockets } import sttp.model.sse.ServerSentEvent import sttp.model.{ headers => _, _ } import sttp.monad.MonadError +import sttp.shared.Identity import sttp.tapir.Codec.JsonCodec import sttp.tapir.model.ServerRequest import sttp.tapir.server.ServerEndpoint @@ -197,24 +198,27 @@ object TapirAdapter { ) = Left(response.toResponseValue(keepDataOnErrors, excludeExtensions)) - def convertHttpEndpointToFuture[R]( - endpoint: ServerEndpoint[ZioStreams, RIO[R, *]] - )(implicit runtime: Runtime[R]): ServerEndpoint[ZioStreams, Future] = - ServerEndpoint[ - endpoint.SECURITY_INPUT, - endpoint.PRINCIPAL, - endpoint.INPUT, - endpoint.ERROR_OUTPUT, - endpoint.OUTPUT, - ZioStreams, - Future - ]( + private val rightUnit: Right[Nothing, Unit] = Right(()) + + private[caliban] def convertHttpEndpointToFuture[R, BS, S, I]( + endpoint: ServerEndpoint.Full[Unit, Unit, I, TapirResponse, CalibanResponse[BS], S, RIO[R, *]] + )(implicit runtime: Runtime[R]): ServerEndpoint[S, Future] = + ServerEndpoint[Unit, Unit, I, TapirResponse, CalibanResponse[BS], S, Future]( endpoint.endpoint, + _ => _ => Future.successful(rightUnit), _ => - a => Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(endpoint.securityLogic(zioMonadError)(a)).future), + _ => + req => Unsafe.unsafe(implicit u => runtime.unsafe.runToFuture(endpoint.logic(zioMonadError)(())(req)).future) + ) + + private[caliban] def convertHttpEndpointToIdentity[R, BS, S, I]( + endpoint: ServerEndpoint.Full[Unit, Unit, I, TapirResponse, CalibanResponse[BS], S, RIO[R, *]] + )(implicit runtime: Runtime[R]): ServerEndpoint[S, Identity] = + ServerEndpoint[Unit, Unit, I, TapirResponse, CalibanResponse[BS], S, Identity]( + endpoint.endpoint, + _ => _ => rightUnit, _ => - u => - req => Unsafe.unsafe(implicit un => runtime.unsafe.runToFuture(endpoint.logic(zioMonadError)(u)(req)).future) + _ => req => Unsafe.unsafe(implicit u => runtime.unsafe.run(endpoint.logic(zioMonadError)(())(req)).getOrThrow()) ) def zioMonadError[R]: MonadError[RIO[R, *]] = new MonadError[RIO[R, *]] {