diff --git a/build.gradle b/build.gradle index 62ee687da..4753d4046 100644 --- a/build.gradle +++ b/build.gradle @@ -33,9 +33,9 @@ subprojects { apply plugin: 'com.github.sherter.google-java-format' apply plugin: 'com.github.vlsi.gradle-extensions' - ext['reactor-bom.version'] = '2020.0.32' + ext['reactor-bom.version'] = '2020.0.39' ext['logback.version'] = '1.2.10' - ext['netty-bom.version'] = '4.1.93.Final' + ext['netty-bom.version'] = '4.1.106.Final' ext['netty-boringssl.version'] = '2.0.61.Final' ext['hdrhistogram.version'] = '2.1.12' ext['mockito.version'] = '4.11.0' diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java index 0c68db6df..4eca35bde 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java @@ -463,8 +463,14 @@ private Mono acceptSetup( return interceptors .initSocketAcceptor(acceptor) .accept(setupPayload, wrappedRSocketRequester) - .doOnError( - err -> serverSetup.sendError(wrappedDuplexConnection, rejectedSetupError(err))) + .onErrorResume( + err -> + Mono.fromRunnable( + () -> + serverSetup.sendError( + wrappedDuplexConnection, rejectedSetupError(err))) + .then(wrappedDuplexConnection.onClose()) + .then(Mono.error(err))) .doOnNext( rSocketHandler -> { RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler); diff --git a/rsocket-core/src/main/java/io/rsocket/frame/FrameUtil.java b/rsocket-core/src/main/java/io/rsocket/frame/FrameUtil.java index 66d18c8a7..c4f39ba44 100644 --- a/rsocket-core/src/main/java/io/rsocket/frame/FrameUtil.java +++ b/rsocket-core/src/main/java/io/rsocket/frame/FrameUtil.java @@ -33,11 +33,31 @@ public static String toString(ByteBuf frame) { if (FrameHeaderCodec.hasMetadata(frame)) { payload.append("\nMetadata:\n"); - ByteBufUtil.appendPrettyHexDump(payload, getMetadata(frame, frameType)); + ByteBuf metadata = getMetadata(frame, frameType); + if (metadata.readableBytes() < 100) { + ByteBufUtil.appendPrettyHexDump(payload, metadata); + } else { + payload.append( + " +-------------------------------------------------+\n" + + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |\n" + + "+--------+-------------------------------------------------+----------------+\n" + + "|00000000| too large payload | |\n" + + "+--------+-------------------------------------------------+----------------+\n"); + } } payload.append("\nData:\n"); - ByteBufUtil.appendPrettyHexDump(payload, getData(frame, frameType)); + ByteBuf data = getData(frame, frameType); + if (data.readableBytes() < 100) { + ByteBufUtil.appendPrettyHexDump(payload, data); + } else { + payload.append( + " +-------------------------------------------------+\n" + + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |\n" + + "+--------+-------------------------------------------------+----------------+\n" + + "|00000000| too large payload | |\n" + + "+--------+-------------------------------------------------+----------------+\n"); + } return payload.toString(); } diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java index c4dc4d837..ad1b38375 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java @@ -287,6 +287,9 @@ public void setKeepAliveSupport(KeepAliveSupport keepAliveSupport) { @Override public void dispose() { + if (logger.isDebugEnabled()) { + logger.debug("Side[server]|Session[{}]. Disposing session", session); + } Operators.terminate(S, this); resumableConnection.dispose(); } diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/integration/AuthenticationTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/integration/AuthenticationTest.java new file mode 100644 index 000000000..5709595ec --- /dev/null +++ b/rsocket-transport-netty/src/test/java/io/rsocket/integration/AuthenticationTest.java @@ -0,0 +1,78 @@ +package io.rsocket.integration; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.RSocketServer; +import io.rsocket.exceptions.RejectedSetupException; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; +import reactor.netty.tcp.TcpClient; +import reactor.netty.tcp.TcpServer; +import reactor.test.StepVerifier; + +public class AuthenticationTest { + + private static final Logger LOG = LoggerFactory.getLogger(AuthenticationTest.class); + private static final int PORT = 23200; + + @Test + void authTest() { + Hooks.onOperatorDebug(); + createServer().block(); + RSocket rsocketClient = createClient().block(); + + StepVerifier.create(rsocketClient.requestResponse(DefaultPayload.create("Client: Hello"))) + .expectError(RejectedSetupException.class) + .verify(); + } + + private static Mono createServer() { + LOG.info("Starting server at port {}", PORT); + RSocketServer rSocketServer = + RSocketServer.create((connectionSetupPayload, rSocket) -> Mono.just(new MyServerRsocket())); + + TcpServer tcpServer = TcpServer.create().host("localhost").port(PORT); + + return rSocketServer + .interceptors( + interceptorRegistry -> + interceptorRegistry.forSocketAcceptor( + socketAcceptor -> + (setup, sendingSocket) -> { + if (true) { // TODO here would be an authentication check based on the + // setup payload + return Mono.error(new RejectedSetupException("ACCESS_DENIED")); + } else { + return socketAcceptor.accept(setup, sendingSocket); + } + })) + .bind(TcpServerTransport.create(tcpServer)) + .doOnNext(closeableChannel -> LOG.info("RSocket server started.")); + } + + private static Mono createClient() { + LOG.info("Connecting...."); + return RSocketConnector.create() + .connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT))) + .doOnNext(rSocket -> LOG.info("Successfully connected to server")) + .doOnError(throwable -> LOG.error("Failed to connect to server")); + } + + public static class MyServerRsocket implements RSocket { + private static final Logger LOG = LoggerFactory.getLogger(MyServerRsocket.class); + + @Override + public Mono requestResponse(Payload payload) { + LOG.info("Got a request with payload: {}", payload.getDataUtf8()); + return Mono.just("Response data blah blah blah").map(DefaultPayload::create); + } + } +} diff --git a/rsocket-transport-netty/src/test/resources/logback-test.xml b/rsocket-transport-netty/src/test/resources/logback-test.xml index b42db6df6..3650f62b3 100644 --- a/rsocket-transport-netty/src/test/resources/logback-test.xml +++ b/rsocket-transport-netty/src/test/resources/logback-test.xml @@ -23,11 +23,26 @@ + + + + + log-${bySecond}.txt + + %date{HH:mm:ss.SSS} %-10thread %-42logger %msg%n + + + + + + + - + + + - - @@ -36,8 +51,9 @@ - + +