Skip to content

Commit

Permalink
Fix reconnect logic (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheikah45 authored May 17, 2023
1 parent bbb28b8 commit 1a4afa3
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 91 deletions.
6 changes: 3 additions & 3 deletions faf-commons-lobby/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ version = "1.0.0-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_11

dependencies {
implementation("io.projectreactor.netty:reactor-netty:1.0.4")
api("io.projectreactor:reactor-core:3.4.3")
implementation("io.projectreactor.netty:reactor-netty:1.1.6")
api("io.projectreactor:reactor-core:3.5.5")
testImplementation("io.projectreactor:reactor-test:3.4.3")
testImplementation("ch.qos.logback:logback-classic:1.2.3")
testImplementation("ch.qos.logback:logback-classic:1.2.9")
testImplementation("org.slf4j:slf4j-api:1.7.25")
val jacksonVersion = "2.12.2"
testImplementation("com.fasterxml.jackson.core:jackson-core:${jacksonVersion}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import reactor.core.publisher.Mono
*/
interface ConnectionApi {

fun connectAndLogin(config: FafLobbyClient.Config): Mono<LoginSuccessResponse>
fun connectAndLogin(config: FafLobbyClient.Config): Mono<Player>

fun disconnect()

Expand Down Expand Up @@ -62,13 +62,6 @@ data class SessionResponse(
val session: Long,
) : ServerMessage

/**
* The server confirms a successful login and sends us our player info
*/
data class LoginSuccessResponse(
val me: Player,
) : ServerMessage

/**
* Randomly assigned password to login into the irc
*/
Expand Down Expand Up @@ -112,13 +105,22 @@ data class Player(
}
}

sealed interface LoginResponse : ServerMessage

/**
* The server confirms a successful login and sends us our player info
*/
data class LoginSuccessResponse(
val me: Player,
) : LoginResponse

/**
* Response if the login failed with an all english response on what failed.
* FIXME: This should send error codes instead of translated text.
*/
data class LoginFailedResponse(
val text: String?,
) : ServerMessage
) : LoginResponse

@Deprecated("Ice Servers should be queried from the FAF API")
data class IceServer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ class FafLobbyClient(
)

private lateinit var config: Config
private lateinit var outboundSink: Sinks.Many<ClientMessage>
private lateinit var outboundMessages: Flux<ClientMessage>

private var connection: Connection? = null
private var pingDisposable: Disposable? = null
private var loginMono: Mono<LoginSuccessResponse>? = null
private var connecting: Boolean = false

var minPingIntervalSeconds: Long = 60
var autoReconnect: Boolean = true

private var autoReconnect: Boolean = false
private var kicked: Boolean = false
private val outboundSink: Sinks.Many<ClientMessage> = Sinks.many().unicast().onBackpressureBuffer()
private val outboundMessages: Flux<ClientMessage> = outboundSink.asFlux().publish().autoConnect()
private val eventSink: Sinks.Many<ServerMessage> = Sinks.many().unicast().onBackpressureBuffer()
private val rawEvents = eventSink.asFlux().publish().autoConnect()
private val connectionStatusSink: Sinks.Many<ConnectionStatus> = Sinks.many().unicast().onBackpressureBuffer()
Expand All @@ -69,6 +69,33 @@ class FafLobbyClient(
it !is LoginFailedResponse
}

private val loginResponseMono = rawEvents.ofType(LoginResponse::class.java).next().flatMap {
when (it) {
is LoginSuccessResponse -> Mono.just(it.me)
is LoginFailedResponse -> Mono.error(LoginException(it.text))
}
}.timeout(Duration.ofMinutes(1))
.doOnError(LoginException::class.java) { kicked = true }
.doOnSubscribe {
prepareAuthenticateOnNextSession()
send(SessionRequest(config.version, config.userAgent))
}

private val loginMono = Mono.defer {
openConnection()
.then(loginResponseMono)
.retryWhen(createRetrySpec(config))
}
.doOnError { LOG.error("Error during connection", it); connection?.dispose() }
.doOnCancel { LOG.debug("Login cancelled"); disconnect() }
.doOnSuccess {
connectionStatusSink.emitNext(ConnectionStatus.CONNECTED, retrySerialFailure)
}
.materialize()
.cacheInvalidateIf { it.isOnError || (!connecting && (connection == null || connection?.isDisposed == true)) }
.dematerialize<Player>()


private val retrySerialFailure =
EmitFailureHandler { _: SignalType?, emitResult: EmitResult ->
(emitResult == EmitResult.FAIL_NON_SERIALIZED)
Expand All @@ -79,39 +106,44 @@ class FafLobbyClient(
.doOnResolveError { connection, throwable ->
LOG.error("Could not find server", throwable)
connection.dispose()
}.doOnConnected {
}.doOnConnect {
connection?.dispose()
}
.doOnConnected {
val address = it.channel().remoteAddress() as InetSocketAddress
LOG.info("Connected to {} on port {}", address.hostName, address.port)
it.addHandler(LineEncoder(LineSeparator.UNIX)) // TODO: This is not working. Raise a bug ticket! Workaround below
.addHandler(LineBasedFrameDecoder(config.bufferSize))
it.addHandlerFirst(LineEncoder(LineSeparator.UNIX)) // TODO: This is not working. Raise a bug ticket! Workaround below
.addHandlerLast(LineBasedFrameDecoder(config.bufferSize))
connection = it
}.doOnDisconnected {
LOG.info("Disconnected from server")
it.dispose()
pingDisposable?.dispose()
connectionStatusSink.emitNext(ConnectionStatus.DISCONNECTED, retrySerialFailure)
}.doOnResolveError { conn, throwable ->
LOG.error("Error resolving", throwable)
conn.dispose()
}

init {
rawEvents.filter { it is ServerPingMessage }.doOnNext { send(ClientPongMessage()) }.subscribe()
rawEvents.ofType(NoticeInfo::class.java).filter { it.style == "kick" }.doOnNext { kicked = true }.subscribe()
connectionStatus.doOnNext {
when (it) {
ConnectionStatus.CONNECTING -> connecting = true
ConnectionStatus.CONNECTED -> connecting = false
ConnectionStatus.DISCONNECTED, null -> {
connecting = false
if (autoReconnect) {
if (autoReconnect && !kicked) {
LOG.info("Attempting to reconnect")
connectAndLogin(this.config).subscribeOn(Schedulers.immediate()).subscribe()
connectAndLogin(this.config).subscribeOn(Schedulers.single()).subscribe()
}
}
}
}.subscribe()
}

private fun openConnection(): Mono<out Connection> {
outboundSink = Sinks.many().unicast().onBackpressureBuffer()
outboundMessages = outboundSink.asFlux().publish().autoConnect()
return client
.wiretap(config.wiretap)
.host(config.host)
Expand Down Expand Up @@ -190,58 +222,24 @@ class FafLobbyClient(
.doOnSubscribe { connectionStatusSink.emitNext(ConnectionStatus.CONNECTING, retrySerialFailure) }
}

override fun connectAndLogin(config: Config): Mono<LoginSuccessResponse> {
override fun connectAndLogin(config: Config): Mono<Player> {
this.config = config

if (loginMono == null || (!connecting && (connection == null || connection?.isDisposed == true))) {
val retry = createRetrySpec(config)
val autoReconnect = this.autoReconnect
this.autoReconnect = false

authenticateOnNextSession(config)

val loginSink = Sinks.one<LoginSuccessResponse>()
emitNextLoginResponse(loginSink)

loginMono = openConnection()
.then(Mono.fromCallable { send(SessionRequest(config.version, config.userAgent)) })
.retryWhen(retry)
.doOnError { LOG.error("Error during connection", it) }
.flatMap { loginSink.asMono().timeout(Duration.ofMinutes(1)) }
.doOnNext {
connectionStatusSink.emitNext(ConnectionStatus.CONNECTED, retrySerialFailure)
this.autoReconnect = autoReconnect
}.cache()
}

return loginMono as Mono<LoginSuccessResponse>
}

private fun emitNextLoginResponse(loginSink: Sinks.One<LoginSuccessResponse>) {
LOG.debug("Starting login listener")
rawEvents.filter {
it is LoginSuccessResponse || it is LoginFailedResponse
}.next().doOnNext {
when (it) {
is LoginSuccessResponse -> loginSink.emitValue(it, retrySerialFailure)
is LoginFailedResponse -> loginSink.emitError(LoginException(it.text), retrySerialFailure)
}
}.subscribeOn(Schedulers.immediate()).subscribe()
kicked = false
autoReconnect = true
return loginMono
}

private fun authenticateOnNextSession(config: Config) {
private fun prepareAuthenticateOnNextSession() {
LOG.debug("Starting session listener")
rawEvents.filter {
it is SessionResponse
}.next().cast(SessionResponse::class.java).doOnNext { message ->
config.tokenMono.doOnNext { token ->
send(AuthenticateRequest(token, message.session, config.generateUid.apply(message.session)))
}.subscribeOn(Schedulers.immediate()).subscribe()
}.subscribe()
rawEvents.ofType(SessionResponse::class.java).next()
.zipWith(config.tokenMono).doOnNext { messageTokenTuple ->
send(AuthenticateRequest(messageTokenTuple.t2, messageTokenTuple.t1.session, config.generateUid.apply(messageTokenTuple.t1.session)))
}.subscribeOn(Schedulers.single()).subscribe()
}

private fun createRetrySpec(config: Config) =
Retry.fixedDelay(config.maxRetryAttempts, Duration.ofSeconds(config.retryWaitSeconds))
.filter { it !is LoginException }
.doBeforeRetry { retry: RetrySignal ->
LOG.warn(
"Could not reach server retrying: Attempt #{} of {}",
Expand All @@ -258,19 +256,7 @@ class FafLobbyClient(

override fun disconnect() {
autoReconnect = false
when (val conn = connection) {
null -> LOG.warn("Attempting to disconnect while never connected")
else -> {
if (conn.isDisposed) {
LOG.info("Already disconnected")
return
}

LOG.info("Disconnecting from server")
outboundSink.emitComplete(retrySerialFailure)
conn.dispose()
}
}
connection?.dispose()
}

private fun pingWithDelay(): Mono<Unit> = ping().delaySubscription(Duration.ofSeconds(minPingIntervalSeconds))
Expand Down Expand Up @@ -323,8 +309,7 @@ class FafLobbyClient(
)
}.then(
events
.filter { it is GameLaunchResponse }
.cast(GameLaunchResponse::class.java)
.ofType(GameLaunchResponse::class.java)
.next()
)

Expand All @@ -333,8 +318,7 @@ class FafLobbyClient(
send(JoinGameRequest(gameId, password))
}.then(
events
.filter { it is GameLaunchResponse }
.cast(GameLaunchResponse::class.java)
.ofType(GameLaunchResponse::class.java)
.next()
)

Expand All @@ -345,8 +329,7 @@ class FafLobbyClient(
Mono.fromCallable { send(IceServerListRequest()) }
.thenMany(
events
.filter { it is IceServerListResponse }
.cast(IceServerListResponse::class.java)
.ofType(IceServerListResponse::class.java)
.next()
.flatMapIterable { it.iceServers }
)
Expand All @@ -364,8 +347,7 @@ class FafLobbyClient(
override fun getAvailableAvatars(): Flux<Player.Avatar> = Mono.fromCallable { send(AvatarListRequest()) }
.thenMany(
events
.filter { it is AvatarListInfo }
.cast(AvatarListInfo::class.java)
.ofType(AvatarListInfo::class.java)
.next()
.flatMapIterable { it.avatarList }
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.faforever.commons.lobby

import ch.qos.logback.classic.Level
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
Expand All @@ -8,6 +9,7 @@ import io.netty.handler.codec.LineBasedFrameDecoder
import io.netty.handler.codec.string.LineEncoder
import io.netty.handler.codec.string.LineSeparator
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.skyscreamer.jsonassert.JSONAssert
Expand All @@ -29,10 +31,17 @@ import java.time.temporal.ChronoUnit

class LobbyClientTest {
companion object {
val TIMEOUT: Long = 5000;
val TIMEOUT: Long = 10000;
val TIMEOUT_UNIT = ChronoUnit.MILLIS
val LOOPBACK_ADDRESS = InetAddress.getLoopbackAddress()
val LOG: Logger = LoggerFactory.getLogger(FafLobbyClient::class.java)

@JvmStatic
@BeforeAll
fun setLogLevel(): Unit {
val lobbyLogger = LoggerFactory.getLogger("com.faforever.commons.lobby") as ch.qos.logback.classic.Logger
lobbyLogger.level = Level.TRACE
}
}

private val objectMapper: ObjectMapper = ObjectMapper().registerModule(KotlinModule())
Expand Down Expand Up @@ -67,8 +76,8 @@ class LobbyClientTest {
.doOnConnection {
LOG.debug("New Client connected to server")
currentConnection = it
it.addHandler(LineEncoder(LineSeparator.UNIX)) // TODO: This is not working. Raise a bug ticket! Workaround below
.addHandler(LineBasedFrameDecoder(1024 * 1024))
it.addHandlerFirst(LineEncoder(LineSeparator.UNIX)) // TODO: This is not working. Raise a bug ticket! Workaround below
.addHandlerLast(LineBasedFrameDecoder(1024 * 1024))
}.doOnBound { disposableServer: DisposableServer ->
LOG.debug(
"Fake server listening at {} on port {}",
Expand Down Expand Up @@ -529,6 +538,30 @@ class LobbyClientTest {
stepVerifierServer.verify(verificationDuration)
}

@Test
fun testOnlySingleConnect() {
val config = FafLobbyClient.Config(
Mono.just(token),
"0",
"downlords-faf-client",
disposableServer.host(),
disposableServer.port(),
{ "abc" },
1024 * 1024,
false,
5,
5
)

val verifyLater = StepVerifier.create(serverMessagesReceived.take(Duration.ofSeconds(5)))
.expectComplete()
.verifyLater();

StepVerifier.create(instance.connectAndLogin(config)).expectNextCount(1).verifyComplete()

verifyLater.verify()
}

@Test
fun testAutoReconnect() {
currentConnection.dispose()
Expand Down

0 comments on commit 1a4afa3

Please sign in to comment.