Skip to content

Commit

Permalink
Modified the startup to collect some telemetry data and send to a clo…
Browse files Browse the repository at this point in the history
…ud collector (#700)

* Modified the startup of Moquette to collect some telemetry data and send to a cloud collector

* Updated changelog
  • Loading branch information
andsel authored Dec 26, 2022
1 parent 7f0dd37 commit d1b9a49
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 7 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Version 0.16-SNAPSHOT:
[build] drop generation of broker-test, removed distribution and embedding_moquette modules from deploy phase (#616)
[fix] introduces sessions event processors to segregate changes to a session in one single thread, simplifying concurrency and code (#631)
[util] add collection of telemetry data (#700)

Version 0.15.1:
[fix] avoid double subscription (#612)
Expand Down
2 changes: 2 additions & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public final class BrokerConstants {
public static final String METRICS_LIBRATO_TOKEN_PROPERTY_NAME = "metrics.librato.token";
public static final String METRICS_LIBRATO_SOURCE_PROPERTY_NAME = "metrics.librato.source";

public static final String ENABLE_TELEMETRY_NAME = "telemetry_enabled";

public static final String BUGSNAG_ENABLE_PROPERTY_NAME = "use_bugsnag";
public static final String BUGSNAG_TOKEN_PROPERTY_NAME = "bugsnag.token";

Expand Down
173 changes: 166 additions & 7 deletions broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,48 @@
package io.moquette.broker;

import io.moquette.BrokerConstants;
import io.moquette.broker.config.*;
import io.moquette.broker.config.FileResourceLoader;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.IResourceLoader;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.broker.config.ResourceLoaderConfig;
import io.moquette.broker.security.ACLFileParser;
import io.moquette.broker.security.AcceptAllAuthenticator;
import io.moquette.broker.security.DenyAllAuthorizatorPolicy;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.security.IAuthorizatorPolicy;
import io.moquette.broker.security.PermitAllAuthorizatorPolicy;
import io.moquette.broker.security.ResourceAuthenticator;
import io.moquette.interception.InterceptHandler;
import io.moquette.persistence.H2Builder;
import io.moquette.persistence.MemorySubscriptionsRepository;
import io.moquette.interception.BrokerInterceptor;
import io.moquette.broker.security.*;
import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.security.IAuthorizatorPolicy;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.ParseException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand All @@ -43,6 +66,7 @@
public class Server {

private static final Logger LOG = LoggerFactory.getLogger(io.moquette.broker.Server.class);
public static final String MOQUETTE_VERSION = "0.16-SNAPSHOT";

private ScheduledExecutorService scheduler;
private NewNettyAcceptor acceptor;
Expand All @@ -51,15 +75,16 @@ public class Server {
private BrokerInterceptor interceptor;
private H2Builder h2Builder;
private SessionRegistry sessions;
private boolean standalone = false;

public static void main(String[] args) throws IOException {
final Server server = new Server();
try {
server.startServer();
server.startStandaloneServer();
} catch (RuntimeException e) {
System.exit(1);
}
System.out.println("Server started, version 0.16-SNAPSHOT");
System.out.println("Server started, version " + MOQUETTE_VERSION);
//Bind a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(server::stopServer));
}
Expand All @@ -77,6 +102,11 @@ public void startServer() throws IOException {
startServer(config);
}

private void startStandaloneServer() throws IOException {
this.standalone = true;
startServer();
}

private static File defaultConfigFile() {
String configPath = System.getProperty("moquette.path", null);
return new File(configPath, IConfig.DEFAULT_CONFIG);
Expand Down Expand Up @@ -195,9 +225,138 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler

final long startTime = System.currentTimeMillis() - start;
LOG.info("Moquette integration has been started successfully in {} ms", startTime);

if (config.boolProp(BrokerConstants.ENABLE_TELEMETRY_NAME, true)) {
collectAndSendTelemetryData(config);
}

initialized = true;
}

private void collectAndSendTelemetryData(IConfig config) {
final String uuid = checkOrCreateUUID(config);

final String telemetryDoc = collectTelemetryData(uuid);

try {
sendTelemetryData(telemetryDoc);
} catch (IOException e) {
LOG.info("Can't reach the telemetry collector");
if (LOG.isDebugEnabled()) {
LOG.debug("Original exception", e);
}
}
}

private String checkOrCreateUUID(IConfig config) {
final String storagePath = config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, "");
final Path uuidFilePath = Paths.get(storagePath, ".moquette_uuid");
if (Files.exists(uuidFilePath)) {
try {
return new String(Files.readAllBytes(uuidFilePath), StandardCharsets.UTF_8);
} catch (IOException e) {
LOG.error("Problem accessing file path: {}", uuidFilePath, e);
}
}
final UUID uuid = UUID.randomUUID();
final FileWriter f;
try {
f = new FileWriter(uuidFilePath.toFile(), false);
f.write(uuid.toString());
f.close();
} catch (IOException e) {
LOG.error("Problem writing new UUID to file path: {}", uuidFilePath, e);
}

return uuid.toString();
}

/**
* @return a json string with the content of max mem, jvm version and similar telemetry data.
* @param uuid*/
private String collectTelemetryData(String uuid) {
final String os = System.getProperty("os.name");
final String cpuArch = System.getProperty("os.arch");
final String jvmVersion = System.getProperty("java.specification.version");
final String jvmVendor = System.getProperty("java.vendor");
final long maxMemory = Runtime.getRuntime().maxMemory();
final String maxHeap = maxMemory == Long.MAX_VALUE ? "undefined" : Long.toString(maxMemory);

return String.format(
"{\"os\": %s, " +
"\"cpu_arch\": %s, " +
"\"jvm_version\": %s, " +
"\"jvm_vendor\": %s, " +
"\"broker_version\": %s, " +
"\"standalone\": %s," +
"\"max_heap\": %s" +
"\"uuid\": %s}",
os, cpuArch, jvmVersion, jvmVendor, MOQUETTE_VERSION, this.standalone, maxHeap, uuid);
}

private void sendTelemetryData(String telemetryDoc) throws IOException {
URL url = new URL("http://telemetry.moquette.io/api/v1/notify");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type", "application/json");
con.setRequestProperty("Accept", "application/json");
con.setInstanceFollowRedirects(true);

// POST
con.setDoOutput(true);
final byte[] input = telemetryDoc.getBytes("utf-8");
try (OutputStream os = con.getOutputStream()) {
os.write(input, 0, input.length);
}

int status = con.getResponseCode();
LOG.trace("Response code is {}", status);

boolean redirect = false;

// normally, 3xx is redirect
if (status != HttpURLConnection.HTTP_OK) {
if (status == HttpURLConnection.HTTP_MOVED_TEMP
|| status == HttpURLConnection.HTTP_MOVED_PERM
|| status == HttpURLConnection.HTTP_SEE_OTHER)
redirect = true;
}

LOG.trace("Response Code: {} ", status);

if (redirect) {

// get redirect url from "location" header field
String newUrl = con.getHeaderField("Location");

// open the new connnection again
con = (HttpURLConnection) new URL(newUrl).openConnection();
con.addRequestProperty("Accept-Language", "en-US,en;q=0.8");
con.addRequestProperty("User-Agent", "Mozilla");
con.addRequestProperty("Referer", "google.com");
con.setRequestMethod("POST");

// POST
con.setDoOutput(true);
try (OutputStream os = con.getOutputStream()) {
os.write(input, 0, input.length);
}

LOG.trace("Redirect to URL: {}", newUrl);
}

BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String inputLine;
StringBuffer content = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
content.append(inputLine);
}
in.close();
LOG.trace("Content: {}", content);

con.disconnect();
}

private IAuthorizatorPolicy initializeAuthorizatorPolicy(IAuthorizatorPolicy authorizatorPolicy, IConfig props) {
LOG.debug("Configuring MQTT authorizator policy");
String authorizatorClassName = props.getProperty(BrokerConstants.AUTHORIZATOR_CLASS_NAME, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.file.Path;
import java.util.Properties;

import static io.moquette.BrokerConstants.ENABLE_TELEMETRY_NAME;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ConfigurationClassLoaderTest implements IAuthenticator, IAuthorizatorPolicy {
Expand Down Expand Up @@ -63,6 +64,7 @@ public void tearDown() {
public void loadAuthenticator() throws Exception {
Properties props = new Properties(IntegrationUtils.prepareTestProperties(dbPath));
props.setProperty(BrokerConstants.AUTHENTICATOR_CLASS_NAME, getClass().getName());
props.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
startServer(props);
assertTrue(true);
}
Expand All @@ -71,6 +73,7 @@ public void loadAuthenticator() throws Exception {
public void loadAuthorizator() throws Exception {
Properties props = new Properties(IntegrationUtils.prepareTestProperties(dbPath));
props.setProperty(BrokerConstants.AUTHORIZATOR_CLASS_NAME, getClass().getName());
props.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
startServer(props);
assertTrue(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.moquette.integration;

import io.moquette.BrokerConstants;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttClient;
Expand All @@ -27,6 +28,7 @@
import java.util.Properties;

import static io.moquette.BrokerConstants.DEFAULT_MOQUETTE_STORE_H2_DB_FILENAME;
import static io.moquette.BrokerConstants.ENABLE_TELEMETRY_NAME;
import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
import static io.moquette.BrokerConstants.PORT_PROPERTY_NAME;

Expand Down Expand Up @@ -58,6 +60,7 @@ public static Properties prepareTestProperties(String dbPath) {
Properties testProperties = new Properties();
testProperties.put(PERSISTENT_STORE_PROPERTY_NAME, dbPath);
testProperties.put(PORT_PROPERTY_NAME, "1883");
testProperties.put(ENABLE_TELEMETRY_NAME, "false");
return testProperties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ protected void startServer() throws IOException {
sslProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, dbPath);

sslProps.put(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
m_server.startServer(sslProps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected void startServer(String dbPath) {
m_server = new Server();
final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath);
configProps.setProperty(BrokerConstants.REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT, "true");
configProps.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
m_config = new MemoryConfig(configProps);
canRead = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ protected void startServer(String dbPath) throws IOException {
sslProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, dbPath);
sslProps.put(BrokerConstants.NEED_CLIENT_AUTH, "true");
sslProps.put(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
m_server.startServer(sslProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ protected void startServer() throws IOException {
sslProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv");
sslProps.put(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, dbPath);
sslProps.put(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");
m_server.startServer(sslProps);
}

Expand Down
1 change: 1 addition & 0 deletions broker/src/test/resources/config/moquette.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ key_manager_password passw0rdsrv
allow_anonymous true

reauthorize_subscriptions_on_connect false
telemetry_enabled false
11 changes: 11 additions & 0 deletions distribution/src/main/resources/moquette.conf
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,14 @@ password_file config/password_file.conf
#*********************************************************************
# use_bugsnag false
# bugsnag.token wleifb8723784dbfeig74


#*********************************************************************
# Telemetry information sending
#
# telemetry_enabled:
# true or false to select if send or not telemetry data when
# starting up.
# default: true
#*********************************************************************
# telemetry_enabled true

0 comments on commit d1b9a49

Please sign in to comment.