From d9abf3662e533bd0c48be87d4a0ea027bb7f2940 Mon Sep 17 00:00:00 2001 From: Sheikah45 Date: Fri, 13 Oct 2023 18:43:22 -0400 Subject: [PATCH] intense logging --- .../faforever/commons/lobby/FafLobbyClient.kt | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt index f2a98c53..cd34507a 100644 --- a/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt +++ b/faf-commons-lobby/src/main/kotlin/com/faforever/commons/lobby/FafLobbyClient.kt @@ -17,6 +17,7 @@ import reactor.core.publisher.Sinks.EmitResult import reactor.core.scheduler.Schedulers import reactor.netty.Connection import reactor.netty.http.client.HttpClient +import reactor.netty.http.client.WebsocketClientSpec import reactor.util.retry.Retry import reactor.util.retry.Retry.RetrySignal import java.net.InetSocketAddress @@ -58,11 +59,11 @@ class FafLobbyClient( private val eventSink: Sinks.Many = Sinks.many().unicast().onBackpressureBuffer() private val rawEvents = eventSink.asFlux().publish().autoConnect() private val connectionStatusSink: Sinks.Many = Sinks.many().unicast().onBackpressureBuffer() - override val connectionStatus = connectionStatusSink.asFlux().publish().autoConnect() + override val connectionStatus: Flux = connectionStatusSink.asFlux().publish().autoConnect() private val connectionAcquiredSink: Sinks.Many = Sinks.many().unicast().onBackpressureBuffer() private val connectionAcquired: Flux = connectionAcquiredSink.asFlux().publish().autoConnect() - override val events = rawEvents.filter { + override val events: Flux = rawEvents.filter { it !is ServerPingMessage && it !is ServerPongMessage && it !is SessionResponse && @@ -75,7 +76,7 @@ class FafLobbyClient( is LoginSuccessResponse -> Mono.just(it.me) is LoginFailedResponse -> Mono.error(LoginException(it.text)) } - }.timeout(Duration.ofSeconds(10)) + }.timeout(Duration.ofSeconds(30)) .doOnError(LoginException::class.java) { kicked = true } .doFirst { prepareAuthenticateOnNextSession() @@ -150,11 +151,11 @@ class FafLobbyClient( LOG.debug("Opening connection") httpClient .wiretap(config.wiretap) - .websocket() + .websocket(WebsocketClientSpec.builder().maxFramePayloadLength(config.bufferSize).build()) .uri(config.url) .handle { inbound, outbound -> val inboundMono = inbound.receive() - .asString() + .asString(Charsets.UTF_8) .flatMapIterable { it.toCharArray().asIterable() } .windowUntil { '\n' == it } .flatMap { it.collectList().map { chars -> chars.toCharArray() }.map { charArray -> String(charArray) } } @@ -187,8 +188,8 @@ class FafLobbyClient( pingDisposable = pingWithDelay() .subscribeOn(Schedulers.single()) .subscribe() - }.log() - .then() + } + .then(Mono.never()) val outboundMono = outbound.sendString( outboundMessages @@ -217,19 +218,19 @@ class FafLobbyClient( LOG.error("Error during serialization of message {}", it, throwable) Mono.empty() } - }.log() - ).then() + } + ).neverComplete() /* The lobby protocol requires two-way communication. If either the outbound or inbound connections complete/close then we are better off closing the connection to the server. This is why we return a mono that completes when one of the connections finishes */ - Mono.firstWithSignal(inboundMono, outboundMono).log() + Mono.firstWithSignal(inboundMono, outboundMono) } .doOnCancel { LOG.info("Connection cancelled") } .doOnSubscribe { LOG.debug("Beginning connection process") connectionStatusSink.emitNext(ConnectionStatus.CONNECTING, retrySerialFailure) - }.log() + } .subscribe(null, { LOG.warn("Error in connection", it) connectionStatusSink.emitNext(ConnectionStatus.DISCONNECTED, retrySerialFailure)