diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index feeb9124..c7c14ca6 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -488,7 +488,7 @@ public void shutdown() { private Mono doShutdown() { return Mono.defer( () -> { - LOGGER.info("[{}] Cluster member is shutting down", localMember); + LOGGER.info("[{}][doShutdown] Shutting down", localMember); return Flux.concatDelayError( leaveCluster(), dispose(), @@ -496,8 +496,7 @@ private Mono doShutdown() { Mono.fromRunnable(this::stopJmxMonitor)) .then() .doFinally(s -> scheduler.dispose()) - .doOnSuccess( - avoid -> LOGGER.info("[{}] Cluster member has been shutdown", localMember)); + .doOnSuccess(avoid -> LOGGER.info("[{}][doShutdown] Shutdown", localMember)); }); } @@ -505,8 +504,8 @@ private Mono leaveCluster() { return membership .leaveCluster() .subscribeOn(scheduler) - .doOnSubscribe(s -> LOGGER.info("[{}] Cluster member is leaving a cluster", localMember)) - .doOnSuccess(s -> LOGGER.info("[{}] Cluster member has left a cluster", localMember)) + .doOnSubscribe(s -> LOGGER.info("[{}] Leaving a cluster", localMember)) + .doOnSuccess(s -> LOGGER.info("[{}] Left a cluster", localMember)) .doOnError( ex -> LOGGER.warn( diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/ExceptionHandler.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/ExceptionHandler.java index 7db73d0f..b0b23a88 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/ExceptionHandler.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/ExceptionHandler.java @@ -18,6 +18,6 @@ final class ExceptionHandler extends ChannelDuplexHandler { @Override public final void exceptionCaught(ChannelHandlerContext ctx, Throwable ex) { - LOGGER.debug("Exception caught for channel {}, cause: {}", ctx.channel(), ex.toString()); + LOGGER.debug("Exception caught on channel {}, cause: {}", ctx.channel(), ex.toString()); } } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java index fe6541ff..daeeca35 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java @@ -181,15 +181,16 @@ public Mono bind0() { return newTcpServer() .handle(this::onMessage) .bind() + .map(server -> new TransportImpl(server, this)) + .cast(Transport.class) + .doOnSubscribe(s -> LOGGER.info("Bind cluster transport on port={}", config.port())) + .doOnSuccess(t -> LOGGER.info("[{}] Bound cluster transport", t.address())) .doOnError( ex -> LOGGER.error( "Failed to bind cluster transport on port={}, cause: {}", config.port(), - ex.toString())) - .map(server -> new TransportImpl(server, this)) - .doOnSuccess(t -> LOGGER.info("[{}] Bound cluster transport", t.address())) - .cast(Transport.class); + ex.toString())); } @Override @@ -214,13 +215,13 @@ public final Mono stop() { private Mono doStop() { return Mono.defer( () -> { - LOGGER.info("[{}] Transport is shutting down", address); + LOGGER.info("[{}][doStop] Stopping", address); // Complete incoming messages observable messageSink.complete(); return Flux.concatDelayError(closeServer(), shutdownLoopResources()) .then() .doFinally(s -> connections.clear()) - .doOnSuccess(avoid -> LOGGER.info("[{}] Transport has been shut down", address)); + .doOnSuccess(avoid -> LOGGER.info("[{}][doStop] Stopped", address)); }); }