Skip to content

Commit

Permalink
migrate nodejs transport
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Nov 16, 2023
1 parent 039cb0c commit ca4659a
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.nodejs.tcp

public class NodejsTcpAddress(
public val hostname: String,
public val port: Int,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.nodejs.tcp

import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

public sealed interface NodejsTcpClientTransport : RSocketClientTransport {
public val address: NodejsTcpAddress

public companion object Factory : RSocketTransportFactory<
NodejsTcpAddress,
NodejsTcpClientTransport,
NodejsTcpClientTransportBuilder>({ NodejsTcpClientTransportBuilderImpl }) {

public operator fun invoke(
context: CoroutineContext,
hostname: String,
port: Int,
block: NodejsTcpClientTransportBuilder.() -> Unit = {},
): NodejsTcpClientTransport = invoke(context, NodejsTcpAddress(hostname, port), block)
}
}

public sealed interface NodejsTcpClientTransportBuilder : RSocketTransportBuilder<NodejsTcpAddress, NodejsTcpClientTransport>

private object NodejsTcpClientTransportBuilderImpl : NodejsTcpClientTransportBuilder {
@RSocketTransportApi
override fun buildTransport(context: CoroutineContext, target: NodejsTcpAddress): NodejsTcpClientTransport =
NodejsTcpClientTransportImpl(
coroutineContext = context.supervisorContext(),
address = target,
)
}

private class NodejsTcpClientTransportImpl(
override val coroutineContext: CoroutineContext,
override val address: NodejsTcpAddress,
) : NodejsTcpClientTransport {

@RSocketTransportApi
override suspend fun createSession(): RSocketTransportSession {
ensureActive()

return NodejsTcpSession(coroutineContext.childContext(), connect(address.port, address.hostname))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.nodejs.tcp

import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

public sealed interface NodejsTcpServerInstance : RSocketServerInstance {
public val address: NodejsTcpAddress
}

public sealed interface NodejsTcpServerTransport : RSocketServerTransport<NodejsTcpServerInstance> {
public val address: NodejsTcpAddress

public companion object Factory : RSocketTransportFactory<
NodejsTcpAddress,
NodejsTcpServerTransport,
NodejsTcpServerTransportBuilder>({ NodejsTcpServerTransportBuilderImpl }) {

public operator fun invoke(
context: CoroutineContext,
hostname: String = "0.0.0.0",
port: Int = 0,
block: NodejsTcpServerTransportBuilder.() -> Unit = {},
): NodejsTcpServerTransport = invoke(context, NodejsTcpAddress(hostname, port), block)
}
}

public sealed interface NodejsTcpServerTransportBuilder : RSocketTransportBuilder<NodejsTcpAddress, NodejsTcpServerTransport>

private object NodejsTcpServerTransportBuilderImpl : NodejsTcpServerTransportBuilder {
@RSocketTransportApi
override fun buildTransport(context: CoroutineContext, target: NodejsTcpAddress): NodejsTcpServerTransport =
NodejsTcpServerTransportImpl(
coroutineContext = context.supervisorContext(),
address = target,
)
}

private class NodejsTcpServerTransportImpl(
override val coroutineContext: CoroutineContext,
override val address: NodejsTcpAddress,
) : NodejsTcpServerTransport {

@RSocketTransportApi
override suspend fun startServer(acceptor: RSocketServerAcceptor): NodejsTcpServerInstance {
ensureActive()

return NodejsTcpServerInstanceImpl(
coroutineContext = coroutineContext.supervisorContext(),
address = address,
acceptor = acceptor,
)
}
}

@RSocketTransportApi
private class NodejsTcpServerInstanceImpl(
override val coroutineContext: CoroutineContext,
override val address: NodejsTcpAddress,
private val acceptor: RSocketServerAcceptor,
) : NodejsTcpServerInstance {
init {
val server = createServer(address.port, address.hostname, {
coroutineContext.job.cancel("Server closed")
}) {
launch {
acceptor.acceptSession(NodejsTcpSession(coroutineContext.childContext(), it))
}
}
launch {
try {
awaitCancellation()
} catch (cause: Throwable) {
suspendCoroutine { cont -> server.close { cont.resume(Unit) } }
throw cause
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.transport.nodejs.tcp

import io.ktor.utils.io.core.*
import io.ktor.utils.io.js.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.khronos.webgl.*
import kotlin.coroutines.*

@RSocketTransportApi
internal class NodejsTcpSession(
override val coroutineContext: CoroutineContext,
private val socket: Socket,
) : RSocketTransportSession.Sequential {

private val sendChannel = channelForCloseable<ByteReadPacket>(8)
private val receiveChannel = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED)

init {
launch {
sendChannel.consumeEach { packet ->
socket.write(Uint8Array(packet.withLength().readArrayBuffer()))
}
}

coroutineContext.job.invokeOnCompletion {
when (it) {
null -> socket.destroy()
else -> socket.destroy(Error(it.message, it.cause))
}
}

val frameAssembler = FrameWithLengthAssembler { receiveChannel.trySend(it) } //TODO
socket.on(
onData = { frameAssembler.write { writeFully(it.buffer) } },
onError = { coroutineContext.job.cancel("Socket error", it) },
onClose = { if (!it) coroutineContext.job.cancel("Socket closed") }
)
}

override suspend fun sendFrame(frame: ByteReadPacket) {
sendChannel.send(frame)
}

override suspend fun receiveFrame(): ByteReadPacket {
return receiveChannel.receive()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,11 @@ class TcpTransportTest : TransportTest() {
server.close()
}
}

class NodejsTcpTransportTest : TransportTest() {
override suspend fun before() {
val port = PortProvider.next()
val server = startServer(NodejsTcpServerTransport(testContext, port = port))
client = connectClient(NodejsTcpClientTransport(testContext, server.address))
}
}

0 comments on commit ca4659a

Please sign in to comment.