diff --git a/.circleci/config.yml b/.circleci/config.yml index c72e67d985..2a7d0e754b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -63,7 +63,7 @@ jobs: - checkout - restore_cache: key: sbtcache - - run: sbt ++3.0.1! core/test catsInterop/compile monixInterop/compile clientJVM/test clientJS/compile zioHttp/compile tapirInterop/test + - run: sbt ++3.0.1! core/test catsInterop/compile monixInterop/compile clientJVM/test clientJS/compile zioHttp/compile tapirInterop/test http4s/test - save_cache: key: sbtcache paths: diff --git a/adapters/http4s/src/main/scala/caliban/Http4sAdapter.scala b/adapters/http4s/src/main/scala/caliban/Http4sAdapter.scala index f0175fc9ac..2b76217a43 100644 --- a/adapters/http4s/src/main/scala/caliban/Http4sAdapter.scala +++ b/adapters/http4s/src/main/scala/caliban/Http4sAdapter.scala @@ -3,16 +3,17 @@ package caliban import caliban.ResponseValue.{ ObjectValue, StreamValue } import caliban.Value.NullValue import caliban.execution.QueryExecution +import caliban.interop.cats.CatsInterop import caliban.uploads._ -import cats.arrow.FunctionK import cats.data.{ Kleisli, OptionT } import cats.syntax.either._ import cats.syntax.traverse._ -import cats.effect.{ Blocker, Effect } -import cats.effect.syntax.all._ +import cats.effect.kernel.Async +import cats.effect.std.{ Dispatcher, Queue => CatsQueue } import cats.~> import fs2.{ Pipe, Stream } import fs2.text.utf8Decode +import fs2.io.file.Files import io.circe.Decoder.Result import io.circe.{ DecodingFailure, Json } import io.circe.parser._ @@ -29,9 +30,10 @@ import org.http4s.multipart.{ Multipart, Part } import org.typelevel.ci.CIString import zio.Exit.Failure import zio._ +import zio.blocking.Blocking import zio.clock.Clock import zio.duration.Duration -import zio.interop.catz._ +import zio.interop.catz.concurrentInstance import zio.random.Random import java.io.File @@ -114,10 +116,9 @@ object Http4sAdapter { private def parsePaths(map: Map[String, Seq[String]]): List[(String, List[Either[String, Int]])] = map.map { case (k, v) => k -> v.map(parsePath).toList }.toList.flatMap(kv => kv._2.map(kv._1 -> _)) - def makeHttpUploadService[R <: Has[_] with Random, E]( + def makeHttpUploadService[R <: Has[_] with Random with Clock with Blocking, E]( interpreter: GraphQLInterpreter[R, E], rootUploadPath: Path, - blocker: Blocker, skipValidation: Boolean = false, enableIntrospection: Boolean = true, queryExecution: QueryExecution = QueryExecution.Parallel @@ -127,6 +128,7 @@ object Http4sAdapter { HttpRoutes.of[RIO[R, *]] { case req @ POST -> Root if req.contentType.exists(_.mediaType.isMultipart) => + import zio.interop.catz.asyncInstance def getFileRefs( parts: Vector[Part[RIO[R, *]]] )(random: Random.Service): RIO[R, Map[String, (File, Part[RIO[R, *]])]] = @@ -137,14 +139,9 @@ object Http4sAdapter { random.nextUUID.flatMap { uuid => val path = rootUploadPath.resolve(uuid.toString) p.body - .through( - fs2.io.file.writeAll( - path, - blocker - ) - ) + .through(Files[RIO[R, *]].writeAll(path)) .compile - .foldMonoid + .drain .as((n, path.toFile -> p)) } } @@ -339,12 +336,12 @@ object Http4sAdapter { import dsl._ def sendMessage( - sendQueue: fs2.concurrent.Queue[Task, WebSocketFrame], + sendQueue: CatsQueue[Task, WebSocketFrame], messageType: String, id: String, payload: Json ): RIO[R, Unit] = - sendQueue.enqueue1( + sendQueue.offer( WebSocketFrame.Text( Json .obj( @@ -357,92 +354,97 @@ object Http4sAdapter { ) def processMessage( - receivingQueue: fs2.concurrent.Queue[Task, WebSocketFrame], - sendQueue: fs2.concurrent.Queue[Task, WebSocketFrame], + receivingQueue: CatsQueue[Task, WebSocketFrame], + sendQueue: CatsQueue[Task, WebSocketFrame], subscriptions: Ref[Map[String, Fiber[Throwable, Unit]]] - ) = - receivingQueue.dequeue.collect { case Text(text, _) => text }.flatMap { text => - Stream.eval { - for { - msg <- Task.fromEither(decode[Json](text)) - msgType = msg.hcursor.downField("type").success.flatMap(_.value.asString).getOrElse("") - _ <- RIO.whenCase(msgType) { - case "connection_init" => - sendQueue.enqueue1(WebSocketFrame.Text("""{"type":"connection_ack"}""")) *> - Task.whenCase(keepAliveTime) { case Some(time) => - // Save the keep-alive fiber with a key of None so that it's interrupted later - sendQueue - .enqueue1(WebSocketFrame.Text("""{"type":"ka"}""")) - .repeat(Schedule.spaced(time)) - .provideLayer(Clock.live) - .unit - .fork - } - case "connection_terminate" => Task.fromEither(WebSocketFrame.Close(1000)) >>= sendQueue.enqueue1 - case "start" => - val payload = msg.hcursor.downField("payload") - val id = msg.hcursor.downField("id").success.flatMap(_.value.asString).getOrElse("") - RIO.whenCase(payload.as[GraphQLRequest]) { case Right(req) => - (for { - result <- interpreter.executeRequest( - req, - skipValidation = skipValidation, - enableIntrospection = enableIntrospection, - queryExecution - ) - _ <- result.data match { - case ObjectValue((fieldName, StreamValue(stream)) :: Nil) => - stream.foreach { item => - sendMessage( - sendQueue, - "data", - id, - GraphQLResponse(ObjectValue(List(fieldName -> item)), result.errors).asJson - ) - }.onExit { - case Failure(cause) if !cause.interrupted => + ): RIO[R, Unit] = + Stream + .repeatEval(receivingQueue.take) + .collect { case Text(text, _) => text } + .flatMap { text => + Stream.eval { + for { + msg <- RIO.fromEither(decode[Json](text)) + msgType = msg.hcursor.downField("type").success.flatMap(_.value.asString).getOrElse("") + _ <- RIO.whenCase[R, String](msgType) { + case "connection_init" => + sendQueue.offer(WebSocketFrame.Text("""{"type":"connection_ack"}""")) *> + RIO.whenCase(keepAliveTime) { case Some(time) => + // Save the keep-alive fiber with a key of None so that it's interrupted later + sendQueue + .offer(WebSocketFrame.Text("""{"type":"ka"}""")) + .repeat(Schedule.spaced(time)) + .provideLayer(Clock.live) + .unit + .fork + } + case "connection_terminate" => RIO.fromEither(WebSocketFrame.Close(1000)) >>= sendQueue.offer + case "start" => + val payload = msg.hcursor.downField("payload") + val id = msg.hcursor.downField("id").success.flatMap(_.value.asString).getOrElse("") + RIO.whenCase(payload.as[GraphQLRequest]) { case Right(req) => + (for { + result <- interpreter.executeRequest( + req, + skipValidation = skipValidation, + enableIntrospection = enableIntrospection, + queryExecution + ) + _ <- result.data match { + case ObjectValue((fieldName, StreamValue(stream)) :: Nil) => + stream.foreach { item => sendMessage( sendQueue, - "error", + "data", id, - Json.obj("message" -> Json.fromString(cause.squash.toString)) - ).orDie - case _ => - sendQueue - .enqueue1(WebSocketFrame.Text(s"""{"type":"complete","id":"$id"}""")) - .orDie - }.fork - .flatMap(fiber => subscriptions.update(_.updated(id, fiber))) - case other => - sendMessage(sendQueue, "data", id, GraphQLResponse(other, result.errors).asJson) *> - sendQueue.enqueue1(WebSocketFrame.Text(s"""{"type":"complete","id":"$id"}""")) - } - } yield ()).catchAll(error => - sendMessage(sendQueue, "error", id, Json.obj("message" -> Json.fromString(error.toString))) - ) - } - case "stop" => - val id = msg.hcursor.downField("id").success.flatMap(_.value.asString).getOrElse("") - subscriptions - .modify(map => (map.get(id), map - id)) - .flatMap(fiber => - IO.whenCase(fiber) { case Some(fiber) => - fiber.interrupt - } - ) - } - } yield () + GraphQLResponse(ObjectValue(List(fieldName -> item)), result.errors).asJson + ) + }.onExit { + case Failure(cause) if !cause.interrupted => + sendMessage( + sendQueue, + "error", + id, + Json.obj("message" -> Json.fromString(cause.squash.toString)) + ).orDie + case _ => + sendQueue + .offer(WebSocketFrame.Text(s"""{"type":"complete","id":"$id"}""")) + .orDie + }.fork + .flatMap(fiber => subscriptions.update(_.updated(id, fiber))) + case other => + sendMessage(sendQueue, "data", id, GraphQLResponse(other, result.errors).asJson) *> + sendQueue.offer(WebSocketFrame.Text(s"""{"type":"complete","id":"$id"}""")) + } + } yield ()).catchAll(error => + sendMessage(sendQueue, "error", id, Json.obj("message" -> Json.fromString(error.toString))) + ) + } + case "stop" => + val id = msg.hcursor.downField("id").success.flatMap(_.value.asString).getOrElse("") + subscriptions + .modify(map => (map.get(id), map - id)) + .flatMap(fiber => + IO.whenCase(fiber) { case Some(fiber) => + fiber.interrupt + } + ) + } + } yield () + } } - }.compile.drain + .compile + .drain def passThroughPipe( - receivingQueue: fs2.concurrent.Queue[Task, WebSocketFrame] - ): Pipe[RIO[R, *], WebSocketFrame, Unit] = _.evalMap(receivingQueue.enqueue1) + receivingQueue: CatsQueue[Task, WebSocketFrame] + ): Pipe[RIO[R, *], WebSocketFrame, Unit] = _.evalMap(receivingQueue.offer) HttpRoutes.of[RIO[R, *]] { case GET -> Root => for { - receivingQueue <- fs2.concurrent.Queue.unbounded[Task, WebSocketFrame] - sendQueue <- fs2.concurrent.Queue.unbounded[Task, WebSocketFrame] + receivingQueue <- CatsQueue.unbounded[Task, WebSocketFrame] + sendQueue <- CatsQueue.unbounded[Task, WebSocketFrame] subscriptions <- Ref.make(Map.empty[String, Fiber[Throwable, Unit]]) // We provide fiber to process messages, which inherits the context of WebSocket connection request, // so that we can pass information available at connection request, such as authentication information, @@ -453,7 +455,7 @@ object Http4sAdapter { headers = Headers(Header.Raw(CIString("Sec-WebSocket-Protocol"), "graphql-ws")), onClose = processMessageFiber.interrupt.unit ) - .build(sendQueue.dequeue, passThroughPipe(receivingQueue)) + .build(Stream.repeatEval(sendQueue.take), passThroughPipe(receivingQueue)) } yield builder } } @@ -469,9 +471,15 @@ object Http4sAdapter { def provideLayerFromRequest[R <: Has[_]](route: HttpRoutes[RIO[R, *]], f: Request[Task] => TaskLayer[R])(implicit tagged: Tag[R] ): HttpRoutes[Task] = - Kleisli { req: Request[Task[*]] => - val to: Task ~> RIO[R, *] = FunctionK.lift[Task, RIO[R, *]](identity) - val from: RIO[R, *] ~> Task = λ[FunctionK[RIO[R, *], Task]](_.provideLayer(f(req))) + Kleisli { (req: Request[Task[*]]) => + val to: Task ~> RIO[R, *] = new (Task ~> RIO[R, *]) { + def apply[A](fa: Task[A]): RIO[R, A] = fa + } + + val from: RIO[R, *] ~> Task = new (RIO[R, *] ~> Task) { + def apply[A](fa: RIO[R, A]): Task[A] = fa.provideLayer(f(req)) + } + route(req.mapK(to)).mapK(from).map(_.mapK(from)) } @@ -488,25 +496,38 @@ object Http4sAdapter { route: HttpRoutes[RIO[R with R1, *]], f: Request[RIO[R, *]] => RLayer[R, R1] )(implicit tagged: Tag[R1]): HttpRoutes[RIO[R, *]] = - Kleisli { req: Request[RIO[R, *]] => - val to: RIO[R, *] ~> RIO[R with R1, *] = FunctionK.lift[RIO[R, *], RIO[R with R1, *]](identity) - val from: RIO[R with R1, *] ~> RIO[R, *] = - λ[FunctionK[RIO[R with R1, *], RIO[R, *]]](_.provideSomeLayer[R](f(req))) + Kleisli { (req: Request[RIO[R, *]]) => + val to: RIO[R, *] ~> RIO[R with R1, *] = new (RIO[R, *] ~> RIO[R with R1, *]) { + def apply[A](fa: RIO[R, A]): RIO[R with R1, A] = fa + } + + val from: RIO[R with R1, *] ~> RIO[R, *] = new (RIO[R with R1, *] ~> RIO[R, *]) { + def apply[A](fa: RIO[R with R1, A]): RIO[R, A] = fa.provideSomeLayer[R](f(req)) + } + route(req.mapK(to)).mapK(from).map(_.mapK(from)) } - private def wrapRoute[F[_]: Effect, R](route: HttpRoutes[RIO[R, *]])(implicit runtime: Runtime[R]): HttpRoutes[F] = { - val toF: RIO[R, *] ~> F = λ[RIO[R, *] ~> F](_.toIO.to[F]) - val toRIO: F ~> RIO[R, *] = λ[F ~> RIO[R, *]](_.toIO.to[RIO[R, *]]) + private def wrapRoute[F[_]: Async, R]( + route: HttpRoutes[RIO[R, *]] + )(implicit dispatcher: Dispatcher[F], runtime: Runtime[R]): HttpRoutes[F] = { + val toF: RIO[R, *] ~> F = CatsInterop.toEffectK + val toRIO: F ~> RIO[R, *] = CatsInterop.fromEffectK + + val to: OptionT[RIO[R, *], *] ~> OptionT[F, *] = new (OptionT[RIO[R, *], *] ~> OptionT[F, *]) { + def apply[A](fa: OptionT[RIO[R, *], A]): OptionT[F, A] = fa.mapK(toF) + } route - .mapK(λ[OptionT[RIO[R, *], *] ~> OptionT[F, *]](_.mapK(toF))) + .mapK(to) .dimap((req: Request[F]) => req.mapK(toRIO))((res: Response[RIO[R, *]]) => res.mapK(toF)) } - private def wrapApp[F[_]: Effect, R](app: HttpApp[RIO[R, *]])(implicit runtime: Runtime[R]): HttpApp[F] = { - val toF: RIO[R, *] ~> F = λ[RIO[R, *] ~> F](_.toIO.to[F]) - val toRIO: F ~> RIO[R, *] = λ[F ~> RIO[R, *]](_.toIO.to[RIO[R, *]]) + private def wrapApp[F[_]: Async, R]( + app: HttpApp[RIO[R, *]] + )(implicit dispatcher: Dispatcher[F], runtime: Runtime[R]): HttpApp[F] = { + val toF: RIO[R, *] ~> F = CatsInterop.toEffectK + val toRIO: F ~> RIO[R, *] = CatsInterop.fromEffectK app .mapK(toF) @@ -519,7 +540,7 @@ object Http4sAdapter { enableIntrospection: Boolean = true, keepAliveTime: Option[Duration] = None, queryExecution: QueryExecution = QueryExecution.Parallel - )(implicit F: Effect[F], runtime: Runtime[R]): HttpRoutes[F] = + )(implicit F: Async[F], dispatcher: Dispatcher[F], runtime: Runtime[R]): HttpRoutes[F] = wrapRoute( makeWebSocketService[R, E]( interpreter, @@ -533,7 +554,7 @@ object Http4sAdapter { @deprecated("Use makeHttpServiceF instead", "0.4.0") def makeRestServiceF[F[_], E]( interpreter: GraphQLInterpreter[Any, E] - )(implicit F: Effect[F], runtime: Runtime[Any]): HttpRoutes[F] = + )(implicit F: Async[F], dispatcher: Dispatcher[F], runtime: Runtime[Any]): HttpRoutes[F] = makeHttpServiceF(interpreter) def makeHttpServiceF[F[_], R, E]( @@ -541,7 +562,7 @@ object Http4sAdapter { skipValidation: Boolean = false, enableIntrospection: Boolean = true, queryExecution: QueryExecution = QueryExecution.Parallel - )(implicit F: Effect[F], runtime: Runtime[R]): HttpRoutes[F] = + )(implicit F: Async[F], dispatcher: Dispatcher[F], runtime: Runtime[R]): HttpRoutes[F] = wrapRoute( makeHttpService[R, E]( interpreter, @@ -556,7 +577,7 @@ object Http4sAdapter { skipValidation: Boolean = false, enableIntrospection: Boolean = true, queryExecution: QueryExecution = QueryExecution.Parallel - )(implicit F: Effect[F], runtime: Runtime[R]): HttpApp[F] = + )(implicit F: Async[F], dispatcher: Dispatcher[F], runtime: Runtime[R]): HttpApp[F] = wrapApp( executeRequest[R, R, E]( interpreter, diff --git a/adapters/http4s/src/test/scala/caliban/Http4sAdapterSpec.scala b/adapters/http4s/src/test/scala/caliban/Http4sAdapterSpec.scala index 65fa330919..d674947142 100644 --- a/adapters/http4s/src/test/scala/caliban/Http4sAdapterSpec.scala +++ b/adapters/http4s/src/test/scala/caliban/Http4sAdapterSpec.scala @@ -1,9 +1,8 @@ package caliban import caliban.GraphQL.graphQL -import caliban.schema.GenericSchema +import caliban.schema.{ GenericSchema, Schema } import caliban.uploads.{ Upload, Uploads } -import cats.effect.Blocker import cats.syntax.semigroupk._ import io.circe.parser.parse import io.circe.generic.auto._ @@ -72,7 +71,13 @@ case class UploadFileArgs(file: Upload) case class UploadFilesArgs(files: List[Upload]) object TestAPI extends GenericSchema[Blocking with Uploads with Console with Clock] { - val api: GraphQL[Blocking with Uploads with Console with Clock] = + type Env = Blocking with Uploads with Console with Clock + + implicit val uploadFileArgsSchema: Schema[Env, UploadFileArgs] = gen[UploadFileArgs] + implicit val mutationsSchema: Schema[Env, Mutations] = gen[Mutations] + implicit val queriesSchema: Schema[Env, Queries] = gen[Queries] + + val api: GraphQL[Env] = graphQL( RootResolver( Queries(args => UIO("stub")), @@ -80,10 +85,6 @@ object TestAPI extends GenericSchema[Blocking with Uploads with Console with Clo ) ) - implicit val uploadFileArgsSchema = gen[UploadFileArgs] - implicit val mutationsSchema = gen[Mutations] - implicit val queriesSchema = gen[Queries] - case class File(hash: String, path: String, filename: String, mimetype: String) case class Queries(stub: Unit => UIO[String]) @@ -102,20 +103,17 @@ object Http4sAdapterSpec extends DefaultRunnableSpec { Platform.default ) - val blocker = Blocker.liftExecutionContext(runtime.platform.executor.asEC) - val uri = Uri.unsafeParse("http://127.0.0.1:8089/") val apiLayer: RLayer[R, Has[Server]] = (for { interpreter <- TestAPI.api.interpreter.toManaged_ - server <- BlazeServerBuilder(runtime.platform.executor.asEC) + server <- BlazeServerBuilder[RIO[R, *]](runtime.platform.executor.asEC) .bindHttp(uri.port.get, uri.host.get) .withHttpApp( (Http4sAdapter.makeHttpUploadService( interpreter, - Paths.get(System.getProperty("java.io.tmpdir")), - blocker + Paths.get(System.getProperty("java.io.tmpdir")) ) <+> Http4sAdapter.makeHttpService(interpreter)).orNotFound ) .resource diff --git a/build.sbt b/build.sbt index f5a3ce52cc..e16c4c16d6 100644 --- a/build.sbt +++ b/build.sbt @@ -6,23 +6,25 @@ val scala213 = "2.13.6" val scala3 = "3.0.1" val allScala = Seq(scala212, scala213, scala3) -val akkaVersion = "2.6.15" -val catsEffectVersion = "2.5.2" -val circeVersion = "0.14.1" -val http4sVersion = "0.22.1" -val laminextVersion = "0.13.10" -val magnoliaVersion = "0.17.0" -val mercatorVersion = "0.2.1" -val playVersion = "2.8.8" -val playJsonVersion = "2.9.2" -val sttpVersion = "3.3.13" -val tapirVersion = "0.18.1" -val zioVersion = "1.0.10" -val zioInteropCatsVersion = "2.5.1.0" -val zioConfigVersion = "1.0.6" -val zqueryVersion = "0.2.9" -val zioJsonVersion = "0.1.5" -val zioHttpVersion = "1.0.0.0-RC17" +val akkaVersion = "2.6.15" +val catsEffect2Version = "2.5.2" +val catsEffect3Version = "3.2.1" +val circeVersion = "0.14.1" +val http4sVersion = "0.23.0" +val laminextVersion = "0.13.10" +val magnoliaVersion = "0.17.0" +val mercatorVersion = "0.2.1" +val playVersion = "2.8.8" +val playJsonVersion = "2.9.2" +val sttpVersion = "3.3.13" +val tapirVersion = "0.18.1" +val zioVersion = "1.0.10" +val zioInteropCats2Version = "2.5.1.0" +val zioInteropCats3Version = "3.1.1.0" +val zioConfigVersion = "1.0.6" +val zqueryVersion = "0.2.9" +val zioJsonVersion = "0.1.5" +val zioHttpVersion = "1.0.0.0-RC17" inThisBuild( List( @@ -190,9 +192,12 @@ lazy val catsInterop = project .settings(name := "caliban-cats") .settings(commonSettings) .settings( - libraryDependencies ++= Seq( - "dev.zio" %% "zio-interop-cats" % zioInteropCatsVersion, - "org.typelevel" %% "cats-effect" % catsEffectVersion + libraryDependencies ++= { + if (scalaVersion.value == scala3) Seq() + else Seq(compilerPlugin(("org.typelevel" %% "kind-projector" % "0.13.0").cross(CrossVersion.full))) + } ++ Seq( + "dev.zio" %% "zio-interop-cats" % zioInteropCats3Version, + "org.typelevel" %% "cats-effect" % catsEffect3Version ) ) .dependsOn(core) @@ -204,7 +209,7 @@ lazy val monixInterop = project .settings( libraryDependencies ++= Seq( "dev.zio" %% "zio-interop-reactivestreams" % "1.3.5", - "dev.zio" %% "zio-interop-cats" % zioInteropCatsVersion, + "dev.zio" %% "zio-interop-cats" % zioInteropCats2Version, "io.monix" %% "monix" % "3.4.0" ) ) @@ -233,24 +238,26 @@ lazy val http4s = project .settings(name := "caliban-http4s") .settings(commonSettings) .settings( - crossScalaVersions -= scala3, testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework")), - libraryDependencies ++= Seq( - "dev.zio" %% "zio-interop-cats" % zioInteropCatsVersion, - "org.typelevel" %% "cats-effect" % catsEffectVersion, - "org.http4s" %% "http4s-dsl" % http4sVersion, - "org.http4s" %% "http4s-circe" % http4sVersion, - "org.http4s" %% "http4s-blaze-server" % http4sVersion, - "io.circe" %% "circe-parser" % circeVersion, - "dev.zio" %% "zio-test" % zioVersion % Test, - "dev.zio" %% "zio-test-sbt" % zioVersion % Test, - "com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % sttpVersion % Test, - "com.softwaremill.sttp.client3" %% "circe" % sttpVersion % Test, - "io.circe" %% "circe-generic" % circeVersion % Test, - compilerPlugin(("org.typelevel" %% "kind-projector" % "0.13.0").cross(CrossVersion.full)) - ) + libraryDependencies ++= { + if (scalaVersion.value == scala3) Seq() + else Seq(compilerPlugin(("org.typelevel" %% "kind-projector" % "0.13.0").cross(CrossVersion.full))) + } ++ + Seq( + "dev.zio" %% "zio-interop-cats" % zioInteropCats3Version, + "org.typelevel" %% "cats-effect" % catsEffect3Version, + "org.http4s" %% "http4s-dsl" % http4sVersion, + "org.http4s" %% "http4s-circe" % http4sVersion, + "org.http4s" %% "http4s-blaze-server" % http4sVersion, + "io.circe" %% "circe-parser" % circeVersion, + "dev.zio" %% "zio-test" % zioVersion % Test, + "dev.zio" %% "zio-test-sbt" % zioVersion % Test, + "com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % sttpVersion % Test, + "com.softwaremill.sttp.client3" %% "circe" % sttpVersion % Test, + "io.circe" %% "circe-generic" % circeVersion % Test + ) ) - .dependsOn(core) + .dependsOn(core, catsInterop) lazy val zioHttp = project .in(file("adapters/zio-http")) @@ -298,8 +305,8 @@ lazy val finch = project libraryDependencies ++= Seq( "com.github.finagle" %% "finchx-core" % "0.32.1", "com.github.finagle" %% "finchx-circe" % "0.32.1", - "dev.zio" %% "zio-interop-cats" % zioInteropCatsVersion, - "org.typelevel" %% "cats-effect" % catsEffectVersion, + "dev.zio" %% "zio-interop-cats" % zioInteropCats2Version, + "org.typelevel" %% "cats-effect" % catsEffect2Version, "io.circe" %% "circe-parser" % circeVersion ) ) @@ -399,9 +406,9 @@ lazy val examples = project akkaHttp, http4s, catsInterop, - finch, + /*finch,*/ play, - monixInterop, + /*monixInterop,*/ tapirInterop, clientJVM, federation, diff --git a/examples/src/main/scala/example/federation/FederatedApp.scala b/examples/src/main/scala/example/federation/FederatedApp.scala index 53b9473029..e44fa80db1 100644 --- a/examples/src/main/scala/example/federation/FederatedApp.scala +++ b/examples/src/main/scala/example/federation/FederatedApp.scala @@ -6,14 +6,12 @@ import example.federation.FederationData.episodes.sampleEpisodes import caliban.Http4sAdapter import cats.data.Kleisli -import cats.effect.Blocker import org.http4s.StaticFile import org.http4s.implicits._ import org.http4s.server.Router import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.server.middleware.CORS import zio._ -import zio.blocking.Blocking import zio.interop.catz._ import scala.concurrent.ExecutionContext @@ -26,18 +24,17 @@ object FederatedApp extends CatsApp { .memoize .use(layer => for { - blocker <- ZIO.access[Blocking](_.get.blockingExecutor.asEC).map(Blocker.liftExecutionContext) interpreter <- FederatedApi.Characters.api.interpreter.map(_.provideCustomLayer(layer)) _ <- BlazeServerBuilder[ExampleTask](ExecutionContext.global) .bindHttp(8089, "localhost") .withHttpApp( Router[ExampleTask]( "/api/graphql" -> CORS(Http4sAdapter.makeHttpService(interpreter)), - "/graphiql" -> Kleisli.liftF(StaticFile.fromResource("/graphiql.html", blocker, None)) + "/graphiql" -> Kleisli.liftF(StaticFile.fromResource("/graphiql.html", None)) ).orNotFound ) .resource - .toManaged + .toManagedZIO .useForever } yield () ) @@ -47,18 +44,17 @@ object FederatedApp extends CatsApp { .memoize .use(layer => for { - blocker <- ZIO.access[Blocking](_.get.blockingExecutor.asEC).map(Blocker.liftExecutionContext) interpreter <- FederatedApi.Episodes.api.interpreter.map(_.provideCustomLayer(layer)) _ <- BlazeServerBuilder[ExampleTask](ExecutionContext.global) .bindHttp(8088, "localhost") .withHttpApp( Router[ExampleTask]( "/api/graphql" -> CORS(Http4sAdapter.makeHttpService(interpreter)), - "/graphiql" -> Kleisli.liftF(StaticFile.fromResource("/graphiql.html", blocker, None)) + "/graphiql" -> Kleisli.liftF(StaticFile.fromResource("/graphiql.html", None)) ).orNotFound ) .resource - .toManaged + .toManagedZIO .useForever } yield () ) diff --git a/examples/src/main/scala/example/finch/ExampleApp.scala b/examples/src/main/scala/example/finch/ExampleApp.scala index 4bb7d99bd9..3d0ede7d5e 100644 --- a/examples/src/main/scala/example/finch/ExampleApp.scala +++ b/examples/src/main/scala/example/finch/ExampleApp.scala @@ -1,4 +1,5 @@ package example.finch +/* import example.ExampleData.sampleCharacters import example.ExampleService.ExampleService @@ -54,3 +55,4 @@ object ExampleApp extends App with Endpoint.Module[Task] { Await.ready(server) } +*/ diff --git a/examples/src/main/scala/example/http4s/AuthExampleApp.scala b/examples/src/main/scala/example/http4s/AuthExampleApp.scala index 0c4ea13575..5c97b193b5 100644 --- a/examples/src/main/scala/example/http4s/AuthExampleApp.scala +++ b/examples/src/main/scala/example/http4s/AuthExampleApp.scala @@ -12,7 +12,6 @@ import org.http4s.server.{ Router, ServiceErrorHandler } import org.typelevel.ci.CIString import zio._ import zio.interop.catz._ -import zio.interop.catz.implicits._ import scala.concurrent.ExecutionContext @@ -62,7 +61,7 @@ object AuthExampleApp extends CatsApp { .bindHttp(8088, "localhost") .withHttpApp(Router[Task]("/api/graphql" -> route).orNotFound) .resource - .toManaged + .toManagedZIO .useForever } yield ()).exitCode } diff --git a/examples/src/main/scala/example/http4s/ExampleApp.scala b/examples/src/main/scala/example/http4s/ExampleApp.scala index f1031f5684..dd9c65789f 100644 --- a/examples/src/main/scala/example/http4s/ExampleApp.scala +++ b/examples/src/main/scala/example/http4s/ExampleApp.scala @@ -7,14 +7,12 @@ import example.{ ExampleApi, ExampleService } import caliban.Http4sAdapter import cats.data.Kleisli -import cats.effect.Blocker import org.http4s.StaticFile import org.http4s.implicits._ import org.http4s.server.Router import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.server.middleware.CORS import zio._ -import zio.blocking.Blocking import zio.interop.catz._ import scala.concurrent.ExecutionContext @@ -28,7 +26,6 @@ object ExampleApp extends App { .runtime[ZEnv with ExampleService] .flatMap(implicit runtime => for { - blocker <- ZIO.access[Blocking](_.get.blockingExecutor.asEC).map(Blocker.liftExecutionContext) interpreter <- ExampleApi.api.interpreter _ <- BlazeServerBuilder[ExampleTask](ExecutionContext.global) .bindHttp(8088, "localhost") @@ -36,11 +33,11 @@ object ExampleApp extends App { Router[ExampleTask]( "/api/graphql" -> CORS(Http4sAdapter.makeHttpService(interpreter)), "/ws/graphql" -> CORS(Http4sAdapter.makeWebSocketService(interpreter)), - "/graphiql" -> Kleisli.liftF(StaticFile.fromResource("/graphiql.html", blocker, None)) + "/graphiql" -> Kleisli.liftF(StaticFile.fromResource("/graphiql.html", None)) ).orNotFound ) .resource - .toManaged + .toManagedZIO .useForever } yield () ) diff --git a/examples/src/main/scala/example/interop/cats/ExampleCatsInterop.scala b/examples/src/main/scala/example/interop/cats/ExampleCatsInterop.scala index 50fdeb70c4..902b26e2b5 100644 --- a/examples/src/main/scala/example/interop/cats/ExampleCatsInterop.scala +++ b/examples/src/main/scala/example/interop/cats/ExampleCatsInterop.scala @@ -4,13 +4,14 @@ import caliban.GraphQL.graphQL import caliban.RootResolver import cats.effect.{ ExitCode, IO, IOApp } +import cats.effect.std.Dispatcher import zio.{ Runtime, ZEnv } object ExampleCatsInterop extends IOApp { import caliban.interop.cats.implicits._ - implicit val runtime: Runtime[ZEnv] = Runtime.default + implicit val zioRuntime: Runtime[ZEnv] = Runtime.default case class Number(value: Int) @@ -20,7 +21,6 @@ object ExampleCatsInterop extends IOApp { val randomNumber = IO(scala.util.Random.nextInt()).map(Number) val queries = Queries(numbers, randomNumber) - val api = graphQL(RootResolver(queries)) val query = """ { @@ -34,10 +34,14 @@ object ExampleCatsInterop extends IOApp { }""" override def run(args: List[String]): IO[ExitCode] = - for { - interpreter <- api.interpreterAsync[IO] - _ <- interpreter.checkAsync[IO](query) - result <- interpreter.executeAsync[IO](query) - _ <- IO(println(result.data)) - } yield ExitCode.Success + Dispatcher[IO].use { implicit dispatcher => // required for a derivation of the schema + val api = graphQL(RootResolver(queries)) + + for { + interpreter <- api.interpreterAsync[IO] + _ <- interpreter.checkAsync[IO](query) + result <- interpreter.executeAsync[IO](query) + _ <- IO(println(result.data)) + } yield ExitCode.Success + } } diff --git a/examples/src/main/scala/example/interop/monix/ExampleMonixInterop.scala b/examples/src/main/scala/example/interop/monix/ExampleMonixInterop.scala index f22afa01cf..e823224cd2 100644 --- a/examples/src/main/scala/example/interop/monix/ExampleMonixInterop.scala +++ b/examples/src/main/scala/example/interop/monix/ExampleMonixInterop.scala @@ -1,4 +1,5 @@ package example.interop.monix +/* import caliban.GraphQL.graphQL import caliban.ResponseValue.{ ObjectValue, StreamValue } @@ -61,3 +62,4 @@ object ExampleMonixInterop extends TaskApp { } } yield ExitCode.Success } +*/ diff --git a/examples/src/main/scala/example/tapir/ExampleApp.scala b/examples/src/main/scala/example/tapir/ExampleApp.scala index a2fd4353d1..26975ea86e 100644 --- a/examples/src/main/scala/example/tapir/ExampleApp.scala +++ b/examples/src/main/scala/example/tapir/ExampleApp.scala @@ -6,7 +6,6 @@ import caliban.interop.tapir._ import caliban.{ GraphQL, Http4sAdapter } import cats.data.Kleisli -import cats.effect.Blocker import org.http4s.StaticFile import org.http4s.implicits._ import org.http4s.server.Router @@ -14,9 +13,7 @@ import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.server.middleware.CORS import sttp.tapir.server.ServerEndpoint import zio._ -import zio.blocking.Blocking import zio.interop.catz._ -import zio.interop.catz.implicits._ import scala.concurrent.ExecutionContext @@ -43,18 +40,17 @@ object ExampleApp extends CatsApp { override def run(args: List[String]): ZIO[ZEnv, Nothing, ExitCode] = (for { - blocker <- ZIO.access[Blocking](_.get.blockingExecutor.asEC).map(Blocker.liftExecutionContext) interpreter <- graphql.interpreter _ <- BlazeServerBuilder[Task](ExecutionContext.global) .bindHttp(8088, "localhost") .withHttpApp( Router[Task]( "/api/graphql" -> CORS(Http4sAdapter.makeHttpService(interpreter)), - "/graphiql" -> Kleisli.liftF(StaticFile.fromResource("/graphiql.html", blocker, None)) + "/graphiql" -> Kleisli.liftF(StaticFile.fromResource("/graphiql.html", None)) ).orNotFound ) .resource - .toManaged + .toManagedZIO .useForever } yield ()).exitCode } diff --git a/interop/cats/src/main/scala/caliban/interop/cats/CatsInterop.scala b/interop/cats/src/main/scala/caliban/interop/cats/CatsInterop.scala index 3000506f7f..f3a985314b 100644 --- a/interop/cats/src/main/scala/caliban/interop/cats/CatsInterop.scala +++ b/interop/cats/src/main/scala/caliban/interop/cats/CatsInterop.scala @@ -5,12 +5,16 @@ import caliban.introspection.adt.__Type import caliban.schema.Step.QueryStep import caliban.schema.{ Schema, Step } import caliban.{ CalibanError, GraphQL, GraphQLInterpreter, GraphQLResponse, InputValue } -import cats.effect.implicits._ -import cats.effect.{ Async, Effect } -import zio.interop.catz._ +import cats.~> +import cats.effect.Async +import cats.effect.std.Dispatcher +import cats.syntax.flatMap._ +import cats.syntax.functor._ import zio.{ Runtime, _ } import zio.query.ZQuery +import scala.concurrent.Future + object CatsInterop { def executeAsync[F[_]: Async, R, E](graphQL: GraphQLInterpreter[R, E])( @@ -21,32 +25,31 @@ object CatsInterop { skipValidation: Boolean = false, enableIntrospection: Boolean = true, queryExecution: QueryExecution = QueryExecution.Parallel - )(implicit runtime: Runtime[R]): F[GraphQLResponse[E]] = - Async[F].async { cb => - val execution = graphQL.execute( - query, - operationName, - variables, - extensions, - skipValidation = skipValidation, - enableIntrospection = enableIntrospection, - queryExecution - ) - - runtime.unsafeRunAsync(execution)(exit => cb(exit.toEither)) - } + )(implicit runtime: Runtime[R]): F[GraphQLResponse[E]] = { + val execution = graphQL.execute( + query, + operationName, + variables, + extensions, + skipValidation = skipValidation, + enableIntrospection = enableIntrospection, + queryExecution + ) + + toEffect(execution) + } def checkAsync[F[_]: Async, R]( graphQL: GraphQLInterpreter[R, Any] )(query: String)(implicit runtime: Runtime[Any]): F[Unit] = - Async[F].async(cb => runtime.unsafeRunAsync(graphQL.check(query))(exit => cb(exit.toEither))) + toEffect(graphQL.check(query)) def interpreterAsync[F[_]: Async, R]( graphQL: GraphQL[R] )(implicit runtime: Runtime[Any]): F[GraphQLInterpreter[R, CalibanError]] = - Async[F].async(cb => runtime.unsafeRunAsync(graphQL.interpreter)(exit => cb(exit.toEither))) + toEffect(graphQL.interpreter) - def schema[F[_]: Effect, R, A](implicit ev: Schema[R, A]): Schema[R, F[A]] = + def schema[F[_], R, A](implicit F: Dispatcher[F], ev: Schema[R, A]): Schema[R, F[A]] = new Schema[R, F[A]] { override def toType(isInput: Boolean, isSubscription: Boolean): __Type = ev.toType_(isInput, isSubscription) @@ -55,6 +58,32 @@ object CatsInterop { ev.optional override def resolve(value: F[A]): Step[R] = - QueryStep(ZQuery.fromEffect(value.toIO.to[Task].map(ev.resolve))) + QueryStep(ZQuery.fromEffect(fromEffect(value).map(ev.resolve))) + } + + def fromEffect[F[_], A](fa: F[A])(implicit F: Dispatcher[F]): Task[A] = + ZIO + .effectTotal(F.unsafeToFutureCancelable(fa)) + .flatMap { case (future, cancel) => + ZIO.fromFuture(_ => future).onInterrupt(ZIO.fromFuture(_ => cancel()).orDie).interruptible + } + .uninterruptible + + def toEffect[F[_], R, A](rio: RIO[R, A])(implicit F: Async[F], R: Runtime[R]): F[A] = + F.uncancelable { poll => + F.delay(R.unsafeRunToFuture(rio)).flatMap { future => + poll(F.onCancel(F.fromFuture(F.pure[Future[A]](future)), F.fromFuture(F.delay(future.cancel())).void)) + } } + + def fromEffectK[F[_], R](implicit F: Dispatcher[F]): F ~> RIO[R, *] = + new (F ~> RIO[R, *]) { + def apply[A](fa: F[A]): RIO[R, A] = fromEffect(fa) + } + + def toEffectK[F[_], R](implicit F: Async[F], R: Runtime[R]): RIO[R, *] ~> F = + new (RIO[R, *] ~> F) { + def apply[A](rio: RIO[R, A]): F[A] = toEffect(rio) + } + } diff --git a/interop/cats/src/main/scala/caliban/interop/cats/implicits/package.scala b/interop/cats/src/main/scala/caliban/interop/cats/implicits/package.scala index ab9d871706..9ae2e4447b 100644 --- a/interop/cats/src/main/scala/caliban/interop/cats/implicits/package.scala +++ b/interop/cats/src/main/scala/caliban/interop/cats/implicits/package.scala @@ -3,7 +3,8 @@ package caliban.interop.cats import caliban.execution.QueryExecution import caliban.schema.Schema import caliban.{ CalibanError, GraphQL, GraphQLInterpreter, GraphQLResponse, InputValue } -import cats.effect.{ Async, Effect } +import cats.effect.Async +import cats.effect.std.Dispatcher import zio.Runtime package object implicits { @@ -37,6 +38,6 @@ package object implicits { CatsInterop.interpreterAsync(underlying) } - implicit def catsEffectSchema[F[_]: Effect, R, A](implicit ev: Schema[R, A]): Schema[R, F[A]] = + implicit def catsEffectSchema[F[_], R, A](implicit F: Dispatcher[F], ev: Schema[R, A]): Schema[R, F[A]] = CatsInterop.schema } diff --git a/vuepress/docs/docs/examples.md b/vuepress/docs/docs/examples.md index b19df53baf..24b5545327 100644 --- a/vuepress/docs/docs/examples.md +++ b/vuepress/docs/docs/examples.md @@ -2,16 +2,18 @@ The [examples](https://github.com/ghostdogpr/caliban/tree/master/examples/) project in Github contains various examples: - [GraphQL API exposed with http4s](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/http4s) - [GraphQL API exposed with Akka HTTP](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/akkahttp) -- [GraphQL API exposed with finch](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/finch) - [GraphQL API exposed with play](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/play) - [GraphQL API exposed with zio-http](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/ziohttp) - [GraphQL Client usage](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/client) - [GraphQL Client integration with Laminext](https://github.com/ghostdogpr/caliban/tree/master/client-laminext/src/test/scala/caliban/client/laminext) - [Optimization with ZQuery](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/optimizations) - [Interop with Cats Effect](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/interop/cats) -- [Interop with Monix](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/interop/monix) - [Interop with Tapir](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/tapir) - [Apollo Federation usage](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/federation) - [Schema Stitching usage](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/stitching) +#### Available only with cats-effect 2.x +- [Interop with Monix](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/interop/monix) +- [GraphQL API exposed with finch](https://github.com/ghostdogpr/caliban/tree/master/examples/src/main/scala/example/finch) + You may also check out [the repository](https://github.com/ghostdogpr/caliban-blog-series) accompanying my [blog series](https://medium.com/@ghostdogpr/graphql-in-scala-with-caliban-part-1-8ceb6099c3c2) on Caliban. \ No newline at end of file diff --git a/vuepress/docs/docs/interop.md b/vuepress/docs/docs/interop.md index ac7b3692e2..9ca3a5b5ef 100644 --- a/vuepress/docs/docs/interop.md +++ b/vuepress/docs/docs/interop.md @@ -20,16 +20,16 @@ import caliban.GraphQL.graphQL import caliban.RootResolver import caliban.interop.cats.implicits._ import cats.effect.{ ExitCode, IO, IOApp } +import cats.effect.std.Dispatcher import zio.Runtime object ExampleCatsInterop extends IOApp { - implicit val runtime = Runtime.default + implicit val zioRuntime = Runtime.default case class Queries(numbers: List[Int], randomNumber: IO[Int]) - val queries = Queries(List(1, 2, 3, 4), IO(scala.util.Random.nextInt())) - val api = graphQL(RootResolver(queries)) + val queries = Queries(List(1, 2, 3, 4), IO(scala.util.Random.nextInt())) val query = """ { @@ -38,17 +38,21 @@ object ExampleCatsInterop extends IOApp { }""" override def run(args: List[String]): IO[ExitCode] = - for { - interpreter <- api.interpreterAsync[IO] - result <- interpreter.executeAsync[IO](query) - _ <- IO(println(result.data)) - } yield ExitCode.Success + Dispatcher[IO].use { implicit dispatcher => // required for a derivation of the schema + val api = graphQL(RootResolver(queries)) + + for { + interpreter <- api.interpreterAsync[IO] + result <- interpreter.executeAsync[IO](query) + _ <- IO(println(result.data)) + } yield ExitCode.Success + } } ``` You can find this example within the [examples](https://github.com/ghostdogpr/caliban/blob/master/examples/src/main/scala/example/interop/cats/ExampleCatsInterop.scala) project. -## Monix +## Monix (only with cats-effect 2.x) You first need to import `caliban.interop.monix.implicits._` and have an implicit `zio.Runtime` in scope. Then a few helpers are available: - the `GraphQL` object is enriched with `interpreterAsync`, a variant of `interpreter` that return a Monix `Task` instead of a `ZIO`. @@ -69,7 +73,7 @@ import zio.Runtime object ExampleMonixInterop extends TaskApp { - implicit val runtime = Runtime.default + implicit val zioRuntime = Runtime.default implicit val monixScheduler: Scheduler = scheduler case class Queries(numbers: List[Int], randomNumber: Task[Int])