Skip to content

Commit

Permalink
Fix async query collection in PrimalApiClient
Browse files Browse the repository at this point in the history
  • Loading branch information
AleksandarIlic committed Feb 22, 2024
1 parent 8b351be commit 3a459ea
Showing 1 changed file with 20 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import javax.inject.Inject
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
Expand Down Expand Up @@ -114,38 +116,32 @@ class PrimalApiClient @Inject constructor(
suspend fun query(message: PrimalCacheFilter): PrimalQueryResult {
val queryResult = runCatching {
retrySendMessage(MAX_RETRIES) {
ensureSocketClientConnection()
val subscriptionId = UUID.randomUUID()
val deferredQueryResult = scope.async { collectQueryResult(subscriptionId) }
sendMessageAndAwaitForResultOrThrow(
subscriptionId = subscriptionId,
data = message.toPrimalJsonObject(),
deferredQueryResult = deferredQueryResult,
)
val deferredQueryResult = asyncQueryCollection(subscriptionId)

try {
sendMessageOrThrow(subscriptionId = subscriptionId, data = message.toPrimalJsonObject())
} catch (error: SocketSendMessageException) {
deferredQueryResult.cancel(CancellationException("Unable to send socket message for $subscriptionId."))
throw error
}

try {
deferredQueryResult.await()
} catch (error: CancellationException) {
throw error.cause ?: error
}
}
}
val result = queryResult.getOrNull()
val error = queryResult.exceptionOrNull().takeAsWssException()
return result ?: throw error
}

private suspend fun sendMessageAndAwaitForResultOrThrow(
subscriptionId: UUID,
data: JsonObject,
deferredQueryResult: Deferred<PrimalQueryResult>,
): PrimalQueryResult {
ensureSocketClientConnection()

try {
sendMessageOrThrow(subscriptionId = subscriptionId, data = data)
} catch (error: SocketSendMessageException) {
deferredQueryResult.cancel(CancellationException("Unable to send socket message."))
throw error
}

return try {
deferredQueryResult.await()
} catch (error: CancellationException) {
throw error.cause ?: error
private fun asyncQueryCollection(subscriptionId: UUID): Deferred<PrimalQueryResult> {
return scope.async(SupervisorJob() + CoroutineName("$subscriptionId queryResult collector")) {
collectQueryResult(subscriptionId)
}
}

Expand Down

0 comments on commit 3a459ea

Please sign in to comment.