Skip to content

Commit

Permalink
Added get*Port() methods on Server to read local bind port (when it's…
Browse files Browse the repository at this point in the history
… passed to '0'), issue #453
  • Loading branch information
andsel committed Feb 23, 2019
1 parent 3f7c8fe commit 8bd0dc7
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
45 changes: 42 additions & 3 deletions broker/src/main/java/io/moquette/broker/NewNettyAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLEngine;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

Expand All @@ -56,6 +60,8 @@
class NewNettyAcceptor {

private static final String MQTT_SUBPROTOCOL_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1";
public static final String PLAIN_MQTT_PROTO = "TCP MQTT";
public static final String SSL_MQTT_PROTO = "SSL MQTT";

static class WebSocketFrameToByteBufDecoder extends MessageToMessageDecoder<BinaryWebSocketFrame> {

Expand Down Expand Up @@ -89,10 +95,33 @@ private abstract static class PipelineInitializer {
abstract void init(SocketChannel channel) throws Exception;
}


private class LocalPortReaderFutureListener implements ChannelFutureListener {
private String transportName;

LocalPortReaderFutureListener(String transportName) {
this.transportName = transportName;
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
final SocketAddress localAddress = future.channel().localAddress();
if (localAddress instanceof InetSocketAddress) {
InetSocketAddress inetAddress = (InetSocketAddress) localAddress;
LOG.debug("bound {} port: {}", transportName, inetAddress.getPort());
int port = inetAddress.getPort();
ports.put(transportName, port);
}
}
}
}

private static final Logger LOG = LoggerFactory.getLogger(NewNettyAcceptor.class);

private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private final Map<String, Integer> ports = new HashMap<>();
private BytesMetricsCollector bytesMetricsCollector = new BytesMetricsCollector();
private MessageMetricsCollector metricsCollector = new MessageMetricsCollector();
private Optional<? extends ChannelInboundHandler> metrics;
Expand Down Expand Up @@ -187,12 +216,22 @@ public void initChannel(SocketChannel ch) throws Exception {
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(host, port);
LOG.info("Server bound to host={}, port={}, protocol={}", host, port, protocol);
f.sync().addListener(FIRE_EXCEPTION_ON_FAILURE);
f.sync()
.addListener(new LocalPortReaderFutureListener(protocol))
.addListener(FIRE_EXCEPTION_ON_FAILURE);
} catch (InterruptedException ex) {
LOG.error("An interruptedException was caught while initializing integration. Protocol={}", protocol, ex);
}
}

public int getPort() {
return ports.computeIfAbsent(PLAIN_MQTT_PROTO, i -> 0);
}

public int getSslPort() {
return ports.computeIfAbsent(SSL_MQTT_PROTO, i -> 0);
}

private void initializePlainTCPTransport(NewNettyMQTTHandler handler, IConfig props) {
LOG.debug("Configuring TCP MQTT transport");
final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
Expand All @@ -204,7 +243,7 @@ private void initializePlainTCPTransport(NewNettyMQTTHandler handler, IConfig pr
return;
}
int port = Integer.parseInt(tcpPortProp);
initFactory(host, port, "TCP MQTT", new PipelineInitializer() {
initFactory(host, port, PLAIN_MQTT_PROTO, new PipelineInitializer() {

@Override
void init(SocketChannel channel) {
Expand Down Expand Up @@ -281,7 +320,7 @@ private void initializeSSLTCPTransport(NewNettyMQTTHandler handler, IConfig prop
String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
String sNeedsClientAuth = props.getProperty(BrokerConstants.NEED_CLIENT_AUTH, "false");
final boolean needsClientAuth = Boolean.valueOf(sNeedsClientAuth);
initFactory(host, sslPort, "SSL MQTT", new PipelineInitializer() {
initFactory(host, sslPort, SSL_MQTT_PROTO, new PipelineInitializer() {

@Override
void init(SocketChannel channel) throws Exception {
Expand Down
8 changes: 8 additions & 0 deletions broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,14 @@ public void stopServer() {
LOG.info("Moquette integration has been stopped.");
}

public int getPort() {
return acceptor.getPort();
}

public int getSslPort() {
return acceptor.getSslPort();
}

/**
* SPI method used by Broker embedded applications to get list of subscribers. Returns null if
* the broker is not started.
Expand Down

0 comments on commit 8bd0dc7

Please sign in to comment.