diff --git a/core/runtime/src/commonMain/kotlin/com/kitakkun/backintime/core/runtime/BackInTimeDebugServiceImpl.kt b/core/runtime/src/commonMain/kotlin/com/kitakkun/backintime/core/runtime/BackInTimeDebugServiceImpl.kt index 1ce4a095..300ec7c2 100644 --- a/core/runtime/src/commonMain/kotlin/com/kitakkun/backintime/core/runtime/BackInTimeDebugServiceImpl.kt +++ b/core/runtime/src/commonMain/kotlin/com/kitakkun/backintime/core/runtime/BackInTimeDebugServiceImpl.kt @@ -4,7 +4,12 @@ import com.kitakkun.backintime.core.runtime.connector.BackInTimeWebSocketConnect import com.kitakkun.backintime.core.runtime.event.BackInTimeDebuggableInstanceEvent import com.kitakkun.backintime.core.websocket.event.BackInTimeDebugServiceEvent import com.kitakkun.backintime.core.websocket.event.BackInTimeDebuggerEvent -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.datetime.Clock import kotlinx.serialization.SerializationException diff --git a/core/runtime/src/commonMain/kotlin/com/kitakkun/backintime/core/runtime/connector/BackInTimeKtorWebSocketConnector.kt b/core/runtime/src/commonMain/kotlin/com/kitakkun/backintime/core/runtime/connector/BackInTimeKtorWebSocketConnector.kt index a782616b..ba5cbc05 100644 --- a/core/runtime/src/commonMain/kotlin/com/kitakkun/backintime/core/runtime/connector/BackInTimeKtorWebSocketConnector.kt +++ b/core/runtime/src/commonMain/kotlin/com/kitakkun/backintime/core/runtime/connector/BackInTimeKtorWebSocketConnector.kt @@ -1,12 +1,9 @@ package com.kitakkun.backintime.core.runtime.connector import com.kitakkun.backintime.core.websocket.client.BackInTimeWebSocketClient -import com.kitakkun.backintime.core.websocket.client.BackInTimeWebSocketClientEvent import com.kitakkun.backintime.core.websocket.event.BackInTimeDebugServiceEvent import com.kitakkun.backintime.core.websocket.event.BackInTimeDebuggerEvent import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.filterIsInstance -import kotlinx.coroutines.flow.flow /** * This class is responsible for sending and receiving events @@ -22,12 +19,7 @@ class BackInTimeKtorWebSocketConnector( override suspend fun connect(): Flow { client.openSession() - - return flow { - client.clientEventFlow.filterIsInstance().collect { - emit(it.debuggerEvent) - } - } + return client.receivedDebuggerEventFlow } override suspend fun sendEventToDebugger(event: BackInTimeDebugServiceEvent) { diff --git a/core/websocket/client/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/client/BackInTimeWebSocketClient.kt b/core/websocket/client/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/client/BackInTimeWebSocketClient.kt index 3cab19cd..459b5897 100644 --- a/core/websocket/client/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/client/BackInTimeWebSocketClient.kt +++ b/core/websocket/client/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/client/BackInTimeWebSocketClient.kt @@ -2,6 +2,7 @@ package com.kitakkun.backintime.core.websocket.client import com.kitakkun.backintime.core.websocket.event.BackInTimeDebugServiceEvent import com.kitakkun.backintime.core.websocket.event.BackInTimeDebuggerEvent +import com.kitakkun.backintime.core.websocket.event.BackInTimeSessionNegotiationEvent import io.ktor.client.HttpClient import io.ktor.client.engine.HttpClientEngine import io.ktor.client.engine.cio.CIO @@ -19,12 +20,8 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.serialization.json.Json - -sealed interface BackInTimeWebSocketClientEvent { - data class ReceiveDebuggerEvent(val debuggerEvent: BackInTimeDebuggerEvent) : BackInTimeWebSocketClientEvent - data object CloseSuccessfully : BackInTimeWebSocketClientEvent - data class CloseWithError(val error: Throwable) : BackInTimeWebSocketClientEvent -} +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine /** * This class is responsible for connecting to the back-in-time debugger server @@ -35,10 +32,12 @@ class BackInTimeWebSocketClient( engine: HttpClientEngine = CIO.create(), client: HttpClient? = null, ) { + private var sessionId: String? = null + private var session: DefaultClientWebSocketSession? = null - private val mutableClientEventFlow = MutableSharedFlow() - val clientEventFlow = mutableClientEventFlow.asSharedFlow() + private val mutableReceivedDebuggerEventFlow = MutableSharedFlow() + val receivedDebuggerEventFlow = mutableReceivedDebuggerEventFlow.asSharedFlow() private val eventDispatchQueueMutex = Mutex() private val eventDispatchQueue = mutableListOf() @@ -50,40 +49,50 @@ class BackInTimeWebSocketClient( } } - private suspend fun DefaultClientWebSocketSession.handleSession() { - val receiveJob = launch { - while (true) { - val debuggerEvent = receiveDeserialized() - mutableClientEventFlow.emit(BackInTimeWebSocketClientEvent.ReceiveDebuggerEvent(debuggerEvent)) - } - } + private suspend fun DefaultClientWebSocketSession.setupSessionHandling() { + suspendCoroutine { + launch { + // sessionId negotiation + sendSerialized(BackInTimeSessionNegotiationEvent.Request(sessionId)) + sessionId = receiveDeserialized().sessionId + + clientLog("sessionId: $sessionId") + + val sendJob = launch { + while (true) { + delay(500) + eventDispatchQueueMutex.withLock { + eventDispatchQueue.forEach { sendSerialized(it) } + eventDispatchQueue.clear() + } + } + } - val sendJob = launch { - while (true) { - delay(500) - eventDispatchQueueMutex.withLock { - eventDispatchQueue.forEach { sendSerialized(it) } - eventDispatchQueue.clear() + val receiveJob = launch { + while (true) { + val debuggerEvent = receiveDeserialized() + mutableReceivedDebuggerEventFlow.emit(debuggerEvent) + } } - } - } - closeReason.invokeOnCompletion { error -> - session = null + closeReason.invokeOnCompletion { error -> + if (error == null) { + clientLog("session closed successfully") + } else { + clientLog("session closed with error: $error") + } - receiveJob.cancel() - sendJob.cancel() + session = null - val event = error?.let { - BackInTimeWebSocketClientEvent.CloseWithError(it) - } ?: BackInTimeWebSocketClientEvent.CloseSuccessfully + sendJob.cancel() + receiveJob.cancel() + } - launch { - mutableClientEventFlow.emit(event) + println("client is ready") + it.resume(Unit) // setup completed: meaning that client is ready. + closeReason.await() } } - - closeReason.await() } suspend fun awaitClose() { @@ -95,10 +104,8 @@ class BackInTimeWebSocketClient( host = host, port = port, path = "/backintime", - ) - - session?.launch { - session?.handleSession() + ).also { + it.setupSessionHandling() } } @@ -110,4 +117,8 @@ class BackInTimeWebSocketClient( session?.close() session = null } + + private fun clientLog(message: String) { + println("[${this::class.simpleName}] $message") + } } diff --git a/core/websocket/client/src/commonTest/kotlin/com/kitakkun/backintime/core/websocket/client/BackInTimeWebSocketClientTest.kt b/core/websocket/client/src/commonTest/kotlin/com/kitakkun/backintime/core/websocket/client/BackInTimeWebSocketClientTest.kt index 9db04fdb..bd7642d2 100644 --- a/core/websocket/client/src/commonTest/kotlin/com/kitakkun/backintime/core/websocket/client/BackInTimeWebSocketClientTest.kt +++ b/core/websocket/client/src/commonTest/kotlin/com/kitakkun/backintime/core/websocket/client/BackInTimeWebSocketClientTest.kt @@ -2,6 +2,7 @@ package com.kitakkun.backintime.core.websocket.client import com.kitakkun.backintime.core.websocket.event.BackInTimeDebugServiceEvent import com.kitakkun.backintime.core.websocket.event.BackInTimeDebuggerEvent +import com.kitakkun.backintime.core.websocket.event.BackInTimeSessionNegotiationEvent import io.ktor.serialization.kotlinx.KotlinxWebsocketSerializationConverter import io.ktor.server.application.install import io.ktor.server.engine.connector @@ -13,8 +14,6 @@ import io.ktor.server.websocket.receiveDeserialized import io.ktor.server.websocket.sendSerialized import io.ktor.server.websocket.webSocket import io.ktor.websocket.close -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest @@ -22,6 +21,8 @@ import kotlinx.serialization.json.Json import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.uuid.ExperimentalUuidApi +import kotlin.uuid.Uuid // FIXME: This test fails for native targets class BackInTimeWebSocketClientTest { @@ -30,6 +31,7 @@ class BackInTimeWebSocketClientTest { private const val TEST_PORT = 50026 } + @OptIn(ExperimentalUuidApi::class) private fun ApplicationTestBuilder.configureServer( host: String, port: Int, @@ -49,7 +51,32 @@ class BackInTimeWebSocketClientTest { routing { webSocket( path = "/backintime", - handler = serverSession, + handler = { + println("New websocket session established!") + println("waiting sessionId negotiation request from the client...") + + val requestedSessionId = receiveDeserialized().sessionId + println("starting sessionId negotiation...") + + if (requestedSessionId == null) { + println("requested sessionId is null. generating new sessionId...") + + val sessionId = Uuid.random().toString() + println("generated new sessionId: $sessionId") + + sendSerialized(BackInTimeSessionNegotiationEvent.Accept(sessionId)) + } else { + sendSerialized(BackInTimeSessionNegotiationEvent.Accept(requestedSessionId)) + } + + println("sessionId negotiation completed!") + println("start server session...") + + serverSession() + + println("keeping session active...") + this.closeReason.await() + } ) } } @@ -64,7 +91,6 @@ class BackInTimeWebSocketClientTest { assertFailsWith(Throwable::class) { client.openSession() - client.awaitClose() } } @@ -73,7 +99,7 @@ class BackInTimeWebSocketClientTest { configureServer( host = TEST_HOST, port = TEST_PORT, - serverSession = { /* Do nothing */ }, + serverSession = { close() }, ) val client = BackInTimeWebSocketClient( @@ -83,7 +109,7 @@ class BackInTimeWebSocketClientTest { ) client.openSession() - client.awaitClose() + client.close() } @Test @@ -94,7 +120,7 @@ class BackInTimeWebSocketClientTest { host = TEST_HOST, port = TEST_PORT, serverSession = { - serverReceivedEvent = receiveDeserialized() + serverReceivedEvent = receiveDeserialized() close() }, ) @@ -115,14 +141,12 @@ class BackInTimeWebSocketClientTest { @Test fun `test success to receive event`() = testApplication { + var clientReceivedEvent: BackInTimeDebuggerEvent? = null + configureServer( host = TEST_HOST, port = TEST_PORT, - serverSession = { - sendSerialized(BackInTimeDebuggerEvent.Ping) - delay(100) // need this to pass the test - close() - }, + serverSession = { sendSerialized(BackInTimeDebuggerEvent.Ping) }, ) val client = BackInTimeWebSocketClient( @@ -133,13 +157,14 @@ class BackInTimeWebSocketClientTest { runTest { launch { - assertEquals( - expected = BackInTimeDebuggerEvent.Ping, - actual = client.clientEventFlow.filterIsInstance().first().debuggerEvent - ) + clientReceivedEvent = client.receivedDebuggerEventFlow.first() + client.close() } client.openSession() + client.awaitClose() + + assertEquals(expected = BackInTimeDebuggerEvent.Ping, actual = clientReceivedEvent) } } } \ No newline at end of file diff --git a/core/websocket/event/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/event/BackInTimeSessionNegotiationEvent.kt b/core/websocket/event/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/event/BackInTimeSessionNegotiationEvent.kt new file mode 100644 index 00000000..babc0d07 --- /dev/null +++ b/core/websocket/event/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/event/BackInTimeSessionNegotiationEvent.kt @@ -0,0 +1,11 @@ +package com.kitakkun.backintime.core.websocket.event + +import kotlinx.serialization.Serializable + +sealed interface BackInTimeSessionNegotiationEvent { + @Serializable + data class Request(val sessionId: String?) : BackInTimeSessionNegotiationEvent + + @Serializable + data class Accept(val sessionId: String) : BackInTimeSessionNegotiationEvent +} diff --git a/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/BackInTimeWebSocketServer.kt b/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/BackInTimeWebSocketServer.kt index 8e5bcbb6..b119f048 100644 --- a/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/BackInTimeWebSocketServer.kt +++ b/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/BackInTimeWebSocketServer.kt @@ -1,57 +1,61 @@ package com.kitakkun.backintime.core.websocket.server -import com.benasher44.uuid.uuid4 import com.kitakkun.backintime.core.websocket.event.BackInTimeDebugServiceEvent import com.kitakkun.backintime.core.websocket.event.BackInTimeDebuggerEvent +import com.kitakkun.backintime.core.websocket.event.BackInTimeSessionNegotiationEvent import io.ktor.serialization.kotlinx.KotlinxWebsocketSerializationConverter import io.ktor.server.application.install import io.ktor.server.cio.CIO import io.ktor.server.engine.ApplicationEngine import io.ktor.server.engine.embeddedServer +import io.ktor.server.plugins.origin import io.ktor.server.routing.routing -import io.ktor.server.websocket.DefaultWebSocketServerSession import io.ktor.server.websocket.WebSockets import io.ktor.server.websocket.receiveDeserialized +import io.ktor.server.websocket.sendSerialized import io.ktor.server.websocket.webSocket -import io.ktor.websocket.send import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.isActive -import kotlinx.serialization.encodeToString +import kotlinx.coroutines.launch import kotlinx.serialization.json.Json - -data class Connection( - val session: DefaultWebSocketServerSession, - val id: String = uuid4().toString(), -) +import kotlin.uuid.ExperimentalUuidApi +import kotlin.uuid.Uuid class BackInTimeWebSocketServer { private var server: ApplicationEngine? = null - val isRunning: Boolean get() = server?.application?.isActive == true - private val connections = mutableListOf() - private val mutableConnectionEstablishedFlow = MutableSharedFlow() - val connectionEstablishedFlow = mutableConnectionEstablishedFlow.asSharedFlow() + private val mutableSessionInfoList = mutableSetOf() + val sessionInfoList: List get() = mutableSessionInfoList.toList() + + private val mutableNewSessionFlow = MutableSharedFlow() + val newSessionFlow = mutableNewSessionFlow.asSharedFlow() + + private val mutableSessionClosedFlow = MutableSharedFlow() + val sessionClosedFlow = mutableSessionClosedFlow.asSharedFlow() + + private val mutableEventFromClientFlow = MutableSharedFlow() + val eventFromClientFlow = mutableEventFromClientFlow.asSharedFlow() - private val mutableReceivedEventFlow = MutableSharedFlow>() - val receivedEventFlow = mutableReceivedEventFlow.asSharedFlow() + private val sendEventFlow = MutableSharedFlow() fun start(host: String, port: Int) { server = configureServer(host, port) server?.start() } - suspend fun send(sessionId: String, event: BackInTimeDebuggerEvent) { - val session = connections.find { it.id == sessionId }?.session ?: return - session.send(Json.encodeToString(event)) - } - fun stop() { server?.stop() server = null } + suspend fun send(sessionId: String, event: BackInTimeDebuggerEvent) { + sendEventFlow.emit(EventToClient(sessionId, event)) + } + + @OptIn(ExperimentalUuidApi::class) private fun configureServer(host: String, port: Int) = embeddedServer( factory = CIO, port = port, @@ -64,14 +68,46 @@ class BackInTimeWebSocketServer { routing { webSocket("/backintime") { - val connection = Connection(this) - connections.add(connection) - mutableConnectionEstablishedFlow.emit(connection.id) + // sessionId negotiation + val requestedSessionId = receiveDeserialized().sessionId + val sessionId = requestedSessionId ?: Uuid.random().toString() + sendSerialized(BackInTimeSessionNegotiationEvent.Accept(sessionId)) + + val sendEventJob = launch { + sendEventFlow.filter { it.sessionId == sessionId }.collect { + sendSerialized(it.event) + } + } + + val receiveEventJob = launch { + while (true) { + val event = receiveDeserialized() + mutableEventFromClientFlow.emit(EventFromClient(sessionId, event)) + } + } - while (true) { - val event = receiveDeserialized() - mutableReceivedEventFlow.emit(connection.id to event) + val sessionInfo = SessionInfo( + id = sessionId, + host = this.call.request.origin.remoteHost, + port = this.call.request.origin.remotePort, + address = this.call.request.origin.remoteAddress, + ) + + closeReason.invokeOnCompletion { + mutableSessionInfoList.remove(sessionInfo) + + sendEventJob.cancel() + receiveEventJob.cancel() + + launch { + mutableSessionClosedFlow.emit(sessionInfo) + } } + + mutableSessionInfoList += sessionInfo + mutableNewSessionFlow.emit(sessionInfo) + + closeReason.await() } } } diff --git a/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/EventFromClient.kt b/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/EventFromClient.kt new file mode 100644 index 00000000..ee0a1939 --- /dev/null +++ b/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/EventFromClient.kt @@ -0,0 +1,8 @@ +package com.kitakkun.backintime.core.websocket.server + +import com.kitakkun.backintime.core.websocket.event.BackInTimeDebugServiceEvent + +data class EventFromClient( + val sessionId: String, + val event: BackInTimeDebugServiceEvent, +) diff --git a/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/EventToClient.kt b/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/EventToClient.kt new file mode 100644 index 00000000..28826ca3 --- /dev/null +++ b/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/EventToClient.kt @@ -0,0 +1,8 @@ +package com.kitakkun.backintime.core.websocket.server + +import com.kitakkun.backintime.core.websocket.event.BackInTimeDebuggerEvent + +data class EventToClient( + val sessionId: String, + val event: BackInTimeDebuggerEvent, +) diff --git a/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/SessionInfo.kt b/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/SessionInfo.kt new file mode 100644 index 00000000..3bfd3911 --- /dev/null +++ b/core/websocket/server/src/commonMain/kotlin/com/kitakkun/backintime/core/websocket/server/SessionInfo.kt @@ -0,0 +1,8 @@ +package com.kitakkun.backintime.core.websocket.server + +data class SessionInfo( + val id: String, + val host: String, + val port: Int, + val address: String, +) diff --git a/core/websocket/server/src/commonTest/kotlin/com/kitakkun/backintime/core/websocket/server/BackInTimeDebugServerTest.kt b/core/websocket/server/src/commonTest/kotlin/com/kitakkun/backintime/core/websocket/server/BackInTimeDebugServerTest.kt index 71c2eacd..f379a905 100644 --- a/core/websocket/server/src/commonTest/kotlin/com/kitakkun/backintime/core/websocket/server/BackInTimeDebugServerTest.kt +++ b/core/websocket/server/src/commonTest/kotlin/com/kitakkun/backintime/core/websocket/server/BackInTimeDebugServerTest.kt @@ -1,10 +1,8 @@ package com.kitakkun.backintime.core.websocket.server import com.kitakkun.backintime.core.websocket.client.BackInTimeWebSocketClient -import com.kitakkun.backintime.core.websocket.client.BackInTimeWebSocketClientEvent import com.kitakkun.backintime.core.websocket.event.BackInTimeDebugServiceEvent import com.kitakkun.backintime.core.websocket.event.BackInTimeDebuggerEvent -import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest @@ -39,23 +37,31 @@ class BackInTimeDebugServerTest { @Test fun `test send event from server`() = runTest { - launch { - val sessionId = server.connectionEstablishedFlow.first() - server.send(sessionId, BackInTimeDebuggerEvent.Ping) + var clientReceivedEvent: BackInTimeDebuggerEvent? = null + + val connectedThenSendEventJob = launch { + val sessionInfo = server.newSessionFlow.first() + server.send(sessionInfo.id, BackInTimeDebuggerEvent.Ping) + } + + val receiveServerEventJob = launch { + clientReceivedEvent = client.receivedDebuggerEventFlow.first() } client.openSession() + connectedThenSendEventJob.join() + receiveServerEventJob.join() assertEquals( expected = BackInTimeDebuggerEvent.Ping, - actual = client.clientEventFlow.filterIsInstance().first().debuggerEvent + actual = clientReceivedEvent, ) } @Test fun `test receive event from client`() = runTest { launch { - val (_, event) = server.receivedEventFlow.first() + val (_, event) = server.eventFromClientFlow.first() assertEquals(BackInTimeDebugServiceEvent.Ping, event) }