From 0a464d34c5b7369d141a1b1990b030f426bb9e03 Mon Sep 17 00:00:00 2001 From: Muzzammil Shahid Date: Fri, 18 Oct 2024 15:46:44 +0500 Subject: [PATCH 1/3] Add BaseSession --- build.gradle.kts | 2 + src/main/kotlin/io/xconn/xconn/Types.kt | 94 +++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 src/main/kotlin/io/xconn/xconn/Types.kt diff --git a/build.gradle.kts b/build.gradle.kts index 1fc8d9c..99de213 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -11,6 +11,8 @@ repositories { dependencies { implementation("io.xconn:wampproto:0.1.0") + implementation("io.ktor:ktor-client-websockets:2.3.12") + implementation("io.ktor:ktor-client-cio:2.3.12") testImplementation(kotlin("test")) } diff --git a/src/main/kotlin/io/xconn/xconn/Types.kt b/src/main/kotlin/io/xconn/xconn/Types.kt new file mode 100644 index 0000000..61aad52 --- /dev/null +++ b/src/main/kotlin/io/xconn/xconn/Types.kt @@ -0,0 +1,94 @@ +package io.xconn.xconn + +import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession +import io.ktor.websocket.* +import io.xconn.wampproto.SessionDetails +import io.xconn.wampproto.messages.Message +import io.xconn.wampproto.serializers.Serializer + +interface IBaseSession { + fun id(): Long + + fun realm(): String + + fun authid(): String + + fun authrole(): String + + fun serializer(): Serializer + + suspend fun send(data: Any) + + suspend fun receive(): Any + + suspend fun sendMessage(msg: Message) + + suspend fun receiveMessage(): Message + + suspend fun close() +} + +class BaseSession( + private val webSocketSession: DefaultClientWebSocketSession, + private val sessionDetails: SessionDetails, + private val serializer: Serializer, +) : IBaseSession { + override fun id(): Long { + return sessionDetails.sessionID + } + + override fun realm(): String { + return sessionDetails.realm + } + + override fun authid(): String { + return sessionDetails.authid + } + + override fun authrole(): String { + return sessionDetails.authrole + } + + override fun serializer(): Serializer { + return serializer + } + + override suspend fun send(data: Any) { + webSocketSession.sendFrame(data) + } + + override suspend fun sendMessage(msg: Message) { + val serializedData = serializer.serialize(msg) + send(serializedData) + } + + override suspend fun receive(): Any { + val frame = webSocketSession.incoming.receive() + return receiveFrame(frame) + } + + override suspend fun receiveMessage(): Message { + return serializer.deserialize(receive()) + } + + override suspend fun close() { + webSocketSession.close() + } +} + + +internal suspend fun DefaultWebSocketSession.sendFrame(data: Any) { + when (data) { + is String -> outgoing.send(Frame.Text(data)) + is ByteArray -> outgoing.send(Frame.Binary(true, data)) + else -> throw IllegalArgumentException("Unsupported frame type") + } +} + +internal fun receiveFrame(frame: Frame): Any { + return when (frame) { + is Frame.Text -> frame.readText() + is Frame.Binary -> frame.readBytes() + else -> throw Exception("Unsupported frame type") + } +} From 7caf376cf1a6f273d51242c022118a1a723e8178 Mon Sep 17 00:00:00 2001 From: Muzzammil Shahid Date: Fri, 18 Oct 2024 15:47:25 +0500 Subject: [PATCH 2/3] Add WAMPSessionJoiner --- src/main/kotlin/io/xconn/xconn/Types.kt | 19 +---- .../io/xconn/xconn/WAMPSessionJoiner.kt | 79 +++++++++++++++++++ 2 files changed, 80 insertions(+), 18 deletions(-) create mode 100644 src/main/kotlin/io/xconn/xconn/WAMPSessionJoiner.kt diff --git a/src/main/kotlin/io/xconn/xconn/Types.kt b/src/main/kotlin/io/xconn/xconn/Types.kt index 61aad52..771b862 100644 --- a/src/main/kotlin/io/xconn/xconn/Types.kt +++ b/src/main/kotlin/io/xconn/xconn/Types.kt @@ -1,7 +1,7 @@ package io.xconn.xconn import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession -import io.ktor.websocket.* +import io.ktor.websocket.close import io.xconn.wampproto.SessionDetails import io.xconn.wampproto.messages.Message import io.xconn.wampproto.serializers.Serializer @@ -75,20 +75,3 @@ class BaseSession( webSocketSession.close() } } - - -internal suspend fun DefaultWebSocketSession.sendFrame(data: Any) { - when (data) { - is String -> outgoing.send(Frame.Text(data)) - is ByteArray -> outgoing.send(Frame.Binary(true, data)) - else -> throw IllegalArgumentException("Unsupported frame type") - } -} - -internal fun receiveFrame(frame: Frame): Any { - return when (frame) { - is Frame.Text -> frame.readText() - is Frame.Binary -> frame.readBytes() - else -> throw Exception("Unsupported frame type") - } -} diff --git a/src/main/kotlin/io/xconn/xconn/WAMPSessionJoiner.kt b/src/main/kotlin/io/xconn/xconn/WAMPSessionJoiner.kt new file mode 100644 index 0000000..bc9763d --- /dev/null +++ b/src/main/kotlin/io/xconn/xconn/WAMPSessionJoiner.kt @@ -0,0 +1,79 @@ +package io.xconn.xconn + +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.plugins.websocket.WebSockets +import io.ktor.client.plugins.websocket.webSocket +import io.ktor.client.request.header +import io.ktor.http.HttpMethod +import io.ktor.websocket.DefaultWebSocketSession +import io.ktor.websocket.Frame +import io.ktor.websocket.readBytes +import io.ktor.websocket.readText +import io.xconn.wampproto.Joiner +import io.xconn.wampproto.auth.AnonymousAuthenticator +import io.xconn.wampproto.auth.ClientAuthenticator +import io.xconn.wampproto.serializers.JSONSerializer +import io.xconn.wampproto.serializers.Serializer +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.launch + +class WAMPSessionJoiner( + private val authenticator: ClientAuthenticator = AnonymousAuthenticator(""), + private val serializer: Serializer = JSONSerializer(), +) { + private val client = HttpClient(CIO) { install(WebSockets) } + + suspend fun join(host: String, port: Int, realm: String): BaseSession { + val welcomeCompleter = CompletableDeferred() + val subProtocol = getSubProtocol(serializer) + val joiner = Joiner(realm, serializer, authenticator) + + client.webSocket( + method = HttpMethod.Get, + host = host, + port = port, + request = { header("Sec-WebSocket-Protocol", subProtocol) }, + ) { + // Send initial Hello message + sendFrame(joiner.sendHello()) + + // Handle incoming messages + launch { + for (frame in incoming) { + try { + val receivedData = receiveFrame(frame) + val toSend = joiner.receive(receivedData) + + if (toSend == null) { + // Complete handshake and session creation + welcomeCompleter.complete(BaseSession(this@webSocket, joiner.getSessionDetails(), serializer)) + } else { + sendFrame(toSend) + } + } catch (error: Exception) { + welcomeCompleter.completeExceptionally(error) + } + } + } + } + + return welcomeCompleter.await() + } +} + +internal suspend fun DefaultWebSocketSession.sendFrame(data: Any) { + when (data) { + is String -> outgoing.send(Frame.Text(data)) + is ByteArray -> outgoing.send(Frame.Binary(true, data)) + else -> throw IllegalArgumentException("Unsupported frame type") + } +} + +internal fun receiveFrame(frame: Frame): Any { + return when (frame) { + is Frame.Text -> frame.readText() + is Frame.Binary -> frame.readBytes() + else -> throw Exception("Unsupported frame type") + } +} From b7314d3a187d56a5407b0a5acbf22bc2312ed5dc Mon Sep 17 00:00:00 2001 From: Mahad Date: Tue, 22 Oct 2024 15:04:40 +0500 Subject: [PATCH 3/3] Don't close session when join exits --- .../io/xconn/xconn/WAMPSessionJoiner.kt | 61 +++++++++++-------- 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/src/main/kotlin/io/xconn/xconn/WAMPSessionJoiner.kt b/src/main/kotlin/io/xconn/xconn/WAMPSessionJoiner.kt index bc9763d..9f22dbb 100644 --- a/src/main/kotlin/io/xconn/xconn/WAMPSessionJoiner.kt +++ b/src/main/kotlin/io/xconn/xconn/WAMPSessionJoiner.kt @@ -2,8 +2,9 @@ package io.xconn.xconn import io.ktor.client.HttpClient import io.ktor.client.engine.cio.CIO +import io.ktor.client.plugins.defaultRequest import io.ktor.client.plugins.websocket.WebSockets -import io.ktor.client.plugins.websocket.webSocket +import io.ktor.client.plugins.websocket.webSocketSession import io.ktor.client.request.header import io.ktor.http.HttpMethod import io.ktor.websocket.DefaultWebSocketSession @@ -16,46 +17,54 @@ import io.xconn.wampproto.auth.ClientAuthenticator import io.xconn.wampproto.serializers.JSONSerializer import io.xconn.wampproto.serializers.Serializer import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.launch +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope class WAMPSessionJoiner( private val authenticator: ClientAuthenticator = AnonymousAuthenticator(""), private val serializer: Serializer = JSONSerializer(), ) { - private val client = HttpClient(CIO) { install(WebSockets) } + private val subProtocol = getSubProtocol(serializer) + private val client = + HttpClient(CIO) { + install(WebSockets) + defaultRequest { + header("Sec-WebSocket-Protocol", subProtocol) + } + } suspend fun join(host: String, port: Int, realm: String): BaseSession { val welcomeCompleter = CompletableDeferred() - val subProtocol = getSubProtocol(serializer) val joiner = Joiner(realm, serializer, authenticator) - client.webSocket( - method = HttpMethod.Get, - host = host, - port = port, - request = { header("Sec-WebSocket-Protocol", subProtocol) }, - ) { - // Send initial Hello message - sendFrame(joiner.sendHello()) + val session = client.webSocketSession(HttpMethod.Get, host, port) + + // Send initial Hello message + session.sendFrame(joiner.sendHello()) - // Handle incoming messages - launch { - for (frame in incoming) { - try { - val receivedData = receiveFrame(frame) - val toSend = joiner.receive(receivedData) + coroutineScope { + val handshakeJob = + async { + for (frame in session.incoming) { + try { + val receivedData = receiveFrame(frame) + val toSend = joiner.receive(receivedData) - if (toSend == null) { - // Complete handshake and session creation - welcomeCompleter.complete(BaseSession(this@webSocket, joiner.getSessionDetails(), serializer)) - } else { - sendFrame(toSend) + if (toSend == null) { + // Complete handshake and session creation + welcomeCompleter.complete(BaseSession(session, joiner.getSessionDetails(), serializer)) + break + } else { + session.sendFrame(toSend) + } + } catch (error: Exception) { + welcomeCompleter.completeExceptionally(error) + break } - } catch (error: Exception) { - welcomeCompleter.completeExceptionally(error) } } - } + + handshakeJob.await() } return welcomeCompleter.await()