Skip to content

Commit

Permalink
Merge pull request #371 from apeun-gidaechi/feature/370-socket-error-…
Browse files Browse the repository at this point in the history
…channel-subscribe

Refactor/Socket Stability
  • Loading branch information
8954sood authored Nov 28, 2024
2 parents c86653b + bb422d5 commit 841e872
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.seugi.data.message
import com.seugi.common.model.Result
import com.seugi.data.message.model.MessageLoadModel
import com.seugi.data.message.model.MessageRoomEvent
import com.seugi.data.message.model.MessageStompErrorModel
import com.seugi.data.message.model.MessageType
import com.seugi.data.message.model.stomp.MessageStompLifecycleModel
import kotlinx.coroutines.flow.Flow
Expand All @@ -14,6 +15,10 @@ interface MessageRepository {

suspend fun subscribeRoom(chatRoomId: String, userId: Long): Flow<Result<MessageRoomEvent>>

suspend fun subscribeError(): Flow<Result<MessageStompErrorModel>>

suspend fun closeSocket(): Boolean

suspend fun reSubscribeRoom(chatRoomId: String, userId: Long): Flow<Result<MessageRoomEvent>>

suspend fun getMessage(chatRoomId: String, timestamp: LocalDateTime?, userId: Long): Flow<Result<MessageLoadModel>>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.seugi.data.message.mapper

import com.seugi.data.message.model.MessageStompErrorModel
import com.seugi.network.message.response.stomp.MessageStompErrorResponse

internal fun MessageStompErrorResponse.toModel() = MessageStompErrorModel(
status = status,
success = success,
state = state,
message = message,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.seugi.data.message.model

data class MessageStompErrorModel(
val status: Int,
val success: Boolean,
val state: String,
val message: String,
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.seugi.common.model.Result
import com.seugi.common.model.asResult
import com.seugi.common.utiles.DispatcherType
import com.seugi.common.utiles.SeugiDispatcher
import com.seugi.data.core.mapper.toModel
import com.seugi.data.core.mapper.toModels
import com.seugi.data.core.model.TimetableModel
import com.seugi.data.message.MessageRepository
Expand All @@ -16,6 +15,7 @@ import com.seugi.data.message.model.MessageBotRawKeyword
import com.seugi.data.message.model.MessageBotRawKeywordInData
import com.seugi.data.message.model.MessageLoadModel
import com.seugi.data.message.model.MessageRoomEvent
import com.seugi.data.message.model.MessageStompErrorModel
import com.seugi.data.message.model.MessageType
import com.seugi.data.message.model.stomp.MessageStompLifecycleModel
import com.seugi.local.room.dao.TokenDao
Expand Down Expand Up @@ -54,27 +54,48 @@ class MessageRepositoryImpl @Inject constructor(
override suspend fun subscribeRoom(chatRoomId: String, userId: Long): Flow<Result<MessageRoomEvent>> {
if (!datasource.getIsConnect()) {
val token = tokenDao.getToken()
datasource.connectStomp(
datasource.connectStompSocket(
token?.token ?: "",
)
}
return datasource.subscribeRoom(chatRoomId)
.flowOn(dispatcher)
.map {
/**/
it.toEventModel(userId)
}
.asResult()
}

override suspend fun subscribeError(): Flow<Result<MessageStompErrorModel>> {
if (!datasource.getIsConnect()) {
val token = tokenDao.getToken()
datasource.connectStompSocket(
token?.token ?: "",
)
}
return datasource.subscribeError()
.flowOn(dispatcher)
.map {
it.toModel()
}
.asResult()
}

override suspend fun closeSocket(): Boolean {
datasource.closeStompSocket()
return true
}

override suspend fun reSubscribeRoom(chatRoomId: String, userId: Long): Flow<Result<MessageRoomEvent>> {
val token = tokenDao.getToken()
datasource.reConnectStomp(
datasource.reConnectStompSocket(
token?.token ?: "",
token?.refreshToken ?: "",
)
delay(200)
return datasource.subscribeRoom(chatRoomId)
.flowOn(dispatcher)
// .flowOn(dispatcher)
.map {
it.toEventModel(userId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import androidx.compose.material3.Scaffold
import androidx.compose.material3.Surface
import androidx.compose.material3.Text
import androidx.compose.runtime.Composable
import androidx.compose.runtime.DisposableEffect
import androidx.compose.runtime.LaunchedEffect
import androidx.compose.runtime.SideEffect
import androidx.compose.runtime.getValue
Expand Down Expand Up @@ -254,9 +255,15 @@ internal fun ChatDetailScreen(
)
}

DisposableEffect(Unit) {
onDispose {
viewModel.socketClose()
}
}

LifecycleStartEffect(key1 = Unit) {
viewModel.collectStompLifecycle(chatRoomId, userId)
viewModel.channelReconnect(userId, chatRoomId)
viewModel.channelConnect(userId, chatRoomId)
onStopOrDispose {
viewModel.subscribeCancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class ChatDetailViewModel @Inject constructor(
content = content,
mention = mention,
)
channelReconnect(userId)
channelConnect(userId)
return@launch
}
// 메세지가 보내고 MESSAGE_TIMEOUT 시간만큼 지났는데도 안보내지면 실패처리
Expand Down Expand Up @@ -604,12 +604,12 @@ class ChatDetailViewModel @Inject constructor(
}
}

private fun channelConnect(userId: Long) {
fun channelConnect(userId: Long, roomId: String? = null) {
viewModelScope.launch {
subscribeChat?.cancel()
val job = viewModelScope.async {
messageRepository.subscribeRoom(
chatRoomId = state.value.roomInfo?.id ?: "",
chatRoomId = roomId ?: state.value.roomInfo?.id ?: "",
userId = userId,
).collect {
it.collectMessage(userId)
Expand All @@ -628,6 +628,10 @@ class ChatDetailViewModel @Inject constructor(
subscribeChat = null
}

fun socketClose() = viewModelScope.launch {
messageRepository.closeSocket()
}

fun leftRoom(chatRoomId: String) {
viewModelScope.launch {
groupChatRepository.leftRoom(chatRoomId).collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ object SeugiUrl {
const val SEND = "/pub/chat.message"
const val GET_MESSAGE = "${BASE_URL}/message/search"
const val EMOJI = "$MESSAGE/emoji"
const val ERRORS = "/user/queue/errors"
}

object PersonalChat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ import com.seugi.network.core.response.BaseResponse
import com.seugi.network.core.response.Response
import com.seugi.network.message.response.MessageRoomEventResponse
import com.seugi.network.message.response.message.MessageLoadResponse
import com.seugi.network.message.response.stomp.MessageStompErrorResponse
import com.seugi.network.message.response.stomp.MessageStompLifecycleResponse
import kotlinx.coroutines.flow.Flow
import kotlinx.datetime.LocalDateTime

interface MessageDataSource {
suspend fun subscribeRoom(chatRoomId: String): Flow<MessageRoomEventResponse>

suspend fun subscribeError(): Flow<MessageStompErrorResponse>

suspend fun sendMessage(chatRoomId: String, message: String, messageUUID: String, type: String, mention: List<Long>): Boolean

suspend fun connectStomp(accessToken: String)
suspend fun connectStompSocket(accessToken: String)

suspend fun reConnectStompSocket(accessToken: String, refreshToken: String)

suspend fun reConnectStomp(accessToken: String, refreshToken: String)
suspend fun closeStompSocket()

suspend fun getIsConnect(): Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.seugi.network.message.request.EmojiRequest
import com.seugi.network.message.request.MessageRequest
import com.seugi.network.message.response.MessageRoomEventResponse
import com.seugi.network.message.response.message.MessageLoadResponse
import com.seugi.network.message.response.stomp.MessageStompErrorResponse
import com.seugi.network.message.response.stomp.MessageStompLifecycleResponse
import com.seugi.stompclient.StompClient
import com.seugi.stompclient.dto.LifecycleEvent
Expand All @@ -27,7 +28,6 @@ import io.ktor.client.request.parameter
import io.ktor.client.request.post
import io.ktor.client.request.put
import io.ktor.client.request.setBody
import io.ktor.client.utils.EmptyContent.contentType
import io.ktor.http.ContentType
import io.ktor.http.contentType
import javax.inject.Inject
Expand All @@ -46,14 +46,18 @@ class MessageDataSourceImpl @Inject constructor(
private val httpClient: HttpClient,
) : MessageDataSource {

override suspend fun connectStomp(accessToken: String) {
override suspend fun connectStompSocket(accessToken: String) {
val header = listOf(StompHeader("Authorization", accessToken))
stompClient.connect(header)
}

override suspend fun reConnectStomp(accessToken: String, refreshToken: String): Unit = coroutineScope {
override suspend fun reConnectStompSocket(accessToken: String, refreshToken: String): Unit = coroutineScope {
stompClient.disconnectCompletable().subscribe { }
connectStompSocket(accessToken)
}

override suspend fun closeStompSocket() {
stompClient.disconnectCompletable().subscribe { }
connectStomp(accessToken)
}

override suspend fun getIsConnect(): Boolean = stompClient.isConnected
Expand Down Expand Up @@ -121,6 +125,19 @@ class MessageDataSourceImpl @Inject constructor(
}
}

override suspend fun subscribeError() = flow {
stompClient.topic(SeugiUrl.Message.ERRORS)
.asFlow()
.flowOn(dispatcher)
.catch {
it.printStackTrace()
}
.collect { message ->
val response = message.payload.toResponse(MessageStompErrorResponse::class.java)
emit(response)
}
}

override suspend fun sendMessage(chatRoomId: String, message: String, messageUUID: String, type: String, mention: List<Long>): Boolean {
// 연결이 되지 않는 경우 연결 강제성 부여
if (!stompClient.isConnected) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.seugi.network.message.response.stomp

data class MessageStompErrorResponse(
val status: Int,
val success: Boolean,
val state: String,
val message: String,
)

0 comments on commit 841e872

Please sign in to comment.