diff --git a/app/src/main/kotlin/net/primal/android/networking/primal/PrimalApiClient.kt b/app/src/main/kotlin/net/primal/android/networking/primal/PrimalApiClient.kt index d3c99305b..04063f825 100644 --- a/app/src/main/kotlin/net/primal/android/networking/primal/PrimalApiClient.kt +++ b/app/src/main/kotlin/net/primal/android/networking/primal/PrimalApiClient.kt @@ -6,9 +6,11 @@ import javax.inject.Inject import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow @@ -114,13 +116,22 @@ class PrimalApiClient @Inject constructor( suspend fun query(message: PrimalCacheFilter): PrimalQueryResult { val queryResult = runCatching { retrySendMessage(MAX_RETRIES) { + ensureSocketClientConnection() val subscriptionId = UUID.randomUUID() - val deferredQueryResult = scope.async { collectQueryResult(subscriptionId) } - sendMessageAndAwaitForResultOrThrow( - subscriptionId = subscriptionId, - data = message.toPrimalJsonObject(), - deferredQueryResult = deferredQueryResult, - ) + val deferredQueryResult = asyncQueryCollection(subscriptionId) + + try { + sendMessageOrThrow(subscriptionId = subscriptionId, data = message.toPrimalJsonObject()) + } catch (error: SocketSendMessageException) { + deferredQueryResult.cancel(CancellationException("Unable to send socket message for $subscriptionId.")) + throw error + } + + try { + deferredQueryResult.await() + } catch (error: CancellationException) { + throw error.cause ?: error + } } } val result = queryResult.getOrNull() @@ -128,24 +139,9 @@ class PrimalApiClient @Inject constructor( return result ?: throw error } - private suspend fun sendMessageAndAwaitForResultOrThrow( - subscriptionId: UUID, - data: JsonObject, - deferredQueryResult: Deferred, - ): PrimalQueryResult { - ensureSocketClientConnection() - - try { - sendMessageOrThrow(subscriptionId = subscriptionId, data = data) - } catch (error: SocketSendMessageException) { - deferredQueryResult.cancel(CancellationException("Unable to send socket message.")) - throw error - } - - return try { - deferredQueryResult.await() - } catch (error: CancellationException) { - throw error.cause ?: error + private fun asyncQueryCollection(subscriptionId: UUID): Deferred { + return scope.async(SupervisorJob() + CoroutineName("$subscriptionId queryResult collector")) { + collectQueryResult(subscriptionId) } }