diff --git a/faf-commons-lobby/src/test/kotlin/com/faforever/commons/lobby/LobbyClientTest.kt b/faf-commons-lobby/src/test/kotlin/com/faforever/commons/lobby/LobbyClientTest.kt index 236177d1..f8082c50 100644 --- a/faf-commons-lobby/src/test/kotlin/com/faforever/commons/lobby/LobbyClientTest.kt +++ b/faf-commons-lobby/src/test/kotlin/com/faforever/commons/lobby/LobbyClientTest.kt @@ -23,6 +23,7 @@ import reactor.netty.Connection import reactor.netty.DisposableServer import reactor.netty.NettyInbound import reactor.netty.NettyOutbound +import reactor.netty.http.server.HttpServer import reactor.netty.tcp.TcpServer import reactor.test.StepVerifier import java.net.InetAddress @@ -30,7 +31,6 @@ import java.nio.charset.StandardCharsets import java.time.Duration import java.time.temporal.ChronoUnit -@Disabled class LobbyClientTest { companion object { val TIMEOUT: Long = 10000; @@ -46,10 +46,7 @@ class LobbyClientTest { } } - private val objectMapper: ObjectMapper = ObjectMapper().registerModule(KotlinModule()) - .registerModule(JavaTimeModule()) - .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) - .enable(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE); + private val objectMapper: ObjectMapper = ObjectMapper().registerModule(KotlinModule()).registerModule(JavaTimeModule()).disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).enable(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE); private val token = "abc" private val serverReceivedSink = Sinks.many().unicast().onBackpressureBuffer() private val serverMessagesReceived = serverReceivedSink.asFlux().publish().autoConnect() @@ -57,14 +54,14 @@ class LobbyClientTest { private val serverMessagesSent = serverSentSink.asFlux().publish().autoConnect() private lateinit var disposableServer: DisposableServer private lateinit var currentConnection: Connection + private lateinit var url: String private val instance: FafLobbyClient = FafLobbyClient(objectMapper) private val playerUid = 123 private val sessionId: Long = 456 private val verificationDuration = Duration.of(TIMEOUT, TIMEOUT_UNIT) - private val retrySerialFailure = - Sinks.EmitFailureHandler { _: SignalType?, emitResult: Sinks.EmitResult -> - (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) - } + private val retrySerialFailure = Sinks.EmitFailureHandler { _: SignalType?, emitResult: Sinks.EmitResult -> + (emitResult == Sinks.EmitResult.FAIL_NON_SERIALIZED) + } @BeforeEach fun setUp() { @@ -74,83 +71,49 @@ class LobbyClientTest { } private fun startFakeFafLobbyServer() { - disposableServer = TcpServer.create() - .doOnConnection { + disposableServer = HttpServer.create().doOnConnection { LOG.debug("New Client connected to server") currentConnection = it it.addHandlerFirst(LineEncoder(LineSeparator.UNIX)) // TODO: This is not working. Raise a bug ticket! Workaround below .addHandlerLast(LineBasedFrameDecoder(1024 * 1024)) }.doOnBound { disposableServer: DisposableServer -> - LOG.debug( - "Fake server listening at {} on port {}", - disposableServer.host(), - disposableServer.port() - ) - }.noSSL() - .host(LOOPBACK_ADDRESS.hostAddress) - .handle { inbound: NettyInbound, outbound: NettyOutbound -> - val inboundMono = inbound.receive() - .asString(StandardCharsets.UTF_8) - .doOnNext { message: String? -> - LOG.debug("Received message at server {}", message) - LOG.debug("Emit Result is {}", serverReceivedSink.tryEmitNext(message!!)) - } - .then() - val outboundMono = outbound.sendString( - serverMessagesSent - .doOnNext { LOG.debug("Sending message from fake server {}", it) } - .map { message: String -> + LOG.debug("Fake server listening at {} on port {}", disposableServer.host(), disposableServer.port()) + }.noSSL().host(LOOPBACK_ADDRESS.hostAddress).route { + it.ws("/") { inbound: NettyInbound, outbound: NettyOutbound -> + val inboundMono = inbound.receive().asString(StandardCharsets.UTF_8).doOnNext { message: String? -> + LOG.debug("Received message at server {}", message) + LOG.debug("Emit Result is {}", serverReceivedSink.tryEmitNext(message!!)) + }.then() + val outboundMono = outbound.sendString(serverMessagesSent.doOnNext { LOG.debug("Sending message from fake server {}", it) }.map { message: String -> message + "\n" - }, StandardCharsets.UTF_8 - ).then() - inboundMono.mergeWith(outboundMono) - } - .bindNow() + }, StandardCharsets.UTF_8).then() + Mono.firstWithSignal(inboundMono, outboundMono) + } + }.bindNow() + url = "http://${disposableServer.host()}:${disposableServer.port()}" } - private fun commandMatches(message: String, command: String) = message.contains( - "\"command\":\"$command\"" - ) + private fun commandMatches(message: String, command: String) = message.contains("\"command\":\"$command\"") - private fun assertCommandMatch(message: String, command: ClientMessage) = - JSONAssert.assertEquals(objectMapper.writeValueAsString(command), message, true) + private fun assertCommandMatch(message: String, command: ClientMessage) = JSONAssert.assertEquals(objectMapper.writeValueAsString(command), message, true) private fun connectAndLogIn() { - val config = FafLobbyClient.Config( - Mono.just(token), - "0", - "downlords-faf-client", - disposableServer.host(), - { "abc" }, - 1024 * 1024, - false, - 5, - 5 - ) + val config = FafLobbyClient.Config(Mono.just(token), "0", "downlords-faf-client", url, { "abc" }, 1024 * 1024, false, 5, 5) - serverMessagesReceived.filter { commandMatches(it, "ask_session") } - .next() - .doOnNext { + serverMessagesReceived.filter { commandMatches(it, "ask_session") }.next().doOnNext { val sessionMessage = SessionResponse(sessionId) sendFromServer(sessionMessage) }.subscribe() - serverMessagesReceived.filter { commandMatches(it, "auth") } - .next() - .doOnNext { + serverMessagesReceived.filter { commandMatches(it, "auth") }.next().doOnNext { val me = Player(playerUid, "Junit", null, null, "", HashMap(), HashMap()) val loginServerMessage = LoginSuccessResponse(me) sendFromServer(loginServerMessage) }.subscribe() - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(2)) - .assertNext { assertCommandMatch(it, SessionRequest(config.version, config.userAgent)) } - .assertNext { assertCommandMatch(it, AuthenticateRequest(token, sessionId, config.generateUid.apply(sessionId))) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(2)).assertNext { assertCommandMatch(it, SessionRequest(config.version, config.userAgent)) }.assertNext { assertCommandMatch(it, AuthenticateRequest(token, sessionId, config.generateUid.apply(sessionId))) }.expectComplete().verifyLater() - StepVerifier.create(instance.connectAndLogin(config)).expectNextCount(1).expectComplete() - .verify(verificationDuration) + StepVerifier.create(instance.connectAndLogin(config)).expectNextCount(1).expectComplete().verify(verificationDuration) stepVerifier.verify(verificationDuration) } @@ -167,10 +130,7 @@ class LobbyClientTest { @Test fun testBroadcast() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, BroadcastRequest("test")) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, BroadcastRequest("test")) }.expectComplete().verifyLater() instance.broadcastMessage("test") @@ -179,10 +139,7 @@ class LobbyClientTest { @Test fun testClosePlayerGame() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, ClosePlayerGameRequest(0)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, ClosePlayerGameRequest(0)) }.expectComplete().verifyLater() instance.closePlayerGame(0) @@ -191,10 +148,7 @@ class LobbyClientTest { @Test fun testClosePlayerLobby() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, ClosePlayerLobbyRequest(0)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, ClosePlayerLobbyRequest(0)) }.expectComplete().verifyLater() instance.closePlayerLobby(0) @@ -203,26 +157,9 @@ class LobbyClientTest { @Test fun testHostGame() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { - assertCommandMatch( - it, HostGameRequest( - "map", - "blah", - "faf", - BooleanArray(0), - GameAccess.PUBLIC, - 0, - null, - GameVisibility.PUBLIC, - null, - null, - false - ) - ) - } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { + assertCommandMatch(it, HostGameRequest("map", "blah", "faf", BooleanArray(0), GameAccess.PUBLIC, 0, null, GameVisibility.PUBLIC, null, null, false)) + }.expectComplete().verifyLater() instance.requestHostGame( "blah", @@ -240,10 +177,7 @@ class LobbyClientTest { @Test fun testJoinGame() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, JoinGameRequest(0, null)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, JoinGameRequest(0, null)) }.expectComplete().verifyLater() instance.requestJoinGame(0, null).subscribe() @@ -252,10 +186,7 @@ class LobbyClientTest { @Test fun testRestoreGameSession() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, RestoreGameSessionRequest(0)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, RestoreGameSessionRequest(0)) }.expectComplete().verifyLater() instance.restoreGameSession(0) @@ -264,10 +195,7 @@ class LobbyClientTest { @Test fun testGetIceServers() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, IceServerListRequest()) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, IceServerListRequest()) }.expectComplete().verifyLater() instance.getIceServers().subscribe() @@ -276,10 +204,7 @@ class LobbyClientTest { @Test fun testAddFriend() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, AddFriendRequest(0)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, AddFriendRequest(0)) }.expectComplete().verifyLater() instance.addFriend(0) @@ -288,10 +213,7 @@ class LobbyClientTest { @Test fun testAddFoe() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, AddFoeRequest(0)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, AddFoeRequest(0)) }.expectComplete().verifyLater() instance.addFoe(0) @@ -300,10 +222,7 @@ class LobbyClientTest { @Test fun testRemoveFriend() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, RemoveFriendRequest(0)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, RemoveFriendRequest(0)) }.expectComplete().verifyLater() instance.removeFriend(0) @@ -312,10 +231,7 @@ class LobbyClientTest { @Test fun testRemoveFoe() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, RemoveFoeRequest(0)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, RemoveFoeRequest(0)) }.expectComplete().verifyLater() instance.removeFoe(0) @@ -324,10 +240,7 @@ class LobbyClientTest { @Test fun testSelectAvatar() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, SelectAvatarRequest(null)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, SelectAvatarRequest(null)) }.expectComplete().verifyLater() instance.selectAvatar(null) @@ -336,10 +249,7 @@ class LobbyClientTest { @Test fun testGetAvailableAvatars() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, AvatarListRequest()) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, AvatarListRequest()) }.expectComplete().verifyLater() instance.getAvailableAvatars().subscribe() @@ -348,10 +258,7 @@ class LobbyClientTest { @Test fun testRequestMatchmakerInfo() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, MatchmakerInfoRequest()) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, MatchmakerInfoRequest()) }.expectComplete().verifyLater() instance.requestMatchmakerInfo() @@ -360,10 +267,7 @@ class LobbyClientTest { @Test fun testGameMatchmaking() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, GameMatchmakingRequest("test", MatchmakerState.START)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, GameMatchmakingRequest("test", MatchmakerState.START)) }.expectComplete().verifyLater() instance.gameMatchmaking("test", MatchmakerState.START) @@ -372,10 +276,7 @@ class LobbyClientTest { @Test fun testInviteToParty() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, InviteToPartyRequest(0)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, InviteToPartyRequest(0)) }.expectComplete().verifyLater() instance.inviteToParty(0) @@ -384,10 +285,7 @@ class LobbyClientTest { @Test fun testAcceptInvite() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, AcceptInviteToPartyRequest(0)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, AcceptInviteToPartyRequest(0)) }.expectComplete().verifyLater() instance.acceptPartyInvite(0) @@ -396,10 +294,7 @@ class LobbyClientTest { @Test fun testKickPlayer() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, KickPlayerFromPartyRequest(0)) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, KickPlayerFromPartyRequest(0)) }.expectComplete().verifyLater() instance.kickPlayerFromParty(0) @@ -408,10 +303,7 @@ class LobbyClientTest { @Test fun testUnreadyParty() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, UnreadyPartyRequest()) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, UnreadyPartyRequest()) }.expectComplete().verifyLater() instance.unreadyParty() @@ -420,10 +312,7 @@ class LobbyClientTest { @Test fun testLeaveParty() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, LeavePartyRequest()) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, LeavePartyRequest()) }.expectComplete().verifyLater() instance.leaveParty() @@ -432,10 +321,7 @@ class LobbyClientTest { @Test fun testSetPartyFactions() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, SelectPartyFactionsRequest(setOf())) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, SelectPartyFactionsRequest(setOf())) }.expectComplete().verifyLater() instance.setPartyFactions(setOf()) @@ -444,10 +330,7 @@ class LobbyClientTest { @Test fun testSendGpgGameMessage() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, GpgGameOutboundMessage("Test", listOf())) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, GpgGameOutboundMessage("Test", listOf())) }.expectComplete().verifyLater() instance.sendGpgGameMessage(GpgGameOutboundMessage("Test", listOf())) @@ -456,10 +339,7 @@ class LobbyClientTest { @Test fun testPingInterval() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, ClientPingMessage()) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, ClientPingMessage()) }.expectComplete().verifyLater() instance.minPingIntervalSeconds = 1 @@ -470,10 +350,7 @@ class LobbyClientTest { @Test fun testPingOnceInterval() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(Duration.ofSeconds(2))) - .assertNext { assertCommandMatch(it, ClientPingMessage()) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(Duration.ofSeconds(2))).assertNext { assertCommandMatch(it, ClientPingMessage()) }.expectComplete().verifyLater() instance.minPingIntervalSeconds = 1 @@ -485,10 +362,7 @@ class LobbyClientTest { @Test fun testPingResponse() { - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)) - .assertNext { assertCommandMatch(it, ClientPongMessage()) } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(1)).assertNext { assertCommandMatch(it, ClientPongMessage()) }.expectComplete().verifyLater() sendFromServer(ServerPingMessage()) @@ -499,88 +373,37 @@ class LobbyClientTest { fun testOnAuthenticationFailed() { instance.disconnect() - val config = FafLobbyClient.Config( - Mono.just(token), - "0", - "downlords-faf-client", - disposableServer.host(), - { "abc" }, - 1024 * 1024, - false, - 5, - 5 - ) - - val stepVerifierServer = StepVerifier.create(serverMessagesReceived.take(2)) - .assertNext { assertCommandMatch(it, SessionRequest(config.version, config.userAgent)) } - .assertNext { assertCommandMatch(it, AuthenticateRequest(token, sessionId, config.generateUid.apply(sessionId))) } - .expectComplete() - .verifyLater() - - serverMessagesReceived.filter { commandMatches(it, "ask_session") } - .next() - .doOnNext { + val config = FafLobbyClient.Config(Mono.just(token), "0", "downlords-faf-client", url, { "abc" }, 1024 * 1024, false, 5, 5) + + val stepVerifierServer = StepVerifier.create(serverMessagesReceived.take(2)).assertNext { assertCommandMatch(it, SessionRequest(config.version, config.userAgent)) }.assertNext { assertCommandMatch(it, AuthenticateRequest(token, sessionId, config.generateUid.apply(sessionId))) }.expectComplete().verifyLater() + + serverMessagesReceived.filter { commandMatches(it, "ask_session") }.next().doOnNext { val sessionMessage = SessionResponse(sessionId) sendFromServer(sessionMessage) }.subscribe() - serverMessagesReceived.filter { commandMatches(it, "auth") } - .next() - .doOnNext { + serverMessagesReceived.filter { commandMatches(it, "auth") }.next().doOnNext { val authenticationFailedMessage = LoginFailedResponse("boo") sendFromServer(authenticationFailedMessage) }.subscribe() - StepVerifier.create(instance.connectAndLogin(config)) - .expectError(LoginException::class.java) - .verify(verificationDuration) + StepVerifier.create(instance.connectAndLogin(config)).expectError(LoginException::class.java).verify(verificationDuration) stepVerifierServer.verify(verificationDuration) } - @Test - fun testOnlySingleConnect() { - val config = FafLobbyClient.Config( - Mono.just(token), - "0", - "downlords-faf-client", - disposableServer.host(), - { "abc" }, - 1024 * 1024, - false, - 5, - 5 - ) - - val verifyLater = StepVerifier.create(serverMessagesReceived.take(Duration.ofSeconds(5))) - .expectComplete() - .verifyLater(); - - StepVerifier.create(instance.connectAndLogin(config)).expectNextCount(1).verifyComplete() - - verifyLater.verify() - } - @Test fun testAutoReconnect() { currentConnection.dispose() - val stepVerifier = StepVerifier.create(serverMessagesReceived.take(2)) - .assertNext { commandMatches(it, "ask_session") } - .assertNext { commandMatches(it, "auth") } - .expectComplete() - .verifyLater() + val stepVerifier = StepVerifier.create(serverMessagesReceived.take(2)).assertNext { commandMatches(it, "ask_session") }.assertNext { commandMatches(it, "auth") }.expectComplete().verifyLater() - serverMessagesReceived.filter { commandMatches(it, "ask_session") } - .next() - .doOnNext { + serverMessagesReceived.filter { commandMatches(it, "ask_session") }.next().doOnNext { val sessionMessage = SessionResponse(sessionId) sendFromServer(sessionMessage) }.subscribe() - serverMessagesReceived.filter { commandMatches(it, "auth") } - .next() - .doOnNext { + serverMessagesReceived.filter { commandMatches(it, "auth") }.next().doOnNext { val me = Player(playerUid, "Junit", null, null, "", HashMap(), HashMap()) val loginServerMessage = LoginSuccessResponse(me) sendFromServer(loginServerMessage) @@ -593,7 +416,6 @@ class LobbyClientTest { fun testNoAutoReconnect() { instance.disconnect() - StepVerifier.create(serverMessagesReceived.take(Duration.ofSeconds(5))) - .verifyComplete() + StepVerifier.create(serverMessagesReceived.take(Duration.ofSeconds(5))).verifyComplete() } }