Skip to content

Commit

Permalink
Add support for mqtt v5 #1
Browse files Browse the repository at this point in the history
  • Loading branch information
deen13 committed Oct 29, 2020
1 parent dbc7b85 commit 5719d65
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 44 deletions.
29 changes: 0 additions & 29 deletions src/main/kotlin/de/smartsquare/kortance/ClientFactory.kt

This file was deleted.

60 changes: 60 additions & 0 deletions src/main/kotlin/de/smartsquare/kortance/mqtt/ClientFactory.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package de.smartsquare.kortance.mqtt

import com.hivemq.client.mqtt.mqtt3.Mqtt3Client
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client
import de.smartsquare.kortance.CredentialOptions

object ClientFactory {

fun create(host: String, port: Int, cred: CredentialOptions?, ssl: Boolean, ver: SupportedMqttVersion): MqttClient {
return if (ver == SupportedMqttVersion.V5) {
buildFive(host, port, cred, ssl)
} else {
buildThree(host, port, cred, ssl)
}
}

private fun buildThree(host: String, port: Int, credentials: CredentialOptions?, ssl: Boolean): Mqtt3ClientWrapper {
val baseClient = Mqtt3Client.builder()
.serverHost(host)
.serverPort(port)

val authClient = if (credentials != null) {
baseClient
.simpleAuth()
.username(credentials.username)
.password(credentials.password.toByteArray())
.applySimpleAuth()
} else {
baseClient
}

return if (ssl) {
Mqtt3ClientWrapper(authClient.sslWithDefaultConfig().build().toBlocking())
} else {
Mqtt3ClientWrapper(authClient.build().toBlocking())
}
}

private fun buildFive(host: String, port: Int, credentials: CredentialOptions?, ssl: Boolean): Mqtt5ClientWrapper {
val baseClient = Mqtt5Client.builder()
.serverHost(host)
.serverPort(port)

val authClient = if (credentials != null) {
baseClient
.simpleAuth()
.username(credentials.username)
.password(credentials.password.toByteArray())
.applySimpleAuth()
} else {
baseClient
}

return if (ssl) {
Mqtt5ClientWrapper(authClient.sslWithDefaultConfig().build().toBlocking())
} else {
Mqtt5ClientWrapper(authClient.build().toBlocking())
}
}
}
18 changes: 18 additions & 0 deletions src/main/kotlin/de/smartsquare/kortance/mqtt/Mqtt3ClientWrapper.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package de.smartsquare.kortance.mqtt

import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient

class Mqtt3ClientWrapper(private val hivemqClient: Mqtt3BlockingClient) : MqttClient {

override fun connect() {
hivemqClient.connect()
}

override fun publish(topic: String, payload: ByteArray) {
hivemqClient.publishWith().topic(topic).payload(payload).send()
}

override fun disconnect() {
hivemqClient.disconnect()
}
}
18 changes: 18 additions & 0 deletions src/main/kotlin/de/smartsquare/kortance/mqtt/Mqtt5ClientWrapper.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package de.smartsquare.kortance.mqtt

import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient

class Mqtt5ClientWrapper(private val hivemqClient: Mqtt5BlockingClient) : MqttClient {

override fun connect() {
hivemqClient.connect()
}

override fun publish(topic: String, payload: ByteArray) {
hivemqClient.publishWith().topic(topic).payload(payload).send()
}

override fun disconnect() {
hivemqClient.disconnect()
}
}
10 changes: 10 additions & 0 deletions src/main/kotlin/de/smartsquare/kortance/mqtt/MqttClient.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package de.smartsquare.kortance.mqtt

