From ba146b2482b652f7ed8a9342928c7b477cfa90e5 Mon Sep 17 00:00:00 2001 From: valentunn <70131744+valentunn@users.noreply.github.com> Date: Thu, 16 Mar 2023 14:00:33 +0700 Subject: [PATCH] Fix/infura batches (#51) * Suppport batch responses arriving in separate requests/differrent batches * Code style * Accumulating batch requests * Accumulating batch requests * Bump versions * Fixes * Code style * Code style * Fix tests --- build.gradle | 2 +- .../fearless_utils/wsrpc/SocketService.kt | 71 ++++++----- .../wsrpc/request/SingleSendable.kt | 26 +++- .../wsrpc/state/SocketStateMachine.kt | 114 +++++++++++++----- .../wsrpc/state/SocketStateMachineTest.kt | 34 ++++-- 5 files changed, 173 insertions(+), 74 deletions(-) diff --git a/build.gradle b/build.gradle index e95ed6be..e2cf50a9 100644 --- a/build.gradle +++ b/build.gradle @@ -1,7 +1,7 @@ buildscript { ext { // App version - versionName = '1.7.2' + versionName = '1.7.3' versionCode = 1 // SDK and tools diff --git a/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/SocketService.kt b/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/SocketService.kt index 617f026d..4e8a4a9b 100644 --- a/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/SocketService.kt +++ b/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/SocketService.kt @@ -138,8 +138,22 @@ class SocketService( fun executeBatchRequest( rpcRequests: List, deliveryType: DeliveryType = DeliveryType.AT_LEAST_ONCE, - callback: ResponseListener> + callback: ResponseListener + ): Cancellable { + val sendable = BatchSendable(rpcRequests, deliveryType, callback) + + updateState(Event.Send(sendable)) + + return RequestCancellable(sendable) + } + + @Synchronized + fun executeAccumulatingBatchRequest( + rpcRequests: List, + deliveryType: DeliveryType = DeliveryType.AT_LEAST_ONCE, + allBatchDoneCallback: ResponseListener> ): Cancellable { + val callback = AccumulatingBatchRequest(allBatchDoneCallback, rpcRequests.size) val sendable = BatchSendable(rpcRequests, deliveryType, callback) updateState(Event.Send(sendable)) @@ -200,14 +214,10 @@ class SocketService( logger.log("[STATE MACHINE][SIDE EFFECT] $sideEffect") when (sideEffect) { - is SideEffect.ResponseToSendable -> respondToSingleRequest( + is SideEffect.ResponseToSendable -> respondToRequest( sideEffect.sendable, sideEffect.response ) - is SideEffect.ResponseToBatchSendable -> respondToBatchRequest( - sideEffect.sendable, - sideEffect.responses - ) is SideEffect.RespondSendablesError -> respondError( sideEffect.sendables, sideEffect.error @@ -224,31 +234,15 @@ class SocketService( } } - private fun respondToSingleRequest( + private fun respondToRequest( sendable: SocketStateMachine.Sendable, response: RpcResponse ) { - require(sendable is SingleSendable) - sendable.callback.onNext(response) } - private fun respondToBatchRequest( - sendable: SocketStateMachine.Sendable, - responses: List - ) { - require(sendable is BatchSendable) - - sendable.callback.onNext(responses) - } - private fun respondError(sendables: Set, throwable: Throwable) { - sendables.forEach { sendable -> - when (sendable) { - is SingleSendable -> sendable.callback.onError(throwable) - is BatchSendable -> sendable.callback.onError(throwable) - } - } + sendables.forEach { sendable -> sendable.callback.onError(throwable) } } private fun respondToSubscription( @@ -260,14 +254,9 @@ class SocketService( subscription.callback.onNext(response) } - private fun sendToSocket(sendables: Set) { + private fun sendToSocket(sendables: Collection) { requestExecutor.execute { - sendables.forEach { sendable -> - when (sendable) { - is SingleSendable -> socket!!.sendRpcRequest(sendable.request) - is BatchSendable -> socket!!.sendBatchRpcRequests(sendable.requests) - } - } + sendables.forEach { sendable -> sendable.sendTo(socket!!) } } } @@ -366,4 +355,24 @@ class SocketService( // do nothing } } + + private class AccumulatingBatchRequest( + private val allDoneCallBack: ResponseListener>, + private val batchSize: Int + ) : ResponseListener { + + private var arrivedResponses: MutableList = ArrayList(batchSize) + + override fun onNext(response: RpcResponse) { + arrivedResponses.add(response) + + if (arrivedResponses.size == batchSize) { + allDoneCallBack.onNext(arrivedResponses) + } + } + + override fun onError(throwable: Throwable) { + allDoneCallBack.onError(throwable) + } + } } diff --git a/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/request/SingleSendable.kt b/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/request/SingleSendable.kt index 065c2b71..b9ed3f39 100644 --- a/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/request/SingleSendable.kt +++ b/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/request/SingleSendable.kt @@ -3,27 +3,43 @@ package jp.co.soramitsu.fearless_utils.wsrpc.request import jp.co.soramitsu.fearless_utils.wsrpc.SocketService import jp.co.soramitsu.fearless_utils.wsrpc.request.base.RpcRequest import jp.co.soramitsu.fearless_utils.wsrpc.response.RpcResponse +import jp.co.soramitsu.fearless_utils.wsrpc.socket.RpcSocket import jp.co.soramitsu.fearless_utils.wsrpc.state.SocketStateMachine internal class SingleSendable( val request: RpcRequest, override val deliveryType: DeliveryType, - val callback: SocketService.ResponseListener + override val callback: SocketService.ResponseListener ) : SocketStateMachine.Sendable { - override val id: Int = request.id + + override val numberOfNeededResponses: Int = 1 + + override fun relatesTo(id: Int): Boolean = request.id == id + + override fun sendTo(rpcSocket: RpcSocket) { + rpcSocket.sendRpcRequest(request) + } override fun toString(): String { - return "Sendable($id)" + return "Sendable(${request.id})" } } internal class BatchSendable( val requests: List, override val deliveryType: DeliveryType, - val callback: SocketService.ResponseListener> + override val callback: SocketService.ResponseListener ) : SocketStateMachine.Sendable { - override val id: Int = requests.first().id + override val numberOfNeededResponses: Int = requests.size + + private val ids = requests.mapTo(mutableSetOf(), RpcRequest::id) + + override fun relatesTo(id: Int): Boolean = id in ids + + override fun sendTo(rpcSocket: RpcSocket) { + rpcSocket.sendBatchRpcRequests(requests) + } override fun toString(): String { val jointIds = requests.joinToString { it.id.toString() } diff --git a/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/state/SocketStateMachine.kt b/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/state/SocketStateMachine.kt index a48daa5e..02cc6914 100644 --- a/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/state/SocketStateMachine.kt +++ b/fearless-utils/src/main/java/jp/co/soramitsu/fearless_utils/wsrpc/state/SocketStateMachine.kt @@ -1,20 +1,32 @@ package jp.co.soramitsu.fearless_utils.wsrpc.state +import jp.co.soramitsu.fearless_utils.wsrpc.SocketService import jp.co.soramitsu.fearless_utils.wsrpc.request.DeliveryType import jp.co.soramitsu.fearless_utils.wsrpc.response.RpcResponse +import jp.co.soramitsu.fearless_utils.wsrpc.socket.RpcSocket import jp.co.soramitsu.fearless_utils.wsrpc.subscription.response.SubscriptionChange typealias Transition = Pair> +private typealias ResponseCounter = Int + object SocketStateMachine { interface Sendable { - val id: Int + + val numberOfNeededResponses: Int + + fun relatesTo(id: Int): Boolean val deliveryType: DeliveryType + + val callback: SocketService.ResponseListener + + fun sendTo(rpcSocket: RpcSocket) } interface Subscription { + val id: String val initiatorId: Int @@ -40,7 +52,7 @@ object SocketStateMachine { val url: String, internal val toResendOnReconnect: Set, internal val unknownSubscriptionResponses: Map, - internal val waitingForResponse: Set, + internal val waitingForResponse: Map, internal val subscriptions: Set ) : State() @@ -61,10 +73,7 @@ object SocketStateMachine { data class SendableResponse(val response: RpcResponse) : Event() - data class SendableBatchResponse(val responses: List) : Event() { - - val responseId = responses.first().id - } + data class SendableBatchResponse(val responses: List) : Event() data class Subscribed(val subscription: Subscription) : Event() @@ -94,11 +103,6 @@ object SocketStateMachine { data class ResponseToSendable(val sendable: Sendable, val response: RpcResponse) : SideEffect() - data class ResponseToBatchSendable( - val sendable: Sendable, - val responses: List - ) : SideEffect() - /** * For [DeliveryType.AT_MOST_ONCE] errors */ @@ -190,7 +194,7 @@ object SocketStateMachine { toResendOnReconnect = state.pendingSendables.filterByDeliveryType( DeliveryType.ON_RECONNECT ), - waitingForResponse = state.pendingSendables, + waitingForResponse = state.pendingSendables.withCounter(), subscriptions = emptySet(), unknownSubscriptionResponses = emptyMap() ) @@ -226,7 +230,7 @@ object SocketStateMachine { state.copy( toResendOnReconnect = toResendOnReconnect, - waitingForResponse = state.waitingForResponse + sendable + waitingForResponse = state.waitingForResponse.add(sendable) ) } @@ -250,30 +254,50 @@ object SocketStateMachine { } is Event.SendableResponse -> { - val sendable = findSendableById(state.waitingForResponse, event.response.id) + val entry = findSendableById(state.waitingForResponse, event.response.id) - if (sendable != null) { + if (entry != null) { + val (sendable, counter) = entry sideEffects += SideEffect.ResponseToSendable(sendable, event.response) - state.copy(waitingForResponse = state.waitingForResponse - sendable) + val newCounter = counter + 1 + + state.copy( + waitingForResponse = state.waitingForResponse.keepUntilThreshold( + key = sendable, + newValue = newCounter, + threshold = sendable.numberOfNeededResponses + ) + ) } else { state } } is Event.SendableBatchResponse -> { - val sendable = findSendableById(state.waitingForResponse, event.responseId) + val newWaitingForResponse = state.waitingForResponse.toMutableMap() - if (sendable != null) { - sideEffects += SideEffect.ResponseToBatchSendable( - sendable = sendable, - responses = event.responses - ) + event.responses.forEach { response -> + val entry = findSendableById(state.waitingForResponse, response.id) - state.copy(waitingForResponse = state.waitingForResponse - sendable) - } else { - state + if (entry != null) { + val (sendable, counter) = entry + val newCounter = counter + 1 + + sideEffects += SideEffect.ResponseToSendable( + sendable = sendable, + response = response + ) + + newWaitingForResponse.keepUntilThreshold( + key = sendable, + newValue = newCounter, + threshold = sendable.numberOfNeededResponses + ) + } } + + state.copy(waitingForResponse = newWaitingForResponse) } is Event.Subscribed -> { @@ -392,12 +416,41 @@ object SocketStateMachine { return Transition(newState, sideEffects) } + private fun MutableMap.keepUntilThreshold( + key: Sendable, + newValue: ResponseCounter, + threshold: ResponseCounter + ) { + if (newValue >= threshold) { + minusAssign(key) + } else { + plusAssign(key to newValue) + } + } + + // we keep separate method for read-only `minus` since it optimizes result internally + private fun Map.keepUntilThreshold( + key: Sendable, + newValue: ResponseCounter, + threshold: ResponseCounter + ): Map { + return if (newValue >= threshold) { + minus(key) + } else { + plus(key to newValue) + } + } + + private fun Map.add(sendable: Sendable) = plus(sendable to 0) + private fun getRequestsToResendAndReportErrorToOthers( state: State.Connected, mutableSideEffects: MutableList, error: Throwable ): Set { - val toReportError = state.waitingForResponse.filterByDeliveryType(DeliveryType.AT_MOST_ONCE) + val waitingSendables = state.waitingForResponse.keys + + val toReportError = waitingSendables.filterByDeliveryType(DeliveryType.AT_MOST_ONCE) if (toReportError.isNotEmpty()) { mutableSideEffects += SideEffect.RespondSendablesError( @@ -406,16 +459,17 @@ object SocketStateMachine { ) } - return state.waitingForResponse - toReportError + state.toResendOnReconnect + return waitingSendables - toReportError + state.toResendOnReconnect } private fun findSubscriptionById(subscriptions: Set, id: String) = subscriptions.find { it.id == id } - private fun findSendableById(sendables: Set, id: Int) = sendables.find { it.id == id } + private fun findSendableById(sendables: Map, id: Int) = + sendables.entries.find { (sendable, _) -> sendable.relatesTo(id) } private fun findSubscriptionByInitiator(subscriptions: Set, initiator: Sendable) = - subscriptions.find { it.initiatorId == initiator.id } + subscriptions.find { initiator.relatesTo(it.initiatorId) } private fun handleStop(sideEffects: MutableList): State { sideEffects += SideEffect.Disconnect @@ -436,3 +490,5 @@ object SocketStateMachine { it.deliveryType == deliveryType } } + +internal fun Set.withCounter() = associateWith { 0 } diff --git a/fearless-utils/src/test/java/jp/co/soramitsu/fearless_utils/wsrpc/state/SocketStateMachineTest.kt b/fearless-utils/src/test/java/jp/co/soramitsu/fearless_utils/wsrpc/state/SocketStateMachineTest.kt index 91ec83f8..03b6741c 100644 --- a/fearless-utils/src/test/java/jp/co/soramitsu/fearless_utils/wsrpc/state/SocketStateMachineTest.kt +++ b/fearless-utils/src/test/java/jp/co/soramitsu/fearless_utils/wsrpc/state/SocketStateMachineTest.kt @@ -1,10 +1,14 @@ package jp.co.soramitsu.fearless_utils.wsrpc.state import jp.co.soramitsu.fearless_utils.common.assertInstance +import jp.co.soramitsu.fearless_utils.wsrpc.SocketService +import jp.co.soramitsu.fearless_utils.wsrpc.SocketService.* import jp.co.soramitsu.fearless_utils.wsrpc.request.DeliveryType import jp.co.soramitsu.fearless_utils.wsrpc.request.runtime.createFakeChange import jp.co.soramitsu.fearless_utils.wsrpc.response.RpcResponse +import jp.co.soramitsu.fearless_utils.wsrpc.socket.RpcSocket import jp.co.soramitsu.fearless_utils.wsrpc.state.SocketStateMachine.Event +import jp.co.soramitsu.fearless_utils.wsrpc.state.SocketStateMachine.Sendable import jp.co.soramitsu.fearless_utils.wsrpc.state.SocketStateMachine.SideEffect import jp.co.soramitsu.fearless_utils.wsrpc.state.SocketStateMachine.SideEffect.Connect import jp.co.soramitsu.fearless_utils.wsrpc.state.SocketStateMachine.SideEffect.Disconnect @@ -30,13 +34,27 @@ private const val SWITCH_URL = "SWITCHED" val emptyConnectedState = State.Connected( url = URL, toResendOnReconnect = emptySet(), - waitingForResponse = emptySet(), + waitingForResponse = emptyMap(), subscriptions = emptySet(), unknownSubscriptionResponses = emptyMap() ) -class TestSendable(override val id: Int, override val deliveryType: DeliveryType) : - SocketStateMachine.Sendable +class TestSendable(val id: Int, override val deliveryType: DeliveryType) : Sendable { + + override val numberOfNeededResponses: Int = 1 + + override fun relatesTo(id: Int): Boolean { + return this.id == id + } + + override val callback = object : ResponseListener { + override fun onNext(response: RpcResponse) {} + + override fun onError(throwable: Throwable) {} + } + + override fun sendTo(rpcSocket: RpcSocket) {} +} class TestSubscription(override val id: String, override val initiatorId: Int) : SocketStateMachine.Subscription @@ -107,7 +125,7 @@ open class SocketStateMachineTest { state = transition(state, Event.Send(sendable)) - assertEquals(state, emptyConnectedState.copy(waitingForResponse = sendables)) + assertEquals(state, emptyConnectedState.copy(waitingForResponse = sendables.withCounter())) assertEquals(listOf(CONNECT_SIDE_EFFECT, SendSendables(sendables)), sideEffectLog) } @@ -125,7 +143,7 @@ open class SocketStateMachineTest { val expectedConnectedState = emptyConnectedState.copy( toResendOnReconnect = sendables, - waitingForResponse = sendables + waitingForResponse = sendables.withCounter() ) assertEquals(expectedConnectedState, state) @@ -169,7 +187,7 @@ open class SocketStateMachineTest { state = transition(state, Event.Connected) - assertEquals(emptyConnectedState.copy(waitingForResponse = sendables), state) + assertEquals(emptyConnectedState.copy(waitingForResponse = sendables.withCounter()), state) assertEquals(listOf(CONNECT_SIDE_EFFECT, SendSendables(sendables)), sideEffectLog) } @@ -569,7 +587,7 @@ open class SocketStateMachineTest { var state: State = State.Connected( URL, toResendOnReconnect = emptySet(), - waitingForResponse = sendables, + waitingForResponse = sendables.withCounter(), subscriptions = emptySet(), unknownSubscriptionResponses = emptyMap() ) @@ -766,7 +784,7 @@ open class SocketStateMachineTest { url = "test", toResendOnReconnect = emptySet(), unknownSubscriptionResponses = emptyMap(), - waitingForResponse = emptySet(), + waitingForResponse = emptyMap(), subscriptions = emptySet() )