Skip to content

Commit

Permalink
somehow working QUIC implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Apr 13, 2024
1 parent a1e408d commit 76c184f
Show file tree
Hide file tree
Showing 13 changed files with 900 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ public final class io/rsocket/kotlin/transport/netty/internal/CoroutinesKt {
public static final fun awaitChannel (Lio/netty/channel/ChannelFuture;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun awaitFuture (Lio/netty/util/concurrent/Future;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun callOnCancellation (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function1;)V
public static final fun toByteBuf (Lio/ktor/utils/io/core/ByteReadPacket;)Lio/netty/buffer/ByteBuf;
public static final fun toByteReadPacket (Lio/netty/buffer/ByteBuf;)Lio/ktor/utils/io/core/ByteReadPacket;
}

44 changes: 44 additions & 0 deletions rsocket-transports/netty-quic/api/rsocket-transport-netty-quic.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransport : io/rsocket/kotlin/transport/RSocketTransport {
public static final field Factory Lio/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransport$Factory;
public abstract fun target (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/RSocketClientTarget;
public abstract fun target (Ljava/net/InetSocketAddress;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
}

public final class io/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
}

public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
public abstract fun channel (Lkotlin/reflect/KClass;)V
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
public abstract fun codec (Lkotlin/jvm/functions/Function1;)V
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
public abstract fun quicBootstrap (Lkotlin/jvm/functions/Function1;)V
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
}

public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerInstance : io/rsocket/kotlin/transport/RSocketServerInstance {
public abstract fun getLocalAddress ()Ljava/net/InetSocketAddress;
}

public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport : io/rsocket/kotlin/transport/RSocketTransport {
public static final field Factory Lio/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport$Factory;
public abstract fun target (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public abstract fun target (Ljava/net/InetSocketAddress;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport;Ljava/lang/String;IILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport;Ljava/net/InetSocketAddress;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
}

public final class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
}

public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
public abstract fun channel (Lkotlin/reflect/KClass;)V
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
public abstract fun codec (Lkotlin/jvm/functions/Function1;)V
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Lio/netty/channel/EventLoopGroup;Z)V
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
}

57 changes: 57 additions & 0 deletions rsocket-transports/netty-quic/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import rsocketbuild.*

plugins {
id("rsocketbuild.multiplatform-library")
}

description = "rsocket-kotlin Netty QUIC client/server transport implementation"

kotlin {
jvmTarget()

sourceSets {
jvmMain.dependencies {
implementation(projects.rsocketTransportNettyInternal)
implementation(projects.rsocketInternalIo)
api(projects.rsocketCore)
api(libs.netty.handler)
api(libs.netty.codec.quic)
}
jvmTest.dependencies {
implementation(projects.rsocketTransportTests)
implementation(libs.bouncycastle)
implementation(libs.netty.codec.quic.map {
val javaOsName = System.getProperty("os.name")
val javaOsArch = System.getProperty("os.arch")
val suffix = when {
javaOsName.contains("mac", ignoreCase = true) -> "osx"
javaOsName.contains("linux", ignoreCase = true) -> "linux"
javaOsName.contains("windows", ignoreCase = true) -> "windows"
else -> error("Unknown os.name: $javaOsName")
} + "-" + when (javaOsArch) {
"x86_64", "amd64" -> "x86_64"
"arm64", "aarch64" -> "aarch_64"
else -> error("Unknown os.arch: $javaOsArch")
}
"$it:$suffix"
})
//implementation("ch.qos.logback:logback-classic:1.2.11")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.netty.quic

import io.netty.bootstrap.*
import io.netty.channel.*
import io.netty.channel.ChannelFactory
import io.netty.channel.nio.*
import io.netty.channel.socket.*
import io.netty.channel.socket.nio.*
import io.netty.incubator.codec.quic.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.netty.internal.*
import kotlinx.coroutines.*
import java.net.*
import kotlin.coroutines.*
import kotlin.reflect.*

public sealed interface NettyQuicClientTransport : RSocketTransport {
public fun target(remoteAddress: InetSocketAddress): RSocketClientTarget
public fun target(host: String, port: Int): RSocketClientTarget

public companion object Factory :
RSocketTransportFactory<NettyQuicClientTransport, NettyQuicClientTransportBuilder>(::NettyQuicClientTransportBuilderImpl)
}

public sealed interface NettyQuicClientTransportBuilder : RSocketTransportBuilder<NettyQuicClientTransport> {
public fun channel(cls: KClass<out DatagramChannel>)
public fun channelFactory(factory: ChannelFactory<out DatagramChannel>)
public fun eventLoopGroup(group: EventLoopGroup, manage: Boolean)

public fun bootstrap(block: Bootstrap.() -> Unit)
public fun codec(block: QuicClientCodecBuilder.() -> Unit)
public fun ssl(block: QuicSslContextBuilder.() -> Unit)
public fun quicBootstrap(block: QuicChannelBootstrap.() -> Unit)
}

private class NettyQuicClientTransportBuilderImpl : NettyQuicClientTransportBuilder {
private var channelFactory: ChannelFactory<out DatagramChannel>? = null
private var eventLoopGroup: EventLoopGroup? = null
private var manageEventLoopGroup: Boolean = false
private var bootstrap: (Bootstrap.() -> Unit)? = null
private var codec: (QuicClientCodecBuilder.() -> Unit)? = null
private var ssl: (QuicSslContextBuilder.() -> Unit)? = null
private var quicBootstrap: (QuicChannelBootstrap.() -> Unit)? = null

override fun channel(cls: KClass<out DatagramChannel>) {
this.channelFactory = ReflectiveChannelFactory(cls.java)
}

override fun channelFactory(factory: ChannelFactory<out DatagramChannel>) {
this.channelFactory = factory
}

override fun eventLoopGroup(group: EventLoopGroup, manage: Boolean) {
this.eventLoopGroup = group
this.manageEventLoopGroup = manage
}

override fun bootstrap(block: Bootstrap.() -> Unit) {
bootstrap = block
}

override fun codec(block: QuicClientCodecBuilder.() -> Unit) {
codec = block
}

override fun ssl(block: QuicSslContextBuilder.() -> Unit) {
ssl = block
}

override fun quicBootstrap(block: QuicChannelBootstrap.() -> Unit) {
quicBootstrap = block
}

@RSocketTransportApi
override fun buildTransport(context: CoroutineContext): NettyQuicClientTransport {
val codecHandler = QuicClientCodecBuilder().apply {
// by default, we allow Int.MAX_VALUE of active stream
initialMaxStreamsBidirectional(Int.MAX_VALUE.toLong())
codec?.invoke(this)
ssl?.let {
sslContext(QuicSslContextBuilder.forClient().apply(it).build())
}
}.build()
val bootstrap = Bootstrap().apply {
bootstrap?.invoke(this)
localAddress(0)
handler(codecHandler)
channelFactory(channelFactory ?: ReflectiveChannelFactory(NioDatagramChannel::class.java))
group(eventLoopGroup ?: NioEventLoopGroup())
}

return NettyQuicClientTransportImpl(
coroutineContext = context.supervisorContext() + bootstrap.config().group().asCoroutineDispatcher(),
bootstrap = bootstrap,
quicBootstrap = quicBootstrap,
manageBootstrap = manageEventLoopGroup
)
}
}

private class NettyQuicClientTransportImpl(
override val coroutineContext: CoroutineContext,
private val bootstrap: Bootstrap,
private val quicBootstrap: (QuicChannelBootstrap.() -> Unit)?,
manageBootstrap: Boolean,
) : NettyQuicClientTransport {
init {
if (manageBootstrap) callOnCancellation {
bootstrap.config().group().shutdownGracefully().awaitFuture()
}
}

override fun target(remoteAddress: InetSocketAddress): NettyQuicClientTargetImpl = NettyQuicClientTargetImpl(
coroutineContext = coroutineContext.supervisorContext(),
bootstrap = bootstrap,
quicBootstrap = quicBootstrap,
remoteAddress = remoteAddress
)

override fun target(host: String, port: Int): RSocketClientTarget = target(InetSocketAddress(host, port))
}

private class NettyQuicClientTargetImpl(
override val coroutineContext: CoroutineContext,
private val bootstrap: Bootstrap,
private val quicBootstrap: (QuicChannelBootstrap.() -> Unit)?,
private val remoteAddress: SocketAddress,
) : RSocketClientTarget {
@RSocketTransportApi
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
QuicChannel.newBootstrap(bootstrap.bind().awaitChannel()).also { quicBootstrap?.invoke(it) }
.handler(
NettyQuicConnectionInitializer(handler, coroutineContext, isClient = true)
).remoteAddress(remoteAddress).connect().awaitFuture()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.netty.quic

import io.netty.channel.*
import io.netty.channel.socket.*
import io.netty.incubator.codec.quic.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.netty.internal.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.atomic.*
import kotlin.coroutines.*

@RSocketTransportApi
internal class NettyQuicConnectionHandler(
private val channel: QuicChannel,
private val handler: RSocketConnectionHandler,
scope: CoroutineScope,
private val isClient: Boolean,
) : ChannelInboundHandlerAdapter() {
private val inbound = Channel<RSocketMultiplexedConnection.Stream>(Channel.UNLIMITED)

private val connectionJob = Job(scope.coroutineContext.job)
private val streamsContext = scope.coroutineContext + SupervisorJob(connectionJob)

private val handlerJob = scope.launch(connectionJob, start = CoroutineStart.LAZY) {
try {
handler.handleConnection(NettyQuicConnection(channel, inbound, streamsContext, isClient))
} catch (cause: Throwable) {
//println(cause)
throw cause
} finally {
inbound.cancel()
withContext(NonCancellable) {
streamsContext.job.cancelAndJoin()
channel.close().awaitFuture()
}
}
}

override fun channelActive(ctx: ChannelHandlerContext) {
handlerJob.start()
connectionJob.complete()
ctx.pipeline().addLast("rsocket-inbound", NettyQuicConnectionInboundHandler(inbound, streamsContext, isClient))

ctx.fireChannelActive()
}

override fun channelInactive(ctx: ChannelHandlerContext) {
handlerJob.cancel("Channel is not active")

ctx.fireChannelInactive()
}

@Suppress("OVERRIDE_DEPRECATION")
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable?) {
handlerJob.cancel("exceptionCaught", cause)
}
}

// TODO: implement support for isAutoRead=false to support `inbound` backpressure
@RSocketTransportApi
private class NettyQuicConnectionInboundHandler(
private val inbound: SendChannel<RSocketMultiplexedConnection.Stream>,
private val streamsContext: CoroutineContext,
private val isClient: Boolean,
) : ChannelInboundHandlerAdapter() {
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
msg as QuicStreamChannel
// TODO:
// quic streams could be received unordered, so f.e we could receive first stream with id 4 and then with id 0

//println("$isClient: ${msg.streamId()}")

val state = NettyQuicStreamState(false)
if (inbound.trySend(state.wrapStream(msg)).isSuccess) {
msg.pipeline().addLast(NettyQuicStreamInitializer(streamsContext, state, isClient))
}
ctx.fireChannelRead(msg)
}

override fun userEventTriggered(ctx: ChannelHandlerContext?, evt: Any?) {
if (evt is ChannelInputShutdownEvent) {
inbound.close()
}
super.userEventTriggered(ctx, evt)
}
}

@RSocketTransportApi
private class NettyQuicConnection(
private val channel: QuicChannel,
private val inbound: ReceiveChannel<RSocketMultiplexedConnection.Stream>,
private val streamsContext: CoroutineContext,
private val isClient: Boolean,
) : RSocketMultiplexedConnection {
private val first = AtomicBoolean(true)
override suspend fun createStream(): RSocketMultiplexedConnection.Stream {
val state = NettyQuicStreamState(first.getAndSet(false))
val stream = try {
channel.createStream(
QuicStreamType.BIDIRECTIONAL,
NettyQuicStreamInitializer(streamsContext, state, isClient)
).awaitFuture()
} catch (cause: Throwable) {
state.closeMarker.complete()
throw cause
}

return state.wrapStream(stream)
}

override suspend fun acceptStream(): RSocketMultiplexedConnection.Stream? {
return inbound.receiveCatching().getOrNull()
}
}
Loading

0 comments on commit 76c184f

Please sign in to comment.