Skip to content

Commit

Permalink
Merge branch 'series/0.5' into merge-to-main
Browse files Browse the repository at this point in the history
  • Loading branch information
hamnis committed Oct 4, 2024
2 parents 0b14a1d + 46c5324 commit df84de4
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 85 deletions.
14 changes: 13 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ jobs:
runs-on: ${{ matrix.os }}
timeout-minutes: 60
steps:
- name: Install sbt
if: contains(runner.os, 'macos')
run: brew install sbt

- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -104,6 +108,10 @@ jobs:
java: [temurin@17]
runs-on: ${{ matrix.os }}
steps:
- name: Install sbt
if: contains(runner.os, 'macos')
run: brew install sbt

- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -168,13 +176,17 @@ jobs:

dependency-submission:
name: Submit Dependencies
if: github.event_name != 'pull_request'
if: github.event.repository.fork == false && github.event_name != 'pull_request'
strategy:
matrix:
os: [ubuntu-latest]
java: [temurin@17]
runs-on: ${{ matrix.os }}
steps:
- name: Install sbt
if: contains(runner.os, 'macos')
run: brew install sbt

- name: Checkout current branch (full)
uses: actions/checkout@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = 3.8.0
version = 3.8.3
runner.dialect = scala213
style = default

Expand Down
28 changes: 16 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
import com.typesafe.tools.mima.core._

val Scala213 = "2.13.13"
val Scala213 = "2.13.15"

inThisBuild(
Seq(
startYear := Some(2020),
Test / fork := true,
developers := List(
// your GitHub handle and name
tlGitHubDev("hamnis", "Erlend Hamnaberg")
),
licenses := Seq(License.Apache2),
tlBaseVersion := "1.0",
tlSonatypeUseLegacyHost := false,
crossScalaVersions := Seq(Scala213, "3.3.3"),
sonatypeCredentialHost := xerial.sbt.Sonatype.sonatypeLegacy,
crossScalaVersions := Seq(Scala213, "3.3.4"),
ThisBuild / scalaVersion := Scala213,
githubWorkflowJavaVersions := Seq(JavaSpec.temurin("17"))
)
)

val http4sVersion = "1.0.0-M41"
val http4sVersion = "1.0.0-M42"

val jetty = "12.0.7"
val jetty = "12.0.14"

val netty = "4.1.107.Final"
val netty = "4.1.114.Final"

val munit = "0.7.29"
val munit = "1.0.2"
val munitScalaCheck = "1.0.0"

val io_uring = "0.0.25.Final"

Expand All @@ -47,7 +49,7 @@ lazy val core = project
name := "http4s-netty-core",
libraryDependencies ++= List(
"org.log4s" %% "log4s" % "1.10.0",
"co.fs2" %% "fs2-reactive-streams" % "3.9.4",
"co.fs2" %% "fs2-reactive-streams" % "3.11.0",
("org.playframework.netty" % "netty-reactive-streams-http" % "3.0.2")
.exclude("io.netty", "netty-codec-http")
.exclude("io.netty", "netty-handler"),
Expand All @@ -72,11 +74,12 @@ lazy val server = project
"org.http4s" %% "http4s-dsl" % http4sVersion % Test,
"ch.qos.logback" % "logback-classic" % "1.4.5" % Test,
"org.scalameta" %% "munit" % munit % Test,
"org.scalameta" %% "munit-scalacheck" % munit % Test,
"org.scalameta" %% "munit-scalacheck" % munitScalaCheck % Test,
"org.http4s" %% "http4s-circe" % http4sVersion % Test,
"org.http4s" %% "http4s-jdk-http-client" % "1.0.0-M9" % Test,
"org.typelevel" %% "munit-cats-effect" % "2.0.0-M4" % Test
"org.typelevel" %% "munit-cats-effect" % "2.0.0" % Test
),
libraryDependencySchemes += "org.typelevel" %% "munit-cats-effect" % VersionScheme.Always, // "early-semver",
libraryDependencies ++= nativeNettyModules,
mimaBinaryIssueFilters := Nil
)
Expand All @@ -97,11 +100,12 @@ lazy val client = project
("com.github.monkeywie" % "proxyee" % "1.7.6" % Test)
.excludeAll("io.netty")
.excludeAll("org.bouncycastle"),
"com.github.bbottema" % "java-socks-proxy-server" % "3.0.0" % Test,
"com.github.bbottema" % "java-socks-proxy-server" % "4.1.1" % Test,
"org.scalameta" %% "munit" % munit % Test,
"ch.qos.logback" % "logback-classic" % "1.2.13" % Test,
"org.typelevel" %% "munit-cats-effect" % "2.0.0-M4" % Test
"org.typelevel" %% "munit-cats-effect" % "2.0.0" % Test
),
libraryDependencySchemes += "org.typelevel" %% "munit-cats-effect" % VersionScheme.Always, // "early-semver",
libraryDependencies ++= nativeNettyModules
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package client
import cats.effect.Async
import cats.effect.Resource
import cats.effect.std.Dispatcher
import cats.syntax.all._
import fs2.io.net.tls.TLSParameters
import io.netty.bootstrap.Bootstrap
import io.netty.channel.Channel
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelPipeline
import io.netty.channel.ChannelPromise
import io.netty.channel.pool.AbstractChannelPoolHandler
import io.netty.channel.pool.AbstractChannelPoolMap
import io.netty.channel.pool.ChannelPoolHandler
Expand All @@ -50,6 +51,7 @@ import org.http4s.client.RequestKey
import org.playframework.netty.http.HttpStreamsClientHandler

