From 1a4afa332ad2144a543c835a2412f2b654bfc415 Mon Sep 17 00:00:00 2001 From: Sheikah45 <66929319+Sheikah45@users.noreply.github.com> Date: Wed, 17 May 2023 18:36:25 -0400 Subject: [PATCH] Fix reconnect logic (#101) --- faf-commons-lobby/build.gradle.kts | 6 +- .../faforever/commons/lobby/ConnectionApi.kt | 20 +-- .../faforever/commons/lobby/FafLobbyClient.kt | 134 ++++++++---------- .../commons/lobby/LobbyClientTest.kt | 39 ++++- 4 files changed, 108 insertions(+), 91 deletions(-) diff --git a/faf-commons-lobby/build.gradle.kts b/faf-commons-lobby/build.gradle.kts index 91525311..62ca19d7 100644 --- a/faf-commons-lobby/build.gradle.kts +++ b/faf-commons-lobby/build.gradle.kts @@ -13,10 +13,10 @@ version = "1.0.0-SNAPSHOT" java.sourceCompatibility = JavaVersion.VERSION_11 dependencies { - implementation("io.projectreactor.netty:reactor-netty:1.0.4") - api("io.projectreactor:reactor-core:3.4.3") + implementation("io.projectreactor.netty:reactor-netty:1.1.6") + api("io.projectreactor:reactor-core:3.5.5") testImplementation("io.projectreactor:reactor-test:3.4.3") - testImplementation("ch.qos.logback:logback-classic:1.2.3") + testImplementation("ch.qos.logback:logback-classic:1.2.9") testImplementation("org.slf4j:slf4j-api:1.7.25") val jacksonVersion = "2.12.2" testImplementation("com.fasterxml.jackson.core:jackson-core:${jacksonVersion}") diff --git a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/ConnectionApi.kt b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/ConnectionApi.kt index fad9385d..4a8fe1b1 100644 --- a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/ConnectionApi.kt +++ b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/ConnectionApi.kt @@ -11,7 +11,7 @@ import reactor.core.publisher.Mono */ interface ConnectionApi { - fun connectAndLogin(config: FafLobbyClient.Config): Mono + fun connectAndLogin(config: FafLobbyClient.Config): Mono fun disconnect() @@ -62,13 +62,6 @@ data class SessionResponse( val session: Long, ) : ServerMessage -/** - * The server confirms a successful login and sends us our player info - */ -data class LoginSuccessResponse( - val me: Player, -) : ServerMessage - /** * Randomly assigned password to login into the irc */ @@ -112,13 +105,22 @@ data class Player( } } +sealed interface LoginResponse : ServerMessage + +/** + * The server confirms a successful login and sends us our player info + */ +data class LoginSuccessResponse( + val me: Player, +) : LoginResponse + /** * Response if the login failed with an all english response on what failed. * FIXME: This should send error codes instead of translated text. */ data class LoginFailedResponse( val text: String?, -) : ServerMessage +) : LoginResponse @Deprecated("Ice Servers should be queried from the FAF API") data class IceServer( diff --git a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt index 8b49c280..1b76b69d 100644 --- a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt +++ b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt @@ -45,17 +45,17 @@ class FafLobbyClient( ) private lateinit var config: Config - private lateinit var outboundSink: Sinks.Many - private lateinit var outboundMessages: Flux private var connection: Connection? = null private var pingDisposable: Disposable? = null - private var loginMono: Mono? = null private var connecting: Boolean = false var minPingIntervalSeconds: Long = 60 - var autoReconnect: Boolean = true + private var autoReconnect: Boolean = false + private var kicked: Boolean = false + private val outboundSink: Sinks.Many = Sinks.many().unicast().onBackpressureBuffer() + private val outboundMessages: Flux = outboundSink.asFlux().publish().autoConnect() private val eventSink: Sinks.Many = Sinks.many().unicast().onBackpressureBuffer() private val rawEvents = eventSink.asFlux().publish().autoConnect() private val connectionStatusSink: Sinks.Many = Sinks.many().unicast().onBackpressureBuffer() @@ -69,6 +69,33 @@ class FafLobbyClient( it !is LoginFailedResponse } + private val loginResponseMono = rawEvents.ofType(LoginResponse::class.java).next().flatMap { + when (it) { + is LoginSuccessResponse -> Mono.just(it.me) + is LoginFailedResponse -> Mono.error(LoginException(it.text)) + } + }.timeout(Duration.ofMinutes(1)) + .doOnError(LoginException::class.java) { kicked = true } + .doOnSubscribe { + prepareAuthenticateOnNextSession() + send(SessionRequest(config.version, config.userAgent)) + } + + private val loginMono = Mono.defer { + openConnection() + .then(loginResponseMono) + .retryWhen(createRetrySpec(config)) + } + .doOnError { LOG.error("Error during connection", it); connection?.dispose() } + .doOnCancel { LOG.debug("Login cancelled"); disconnect() } + .doOnSuccess { + connectionStatusSink.emitNext(ConnectionStatus.CONNECTED, retrySerialFailure) + } + .materialize() + .cacheInvalidateIf { it.isOnError || (!connecting && (connection == null || connection?.isDisposed == true)) } + .dematerialize() + + private val retrySerialFailure = EmitFailureHandler { _: SignalType?, emitResult: EmitResult -> (emitResult == EmitResult.FAIL_NON_SERIALIZED) @@ -79,30 +106,37 @@ class FafLobbyClient( .doOnResolveError { connection, throwable -> LOG.error("Could not find server", throwable) connection.dispose() - }.doOnConnected { + }.doOnConnect { + connection?.dispose() + } + .doOnConnected { val address = it.channel().remoteAddress() as InetSocketAddress LOG.info("Connected to {} on port {}", address.hostName, address.port) - it.addHandler(LineEncoder(LineSeparator.UNIX)) // TODO: This is not working. Raise a bug ticket! Workaround below - .addHandler(LineBasedFrameDecoder(config.bufferSize)) + it.addHandlerFirst(LineEncoder(LineSeparator.UNIX)) // TODO: This is not working. Raise a bug ticket! Workaround below + .addHandlerLast(LineBasedFrameDecoder(config.bufferSize)) connection = it }.doOnDisconnected { LOG.info("Disconnected from server") it.dispose() pingDisposable?.dispose() connectionStatusSink.emitNext(ConnectionStatus.DISCONNECTED, retrySerialFailure) + }.doOnResolveError { conn, throwable -> + LOG.error("Error resolving", throwable) + conn.dispose() } init { rawEvents.filter { it is ServerPingMessage }.doOnNext { send(ClientPongMessage()) }.subscribe() + rawEvents.ofType(NoticeInfo::class.java).filter { it.style == "kick" }.doOnNext { kicked = true }.subscribe() connectionStatus.doOnNext { when (it) { ConnectionStatus.CONNECTING -> connecting = true ConnectionStatus.CONNECTED -> connecting = false ConnectionStatus.DISCONNECTED, null -> { connecting = false - if (autoReconnect) { + if (autoReconnect && !kicked) { LOG.info("Attempting to reconnect") - connectAndLogin(this.config).subscribeOn(Schedulers.immediate()).subscribe() + connectAndLogin(this.config).subscribeOn(Schedulers.single()).subscribe() } } } @@ -110,8 +144,6 @@ class FafLobbyClient( } private fun openConnection(): Mono { - outboundSink = Sinks.many().unicast().onBackpressureBuffer() - outboundMessages = outboundSink.asFlux().publish().autoConnect() return client .wiretap(config.wiretap) .host(config.host) @@ -190,58 +222,24 @@ class FafLobbyClient( .doOnSubscribe { connectionStatusSink.emitNext(ConnectionStatus.CONNECTING, retrySerialFailure) } } - override fun connectAndLogin(config: Config): Mono { + override fun connectAndLogin(config: Config): Mono { this.config = config - - if (loginMono == null || (!connecting && (connection == null || connection?.isDisposed == true))) { - val retry = createRetrySpec(config) - val autoReconnect = this.autoReconnect - this.autoReconnect = false - - authenticateOnNextSession(config) - - val loginSink = Sinks.one() - emitNextLoginResponse(loginSink) - - loginMono = openConnection() - .then(Mono.fromCallable { send(SessionRequest(config.version, config.userAgent)) }) - .retryWhen(retry) - .doOnError { LOG.error("Error during connection", it) } - .flatMap { loginSink.asMono().timeout(Duration.ofMinutes(1)) } - .doOnNext { - connectionStatusSink.emitNext(ConnectionStatus.CONNECTED, retrySerialFailure) - this.autoReconnect = autoReconnect - }.cache() - } - - return loginMono as Mono - } - - private fun emitNextLoginResponse(loginSink: Sinks.One) { - LOG.debug("Starting login listener") - rawEvents.filter { - it is LoginSuccessResponse || it is LoginFailedResponse - }.next().doOnNext { - when (it) { - is LoginSuccessResponse -> loginSink.emitValue(it, retrySerialFailure) - is LoginFailedResponse -> loginSink.emitError(LoginException(it.text), retrySerialFailure) - } - }.subscribeOn(Schedulers.immediate()).subscribe() + kicked = false + autoReconnect = true + return loginMono } - private fun authenticateOnNextSession(config: Config) { + private fun prepareAuthenticateOnNextSession() { LOG.debug("Starting session listener") - rawEvents.filter { - it is SessionResponse - }.next().cast(SessionResponse::class.java).doOnNext { message -> - config.tokenMono.doOnNext { token -> - send(AuthenticateRequest(token, message.session, config.generateUid.apply(message.session))) - }.subscribeOn(Schedulers.immediate()).subscribe() - }.subscribe() + rawEvents.ofType(SessionResponse::class.java).next() + .zipWith(config.tokenMono).doOnNext { messageTokenTuple -> + send(AuthenticateRequest(messageTokenTuple.t2, messageTokenTuple.t1.session, config.generateUid.apply(messageTokenTuple.t1.session))) + }.subscribeOn(Schedulers.single()).subscribe() } private fun createRetrySpec(config: Config) = Retry.fixedDelay(config.maxRetryAttempts, Duration.ofSeconds(config.retryWaitSeconds)) + .filter { it !is LoginException } .doBeforeRetry { retry: RetrySignal -> LOG.warn( "Could not reach server retrying: Attempt #{} of {}", @@ -258,19 +256,7 @@ class FafLobbyClient( override fun disconnect() { autoReconnect = false - when (val conn = connection) { - null -> LOG.warn("Attempting to disconnect while never connected") - else -> { - if (conn.isDisposed) { - LOG.info("Already disconnected") - return - } - - LOG.info("Disconnecting from server") - outboundSink.emitComplete(retrySerialFailure) - conn.dispose() - } - } + connection?.dispose() } private fun pingWithDelay(): Mono = ping().delaySubscription(Duration.ofSeconds(minPingIntervalSeconds)) @@ -323,8 +309,7 @@ class FafLobbyClient( ) }.then( events - .filter { it is GameLaunchResponse } - .cast(GameLaunchResponse::class.java) + .ofType(GameLaunchResponse::class.java) .next() ) @@ -333,8 +318,7 @@ class FafLobbyClient( send(JoinGameRequest(gameId, password)) }.then( events - .filter { it is GameLaunchResponse } - .cast(GameLaunchResponse::class.java) + .ofType(GameLaunchResponse::class.java) .next() ) @@ -345,8 +329,7 @@ class FafLobbyClient( Mono.fromCallable { send(IceServerListRequest()) } .thenMany( events - .filter { it is IceServerListResponse } - .cast(IceServerListResponse::class.java) + .ofType(IceServerListResponse::class.java) .next() .flatMapIterable { it.iceServers } ) @@ -364,8 +347,7 @@ class FafLobbyClient( override fun getAvailableAvatars(): Flux = Mono.fromCallable { send(AvatarListRequest()) } .thenMany( events - .filter { it is AvatarListInfo } - .cast(AvatarListInfo::class.java) + .ofType(AvatarListInfo::class.java) .next() .flatMapIterable { it.avatarList } ) diff --git a/faf-commons-lobby/src/test/kotlin/com/faforever/commons/lobby/LobbyClientTest.kt b/faf-commons-lobby/src/test/kotlin/com/faforever/commons/lobby/LobbyClientTest.kt index a884cd0e..784166f7 100644 --- a/faf-commons-lobby/src/test/kotlin/com/faforever/commons/lobby/LobbyClientTest.kt +++ b/faf-commons-lobby/src/test/kotlin/com/faforever/commons/lobby/LobbyClientTest.kt @@ -1,5 +1,6 @@ package com.faforever.commons.lobby +import ch.qos.logback.classic.Level import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule @@ -8,6 +9,7 @@ import io.netty.handler.codec.LineBasedFrameDecoder import io.netty.handler.codec.string.LineEncoder import io.netty.handler.codec.string.LineSeparator import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeAll import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.skyscreamer.jsonassert.JSONAssert @@ -29,10 +31,17 @@ import java.time.temporal.ChronoUnit class LobbyClientTest { companion object { - val TIMEOUT: Long = 5000; + val TIMEOUT: Long = 10000; val TIMEOUT_UNIT = ChronoUnit.MILLIS val LOOPBACK_ADDRESS = InetAddress.getLoopbackAddress() val LOG: Logger = LoggerFactory.getLogger(FafLobbyClient::class.java) + + @JvmStatic + @BeforeAll + fun setLogLevel(): Unit { + val lobbyLogger = LoggerFactory.getLogger("com.faforever.commons.lobby") as ch.qos.logback.classic.Logger + lobbyLogger.level = Level.TRACE + } } private val objectMapper: ObjectMapper = ObjectMapper().registerModule(KotlinModule()) @@ -67,8 +76,8 @@ class LobbyClientTest { .doOnConnection { LOG.debug("New Client connected to server") currentConnection = it - it.addHandler(LineEncoder(LineSeparator.UNIX)) // TODO: This is not working. Raise a bug ticket! Workaround below - .addHandler(LineBasedFrameDecoder(1024 * 1024)) + it.addHandlerFirst(LineEncoder(LineSeparator.UNIX)) // TODO: This is not working. Raise a bug ticket! Workaround below + .addHandlerLast(LineBasedFrameDecoder(1024 * 1024)) }.doOnBound { disposableServer: DisposableServer -> LOG.debug( "Fake server listening at {} on port {}", @@ -529,6 +538,30 @@ class LobbyClientTest { stepVerifierServer.verify(verificationDuration) } + @Test + fun testOnlySingleConnect() { + val config = FafLobbyClient.Config( + Mono.just(token), + "0", + "downlords-faf-client", + disposableServer.host(), + disposableServer.port(), + { "abc" }, + 1024 * 1024, + false, + 5, + 5 + ) + + val verifyLater = StepVerifier.create(serverMessagesReceived.take(Duration.ofSeconds(5))) + .expectComplete() + .verifyLater(); + + StepVerifier.create(instance.connectAndLogin(config)).expectNextCount(1).verifyComplete() + + verifyLater.verify() + } + @Test fun testAutoReconnect() { currentConnection.dispose()