diff --git a/ChangeLog.txt b/ChangeLog.txt index c18fa96da..686ce0198 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,6 +1,7 @@ Version 0.16-SNAPSHOT: [build] drop generation of broker-test, removed distribution and embedding_moquette modules from deploy phase (#616) [fix] introduces sessions event processors to segregate changes to a session in one single thread, simplifying concurrency and code (#631) + [util] add collection of telemetry data (#700) Version 0.15.1: [fix] avoid double subscription (#612) diff --git a/broker/src/main/java/io/moquette/BrokerConstants.java b/broker/src/main/java/io/moquette/BrokerConstants.java index 95a60df93..e1efd97bc 100644 --- a/broker/src/main/java/io/moquette/BrokerConstants.java +++ b/broker/src/main/java/io/moquette/BrokerConstants.java @@ -78,6 +78,8 @@ public final class BrokerConstants { public static final String METRICS_LIBRATO_TOKEN_PROPERTY_NAME = "metrics.librato.token"; public static final String METRICS_LIBRATO_SOURCE_PROPERTY_NAME = "metrics.librato.source"; + public static final String ENABLE_TELEMETRY_NAME = "telemetry_enabled"; + public static final String BUGSNAG_ENABLE_PROPERTY_NAME = "use_bugsnag"; public static final String BUGSNAG_TOKEN_PROPERTY_NAME = "bugsnag.token"; diff --git a/broker/src/main/java/io/moquette/broker/Server.java b/broker/src/main/java/io/moquette/broker/Server.java index 94388a616..8210dd900 100644 --- a/broker/src/main/java/io/moquette/broker/Server.java +++ b/broker/src/main/java/io/moquette/broker/Server.java @@ -16,25 +16,48 @@ package io.moquette.broker; import io.moquette.BrokerConstants; -import io.moquette.broker.config.*; +import io.moquette.broker.config.FileResourceLoader; +import io.moquette.broker.config.IConfig; +import io.moquette.broker.config.IResourceLoader; +import io.moquette.broker.config.MemoryConfig; +import io.moquette.broker.config.ResourceLoaderConfig; +import io.moquette.broker.security.ACLFileParser; +import io.moquette.broker.security.AcceptAllAuthenticator; +import io.moquette.broker.security.DenyAllAuthorizatorPolicy; +import io.moquette.broker.security.IAuthenticator; +import io.moquette.broker.security.IAuthorizatorPolicy; +import io.moquette.broker.security.PermitAllAuthorizatorPolicy; +import io.moquette.broker.security.ResourceAuthenticator; import io.moquette.interception.InterceptHandler; import io.moquette.persistence.H2Builder; import io.moquette.persistence.MemorySubscriptionsRepository; import io.moquette.interception.BrokerInterceptor; -import io.moquette.broker.security.*; import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory; import io.moquette.broker.subscriptions.ISubscriptionsDirectory; -import io.moquette.broker.security.IAuthenticator; -import io.moquette.broker.security.IAuthorizatorPolicy; import io.netty.handler.codec.mqtt.MqttPublishMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; import java.io.File; +import java.io.FileWriter; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.text.ParseException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -43,6 +66,7 @@ public class Server { private static final Logger LOG = LoggerFactory.getLogger(io.moquette.broker.Server.class); + public static final String MOQUETTE_VERSION = "0.16-SNAPSHOT"; private ScheduledExecutorService scheduler; private NewNettyAcceptor acceptor; @@ -51,15 +75,16 @@ public class Server { private BrokerInterceptor interceptor; private H2Builder h2Builder; private SessionRegistry sessions; + private boolean standalone = false; public static void main(String[] args) throws IOException { final Server server = new Server(); try { - server.startServer(); + server.startStandaloneServer(); } catch (RuntimeException e) { System.exit(1); } - System.out.println("Server started, version 0.16-SNAPSHOT"); + System.out.println("Server started, version " + MOQUETTE_VERSION); //Bind a shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(server::stopServer)); } @@ -77,6 +102,11 @@ public void startServer() throws IOException { startServer(config); } + private void startStandaloneServer() throws IOException { + this.standalone = true; + startServer(); + } + private static File defaultConfigFile() { String configPath = System.getProperty("moquette.path", null); return new File(configPath, IConfig.DEFAULT_CONFIG); @@ -195,9 +225,138 @@ public void startServer(IConfig config, List handler final long startTime = System.currentTimeMillis() - start; LOG.info("Moquette integration has been started successfully in {} ms", startTime); + + if (config.boolProp(BrokerConstants.ENABLE_TELEMETRY_NAME, true)) { + collectAndSendTelemetryData(config); + } + initialized = true; } + private void collectAndSendTelemetryData(IConfig config) { + final String uuid = checkOrCreateUUID(config); + + final String telemetryDoc = collectTelemetryData(uuid); + + try { + sendTelemetryData(telemetryDoc); + } catch (IOException e) { + LOG.info("Can't reach the telemetry collector"); + if (LOG.isDebugEnabled()) { + LOG.debug("Original exception", e); + } + } + } + + private String checkOrCreateUUID(IConfig config) { + final String storagePath = config.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, ""); + final Path uuidFilePath = Paths.get(storagePath, ".moquette_uuid"); + if (Files.exists(uuidFilePath)) { + try { + return new String(Files.readAllBytes(uuidFilePath), StandardCharsets.UTF_8); + } catch (IOException e) { + LOG.error("Problem accessing file path: {}", uuidFilePath, e); + } + } + final UUID uuid = UUID.randomUUID(); + final FileWriter f; + try { + f = new FileWriter(uuidFilePath.toFile(), false); + f.write(uuid.toString()); + f.close(); + } catch (IOException e) { + LOG.error("Problem writing new UUID to file path: {}", uuidFilePath, e); + } + + return uuid.toString(); + } + + /** + * @return a json string with the content of max mem, jvm version and similar telemetry data. + * @param uuid*/ + private String collectTelemetryData(String uuid) { + final String os = System.getProperty("os.name"); + final String cpuArch = System.getProperty("os.arch"); + final String jvmVersion = System.getProperty("java.specification.version"); + final String jvmVendor = System.getProperty("java.vendor"); + final long maxMemory = Runtime.getRuntime().maxMemory(); + final String maxHeap = maxMemory == Long.MAX_VALUE ? "undefined" : Long.toString(maxMemory); + + return String.format( + "{\"os\": %s, " + + "\"cpu_arch\": %s, " + + "\"jvm_version\": %s, " + + "\"jvm_vendor\": %s, " + + "\"broker_version\": %s, " + + "\"standalone\": %s," + + "\"max_heap\": %s" + + "\"uuid\": %s}", + os, cpuArch, jvmVersion, jvmVendor, MOQUETTE_VERSION, this.standalone, maxHeap, uuid); + } + + private void sendTelemetryData(String telemetryDoc) throws IOException { + URL url = new URL("http://telemetry.moquette.io/api/v1/notify"); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + con.setRequestMethod("POST"); + con.setRequestProperty("Content-Type", "application/json"); + con.setRequestProperty("Accept", "application/json"); + con.setInstanceFollowRedirects(true); + + // POST + con.setDoOutput(true); + final byte[] input = telemetryDoc.getBytes("utf-8"); + try (OutputStream os = con.getOutputStream()) { + os.write(input, 0, input.length); + } + + int status = con.getResponseCode(); + LOG.trace("Response code is {}", status); + + boolean redirect = false; + + // normally, 3xx is redirect + if (status != HttpURLConnection.HTTP_OK) { + if (status == HttpURLConnection.HTTP_MOVED_TEMP + || status == HttpURLConnection.HTTP_MOVED_PERM + || status == HttpURLConnection.HTTP_SEE_OTHER) + redirect = true; + } + + LOG.trace("Response Code: {} ", status); + + if (redirect) { + + // get redirect url from "location" header field + String newUrl = con.getHeaderField("Location"); + + // open the new connnection again + con = (HttpURLConnection) new URL(newUrl).openConnection(); + con.addRequestProperty("Accept-Language", "en-US,en;q=0.8"); + con.addRequestProperty("User-Agent", "Mozilla"); + con.addRequestProperty("Referer", "google.com"); + con.setRequestMethod("POST"); + + // POST + con.setDoOutput(true); + try (OutputStream os = con.getOutputStream()) { + os.write(input, 0, input.length); + } + + LOG.trace("Redirect to URL: {}", newUrl); + } + + BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream())); + String inputLine; + StringBuffer content = new StringBuffer(); + while ((inputLine = in.readLine()) != null) { + content.append(inputLine); + } + in.close(); + LOG.trace("Content: {}", content); + + con.disconnect(); + } + private IAuthorizatorPolicy initializeAuthorizatorPolicy(IAuthorizatorPolicy authorizatorPolicy, IConfig props) { LOG.debug("Configuring MQTT authorizator policy"); String authorizatorClassName = props.getProperty(BrokerConstants.AUTHORIZATOR_CLASS_NAME, ""); diff --git a/broker/src/test/java/io/moquette/integration/ConfigurationClassLoaderTest.java b/broker/src/test/java/io/moquette/integration/ConfigurationClassLoaderTest.java index d613b355d..ea32ea2bc 100644 --- a/broker/src/test/java/io/moquette/integration/ConfigurationClassLoaderTest.java +++ b/broker/src/test/java/io/moquette/integration/ConfigurationClassLoaderTest.java @@ -32,6 +32,7 @@ import java.nio.file.Path; import java.util.Properties; +import static io.moquette.BrokerConstants.ENABLE_TELEMETRY_NAME; import static org.junit.jupiter.api.Assertions.assertTrue; public class ConfigurationClassLoaderTest implements IAuthenticator, IAuthorizatorPolicy { @@ -63,6 +64,7 @@ public void tearDown() { public void loadAuthenticator() throws Exception { Properties props = new Properties(IntegrationUtils.prepareTestProperties(dbPath)); props.setProperty(BrokerConstants.AUTHENTICATOR_CLASS_NAME, getClass().getName()); + props.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false"); startServer(props); assertTrue(true); } @@ -71,6 +73,7 @@ public void loadAuthenticator() throws Exception { public void loadAuthorizator() throws Exception { Properties props = new Properties(IntegrationUtils.prepareTestProperties(dbPath)); props.setProperty(BrokerConstants.AUTHORIZATOR_CLASS_NAME, getClass().getName()); + props.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false"); startServer(props); assertTrue(true); } diff --git a/broker/src/test/java/io/moquette/integration/IntegrationUtils.java b/broker/src/test/java/io/moquette/integration/IntegrationUtils.java index 7f4cff61b..de15253fb 100644 --- a/broker/src/test/java/io/moquette/integration/IntegrationUtils.java +++ b/broker/src/test/java/io/moquette/integration/IntegrationUtils.java @@ -16,6 +16,7 @@ package io.moquette.integration; +import io.moquette.BrokerConstants; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttClient; @@ -27,6 +28,7 @@ import java.util.Properties; import static io.moquette.BrokerConstants.DEFAULT_MOQUETTE_STORE_H2_DB_FILENAME; +import static io.moquette.BrokerConstants.ENABLE_TELEMETRY_NAME; import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME; import static io.moquette.BrokerConstants.PORT_PROPERTY_NAME; @@ -58,6 +60,7 @@ public static Properties prepareTestProperties(String dbPath) { Properties testProperties = new Properties(); testProperties.put(PERSISTENT_STORE_PROPERTY_NAME, dbPath); testProperties.put(PORT_PROPERTY_NAME, "1883"); + testProperties.put(ENABLE_TELEMETRY_NAME, "false"); return testProperties; } diff --git a/broker/src/test/java/io/moquette/integration/ServerIntegrationOpenSSLTest.java b/broker/src/test/java/io/moquette/integration/ServerIntegrationOpenSSLTest.java index 4a94a86f9..fc9606135 100644 --- a/broker/src/test/java/io/moquette/integration/ServerIntegrationOpenSSLTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerIntegrationOpenSSLTest.java @@ -56,6 +56,8 @@ protected void startServer() throws IOException { sslProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "passw0rdsrv"); sslProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv"); sslProps.put(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, dbPath); + + sslProps.put(BrokerConstants.ENABLE_TELEMETRY_NAME, "false"); m_server.startServer(sslProps); } } diff --git a/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java b/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java index 337ee504c..088645a90 100644 --- a/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java @@ -77,6 +77,7 @@ protected void startServer(String dbPath) { m_server = new Server(); final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath); configProps.setProperty(BrokerConstants.REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT, "true"); + configProps.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false"); m_config = new MemoryConfig(configProps); canRead = true; diff --git a/broker/src/test/java/io/moquette/integration/ServerIntegrationSSLClientAuthTest.java b/broker/src/test/java/io/moquette/integration/ServerIntegrationSSLClientAuthTest.java index 3d0054c1c..f03ed420a 100644 --- a/broker/src/test/java/io/moquette/integration/ServerIntegrationSSLClientAuthTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerIntegrationSSLClientAuthTest.java @@ -183,6 +183,7 @@ protected void startServer(String dbPath) throws IOException { sslProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv"); sslProps.put(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, dbPath); sslProps.put(BrokerConstants.NEED_CLIENT_AUTH, "true"); + sslProps.put(BrokerConstants.ENABLE_TELEMETRY_NAME, "false"); m_server.startServer(sslProps); } diff --git a/broker/src/test/java/io/moquette/integration/ServerIntegrationSSLTest.java b/broker/src/test/java/io/moquette/integration/ServerIntegrationSSLTest.java index 99ffa335d..2732e4895 100644 --- a/broker/src/test/java/io/moquette/integration/ServerIntegrationSSLTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerIntegrationSSLTest.java @@ -114,6 +114,7 @@ protected void startServer() throws IOException { sslProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "passw0rdsrv"); sslProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "passw0rdsrv"); sslProps.put(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, dbPath); + sslProps.put(BrokerConstants.ENABLE_TELEMETRY_NAME, "false"); m_server.startServer(sslProps); } diff --git a/broker/src/test/resources/config/moquette.conf b/broker/src/test/resources/config/moquette.conf index 11473022b..86949acb3 100644 --- a/broker/src/test/resources/config/moquette.conf +++ b/broker/src/test/resources/config/moquette.conf @@ -22,3 +22,4 @@ key_manager_password passw0rdsrv allow_anonymous true reauthorize_subscriptions_on_connect false +telemetry_enabled false diff --git a/distribution/src/main/resources/moquette.conf b/distribution/src/main/resources/moquette.conf index 21991d19c..d32de1957 100644 --- a/distribution/src/main/resources/moquette.conf +++ b/distribution/src/main/resources/moquette.conf @@ -161,3 +161,14 @@ password_file config/password_file.conf #********************************************************************* # use_bugsnag false # bugsnag.token wleifb8723784dbfeig74 + + +#********************************************************************* +# Telemetry information sending +# +# telemetry_enabled: +# true or false to select if send or not telemetry data when +# starting up. +# default: true +#********************************************************************* +# telemetry_enabled true