import java.net.ConnectException
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration

private[client] case class Key(requestKey: RequestKey, version: HttpVersion)
Expand All @@ -75,15 +77,20 @@ private[client] class Http4sChannelPoolMap[F[_]](

private def endOfPipeline(pipeline: ChannelPipeline): Unit = void {
logger.trace("building pipeline / end-of-pipeline")
pipeline.addLast("streaming-handler", new HttpStreamsClientHandler)
pipeline.addLast(
"streaming-handler",
new HttpStreamsClientHandler {
override def close(ctx: ChannelHandlerContext, future: ChannelPromise): Unit = void {
ctx.close(future)
}
})

if (config.idleTimeout.isFinite && config.idleTimeout.length > 0) {
void(
pipeline
.addLast(
"timeout",
new IdleStateHandler(0, 0, config.idleTimeout.length, config.idleTimeout.unit)))
}
val idletimeout = if (config.idleTimeout.isFinite) config.idleTimeout.toMillis else 0L
val readTimeout = if (config.readTimeout.isFinite) config.readTimeout.toMillis else 0L

pipeline.addLast(
"timeout",
new IdleStateHandler(readTimeout, 0, idletimeout, TimeUnit.MILLISECONDS))
}

private def connectAndConfigure(key: Key): Resource[F, Channel] = {
Expand Down Expand Up @@ -251,16 +258,9 @@ private[client] object Http4sChannelPoolMap {
proxy: Option[Proxy],
sslConfig: SSLContextOption,
http2: Boolean,
defaultRequestHeaders: Headers
defaultRequestHeaders: Headers,
readTimeout: Duration
)

private[client] def fromFuture[F[_]: Async, A](future: => Future[A]): F[A] =
Async[F].async { callback =>
val fut = future
void(
fut
.addListener((f: Future[A]) =>
if (f.isSuccess) callback(Right(f.getNow)) else callback(Left(f.cause()))))
Async[F].delay(Some(Async[F].delay(fut.cancel(false)).void))
}
private[client] def fromFuture[F[_]: Async, A](future: => Future[A]): F[A] = ???
}
61 changes: 44 additions & 17 deletions client/src/main/scala/org/http4s/netty/client/Http4sHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import cats.effect.std.Dispatcher
import cats.syntax.all._
import io.netty.channel._
import io.netty.handler.codec.http.HttpResponse
import io.netty.handler.timeout.IdleState
import io.netty.handler.timeout.IdleStateEvent
import org.http4s._
import org.http4s.netty.client.Http4sHandler.logger
Expand All @@ -38,10 +39,11 @@ import scala.util.Success

private[netty] class Http4sHandler[F[_]](dispatcher: Dispatcher[F])(implicit F: Async[F])
extends ChannelInboundHandlerAdapter {
type Promise = Either[Throwable, Resource[F, Response[F]]] => Unit

private val modelConversion = new NettyModelConversion[F]
private val promises =
collection.mutable.Queue[Either[Throwable, Resource[F, Response[F]]] => Unit]()
collection.mutable.Queue[Promise]()
// By using the Netty event loop assigned to this channel we get two benefits:
// 1. We can avoid the necessary hopping around of threads since Netty pipelines will
// only pass events up and down from within the event loop to which it is assigned.
Expand All @@ -51,6 +53,7 @@ private[netty] class Http4sHandler[F[_]](dispatcher: Dispatcher[F])(implicit F:
private var eventLoopContext: ExecutionContext = _

private var pending: Future[Unit] = Future.unit
private var inFlight: Option[Promise] = None

private def write2(request: Request[F], channel: Channel, key: Key): F[Unit] = {
import io.netty.handler.codec.http2._
Expand Down Expand Up @@ -145,6 +148,8 @@ private[netty] class Http4sHandler[F[_]](dispatcher: Dispatcher[F])(implicit F:
override def isSharable: Boolean = false

override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = void {
implicit val ec: ExecutionContext = eventLoopContext

msg match {
case h: HttpResponse =>
val responseResourceF = modelConversion
Expand All @@ -154,17 +159,25 @@ private[netty] class Http4sHandler[F[_]](dispatcher: Dispatcher[F])(implicit F:
}
val result = dispatcher.unsafeToFuture(responseResourceF)

pending = pending.flatMap { _ =>
if (promises.nonEmpty) {
val promise = promises.dequeue()
result.transform {
case Failure(exception) =>
promise(Left(exception))
Failure(exception)
case Success(res) =>
promise(Right(res))
Success(())
}(eventLoopContext)
}(eventLoopContext)
inFlight = Some(promise)
logger.trace("dequeuing promise")
pending = pending.flatMap { _ =>
result.transform {
case Failure(exception) =>
logger.trace("handling promise failure")
promise(Left(exception))
inFlight = None
Failure(exception)
case Success(res) =>
logger.trace("handling promise success")
promise(Right(res))
inFlight = None
Success(())
}
}
}
case _ =>
super.channelRead(ctx, msg)
}
Expand Down Expand Up @@ -193,17 +206,31 @@ private[netty] class Http4sHandler[F[_]](dispatcher: Dispatcher[F])(implicit F:
}

private def onException(channel: Channel, e: Throwable): Unit = void {
promises.foreach(cb => cb(Left(e)))
promises.clear()
implicit val ec: ExecutionContext = eventLoopContext

val allPromises =
(inFlight.toList ++ promises.dequeueAll(_ => true)).map(promise => Future(promise(Left(e))))
logger.trace(s"onException: dequeueAll(${allPromises.size})")
pending = pending.flatMap(_ => Future.sequence(allPromises).map(_ => ()))
inFlight = None

channel.close()
}

override def userEventTriggered(ctx: ChannelHandlerContext, evt: scala.Any): Unit = void {
evt match {
case _: IdleStateEvent if ctx.channel().isOpen =>
val message = s"Closing connection due to idle timeout"
logger.trace(message)
onException(ctx.channel(), new TimeoutException(message))
case e: IdleStateEvent if ctx.channel().isOpen =>
val state = e.state()
state match {
case IdleState.READER_IDLE =>
val message = "Timing out request due to missing read"
onException(ctx.channel(), new TimeoutException(message))
case IdleState.WRITER_IDLE => ()
case IdleState.ALL_IDLE =>
val message = "Closing connection due to idle timeout"
logger.trace(message)
onException(ctx.channel(), new TimeoutException(message))
}
case _ => super.userEventTriggered(ctx, evt)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.concurrent.duration._

class NettyClientBuilder[F[_]](
idleTimeout: Duration,
readTimeout: Duration,
eventLoopThreads: Int,
maxInitialLength: Int,
maxHeaderSize: Int,
Expand All @@ -45,6 +46,7 @@ class NettyClientBuilder[F[_]](

private def copy(
idleTimeout: Duration = idleTimeout,
readTimeout: Duration = readTimeout,
eventLoopThreads: Int = eventLoopThreads,
maxInitialLength: Int = maxInitialLength,
maxHeaderSize: Int = maxHeaderSize,
Expand All @@ -59,6 +61,7 @@ class NettyClientBuilder[F[_]](
): NettyClientBuilder[F] =
new NettyClientBuilder[F](
idleTimeout,
readTimeout,
eventLoopThreads,
maxInitialLength,
maxHeaderSize,
Expand All @@ -79,7 +82,9 @@ class NettyClientBuilder[F[_]](
def withMaxHeaderSize(size: Int): Self = copy(maxHeaderSize = size)
def withMaxChunkSize(size: Int): Self = copy(maxChunkSize = size)
def withMaxConnectionsPerKey(size: Int): Self = copy(maxConnectionsPerKey = size)
def withIdleTimeout(duration: FiniteDuration): Self = copy(idleTimeout = duration)

def withIdleTimeout(duration: Duration): Self = copy(idleTimeout = duration)
def withReadTimeout(duration: Duration): Self = copy(readTimeout = duration)

def withSSLContext(sslContext: SSLContext): Self =
copy(sslContext = SSLContextOption.Provided(sslContext))
Expand Down Expand Up @@ -134,7 +139,8 @@ class NettyClientBuilder[F[_]](
proxy,
sslContext,
http2,
defaultRequestHeaders
defaultRequestHeaders,
readTimeout
)
Client[F](new Http4sChannelPoolMap[F](bs, config).run)
}
Expand All @@ -144,6 +150,7 @@ object NettyClientBuilder {
def apply[F[_]](implicit F: Async[F]): NettyClientBuilder[F] =
new NettyClientBuilder[F](
idleTimeout = 60.seconds,
readTimeout = 60.seconds,
eventLoopThreads = 0,
maxInitialLength = 4096,
maxHeaderSize = 8192,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.http4s.Uri
import org.http4s.client.testkit.scaffold.ServerScaffold

import java.net.ServerSocket
import scala.compat.java8.FutureConverters._

class HttpProxyTest extends IOSuite {

Expand All @@ -44,12 +43,12 @@ class HttpProxyTest extends IOSuite {
val proxy: IOFixture[HttpProxy] = resourceFixture(
for {
address <- Resource.eval(HttpProxyTest.randomSocketAddress[IO])
_ <- Resource {
_ <- Resource.make[IO, HttpProxyServer] {
val s = new HttpProxyServer()
IO.fromFuture(
IO(toScala(s.startAsync(address.host.toInetAddress.getHostAddress, address.port.value))))
.as(s -> IO.blocking(s.close()))
}
IO.fromCompletionStage(
IO(s.startAsync(address.host.toInetAddress.getHostAddress, address.port.value)))
.as(s)
}(s => IO.blocking(s.close()))
} yield HttpProxy(
Uri.Scheme.http,
address.host,
Expand Down
Loading

0 comments on commit df84de4

Please sign in to comment.