-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
137 lines (113 loc) · 3.93 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package main
import (
"context"
"flag"
"fmt"
"math/rand"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
MQTTClient "github.com/pablitovicente/mqtt-load-generator/pkg/MQTTClient"
)
func main() {
// Argument parsing
targetTopic := flag.String("t", "/load", "Target MQTT topic to publish messages to")
username := flag.String("u", "", "MQTT username")
password := flag.String("P", "", "MQTT password")
host := flag.String("h", "localhost", "MQTT host")
port := flag.Int("p", 1883, "MQTT port")
qos := flag.Int("q", 1, "MQTT QoS used by all clients")
cert := flag.String("cert", "", "Path to TLS certificate file")
ca := flag.String("ca", "", "Path to TLS CA file")
key := flag.String("key", "", "Path to TLS key file")
insecure := flag.Bool("insecure", false, "Set to true to allow self signed certificates")
mqtts := flag.Bool("mqtts", false, "Set to true to use MQTTS")
cleanSession := flag.Bool("cleanSession", true, "Set to true for clean MQTT sessions or false to keep session")
clientID := flag.String("clientID", "mqtt-to-nats-bridge", "Custom MQTT clientID")
keepAliveTimeout := flag.Int64("keepAliveTimeout", 5000, "Set the amount of time (in seconds) that the client should wait before sending a PING request to the broker")
natsURL := flag.String("N", "nats://localhost:4222", "NATS Stream server url for example nats://localhost:4222")
natsStreamName := flag.String("SN", "collector", "NATS Stream name used to store MQTT forwarded messages")
natsStreamReplicas := flag.Int("R", 1, "Number of NATS Stream replicas")
natsStreamStorage := flag.String("S", "file", "The storage used for the stream it can be either 'memory' or 'file'")
maxInflightMessages := flag.Int("bufferSize", 1024, "The size of the buffer the NATS client will use before blocking")
flag.Parse()
var storageType jetstream.StorageType
if *natsStreamStorage == "file" {
storageType = jetstream.FileStorage
} else if *natsStreamStorage == "memory" {
storageType = jetstream.MemoryStorage
} else {
panic("'S' parameter needs to be either 'file' or 'memory'")
}
if *qos < 0 || *qos > 2 {
panic("QoS should be any of [0, 1, 2]")
}
// General Client Config
mqttClientConfig := MQTTClient.Config{
TargetTopic: targetTopic,
Username: username,
Password: password,
Host: host,
Port: port,
QoS: qos,
CleanSession: cleanSession,
ClientID: clientID,
KeepAliveTimeout: keepAliveTimeout,
Insecure: insecure,
MQTTS: mqtts,
}
if TLSOptionsSet() {
mqttClientConfig.TLSConfigured = true
mqttClientConfig.CA = ca
mqttClientConfig.Cert = cert
mqttClientConfig.Key = key
}
rand.Seed(time.Now().UnixNano())
updates := make(chan int)
mqttClient := MQTTClient.Client{
Config: mqttClientConfig,
Updates: updates,
}
mqttClient.Connect()
nc, err := nats.Connect(*natsURL)
if err != nil {
panic("Not able to connect to NATS")
}
js, _ := jetstream.New(nc, jetstream.WithPublishAsyncMaxPending(*maxInflightMessages))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: *natsStreamName,
Subjects: []string{*natsStreamName},
Replicas: *natsStreamReplicas,
Storage: storageType,
})
if err != nil {
fmt.Println("Error creating NATS Stream:", err)
}
mqttClient.Connection.Subscribe(*targetTopic, byte(*qos), func(c mqtt.Client, m mqtt.Message) {
_, err := js.PublishAsync(*natsStreamName, m.Payload())
if err != nil {
fmt.Println("Nats publish error:", err)
}
})
select {}
}
func TLSOptionsSet() bool {
foundCert := false
foundCA := false
foundKey := false
flag.Visit(func(f *flag.Flag) {
if f.Name == "cert" {
foundCert = true
}
if f.Name == "ca" {
foundCA = true
}
if f.Name == "key" {
foundKey = true
}
})
return foundCA && foundCert && foundKey
}