From ba66c6965e0720ee96b114a755030f59add96641 Mon Sep 17 00:00:00 2001 From: Alex Sokol Date: Thu, 5 Aug 2021 13:06:49 +0300 Subject: [PATCH] #4: fixed case when multiple filter requests are lost --- .../dev.icerock.moko.web3/Web3Socket.kt | 30 ++++++++++--------- .../dev.icerock.moko.web3/Web3SocketTest.kt | 18 +++++++++-- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/web3/src/commonMain/kotlin/dev.icerock.moko.web3/Web3Socket.kt b/web3/src/commonMain/kotlin/dev.icerock.moko.web3/Web3Socket.kt index 0011e09..328024c 100644 --- a/web3/src/commonMain/kotlin/dev.icerock.moko.web3/Web3Socket.kt +++ b/web3/src/commonMain/kotlin/dev.icerock.moko.web3/Web3Socket.kt @@ -11,7 +11,7 @@ import io.ktor.client.* import io.ktor.client.features.websocket.* import io.ktor.http.cio.websocket.* import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex @@ -27,11 +27,13 @@ import kotlinx.serialization.json.encodeToJsonElement */ class Web3Socket( private val httpClient: HttpClient, - val json: Json, private val webSocketUrl: String, private val coroutineScope: CoroutineScope ) { - + val json = Json { + isLenient = true + ignoreUnknownKeys = true + } /** * channel to receive data from webSocket */ @@ -41,30 +43,30 @@ class Web3Socket( val responsesFlow: SharedFlow> = responsesFlowSource.asSharedFlow() /** - * subscription filter's flow, here we emit new + * incremental field to filter different incoming messages from websocket */ - private val requestsFlow: MutableSharedFlow> = - MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.SUSPEND) + private var queueID: Int = 0 /** - * incremental field to filter different incoming messages from websocket + * subscription filter's flow, here we emit new */ - private var queueID: Int = 0 + private val requestsChannel: Channel> = Channel(capacity = 1) init { // launch websocket connection to work with in over web3Socket lifecycle coroutineScope.launch { httpClient.webSocket(webSocketUrl) { - requestsFlow + requestsChannel + .consumeAsFlow() .map { request -> json.encodeToString( serializer = InfuraRequest.serializer(JsonElement.serializer()), value = request ) } - .map { encoded -> Frame.Text(encoded) } - .onEach { frame -> outgoing.send(frame) } - .launchIn(this) + .map(Frame::Text) + .onEach(outgoing::send) + .launchIn(scope = this) incoming .consumeAsFlow() @@ -93,7 +95,7 @@ class Web3Socket( val id = request.id coroutineScope.launch { - requestsFlow.emit(request) + requestsChannel.send(request) } return responsesFlowSource.first { it.id == id }.result @@ -138,7 +140,7 @@ class Web3Socket( method = "eth_unsubscribe", params = listOf(json.encodeToJsonElement(subId)) ) - sendRpcRequestRaw(request).also(::println) + sendRpcRequestRaw(request) } } } diff --git a/web3/src/commonTest/kotlin/dev.icerock.moko.web3/Web3SocketTest.kt b/web3/src/commonTest/kotlin/dev.icerock.moko.web3/Web3SocketTest.kt index abb6415..474f93e 100644 --- a/web3/src/commonTest/kotlin/dev.icerock.moko.web3/Web3SocketTest.kt +++ b/web3/src/commonTest/kotlin/dev.icerock.moko.web3/Web3SocketTest.kt @@ -33,7 +33,6 @@ class Web3SocketTest { web3Socket = Web3Socket( httpClient = httpClient, - json = json, webSocketUrl = "wss://rinkeby.infura.io/ws/v3/59d7fae3226b40e09d84d713e588305b", coroutineScope = GlobalScope ) @@ -48,10 +47,23 @@ class Web3SocketTest { println("flow is empty") } .take(2) - .collect { - println("received on inherited channel: $it") + .launchIn(scope = this) + + web3Socket.subscribeWebSocketWithFilter(SubscriptionParam.Logs) + .onEach(::println) + .onEmpty { + println("flow is empty") } + .take(2) + .launchIn(scope = this) + web3Socket.subscribeWebSocketWithFilter(SubscriptionParam.Logs) + .onEach(::println) + .onEmpty { + println("flow is empty") + } + .take(2) + .launchIn(scope = this) } } } \ No newline at end of file