interface MqttClient {

fun publish(topic: String, payload: ByteArray)

fun connect()

fun disconnect()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package de.smartsquare.kortance.mqtt

enum class SupportedMqttVersion {
V3, V5;
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package de.smartsquare.kortance.scenarios.soak

import de.smartsquare.kortance.ClientFactory
import de.smartsquare.kortance.CredentialOptions
import de.smartsquare.kortance.WaveOptions
import de.smartsquare.kortance.delay
import de.smartsquare.kortance.mqtt.ClientFactory
import de.smartsquare.kortance.mqtt.SupportedMqttVersion
import de.smartsquare.kortance.randomPayload
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -31,22 +32,22 @@ class SoakCommand : Callable<Int> {
@CommandLine.Option(names = ["-j", "--jobs"])
private var jobs: Int = 10

@CommandLine.Option(names = ["-v", "--version"])
private var mqttVersion: SupportedMqttVersion = SupportedMqttVersion.V3

override fun call(): Int {
repeat(waveOptions.waves) { wave ->
println("Launching wave ${wave + 1} / ${waveOptions.waves}")

repeat(jobs) { job ->
val client = ClientFactory.createClient(host, port, credentialOptions, ssl)
val client = ClientFactory.create(host, port, credentialOptions, ssl, mqttVersion)

GlobalScope.launch {
client.connect()

try {
while (true) {
client.publishWith()
.topic("internal/kortance/${wave + 1}/${job + 1}")
.payload(randomPayload(size = 150))
.send()
client.publish("internal/kortance/${wave + 1}/${job + 1}", randomPayload(size = 150))

delay(min = 1, max = 100)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package de.smartsquare.kortance.scenarios.spike

import de.smartsquare.kortance.ClientFactory
import de.smartsquare.kortance.CredentialOptions
import de.smartsquare.kortance.mqtt.ClientFactory
import de.smartsquare.kortance.mqtt.SupportedMqttVersion
import de.smartsquare.kortance.randomPayload
import picocli.CommandLine
import java.util.concurrent.Callable
Expand All @@ -24,14 +25,17 @@ class SpikeCommand : Callable<Int> {
@CommandLine.Option(names = ["-m", "--messages"])
private var messageCount: Int = 1000

@CommandLine.Option(names = ["-v", "--version"])
private var mqttVersion: SupportedMqttVersion = SupportedMqttVersion.V3

override fun call(): Int {
with(ClientFactory.createClient(host, port, credentialOptions, ssl)) {
with(ClientFactory.create(host, port, credentialOptions, ssl, mqttVersion)) {
print("Publishing $messageCount messages... ")

connect()

repeat(messageCount) {
publishWith().topic("internal/kortance").payload(randomPayload(size = 150)).send()
publish("internal/kortance", randomPayload(size = 150))
}

disconnect()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package de.smartsquare.kortance.scenarios.stress

import de.smartsquare.kortance.ClientFactory
import de.smartsquare.kortance.CredentialOptions
import de.smartsquare.kortance.WaveOptions
import de.smartsquare.kortance.delay
import de.smartsquare.kortance.mqtt.ClientFactory
import de.smartsquare.kortance.mqtt.SupportedMqttVersion
import de.smartsquare.kortance.randomPayload
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -34,21 +35,21 @@ class StressCommand : Callable<Int> {
@CommandLine.Option(names = ["-m", "--messages"])
private var messageCount: Int = 1000

@CommandLine.Option(names = ["-v", "--version"])
private var mqttVersion: SupportedMqttVersion = SupportedMqttVersion.V3

override fun call(): Int {
repeat(waveOptions.waves) { wave ->
println("Launching wave ${wave + 1} / ${waveOptions.waves}")

repeat(jobs) { job ->
val client = ClientFactory.createClient(host, port, credentialOptions, ssl)
val client = ClientFactory.create(host, port, credentialOptions, ssl, mqttVersion)

GlobalScope.launch {
client.connect()

repeat(messageCount) {
client.publishWith()
.topic("internal/kortance/${wave + 1}/${job + 1}")
.payload(randomPayload(size = 150))
.send()
client.publish("internal/kortance/${wave + 1}/${job + 1}", randomPayload(size = 150))

delay(min = 1, max = 100)
}
Expand Down

0 comments on commit 5719d65

Please sign in to comment.