Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSockets don't work with .enableRequestStreaming #2977

Closed
adamw opened this issue Jul 24, 2024 · 1 comment · Fixed by #2978
Closed

WebSockets don't work with .enableRequestStreaming #2977

adamw opened this issue Jul 24, 2024 · 1 comment · Fixed by #2978
Labels
bug Something isn't working

Comments

@adamw
Copy link

adamw commented Jul 24, 2024

Happens in 3.0.0-RC9, worked fine in 3.0.0-RC8

When the enableRequestStreaming option is enabled, attempting to connect to a web socket from a client causes the following exception:

timestamp=2024-07-24T14:32:48.256913Z level=WARN thread=#zio-fiber-1655323318 message="Fatal exception in Netty" cause="Exception in thread "zio-fiber-" io.netty.handler.codec.PrematureChannelClosureException: Channel closed while still aggregating message
	at io.netty.handler.codec.MessageAggregator.channelInactive(MessageAggregator.java:436)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)"

Blocks softwaremill/tapir#3876 & softwaremill/tapir#3690

Code to reproduce:

//> using dep dev.zio::zio-http:3.0.0-RC9
//> using dep com.softwaremill.sttp.client3::core:3.9.7

package sttp.tapir.examples

import sttp.ws.WebSocket
import zio.*
import zio.http.*
import zio.http.ChannelEvent.Read

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}

object Test extends ZIOAppDefault {
  private val socketApp: WebSocketApp[Any] =
    Handler.webSocket { channel =>
      channel.receiveAll {
        case Read(WebSocketFrame.Text(s)) =>
          channel.send(Read(WebSocketFrame.Text("echo: " + s)))
        case _ =>
          ZIO.unit
      }
    }

  private val app: Routes[Any, Response] = Routes(Method.GET / "sub" -> handler(socketApp.toResponse))

  override val run = Server
    .install(app)
    .flatMap { _ => test }
    .provide(
      ZLayer.succeed(Server.Config.default.enableRequestStreaming),
      Server.live
    )

  val test = ZIO.attempt {
    println("STARTED")
    import sttp.client3.*
    val backend = HttpClientFutureBackend()
    val f = basicRequest
      .response(asWebSocket { (ws: WebSocket[Future]) =>
        ws.sendText("test1")
      })
      .get(uri"ws://localhost:8080/sub")
      .send(backend)
    Await.result(f, scala.concurrent.duration.Duration.Inf)
    println("OK")
  }
}
@kyri-petrou
Copy link
Collaborator

Hey @adamw thanks for the reproducer, I'll look into what might be causing it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants