Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cats-effect 3 and http4s 0.23.0 #891

Merged
merged 14 commits into from
Aug 1, 2021
21 changes: 11 additions & 10 deletions adapters/http4s/src/main/scala/caliban/Http4sAdapter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import caliban.Value.NullValue
import caliban.execution.QueryExecution
import caliban.interop.cats.CatsInterop
import caliban.uploads._
import cats.arrow.FunctionK
import cats.data.{ Kleisli, OptionT }
import cats.syntax.either._
import cats.syntax.traverse._
Expand Down Expand Up @@ -34,7 +33,7 @@ import zio._
import zio.blocking.Blocking
import zio.clock.Clock
import zio.duration.Duration
import zio.interop.catz._
import zio.interop.catz.concurrentInstance
import zio.random.Random

import java.io.File
Expand Down Expand Up @@ -129,6 +128,7 @@ object Http4sAdapter {

HttpRoutes.of[RIO[R, *]] {
case req @ POST -> Root if req.contentType.exists(_.mediaType.isMultipart) =>
import zio.interop.catz.asyncInstance
def getFileRefs(
parts: Vector[Part[RIO[R, *]]]
)(random: Random.Service): RIO[R, Map[String, (File, Part[RIO[R, *]])]] =
Expand Down Expand Up @@ -324,7 +324,7 @@ object Http4sAdapter {
} yield response
}

def makeWebSocketService[R <: Has[_] with Clock with Blocking, E](
def makeWebSocketService[R, E](
interpreter: GraphQLInterpreter[R, E],
skipValidation: Boolean = false,
enableIntrospection: Boolean = true,
Expand All @@ -336,7 +336,7 @@ object Http4sAdapter {
import dsl._

def sendMessage(
sendQueue: CatsQueue[RIO[R, *], WebSocketFrame],
sendQueue: CatsQueue[Task, WebSocketFrame],
messageType: String,
id: String,
payload: Json
Expand All @@ -354,8 +354,8 @@ object Http4sAdapter {
)

def processMessage(
receivingQueue: CatsQueue[RIO[R, *], WebSocketFrame],
sendQueue: CatsQueue[RIO[R, *], WebSocketFrame],
receivingQueue: CatsQueue[Task, WebSocketFrame],
sendQueue: CatsQueue[Task, WebSocketFrame],
subscriptions: Ref[Map[String, Fiber[Throwable, Unit]]]
): RIO[R, Unit] =
Stream
Expand All @@ -366,14 +366,15 @@ object Http4sAdapter {
for {
msg <- RIO.fromEither(decode[Json](text))
msgType = msg.hcursor.downField("type").success.flatMap(_.value.asString).getOrElse("")
_ <- RIO.whenCase(msgType) {
_ <- RIO.whenCase[R, String](msgType) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it didn't compile in Scala 3 because explicit types were missing.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah weirdly this one seems necessary with Scala 3.

case "connection_init" =>
sendQueue.offer(WebSocketFrame.Text("""{"type":"connection_ack"}""")) *>
RIO.whenCase(keepAliveTime) { case Some(time) =>
// Save the keep-alive fiber with a key of None so that it's interrupted later
sendQueue
.offer(WebSocketFrame.Text("""{"type":"ka"}"""))
.repeat(Schedule.spaced(time))
.provideLayer(Clock.live)
.unit
.fork
}
Expand Down Expand Up @@ -437,13 +438,13 @@ object Http4sAdapter {
.drain

def passThroughPipe(
receivingQueue: CatsQueue[RIO[R, *], WebSocketFrame]
receivingQueue: CatsQueue[Task, WebSocketFrame]
): Pipe[RIO[R, *], WebSocketFrame, Unit] = _.evalMap(receivingQueue.offer)

HttpRoutes.of[RIO[R, *]] { case GET -> Root =>
for {
receivingQueue <- CatsQueue.unbounded[RIO[R, *], WebSocketFrame]
sendQueue <- CatsQueue.unbounded[RIO[R, *], WebSocketFrame]
receivingQueue <- CatsQueue.unbounded[Task, WebSocketFrame]
sendQueue <- CatsQueue.unbounded[Task, WebSocketFrame]
subscriptions <- Ref.make(Map.empty[String, Fiber[Throwable, Unit]])
// We provide fiber to process messages, which inherits the context of WebSocket connection request,
// so that we can pass information available at connection request, such as authentication information,
Expand Down