Skip to content

Commit

Permalink
intense logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheikah45 committed Oct 13, 2023
1 parent 315a2c7 commit d9abf36
Showing 1 changed file with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import reactor.core.publisher.Sinks.EmitResult
import reactor.core.scheduler.Schedulers
import reactor.netty.Connection
import reactor.netty.http.client.HttpClient
import reactor.netty.http.client.WebsocketClientSpec
import reactor.util.retry.Retry
import reactor.util.retry.Retry.RetrySignal
import java.net.InetSocketAddress
Expand Down Expand Up @@ -58,11 +59,11 @@ class FafLobbyClient(
private val eventSink: Sinks.Many<ServerMessage> = Sinks.many().unicast().onBackpressureBuffer()
private val rawEvents = eventSink.asFlux().publish().autoConnect()
private val connectionStatusSink: Sinks.Many<ConnectionStatus> = Sinks.many().unicast().onBackpressureBuffer()
override val connectionStatus = connectionStatusSink.asFlux().publish().autoConnect()
override val connectionStatus: Flux<ConnectionStatus> = connectionStatusSink.asFlux().publish().autoConnect()
private val connectionAcquiredSink: Sinks.Many<Any> = Sinks.many().unicast().onBackpressureBuffer()
private val connectionAcquired: Flux<Any> = connectionAcquiredSink.asFlux().publish().autoConnect()

override val events = rawEvents.filter {
override val events: Flux<ServerMessage> = rawEvents.filter {
it !is ServerPingMessage &&
it !is ServerPongMessage &&
it !is SessionResponse &&
Expand All @@ -75,7 +76,7 @@ class FafLobbyClient(
is LoginSuccessResponse -> Mono.just(it.me)
is LoginFailedResponse -> Mono.error(LoginException(it.text))
}
}.timeout(Duration.ofSeconds(10))
}.timeout(Duration.ofSeconds(30))
.doOnError(LoginException::class.java) { kicked = true }
.doFirst {
prepareAuthenticateOnNextSession()
Expand Down Expand Up @@ -150,11 +151,11 @@ class FafLobbyClient(
LOG.debug("Opening connection")
httpClient
.wiretap(config.wiretap)
.websocket()
.websocket(WebsocketClientSpec.builder().maxFramePayloadLength(config.bufferSize).build())
.uri(config.url)
.handle { inbound, outbound ->
val inboundMono = inbound.receive()
.asString()
.asString(Charsets.UTF_8)
.flatMapIterable { it.toCharArray().asIterable() }
.windowUntil { '\n' == it }
.flatMap { it.collectList().map { chars -> chars.toCharArray() }.map { charArray -> String(charArray) } }
Expand Down Expand Up @@ -187,8 +188,8 @@ class FafLobbyClient(
pingDisposable = pingWithDelay()
.subscribeOn(Schedulers.single())
.subscribe()
}.log()
.then()
}
.then(Mono.never<Unit>())

val outboundMono = outbound.sendString(
outboundMessages
Expand Down Expand Up @@ -217,19 +218,19 @@ class FafLobbyClient(
LOG.error("Error during serialization of message {}", it, throwable)
Mono.empty()
}
}.log()
).then()
}
).neverComplete()

/* The lobby protocol requires two-way communication. If either the outbound or inbound connections complete/close
then we are better off closing the connection to the server. This is why we return a mono that completes when one
of the connections finishes */
Mono.firstWithSignal(inboundMono, outboundMono).log()
Mono.firstWithSignal(inboundMono, outboundMono)
}
.doOnCancel { LOG.info("Connection cancelled") }
.doOnSubscribe {
LOG.debug("Beginning connection process")
connectionStatusSink.emitNext(ConnectionStatus.CONNECTING, retrySerialFailure)
}.log()
}
.subscribe(null, {
LOG.warn("Error in connection", it)
connectionStatusSink.emitNext(ConnectionStatus.DISCONNECTED, retrySerialFailure)
Expand Down

0 comments on commit d9abf36

Please sign in to comment.