Skip to content

Commit

Permalink
Exposed method to retrieve the list of Server's connected clients (is…
Browse files Browse the repository at this point in the history
…sue #448)
  • Loading branch information
andsel committed Feb 24, 2019
1 parent 8bd0dc7 commit b827ad4
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 18 deletions.
52 changes: 52 additions & 0 deletions broker/src/main/java/io/moquette/broker/ClientDescriptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.moquette.broker;

import java.util.Objects;

public class ClientDescriptor {

private final String clientID;
private final String address;
private final int port;

ClientDescriptor(String clientID, String address, int port) {
this.clientID = clientID;
this.address = address;
this.port = port;
}

public String getClientID() {
return clientID;
}

public String getAddress() {
return address;
}

public int getPort() {
return port;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClientDescriptor that = (ClientDescriptor) o;
return port == that.port &&
Objects.equals(clientID, that.clientID) &&
Objects.equals(address, that.address);
}

@Override
public int hashCode() {
return Objects.hash(clientID, address, port);
}

@Override
public String toString() {
return "ClientDescriptor{" +
"clientID='" + clientID + '\'' +
", address='" + address + '\'' +
", port=" + port +
'}';
}
}
5 changes: 5 additions & 0 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -488,4 +489,8 @@ int nextPacketId() {
public String toString() {
return "MQTTConnection{channel=" + channel + ", connected=" + connected + '}';
}

InetSocketAddress remoteAddress() {
return (InetSocketAddress) channel.remoteAddress();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MQTTConnectionFactory {
this.postOffice = postOffice;
}

public MQTTConnection create(Channel channel) {
MQTTConnection create(Channel channel) {
return new MQTTConnection(channel, brokerConfig, authenticator, sessionRegistry, postOffice);
}
}
29 changes: 12 additions & 17 deletions broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand All @@ -53,6 +50,7 @@ public class Server {
private PostOffice dispatcher;
private BrokerInterceptor interceptor;
private H2Builder h2Builder;
private SessionRegistry sessions;

public static void main(String[] args) throws IOException {
final Server server = new Server();
Expand Down Expand Up @@ -136,7 +134,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
}

public void startServer(IConfig config, List<? extends InterceptHandler> handlers, ISslContextCreator sslCtxCreator,
IAuthenticator authenticator, IAuthorizatorPolicy authorizatorPolicy) throws IOException {
IAuthenticator authenticator, IAuthorizatorPolicy authorizatorPolicy) {
final long start = System.currentTimeMillis();
if (handlers == null) {
handlers = Collections.emptyList();
Expand Down Expand Up @@ -179,7 +177,7 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
ISubscriptionsDirectory subscriptions = new CTrieSubscriptionDirectory();
subscriptions.init(subscriptionsRepository);
final Authorizator authorizator = new Authorizator(authorizatorPolicy);
SessionRegistry sessions = new SessionRegistry(subscriptions, queueRepository, authorizator);
sessions = new SessionRegistry(subscriptions, queueRepository, authorizator);
dispatcher = new PostOffice(subscriptions, retainedRepository, sessions, interceptor, authorizator);
final BrokerConfiguration brokerConfig = new BrokerConfiguration(config);
MQTTConnectionFactory connectionFactory = new MQTTConnectionFactory(brokerConfig, authenticator, sessions,
Expand All @@ -195,11 +193,6 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
}

private IAuthorizatorPolicy initializeAuthorizatorPolicy(IAuthorizatorPolicy authorizatorPolicy, IConfig props) {
// if (authorizatorPolicy == null) {
// authorizatorPolicy = new PermitAllAuthorizatorPolicy();
// }
// return authorizatorPolicy;

LOG.debug("Configuring MQTT authorizator policy");
String authorizatorClassName = props.getProperty(BrokerConstants.AUTHORIZATOR_CLASS_NAME, "");
if (authorizatorPolicy == null && !authorizatorClassName.isEmpty()) {
Expand All @@ -215,7 +208,7 @@ private IAuthorizatorPolicy initializeAuthorizatorPolicy(IAuthorizatorPolicy aut
IResourceLoader resourceLoader = props.getResourceLoader();
authorizatorPolicy = ACLFileParser.parse(resourceLoader.loadResource(aclFilePath));
} catch (ParseException pex) {
LOG.error("Unable to parse ACL file. path=" + aclFilePath, pex);
LOG.error("Unable to parse ACL file. path = {}", aclFilePath, pex);
}
} else {
authorizatorPolicy = new PermitAllAuthorizatorPolicy();
Expand All @@ -226,11 +219,6 @@ private IAuthorizatorPolicy initializeAuthorizatorPolicy(IAuthorizatorPolicy aut
}

private IAuthenticator initializeAuthenticator(IAuthenticator authenticator, IConfig props) {
// if (authenticator == null) {
// authenticator = new AcceptAllAuthenticator();
// }
// return authenticator;

LOG.debug("Configuring MQTT authenticator");
String authenticatorClassName = props.getProperty(BrokerConstants.AUTHENTICATOR_CLASS_NAME, "");

Expand Down Expand Up @@ -391,4 +379,11 @@ public void removeInterceptHandler(InterceptHandler interceptHandler) {
LOG.info("Removing MQTT message interceptor. InterceptorId={}", interceptHandler.getID());
interceptor.removeInterceptHandler(interceptHandler);
}

/**
* Return a list of descriptors of connected clients.
* */
public Collection<ClientDescriptor> listConnectedClients() {
return sessions.listConnectedClients();
}
}
8 changes: 8 additions & 0 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
Expand Down Expand Up @@ -368,6 +369,13 @@ public void receivedPubRelQos2(int messageID) {
ReferenceCountUtil.release(removedMsg);
}

Optional<InetSocketAddress> remoteAddress() {
if (connected()) {
return Optional.of(mqttConnection.remoteAddress());
}
return Optional.empty();
}

@Override
public String toString() {
return "Session{" +
Expand Down
19 changes: 19 additions & 0 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

public class SessionRegistry {

Expand Down Expand Up @@ -245,4 +249,19 @@ public void disconnect(String clientID) {
private void dropQueuesForClient(String clientId) {
queues.remove(clientId);
}

Collection<ClientDescriptor> listConnectedClients() {
return pool.values().stream()
.filter(Session::connected)
.map(this::createClientDescriptor)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}

private Optional<ClientDescriptor> createClientDescriptor(Session s) {
final String clientID = s.getClientID();
final Optional<InetSocketAddress> remoteAddressOpt = s.remoteAddress();
return remoteAddressOpt.map(r -> new ClientDescriptor(clientID, r.getHostString(), r.getPort()));
}
}

0 comments on commit b827ad4

Please sign in to comment.