Skip to content

Commit

Permalink
Merge pull request #17 from denmusic1992/task/chornyy-add-websocket
Browse files Browse the repository at this point in the history
chornyy add web socket implementation
  • Loading branch information
anton6tak authored Aug 6, 2021
2 parents 90db8b5 + d88ec0a commit 9b9f23c
Show file tree
Hide file tree
Showing 17 changed files with 683 additions and 4 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ allprojects {
project build.gradle
```groovy
dependencies {
commonMainApi("dev.icerock.moko:web3:0.3.0")
commonMainApi("dev.icerock.moko:web3:0.4.0")
}
```

Expand Down
7 changes: 5 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ kbignumVersion = "2.2.0"
klockVersion = "2.2.2"
ktorClientVersion = "1.6.1"
mokoTestVersion = "0.4.0"
mokoWeb3Version = "0.3.0"
mokoWeb3Version = "0.4.0"
multidexVersion = "2.0.1"

[libraries]
appCompat = { module = "androidx.appcompat:appcompat", version.ref = "androidAppCompatVersion" }
Expand All @@ -30,9 +31,11 @@ kotlinSerialization = { module = "org.jetbrains.kotlinx:kotlinx-serialization-js
ktorClient = { module = "io.ktor:ktor-client-core", version.ref = "ktorClientVersion" }
ktorClientLogigng = { module = "io.ktor:ktor-client-logging", version.ref = "ktorClientVersion" }
ktorClientOkHttp = { module = "io.ktor:ktor-client-okhttp", version.ref = "ktorClientVersion" }
ktorWebsockets = { module = "io.ktor:ktor-client-websockets", version.ref = "ktorClientVersion" }
ktorClientIos = { module = "io.ktor:ktor-client-ios", version.ref = "ktorClientVersion" }
klock = { module = "com.soywiz.korlibs.klock:klock", version.ref = "klockVersion" }
kotlinTestCommon = { module = "org.jetbrains.kotlin:kotlin-test-common", version.ref = "kotlinVersion" }
kotlinTestAnnotations = { module = "org.jetbrains.kotlin:kotlin-test-annotations-common", version.ref = "kotlinVersion" }
kotlinTest = { module = "org.jetbrains.kotlin:kotlin-test", version.ref = "kotlinVersion" }
kotlinTestJunit = { module = "org.jetbrains.kotlin:kotlin-test-junit", version.ref = "kotlinVersion" }
kotlinTestJunit = { module = "org.jetbrains.kotlin:kotlin-test-junit", version.ref = "kotlinVersion" }
multidex = { module = "androidx.multidex:multidex", version.ref = "multidexVersion" }
2 changes: 2 additions & 0 deletions sample/android-app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ android {
defaultConfig {
applicationId = "dev.icerock.moko.samples.web3"

multiDexEnabled = true
versionCode = 1
versionName = "0.1.0"
}
Expand All @@ -18,6 +19,7 @@ android {
dependencies {
implementation(libs.appCompat)
implementation(libs.material)
implementation(libs.multidex)

implementation(projects.sample.mppLibrary)
}
11 changes: 10 additions & 1 deletion web3/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,30 @@ plugins {
id("kotlinx-serialization")
}

android {
testOptions {
unitTests.isReturnDefaultValues = true
}
}

dependencies {
commonMainImplementation(libs.coroutines)
commonMainImplementation(libs.kbignum)
commonMainImplementation(libs.kotlinSerialization)
commonMainImplementation(libs.klock)
commonMainImplementation(libs.ktorClient)
commonMainImplementation(libs.ktorClientLogigng)
commonMainImplementation(libs.ktorWebsockets)

commonTestImplementation(libs.kotlinTestCommon)
commonTestImplementation(libs.kotlinTestAnnotations)


androidMainImplementation(libs.ktorClientOkHttp)
androidTestImplementation(libs.ktorClientOkHttp)
androidTestImplementation(libs.kotlinTest)
androidTestImplementation(libs.kotlinTestJunit)

iosMainImplementation(libs.ktorClientIos)
iosTestImplementation(libs.ktorClientIos)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 IceRock MAG Inc. Use of this source code is governed by the Apache 2.0 license.
*/

package dev.icerock.moko.web3.websockets

import android.util.Log
import io.ktor.client.engine.HttpClientEngine
import io.ktor.client.engine.okhttp.OkHttp
import java.util.concurrent.TimeUnit

actual fun log(message: String) {
Log.d("KCWS", message)
}

actual fun createHttpClientEngine(): HttpClientEngine {
return OkHttp.create {
config {
retryOnConnectionFailure(true)
pingInterval(30, TimeUnit.SECONDS)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2021 IceRock MAG Inc. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("EXPERIMENTAL_API_USAGE")

package dev.icerock.moko.web3

import dev.icerock.moko.web3.websockets.SubscriptionParam
import dev.icerock.moko.web3.websockets.createHttpClientEngine
import io.ktor.client.*
import io.ktor.client.features.logging.*
import io.ktor.client.features.websocket.*
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.Json
import kotlin.test.BeforeTest
import kotlin.test.Test

class Web3SocketTest {

private lateinit var web3Socket: Web3Socket

@BeforeTest
fun `create socket`() {
val httpClient = HttpClient(createHttpClientEngine()) {
install(WebSockets) {
pingInterval = 30
}
}

web3Socket = Web3Socket(
httpClient = httpClient,
webSocketUrl = "wss://rinkeby.infura.io/ws/v3/59d7fae3226b40e09d84d713e588305b",
coroutineScope = GlobalScope
)
}

@Test
fun `test web socket flow`() {
runBlocking {
web3Socket.subscribeWebSocketWithFilter(SubscriptionParam.Logs)
.onEach(::println)
.take(2)
.launchIn(scope = this)

web3Socket.subscribeWebSocketWithFilter(SubscriptionParam.Logs)
.onEach(::println)
.take(2)
.launchIn(scope = this)

web3Socket.subscribeWebSocketWithFilter(SubscriptionParam.Logs)
.onEach(::println)
.take(2)
.launchIn(scope = this)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2021 IceRock MAG Inc. Use of this source code is governed by the Apache 2.0 license.
*/

package dev.icerock.moko.web3

import kotlinx.serialization.Serializable


@Serializable
data class LogsWeb3SocketEvent(
val removed: Boolean,
val logIndex: String,
val transactionIndex: String,
val transactionHash: String,
val blockHash: String,
val blockNumber: String,
val address: String,
val data: String,
val topics: List<String>
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2021 IceRock MAG Inc. Use of this source code is governed by the Apache 2.0 license.
*/

package dev.icerock.moko.web3

import kotlinx.serialization.Serializable

@Serializable
data class NewHeadsWeb3SocketEvent(
val difficulty: String,
val extraData: String,
val gasLimit: String,
val gasUsed: String,
val logsBloom: String,
val miner: String,
val nonce: String,
val number: String,
val parentHash: String,
val receiptsRoot: String,
val sha3Uncles: String,
val stateRoot: String,
val timestamp: String,
val transactionsRoot: String,
val hash: String
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2021 IceRock MAG Inc. Use of this source code is governed by the Apache 2.0 license.
*/

package dev.icerock.moko.web3

import kotlinx.serialization.Serializable

@Serializable
data class SyncingWeb3SocketEvent(
val syncing: Boolean,
val status: Status
) {
@Serializable
data class Status(
val startingBlock: Int,
val currentBlock: Int,
val highestBlock: Int,
val pulledStates: Int,
val knownStates: Int
)
}
146 changes: 146 additions & 0 deletions web3/src/commonMain/kotlin/dev.icerock.moko.web3/Web3Socket.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2020 IceRock MAG Inc. Use of this source code is governed by the Apache 2.0 license.
*/

package dev.icerock.moko.web3

import dev.icerock.moko.web3.entity.InfuraRequest
import dev.icerock.moko.web3.entity.Web3SocketResponse
import dev.icerock.moko.web3.websockets.SubscriptionParam
import io.ktor.client.*
import io.ktor.client.features.websocket.*
import io.ktor.http.cio.websocket.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement
import kotlinx.serialization.json.decodeFromJsonElement
import kotlinx.serialization.json.encodeToJsonElement

/**
* Class to work with webSocket connection in Etherium network
* @param httpClient client to work with WebSocket
*/
class Web3Socket(
private val httpClient: HttpClient,
private val webSocketUrl: String,
private val coroutineScope: CoroutineScope
) {
val json = Json {
isLenient = true
ignoreUnknownKeys = true
}
/**
* channel to receive data from webSocket
*/
private val responsesFlowSource: MutableSharedFlow<Web3SocketResponse<JsonElement>> =
MutableSharedFlow()

val responsesFlow: SharedFlow<Web3SocketResponse<JsonElement>> = responsesFlowSource.asSharedFlow()

/**
* incremental field to filter different incoming messages from websocket
*/
private var queueID: Int = 0

/**
* subscription filter's flow, here we emit new
*/
private val requestsChannel: Channel<InfuraRequest<JsonElement>> = Channel(capacity = 1)

init {
// launch websocket connection to work with in over web3Socket lifecycle
coroutineScope.launch {
httpClient.webSocket(webSocketUrl) {
requestsChannel
.consumeAsFlow()
.map { request ->
json.encodeToString(
serializer = InfuraRequest.serializer(JsonElement.serializer()),
value = request
)
}
.map(Frame::Text)
.onEach(outgoing::send)
.launchIn(scope = this)

incoming
.consumeAsFlow()
.mapNotNull { frame ->
val textFrame = frame as? Frame.Text
return@mapNotNull textFrame?.readText()
}
.map { text ->
json.decodeFromString(
deserializer = Web3SocketResponse.serializer(JsonElement.serializer()),
string = text
)
}
.collect {
responsesFlowSource.emit(it)
}
}
}
}

suspend inline fun <reified T> sendRpcRequest(request: InfuraRequest<JsonElement>): T? {
val response = sendRpcRequestRaw(request) ?: return null
return json.decodeFromJsonElement(response)
}
suspend fun sendRpcRequestRaw(request: InfuraRequest<JsonElement>): JsonElement? {
val id = request.id

coroutineScope.launch {
requestsChannel.send(request)
}

return responsesFlowSource.first { it.id == id }.result
}


private val queueMutex = Mutex()

/**
* Subscription function for current filter
*/
@OptIn(ExperimentalStdlibApi::class)
fun <T> subscribeWebSocketWithFilter(param: SubscriptionParam<T>): Flow<T> {
var subscriptionID: String? = null
return flow {
val id = queueMutex.withLock { queueID++ }
val request = InfuraRequest(
id = id,
method = "eth_subscribe",
params = buildList {
add(json.encodeToJsonElement(param.name))
if(param.params != null)
add(json.encodeToJsonElement(param.params))
}
)
subscriptionID = sendRpcRequest(request) ?: return@flow

val responses = responsesFlowSource
.filter {
it.params?.subscription == subscriptionID
}.mapNotNull {
json.decodeFromJsonElement (
param.serializer,
element = it.params?.result ?: return@mapNotNull null
)
}

emitAll(responses)
}.onCompletion {
val subId = subscriptionID ?: return@onCompletion
val request = InfuraRequest(
method = "eth_unsubscribe",
params = listOf(json.encodeToJsonElement(subId))
)
sendRpcRequestRaw(request)
}
}
}
Loading

0 comments on commit 9b9f23c

Please sign in to comment.