diff --git a/build.gradle.kts b/build.gradle.kts index 1fc8d9c..99de213 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -11,6 +11,8 @@ repositories { dependencies { implementation("io.xconn:wampproto:0.1.0") + implementation("io.ktor:ktor-client-websockets:2.3.12") + implementation("io.ktor:ktor-client-cio:2.3.12") testImplementation(kotlin("test")) } diff --git a/src/main/kotlin/io/xconn/xconn/Types.kt b/src/main/kotlin/io/xconn/xconn/Types.kt new file mode 100644 index 0000000..771b862 --- /dev/null +++ b/src/main/kotlin/io/xconn/xconn/Types.kt @@ -0,0 +1,77 @@ +package io.xconn.xconn + +import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession +import io.ktor.websocket.close +import io.xconn.wampproto.SessionDetails +import io.xconn.wampproto.messages.Message +import io.xconn.wampproto.serializers.Serializer + +interface IBaseSession { + fun id(): Long + + fun realm(): String + + fun authid(): String + + fun authrole(): String + + fun serializer(): Serializer + + suspend fun send(data: Any) + + suspend fun receive(): Any + + suspend fun sendMessage(msg: Message) + + suspend fun receiveMessage(): Message + + suspend fun close() +} + +class BaseSession( + private val webSocketSession: DefaultClientWebSocketSession, + private val sessionDetails: SessionDetails, + private val serializer: Serializer, +) : IBaseSession { + override fun id(): Long { + return sessionDetails.sessionID + } + + override fun realm(): String { + return sessionDetails.realm + } + + override fun authid(): String { + return sessionDetails.authid + } + + override fun authrole(): String { + return sessionDetails.authrole + } + + override fun serializer(): Serializer { + return serializer + } + + override suspend fun send(data: Any) { + webSocketSession.sendFrame(data) + } + + override suspend fun sendMessage(msg: Message) { + val serializedData = serializer.serialize(msg) + send(serializedData) + } + + override suspend fun receive(): Any { + val frame = webSocketSession.incoming.receive() + return receiveFrame(frame) + } + + override suspend fun receiveMessage(): Message { + return serializer.deserialize(receive()) + } + + override suspend fun close() { + webSocketSession.close() + } +} diff --git a/src/main/kotlin/io/xconn/xconn/WAMPSessionJoiner.kt b/src/main/kotlin/io/xconn/xconn/WAMPSessionJoiner.kt new file mode 100644 index 0000000..9f22dbb --- /dev/null +++ b/src/main/kotlin/io/xconn/xconn/WAMPSessionJoiner.kt @@ -0,0 +1,88 @@ +package io.xconn.xconn + +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.plugins.defaultRequest +import io.ktor.client.plugins.websocket.WebSockets +import io.ktor.client.plugins.websocket.webSocketSession +import io.ktor.client.request.header +import io.ktor.http.HttpMethod +import io.ktor.websocket.DefaultWebSocketSession +import io.ktor.websocket.Frame +import io.ktor.websocket.readBytes +import io.ktor.websocket.readText +import io.xconn.wampproto.Joiner +import io.xconn.wampproto.auth.AnonymousAuthenticator +import io.xconn.wampproto.auth.ClientAuthenticator +import io.xconn.wampproto.serializers.JSONSerializer +import io.xconn.wampproto.serializers.Serializer +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope + +class WAMPSessionJoiner( + private val authenticator: ClientAuthenticator = AnonymousAuthenticator(""), + private val serializer: Serializer = JSONSerializer(), +) { + private val subProtocol = getSubProtocol(serializer) + private val client = + HttpClient(CIO) { + install(WebSockets) + defaultRequest { + header("Sec-WebSocket-Protocol", subProtocol) + } + } + + suspend fun join(host: String, port: Int, realm: String): BaseSession { + val welcomeCompleter = CompletableDeferred() + val joiner = Joiner(realm, serializer, authenticator) + + val session = client.webSocketSession(HttpMethod.Get, host, port) + + // Send initial Hello message + session.sendFrame(joiner.sendHello()) + + coroutineScope { + val handshakeJob = + async { + for (frame in session.incoming) { + try { + val receivedData = receiveFrame(frame) + val toSend = joiner.receive(receivedData) + + if (toSend == null) { + // Complete handshake and session creation + welcomeCompleter.complete(BaseSession(session, joiner.getSessionDetails(), serializer)) + break + } else { + session.sendFrame(toSend) + } + } catch (error: Exception) { + welcomeCompleter.completeExceptionally(error) + break + } + } + } + + handshakeJob.await() + } + + return welcomeCompleter.await() + } +} + +internal suspend fun DefaultWebSocketSession.sendFrame(data: Any) { + when (data) { + is String -> outgoing.send(Frame.Text(data)) + is ByteArray -> outgoing.send(Frame.Binary(true, data)) + else -> throw IllegalArgumentException("Unsupported frame type") + } +} + +internal fun receiveFrame(frame: Frame): Any { + return when (frame) { + is Frame.Text -> frame.readText() + is Frame.Binary -> frame.readBytes() + else -> throw Exception("Unsupported frame type") + } +}