Skip to content

Commit

Permalink
icerockdev#4: fixed case when multiple filter requests are lost
Browse files Browse the repository at this point in the history
  • Loading branch information
y9san9 committed Aug 5, 2021
1 parent c42168b commit ba66c69
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
30 changes: 16 additions & 14 deletions web3/src/commonMain/kotlin/dev.icerock.moko.web3/Web3Socket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.ktor.client.*
import io.ktor.client.features.websocket.*
import io.ktor.http.cio.websocket.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
Expand All @@ -27,11 +27,13 @@ import kotlinx.serialization.json.encodeToJsonElement
*/
class Web3Socket(
private val httpClient: HttpClient,
val json: Json,
private val webSocketUrl: String,
private val coroutineScope: CoroutineScope
) {

val json = Json {
isLenient = true
ignoreUnknownKeys = true
}
/**
* channel to receive data from webSocket
*/
Expand All @@ -41,30 +43,30 @@ class Web3Socket(
val responsesFlow: SharedFlow<Web3SocketResponse<JsonElement>> = responsesFlowSource.asSharedFlow()

/**
* subscription filter's flow, here we emit new
* incremental field to filter different incoming messages from websocket
*/
private val requestsFlow: MutableSharedFlow<InfuraRequest<JsonElement>> =
MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.SUSPEND)
private var queueID: Int = 0

/**
* incremental field to filter different incoming messages from websocket
* subscription filter's flow, here we emit new
*/
private var queueID: Int = 0
private val requestsChannel: Channel<InfuraRequest<JsonElement>> = Channel(capacity = 1)

init {
// launch websocket connection to work with in over web3Socket lifecycle
coroutineScope.launch {
httpClient.webSocket(webSocketUrl) {
requestsFlow
requestsChannel
.consumeAsFlow()
.map { request ->
json.encodeToString(
serializer = InfuraRequest.serializer(JsonElement.serializer()),
value = request
)
}
.map { encoded -> Frame.Text(encoded) }
.onEach { frame -> outgoing.send(frame) }
.launchIn(this)
.map(Frame::Text)
.onEach(outgoing::send)
.launchIn(scope = this)

incoming
.consumeAsFlow()
Expand Down Expand Up @@ -93,7 +95,7 @@ class Web3Socket(
val id = request.id

coroutineScope.launch {
requestsFlow.emit(request)
requestsChannel.send(request)
}

return responsesFlowSource.first { it.id == id }.result
Expand Down Expand Up @@ -138,7 +140,7 @@ class Web3Socket(
method = "eth_unsubscribe",
params = listOf(json.encodeToJsonElement(subId))
)
sendRpcRequestRaw(request).also(::println)
sendRpcRequestRaw(request)
}
}
}
18 changes: 15 additions & 3 deletions web3/src/commonTest/kotlin/dev.icerock.moko.web3/Web3SocketTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class Web3SocketTest {

web3Socket = Web3Socket(
httpClient = httpClient,
json = json,
webSocketUrl = "wss://rinkeby.infura.io/ws/v3/59d7fae3226b40e09d84d713e588305b",
coroutineScope = GlobalScope
)
Expand All @@ -48,10 +47,23 @@ class Web3SocketTest {
println("flow is empty")
}
.take(2)
.collect {
println("received on inherited channel: $it")
.launchIn(scope = this)

web3Socket.subscribeWebSocketWithFilter(SubscriptionParam.Logs)
.onEach(::println)
.onEmpty {
println("flow is empty")
}
.take(2)
.launchIn(scope = this)

web3Socket.subscribeWebSocketWithFilter(SubscriptionParam.Logs)
.onEach(::println)
.onEmpty {
println("flow is empty")
}
.take(2)
.launchIn(scope = this)
}
}
}

0 comments on commit ba66c69

Please sign in to comment.