Skip to content
This repository has been archived by the owner on Jul 6, 2024. It is now read-only.

Commit

Permalink
Add client reconnect mechanism when error occur during connect or obs…
Browse files Browse the repository at this point in the history
…erving incoming messages
  • Loading branch information
mklkj committed Mar 10, 2024
1 parent 8285d28 commit 9871720
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ sealed interface ConnectionStatus {

data object Connected : ConnectionStatus

data class Error(val error: Throwable) : ConnectionStatus {
override fun toString(): String = "Connection error"
data class Error(
val error: Throwable?,
val retryIn: Int = RECONNECTION_WAIT_SECONDS,
) : ConnectionStatus {
override fun toString(): String = "Connection error. Retry in $retryIn seconds"
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package io.github.mklkj.kommunicator.data.ws

import co.touchlab.kermit.Logger
import io.github.mklkj.kommunicator.data.models.ChatReadPush
import io.github.mklkj.kommunicator.data.models.MessageBroadcast
import io.github.mklkj.kommunicator.data.models.MessageEvent
import io.github.mklkj.kommunicator.data.models.MessagePush
import io.github.mklkj.kommunicator.data.models.MessageRequest
import io.github.mklkj.kommunicator.data.models.ParticipantReadBroadcast
import io.github.mklkj.kommunicator.data.models.ChatReadPush
import io.github.mklkj.kommunicator.data.models.TypingBroadcast
import io.github.mklkj.kommunicator.data.models.TypingPush
import io.github.mklkj.kommunicator.data.repository.MessagesRepository
Expand All @@ -20,6 +20,7 @@ import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
Expand All @@ -39,6 +40,7 @@ import kotlin.time.Duration.Companion.seconds

private const val TAG = "ConversationClient"
private val TYPING_SAMPLE_DURATION = 3.seconds
const val RECONNECTION_WAIT_SECONDS = 3

@Factory
class ConversationClient(
Expand All @@ -57,32 +59,72 @@ class ConversationClient(
val typingParticipants = MutableStateFlow<Map<UUID, Instant>>(emptyMap())
val connectionStatus = MutableStateFlow<ConnectionStatus>(ConnectionStatus.NotConnected)

private val isConnectionShouldRetry = Channel<Boolean>()
private var connectJob: Job? = null

init {
initializeTypingObserver()
initializeTypingStaleTimer()
}

fun connect(chatId: UUID) {
Logger.withTag(TAG).i("Connecting to websocket on $chatId chat")
this.chatId = chatId
scope.launch(errorExceptionHandler) {
connectionStatus.update { ConnectionStatus.Connecting }
chatSession = runCatching { messagesRepository.getChatSession(chatId) }
.onFailure { error -> connectionStatus.update { ConnectionStatus.Error(error) } }
.getOrThrow()
initializeConnectionRetryingObserver(chatId)
connectAndObserveIncomingMessages(chatId)
}

initializeTypingObserver()
initializeTypingStaleTimer()
observeIncomingMessages()
private fun initializeConnectionRetryingObserver(chatId: UUID) {
scope.launch(errorExceptionHandler) {
isConnectionShouldRetry.consumeAsFlow()
.throttleFirst(1.seconds)
.onEach { shouldReconnect ->
if (shouldReconnect) {
val previousError = connectionStatus.value as? ConnectionStatus.Error
var retryIn = previousError?.retryIn ?: RECONNECTION_WAIT_SECONDS
while (retryIn > 0) {
val updatedState = ConnectionStatus.Error(
error = previousError?.error,
retryIn = retryIn,
)
connectionStatus.update { updatedState }
retryIn--
delay(1.seconds)
}
connectAndObserveIncomingMessages(chatId)
}
}
.collect()
}
}

private suspend fun observeIncomingMessages() {
try {
connectionStatus.update { ConnectionStatus.Connected }
for (frame in chatSession?.incoming ?: return) {
if (frame !is Frame.Text) continue
private fun connectAndObserveIncomingMessages(chatId: UUID) {
connectJob?.cancel()
connectJob = scope.launch(errorExceptionHandler) {
connectionStatus.update { ConnectionStatus.Connecting }
chatSession = runCatching { messagesRepository.getChatSession(chatId) }
.onFailure { error ->
connectionStatus.update { ConnectionStatus.Error(error) }
isConnectionShouldRetry.send(true)
}
.onSuccess {
connectionStatus.update { ConnectionStatus.Connected }
isConnectionShouldRetry.send(false)
}
.getOrThrow()

handleIncomingFrame(frame)
}
} catch (e: Throwable) {
connectionStatus.update { ConnectionStatus.Error(e) }
chatSession = null
chatSession?.incoming?.consumeAsFlow()
?.onEach { frame ->
if (frame is Frame.Text) {
handleIncomingFrame(frame)
}
}
?.catch { error ->
chatSession = null
isConnectionShouldRetry.send(true)
connectionStatus.update { ConnectionStatus.Error(error) }
}
?.collect()
}
}

Expand Down Expand Up @@ -147,7 +189,7 @@ class ConversationClient(
private fun initializeTypingObserver() {
scope.launch(errorExceptionHandler) {
typingChannel.consumeAsFlow()
.throttleFirst(TYPING_SAMPLE_DURATION.inWholeMilliseconds)
.throttleFirst(TYPING_SAMPLE_DURATION)
.onEach {
chatSession?.sendSerialized<MessageEvent>(TypingPush(it))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package io.github.mklkj.kommunicator.utils
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.datetime.Clock
import kotlin.time.Duration

/**
* @see [https://proandroiddev.com/from-rxjava-to-kotlin-flow-throttling-ed1778847619]
*/
fun <T> Flow<T>.throttleFirst(periodMillis: Long): Flow<T> {
fun <T> Flow<T>.throttleFirst(periodDuration: Duration): Flow<T> {
val periodMillis = periodDuration.inWholeMilliseconds
return flow {
var lastTime = 0L
var lastvalue: T? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ fun Route.chatRoutes() {
notificationService.notifyParticipants(
chatId = chatId,
messageId = message.id,
alreadyNotifiedUsers = connections.map { it.userId },
alreadyNotifiedUsers = connections.map { it.userId } + userId,
)

call.respond(message = HttpStatusCode.Created)
Expand Down

0 comments on commit 9871720

Please sign in to comment.