diff --git a/benchmarks/rsocket-kotlin/build.gradle.kts b/benchmarks/rsocket-kotlin/build.gradle.kts index da544f24..92d39abe 100644 --- a/benchmarks/rsocket-kotlin/build.gradle.kts +++ b/benchmarks/rsocket-kotlin/build.gradle.kts @@ -104,9 +104,9 @@ benchmark { register("ktorTcp") { include("KtorTcpRSocketKotlinBenchmark") - param("payloadSize", "0") - param("dispatcher", "IO", "DEFAULT", "UNCONFINED") - param("selectorDispatcher", "IO", "1", "2", "4", "8", "l1", "l2", "l4", "l8") + param("payloadSize", "0", "64") + param("dispatcher", "DEFAULT") + param("selectorDispatcher", "IO") } register("ktorTcpPayloadSize") { include("KtorTcpRSocketKotlinBenchmark") @@ -115,12 +115,11 @@ benchmark { register("nettyTcp") { include("NettyTcpRSocketKotlinBenchmark") - param("payloadSize", "0") - param("shareGroup", "true", "false") + param("payloadSize", "0", "64") } register("nettyQuic") { include("NettyQuicRSocketKotlinBenchmark") - param("payloadSize", "0") + param("payloadSize", "0", "64") } } } diff --git a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt index e4b2c836..30ca2fd4 100644 --- a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt +++ b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt @@ -16,7 +16,6 @@ package io.rsocket.kotlin.benchmarks.kotlin -import io.ktor.network.selector.* import io.rsocket.kotlin.transport.* import io.rsocket.kotlin.transport.ktor.tcp.* import kotlinx.benchmark.* @@ -40,44 +39,36 @@ class KtorTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() { private val dispatcherV by lazy { when (dispatcher) { "DEFAULT" -> Dispatchers.Default - "IO" -> Dispatchers.IO "UNCONFINED" -> Dispatchers.Unconfined else -> error("wrong parameter 'dispatcher=$dispatcher'") } } - private val selectorDispatcherV by lazy { - when (selectorDispatcher) { - "DEFAULT" -> Dispatchers.Default - "IO" -> Dispatchers.IO - "1" -> newSingleThreadContext("selectorDispatcher") - "2" -> newFixedThreadPoolContext(2, "selectorDispatcher") - "4" -> newFixedThreadPoolContext(4, "selectorDispatcher") - "8" -> newFixedThreadPoolContext(8, "selectorDispatcher") - "l1" -> Dispatchers.IO.limitedParallelism(1) - "l2" -> Dispatchers.IO.limitedParallelism(2) - "l4" -> Dispatchers.IO.limitedParallelism(4) - "l8" -> Dispatchers.IO.limitedParallelism(8) - else -> error("wrong parameter 'selectorDispatcher=$selectorDispatcher'") - } - } +// private val selectorDispatcherV by lazy { +// when (selectorDispatcher) { +// "DEFAULT" -> Dispatchers.Default +// "IO" -> Dispatchers.IO +// "2" -> newFixedThreadPoolContext(2, "selectorDispatcher") +// else -> error("wrong parameter 'selectorDispatcher=$selectorDispatcher'") +// } +// } - private val selector by lazy { - SelectorManager(selectorDispatcherV) - } +// private val selector by lazy { +// SelectorManager(selectorDispatcherV) +// } override val serverTarget: RSocketServerTarget<*> by lazy { KtorTcpServerTransport(benchJob) { - dispatcher(dispatcherV) - selectorManager(selector, manage = false) - }.target(port = 9000) +// dispatcher(dispatcherV) +// selectorManager(selector, manage = false) + }.target() } - override val clientTarget: RSocketClientTarget by lazy { - KtorTcpClientTransport(benchJob) { - dispatcher(dispatcherV) - selectorManager(selector, manage = false) - }.target("0.0.0.0", port = 9000) + override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget { + return KtorTcpClientTransport(benchJob) { +// dispatcher(dispatcherV) +// selectorManager(selector, manage = false) + }.target((serverInstance as KtorTcpServerInstance).localAddress) } @Setup @@ -88,13 +79,13 @@ class KtorTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() { @TearDown override fun cleanup() { super.cleanup() - selector.close() - if ( - selectorDispatcherV != Dispatchers.Default && - selectorDispatcherV != Dispatchers.IO && - selectorDispatcherV is CloseableCoroutineDispatcher - ) { - (selectorDispatcherV as CloseableCoroutineDispatcher).close() - } +// selector.close() +// if ( +// selectorDispatcherV != Dispatchers.Default && +// selectorDispatcherV != Dispatchers.IO && +// selectorDispatcherV is CloseableCoroutineDispatcher +// ) { +// (selectorDispatcherV as CloseableCoroutineDispatcher).close() +// } } } diff --git a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/LocalRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/LocalRSocketKotlinBenchmark.kt index 63dacdfd..89186ea7 100644 --- a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/LocalRSocketKotlinBenchmark.kt +++ b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/LocalRSocketKotlinBenchmark.kt @@ -57,8 +57,8 @@ class LocalRSocketKotlinBenchmark : RSocketKotlinBenchmark() { }.target("local") } - override val clientTarget: RSocketClientTarget by lazy { - LocalClientTransport(benchJob) { + override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget { + return LocalClientTransport(benchJob) { dispatcher(dispatcherV) }.target("local") } diff --git a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/RSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/RSocketKotlinBenchmark.kt index b406d39f..1b5334a6 100644 --- a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/RSocketKotlinBenchmark.kt +++ b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/RSocketKotlinBenchmark.kt @@ -29,7 +29,7 @@ import kotlin.random.* @OptIn(ExperimentalStreamsApi::class) abstract class RSocketKotlinBenchmark : RSocketBenchmark() { - protected abstract val clientTarget: RSocketClientTarget + protected abstract fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget protected abstract val serverTarget: RSocketServerTarget<*> private val requestStrategy = PrefetchStrategy(64, 0) @@ -56,7 +56,7 @@ abstract class RSocketKotlinBenchmark : RSocketBenchmark() { payload = createPayload(payloadSize) payloadsFlow = flow { repeat(5000) { emit(createPayloadCopy()) } } - RSocketServer().startServer(serverTarget) { + val serverInstance = RSocketServer().startServer(serverTarget) { RSocketRequestHandler { requestResponse { it.close() @@ -72,7 +72,7 @@ abstract class RSocketKotlinBenchmark : RSocketBenchmark() { } } } - client = RSocketConnector().connect(clientTarget) + client = RSocketConnector().connect(clientTarget(serverInstance)) } override fun cleanup(): Unit = runBlocking { diff --git a/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyQuicRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyQuicRSocketKotlinBenchmark.kt index 8080d8a8..b914d5f0 100644 --- a/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyQuicRSocketKotlinBenchmark.kt +++ b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyQuicRSocketKotlinBenchmark.kt @@ -53,17 +53,19 @@ class NettyQuicRSocketKotlinBenchmark : RSocketKotlinBenchmark() { codec { tokenHandler(InsecureQuicTokenHandler.INSTANCE) } - }.target(port = 9009) + }.target("127.0.0.1") } - override val clientTarget: RSocketClientTarget by lazy { - NettyQuicClientTransport(benchJob) { + override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget { + return NettyQuicClientTransport(benchJob) { eventLoopGroup(sharedGroup, manage = false) ssl { trustManager(InsecureTrustManagerFactory.INSTANCE) applicationProtocols(*protos) } - }.target("127.0.0.1", port = 9009) + }.target( + (serverInstance as NettyQuicServerInstance).localAddress + ) } @Setup diff --git a/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyTcpRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyTcpRSocketKotlinBenchmark.kt index 4bf5c5c9..8662b46b 100644 --- a/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyTcpRSocketKotlinBenchmark.kt +++ b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyTcpRSocketKotlinBenchmark.kt @@ -32,27 +32,14 @@ class NettyTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() { @Param("0") override var payloadSize: Int = 0 - @Param("true", "false") - var shareGroup: Boolean = true - - private val sharedGroup by lazy { - if (shareGroup) NioEventLoopGroup() else null - } - override val serverTarget: RSocketServerTarget<*> by lazy { - NettyTcpServerTransport(benchJob) { - if (sharedGroup != null) { - eventLoopGroup(sharedGroup!!, manage = false) - } - }.target(port = 9000) + NettyTcpServerTransport(benchJob).target() } - override val clientTarget: RSocketClientTarget by lazy { - NettyTcpClientTransport(benchJob) { - if (sharedGroup != null) { - eventLoopGroup(sharedGroup!!, manage = false) - } - }.target("0.0.0.0", port = 9000) + override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget { + return NettyTcpClientTransport(benchJob).target( + (serverInstance as NettyTcpServerInstance).localAddress + ) } @Setup @@ -63,6 +50,5 @@ class NettyTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() { @TearDown override fun cleanup() { super.cleanup() - sharedGroup?.shutdownGracefully()?.await(1000) } } diff --git a/rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport.kt b/rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport.kt index 9675af6b..e47c4f63 100644 --- a/rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport.kt +++ b/rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpClientTransport.kt @@ -91,7 +91,7 @@ private class NettyTcpClientTransportBuilderImpl : NettyTcpClientTransportBuilde } return NettyTcpClientTransportImpl( - coroutineContext = context.supervisorContext() + bootstrap.config().group().asCoroutineDispatcher(), + coroutineContext = context.supervisorContext() + Dispatchers.Default, //bootstrap.config().group().asCoroutineDispatcher(), sslContext = sslContext, bootstrap = bootstrap, manageBootstrap = manageEventLoopGroup diff --git a/rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport.kt b/rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport.kt index 9566f451..ac3f6d61 100644 --- a/rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport.kt +++ b/rsocket-transports/netty-tcp/src/jvmMain/kotlin/io/rsocket/kotlin/transport/netty/tcp/NettyTcpServerTransport.kt @@ -105,7 +105,7 @@ private class NettyTcpServerTransportBuilderImpl : NettyTcpServerTransportBuilde } return NettyTcpServerTransportImpl( - coroutineContext = context.supervisorContext() + bootstrap.config().childGroup().asCoroutineDispatcher(), + coroutineContext = context.supervisorContext() + Dispatchers.Default, //bootstrap.config().childGroup().asCoroutineDispatcher(), bootstrap = bootstrap, sslContext = sslContext, manageBootstrap = manageEventLoopGroup