From 4cbda5532c48e1cf83c953cedc95d90c544e5b90 Mon Sep 17 00:00:00 2001 From: "Bryan T. Richardson" Date: Thu, 7 Dec 2023 11:51:34 -0700 Subject: [PATCH] feat(mqtt): add support for MQTT broker functionality --- src/go/go.mod | 14 +- src/go/go.sum | 22 ++- src/go/mqtt/broker/broker.go | 213 ++++++++++++++++++++++ src/go/mqtt/broker/hooks.go | 61 +++++++ src/go/mqtt/client/client.go | 329 +++++++++++++++++++++++++++++++++ src/go/mqtt/mqtt.go | 340 ++--------------------------------- src/go/mqtt/types.go | 83 --------- src/go/mqtt/types/data.go | 25 +++ src/go/mqtt/types/enpoint.go | 101 +++++++++++ 9 files changed, 768 insertions(+), 420 deletions(-) create mode 100644 src/go/mqtt/broker/broker.go create mode 100644 src/go/mqtt/broker/hooks.go create mode 100644 src/go/mqtt/client/client.go delete mode 100644 src/go/mqtt/types.go create mode 100644 src/go/mqtt/types/data.go create mode 100644 src/go/mqtt/types/enpoint.go diff --git a/src/go/go.mod b/src/go/go.mod index 5dda32a..c07a86f 100644 --- a/src/go/go.mod +++ b/src/go/go.mod @@ -2,6 +2,8 @@ module github.com/patsec/ot-sim go 1.21 +toolchain go1.21.1 + require ( actshad.dev/mbserver v0.3.1 actshad.dev/modbus v0.2.1 @@ -13,10 +15,11 @@ require ( github.com/gofrs/uuid v4.4.0+incompatible github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.5.0 + github.com/mochi-mqtt/server/v2 v2.4.2 github.com/pebbe/zmq4 v1.2.7 github.com/prometheus/client_golang v1.12.2 github.com/reiver/go-telnet v0.0.0-20180421082511-9ff0b2ab096e - golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 + golang.org/x/crypto v0.14.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) @@ -24,14 +27,15 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/google/go-cmp v0.5.8 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/reiver/go-oi v1.0.0 // indirect - golang.org/x/net v0.0.0-20210525063256-abc453219eb5 // indirect + github.com/rs/xid v1.4.0 // indirect + golang.org/x/net v0.17.0 // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect - golang.org/x/sys v0.12.0 // indirect - google.golang.org/protobuf v1.26.0 // indirect + golang.org/x/sys v0.13.0 // indirect + google.golang.org/protobuf v1.28.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/src/go/go.sum b/src/go/go.sum index cc79965..c661d7b 100644 --- a/src/go/go.sum +++ b/src/go/go.sum @@ -148,6 +148,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= +github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -161,11 +163,15 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mochi-mqtt/server/v2 v2.4.2 h1:x7xC41Qn/ek1hOWNcZraRm+Cmqc2yrfhD5VA1NFnXhc= +github.com/mochi-mqtt/server/v2 v2.4.2/go.mod h1:M1lZnLbyowXUyQBIlHYlX1wasxXqv/qFWwQxAzfphwA= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -207,6 +213,8 @@ github.com/reiver/go-oi v1.0.0/go.mod h1:RrDBct90BAhoDTxB1fenZwfykqeGvhI6LsNfStJ github.com/reiver/go-telnet v0.0.0-20180421082511-9ff0b2ab096e h1:quuzZLi72kkJjl+f5AQ93FMcadG19WkS7MO6TXFOSas= github.com/reiver/go-telnet v0.0.0-20180421082511-9ff0b2ab096e/go.mod h1:+5vNVvEWwEIx86DB9Ke/+a5wBI464eDRo3eF0LcfpWg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= +github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -232,8 +240,9 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -293,8 +302,9 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -348,8 +358,8 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -478,11 +488,13 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= diff --git a/src/go/mqtt/broker/broker.go b/src/go/mqtt/broker/broker.go new file mode 100644 index 0000000..22dddcc --- /dev/null +++ b/src/go/mqtt/broker/broker.go @@ -0,0 +1,213 @@ +package broker + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/binary" + "errors" + "fmt" + "strings" + + "github.com/patsec/ot-sim/mqtt/types" + "github.com/patsec/ot-sim/msgbus" + + "github.com/beevik/etree" + mochi "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/hooks/auth" + "github.com/mochi-mqtt/server/v2/listeners" +) + +/* + + + +
127.0.0.1:1883
+
+ +
10.11.12.13:8883
+ + /etc/ot-sim/root.pem + /etc/ot-sim/broker.key + /etc/ot-sim/broker.crt + +
+ 127.0.0.1:1883 + foo/bar +
+*/ + +type MQTTBroker struct { + pubEndpoint string + pullEndpoint string + + endpoints []types.Endpoint + + name string + topicToTag map[string]string + tagToTopic map[string]string + + server *mochi.Server +} + +func New(name string) *MQTTBroker { + return &MQTTBroker{ + name: name, + topicToTag: make(map[string]string), + tagToTopic: make(map[string]string), + } +} + +func (this MQTTBroker) Name() string { + return this.name +} + +func (this *MQTTBroker) Configure(e *etree.Element) error { + for _, child := range e.ChildElements() { + switch child.Tag { + case "pull-endpoint": + this.pullEndpoint = child.Text() + case "endpoint": + var endpoint types.Endpoint + + if len(child.ChildElements()) == 0 { + endpoint.Address = child.Text() + } else { + for _, child := range child.ChildElements() { + switch child.Tag { + case "address": + endpoint.Address = child.Text() + case "tls": + for _, child := range child.ChildElements() { + switch child.Tag { + case "ca": + endpoint.CAPath = child.Text() + case "key": + endpoint.KeyPath = child.Text() + case "certificate": + endpoint.CertPath = child.Text() + } + } + } + } + } + + this.endpoints = append(this.endpoints, endpoint) + case "topic": + var ( + topic = child.Text() + tag = child.SelectAttrValue("tag", strings.ReplaceAll(topic, "/", ".")) + ) + + this.topicToTag[topic] = tag + this.tagToTopic[tag] = topic + } + } + + for idx, endpoint := range this.endpoints { + if err := endpoint.Validate(); err != nil { + return fmt.Errorf("validating endpoint: %w", err) + } + + this.endpoints[idx] = endpoint + } + + return nil +} + +func (this *MQTTBroker) Run(ctx context.Context, pubEndpoint, pullEndpoint string) error { + // Use ZeroMQ PUB endpoint specified in `mqtt` config block if provided. + if this.pubEndpoint != "" { + pubEndpoint = this.pubEndpoint + } + + // Use ZeroMQ PULL endpoint specified in `mqtt` config block if provided. + if this.pullEndpoint != "" { + pullEndpoint = this.pullEndpoint + } + + if len(this.endpoints) == 0 { + return fmt.Errorf("no MQTT broker listener endpoints provided") + } + + msgBusHook := &PublishToMsgBus{ + name: this.name, + pusher: msgbus.MustNewPusher(pullEndpoint), + topics: this.topicToTag, + log: this.log, + } + + subscriber := msgbus.MustNewSubscriber(pubEndpoint) + subscriber.AddUpdateHandler(this.handleMsgBusUpdate) + subscriber.Start("RUNTIME") + + this.server = mochi.New(&mochi.Options{InlineClient: true}) + this.server.AddHook(new(auth.AllowHook), nil) + this.server.AddHook(msgBusHook, nil) + + for i, endpoint := range this.endpoints { + var config *listeners.Config + + if !endpoint.Insecure { + config = &listeners.Config{ + TLSConfig: &tls.Config{ + RootCAs: endpoint.Roots, + Certificates: []tls.Certificate{endpoint.Cert}, + }, + } + } + + l := listeners.NewTCP(fmt.Sprintf("t%d", i), endpoint.Address, config) + + if err := this.server.AddListener(l); err != nil { + return fmt.Errorf("adding TCP listener to MQTT broker: %w", err) + } + } + + go func() { + if err := this.server.Serve(); err != nil { + this.log("[ERROR] serving MQTT broker: %v", err) + } + }() + + go func() { + <-ctx.Done() + this.server.Close() + }() + + return nil +} + +func (this *MQTTBroker) handleMsgBusUpdate(env msgbus.Envelope) { + if env.Sender() == this.name { + return + } + + update, err := env.Update() + if err != nil { + if !errors.Is(err, msgbus.ErrKindNotUpdate) { + this.log("[ERROR] getting Update message from envelope: %v", err) + } + + return + } + + for _, point := range update.Updates { + this.log("[DEBUG] received update for tag %s (value: %f)", point.Tag, point.Value) + + if topic, ok := this.tagToTopic[point.Tag]; ok { + var buf bytes.Buffer + if err := binary.Write(&buf, binary.BigEndian, point.Value); err != nil { + this.log("[ERROR] converting value %f for tag %s to bytes: %v", point.Value, point.Tag, err) + } + + this.log("[DEBUG] publishing value %f to topic %s", point.Value, topic) + + this.server.Publish(topic, buf.Bytes(), false, 0) + } + } +} + +func (this MQTTBroker) log(format string, a ...any) { + fmt.Printf("[%s] %s\n", this.name, fmt.Sprintf(format, a...)) +} diff --git a/src/go/mqtt/broker/hooks.go b/src/go/mqtt/broker/hooks.go new file mode 100644 index 0000000..8d7f591 --- /dev/null +++ b/src/go/mqtt/broker/hooks.go @@ -0,0 +1,61 @@ +package broker + +import ( + "bytes" + "strconv" + + mochi "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/packets" + "github.com/patsec/ot-sim/msgbus" +) + +type PublishToMsgBus struct { + mochi.HookBase + + name string + + pusher *msgbus.Pusher + topics map[string]string + + log func(string, ...any) +} + +func (this *PublishToMsgBus) ID() string { + return "publish-to-ot-sim-msg-bus" +} + +func (this *PublishToMsgBus) Provides(b byte) bool { + return bytes.Contains([]byte{ + mochi.OnPublished, + }, []byte{b}) +} + +func (this *PublishToMsgBus) OnPublished(c *mochi.Client, p packets.Packet) { + if c.ID == "inline" { + return + } + + this.log("[DEBUG] topic: %s -- payload: %s", p.TopicName, string(p.Payload)) + + if tag, ok := this.topics[p.TopicName]; ok { + var points []msgbus.Point + + value, err := strconv.ParseFloat(string(p.Payload), 64) + if err != nil { + this.log("[ERROR] parsing payload for topic %s to float64: %v", p.TopicName, err) + return + } + + points = append(points, msgbus.Point{Tag: tag, Value: value}) + + env, err := msgbus.NewEnvelope(this.name, msgbus.Status{Measurements: points}) + if err != nil { + this.log("[ERROR] creating status message: %v", err) + return + } + + if err := this.pusher.Push("RUNTIME", env); err != nil { + this.log("[ERROR] sending status message: %v", err) + } + } +} diff --git a/src/go/mqtt/client/client.go b/src/go/mqtt/client/client.go new file mode 100644 index 0000000..99498c9 --- /dev/null +++ b/src/go/mqtt/client/client.go @@ -0,0 +1,329 @@ +package client + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "strconv" + "strings" + "sync" + "text/template" + "time" + + "github.com/patsec/ot-sim/mqtt/types" + "github.com/patsec/ot-sim/msgbus" + + "github.com/beevik/etree" + "github.com/cenkalti/backoff" + MQTT "github.com/eclipse/paho.mqtt.golang" +) + +/* + + + + ssl://broker-1.example.com:8883 + + /etc/ot-sim/root.pem + /etc/ot-sim/broker-1-client.key + /etc/ot-sim/broker-1-client.crt + + + + ssl://broker-2.example.com:8883 + + /etc/ot-sim/root.pem + /etc/ot-sim/broker-2-client.key + /etc/ot-sim/broker-2-client.crt + + + tcp://broker.example.com:1883 + ot-sim-jitp-test + 5s + bus-692.voltage + +*/ + +type MQTTClient struct { + sync.RWMutex + + pubEndpoint string + + endpoints []types.Endpoint + period time.Duration + + name string + id string + + topics map[string]string + values map[string]float64 + + // index of endpoint currently in use + endpoint int + client MQTT.Client + + payloadTmpl *template.Template + timestampTmpl string +} + +func New(name string) *MQTTClient { + return &MQTTClient{ + name: name, + topics: make(map[string]string), + values: make(map[string]float64), + payloadTmpl: template.Must(template.New("payload").Parse(`{{ .Value }}`)), + timestampTmpl: time.RFC3339, + } +} + +func (this MQTTClient) Name() string { + return this.name +} + +func (this *MQTTClient) Configure(e *etree.Element) error { + for _, child := range e.ChildElements() { + switch child.Tag { + case "pub-endpoint": + this.pubEndpoint = child.Text() + case "endpoint": + var endpoint types.Endpoint + + if len(child.ChildElements()) == 0 { + endpoint.URL = child.Text() + } else { + for _, child := range child.ChildElements() { + switch child.Tag { + case "url": + endpoint.URL = child.Text() + case "tls": + insecure := child.SelectAttrValue("insecure", "false") + endpoint.Insecure, _ = strconv.ParseBool(insecure) + + for _, child := range child.ChildElements() { + switch child.Tag { + case "ca": + endpoint.CAPath = child.Text() + case "key": + endpoint.KeyPath = child.Text() + case "certificate": + endpoint.CertPath = child.Text() + } + } + } + } + } + + this.endpoints = append(this.endpoints, endpoint) + case "client-id": + this.id = child.Text() + case "period": + var err error + if this.period, err = time.ParseDuration(child.Text()); err != nil { + return fmt.Errorf("invalid period duration '%s': %w", child.Text(), err) + } + case "tag": + var ( + tag = child.Text() + topic = child.SelectAttrValue("topic", strings.ReplaceAll(tag, ".", "/")) + ) + + this.topics[tag] = topic + this.values[tag] = 0.0 + case "payload-template": + var err error + + this.payloadTmpl, err = template.New("payload").Parse(strings.TrimSpace(child.Text())) + if err != nil { + return fmt.Errorf("parsing payload template: %w", err) + } + + this.timestampTmpl = child.SelectAttrValue("timestamp", this.timestampTmpl) + } + } + + if this.id == "" { + return fmt.Errorf("must provide 'client-id' for MQTT module config") + } + + for idx, endpoint := range this.endpoints { + if err := endpoint.Validate(); err != nil { + return fmt.Errorf("validating endpoint: %w", err) + } + + this.endpoints[idx] = endpoint + } + + return nil +} + +func (this *MQTTClient) Run(ctx context.Context, pubEndpoint, _ string) error { + // Use ZeroMQ PUB endpoint specified in `mqtt` config block if provided. + if this.pubEndpoint != "" { + pubEndpoint = this.pubEndpoint + } + + if len(this.endpoints) == 0 { + return fmt.Errorf("no MQTT broker endpoints provided") + } + + subscriber := msgbus.MustNewSubscriber(pubEndpoint) + subscriber.AddStatusHandler(this.handleMsgBusStatus) + subscriber.Start("RUNTIME") + + this.connectAndRun(ctx) + + return nil +} + +func (this *MQTTClient) connectAndRun(ctx context.Context) { + cctx, cancel := context.WithCancel(ctx) + backoff := backoff.NewExponentialBackOff() + + for { + if err := this.connect(ctx, cancel); err == nil { + break + } + + time.Sleep(backoff.NextBackOff()) + } + + go this.run(cctx) +} + +func (this *MQTTClient) connect(ctx context.Context, cancel context.CancelFunc) error { + // circle back around to beginning of endpoint list + if this.endpoint == len(this.endpoints) { + this.endpoint = 0 + } + + endpoint := this.endpoints[this.endpoint] + this.endpoint++ + + opts := MQTT.NewClientOptions() + + opts.AddBroker(endpoint.URL).SetClientID(this.id).SetCleanSession(true) + opts.SetKeepAlive(5 * time.Second).SetAutoReconnect(false).SetConnectRetry(false).SetConnectTimeout(10 * time.Second) + + opts.SetConnectionLostHandler(this.lostConnectionHandler(ctx, cancel)) + + if endpoint.URI.Scheme == "ssl" || endpoint.URI.Scheme == "tls" { + opts.SetTLSConfig(&tls.Config{ + ServerName: endpoint.URI.Hostname(), + RootCAs: endpoint.Roots, + Certificates: []tls.Certificate{endpoint.Cert}, + InsecureSkipVerify: endpoint.Insecure, + }) + } + + this.client = MQTT.NewClient(opts) + + if token := this.client.Connect(); token.Wait() && token.Error() != nil { + this.log("[ERROR] connecting to MQTT broker at %s: %v", endpoint.URL, token.Error()) + return fmt.Errorf("connecting to MQTT broker: %w", token.Error()) + } + + this.log("[DEBUG] connected to MQTT broker at %s", endpoint.URL) + + return nil +} + +func (this *MQTTClient) run(ctx context.Context) { + if this.period == 0 { + return + } + + ticker := time.NewTicker(this.period) + + for { + select { + case <-ctx.Done(): + ticker.Stop() + + this.log("[ERROR] stopping publish loop: %v", ctx.Err()) + + return + case <-ticker.C: + this.RLock() + + for tag, value := range this.values { + go this.publish(tag, value) + } + + this.RUnlock() + } + } +} + +func (this *MQTTClient) publish(tag string, value float64) { + var ( + tstamp = time.Now().UTC() + topic = this.topics[tag] + ) + + data := types.Data{ + Epoch: tstamp.Unix(), + Timestamp: tstamp.Format(this.timestampTmpl), + Client: this.id, + Topic: topic, + Value: value, + } + + payload, err := data.Execute(this.payloadTmpl) + if err != nil { + this.log("[ERROR] executing payload template: %v", err) + return + } + + token := this.client.Publish(topic, 0, false, payload) + + this.log("[DEBUG] publishing %s --> %s to MQTT broker", topic, payload) + + if token.Wait() && token.Error() != nil { + this.log("[ERROR] publishing topic %s to MQTT broker: %v", topic, token.Error()) + } else { + this.log("[DEBUG] published %s --> %f to MQTT broker", topic, value) + } +} + +func (this *MQTTClient) handleMsgBusStatus(env msgbus.Envelope) { + if env.Sender() == this.name { + return + } + + status, err := env.Status() + if err != nil { + if !errors.Is(err, msgbus.ErrKindNotStatus) { + this.log("[ERROR] getting Status message from envelope: %v", err) + } + + return + } + + this.Lock() + defer this.Unlock() + + for _, point := range status.Measurements { + if _, ok := this.values[point.Tag]; ok { + this.values[point.Tag] = point.Value + + if this.period == 0 { + go this.publish(point.Tag, point.Value) + } + } + } +} + +func (this *MQTTClient) lostConnectionHandler(ctx context.Context, cancel context.CancelFunc) MQTT.ConnectionLostHandler { + return func(client MQTT.Client, err error) { + this.log("[ERROR] connection to MQTT broker lost: %v", err) + + cancel() + + this.connectAndRun(ctx) + } +} + +func (this MQTTClient) log(format string, a ...any) { + fmt.Printf("[%s] %s\n", this.name, fmt.Sprintf(format, a...)) +} diff --git a/src/go/mqtt/mqtt.go b/src/go/mqtt/mqtt.go index 87c1ec6..a425497 100644 --- a/src/go/mqtt/mqtt.go +++ b/src/go/mqtt/mqtt.go @@ -1,347 +1,33 @@ -// Package mqtt implements an MQTTClient client as a module. package mqtt import ( - "bytes" - "context" - "crypto/tls" - "errors" "fmt" - "strconv" "strings" - "sync" - "text/template" - "time" otsim "github.com/patsec/ot-sim" - "github.com/patsec/ot-sim/msgbus" + "github.com/patsec/ot-sim/mqtt/broker" + "github.com/patsec/ot-sim/mqtt/client" "github.com/beevik/etree" - "github.com/cenkalti/backoff" - MQTT "github.com/eclipse/paho.mqtt.golang" ) -/* - - - - ssl://broker-1.example.com:8883 - - /etc/ot-sim/root.pem - /etc/ot-sim/broker-1-client.key - /etc/ot-sim/broker-1-client.crt - - - - ssl://broker-2.example.com:8883 - - /etc/ot-sim/root.pem - /etc/ot-sim/broker-2-client.key - /etc/ot-sim/broker-2-client.crt - - - tcp://broker.example.com:1883 - ot-sim-jitp-test - 5s - bus-692.voltage - -*/ - -func init() { - otsim.AddModuleFactory("mqtt", new(Factory)) -} - type Factory struct{} func (Factory) NewModule(e *etree.Element) (otsim.Module, error) { - name := e.SelectAttrValue("name", "mqtt") - return New(name), nil -} - -type MQTTClient struct { - sync.RWMutex - - pubEndpoint string - - endpoints []endpoint - period time.Duration - - name string - id string - - topics map[string]string - values map[string]float64 - - // index of endpoint currently in use - endpoint int - client MQTT.Client - - payloadTmpl *template.Template - timestampTmpl string -} - -func New(name string) *MQTTClient { - return &MQTTClient{ - name: name, - topics: make(map[string]string), - values: make(map[string]float64), - payloadTmpl: template.Must(template.New("payload").Parse(`{{ .Value }}`)), - timestampTmpl: time.RFC3339, - } -} - -func (this MQTTClient) Name() string { - return this.name -} - -func (this *MQTTClient) Configure(e *etree.Element) error { - for _, child := range e.ChildElements() { - switch child.Tag { - case "pub-endpoint": - this.pubEndpoint = child.Text() - case "endpoint": - var endpoint endpoint - - if len(child.ChildElements()) == 0 { - endpoint.url = child.Text() - } else { - for _, child := range child.ChildElements() { - switch child.Tag { - case "url": - endpoint.url = child.Text() - case "tls": - insecure := child.SelectAttrValue("insecure", "false") - endpoint.insecure, _ = strconv.ParseBool(insecure) - - for _, child := range child.ChildElements() { - switch child.Tag { - case "ca": - endpoint.caPath = child.Text() - case "key": - endpoint.keyPath = child.Text() - case "certificate": - endpoint.certPath = child.Text() - } - } - } - } - } - - this.endpoints = append(this.endpoints, endpoint) - case "client-id": - this.id = child.Text() - case "period": - var err error - if this.period, err = time.ParseDuration(child.Text()); err != nil { - return fmt.Errorf("invalid period duration '%s': %w", child.Text(), err) - } - case "tag": - var ( - tag = child.Text() - topic = child.SelectAttrValue("topic", strings.ReplaceAll(tag, ".", "/")) - ) - - this.topics[tag] = topic - this.values[tag] = 0.0 - case "payload-template": - var err error - - this.payloadTmpl, err = template.New("payload").Parse(strings.TrimSpace(child.Text())) - if err != nil { - return fmt.Errorf("parsing payload template: %w", err) - } - - this.timestampTmpl = child.SelectAttrValue("timestamp", this.timestampTmpl) - } - } - - if this.id == "" { - return fmt.Errorf("must provide 'client-id' for MQTT module config") - } - - for idx, endpoint := range this.endpoints { - this.log("[DEBUG] endpoint pre validation: %+v", endpoint) - - if err := endpoint.validate(); err != nil { - return fmt.Errorf("validating endpoint: %w", err) - } - - this.log("[DEBUG] endpoint post validation: %+v", endpoint) - - this.endpoints[idx] = endpoint - } - - return nil -} - -func (this *MQTTClient) Run(ctx context.Context, pubEndpoint, _ string) error { - // Use ZeroMQ PUB endpoint specified in `mqtt` config block if provided. - if this.pubEndpoint != "" { - pubEndpoint = this.pubEndpoint - } - - if len(this.endpoints) == 0 { - return fmt.Errorf("no MQTT broker endpoints provided") - } - - subscriber := msgbus.MustNewSubscriber(pubEndpoint) - subscriber.AddStatusHandler(this.handleMsgBusStatus) - subscriber.Start("RUNTIME") - - this.connectAndRun(ctx) - - return nil -} - -func (this *MQTTClient) connectAndRun(ctx context.Context) { - cctx, cancel := context.WithCancel(ctx) - backoff := backoff.NewExponentialBackOff() - - for { - if err := this.connect(ctx, cancel); err == nil { - break - } - - time.Sleep(backoff.NextBackOff()) - } - - go this.run(cctx) -} - -func (this *MQTTClient) connect(ctx context.Context, cancel context.CancelFunc) error { - // circle back around to beginning of endpoint list - if this.endpoint == len(this.endpoints) { - this.endpoint = 0 - } - - endpoint := this.endpoints[this.endpoint] - this.endpoint++ - - opts := MQTT.NewClientOptions() - - opts.AddBroker(endpoint.url).SetClientID(this.id).SetCleanSession(true) - opts.SetKeepAlive(5 * time.Second).SetAutoReconnect(false).SetConnectRetry(false).SetConnectTimeout(10 * time.Second) - - opts.SetConnectionLostHandler(this.lostConnectionHandler(ctx, cancel)) + mode := e.SelectAttrValue("mode", "client") - if endpoint.uri.Scheme == "ssl" || endpoint.uri.Scheme == "tls" { - opts.SetTLSConfig(&tls.Config{ - ServerName: endpoint.uri.Hostname(), - RootCAs: endpoint.roots, - Certificates: []tls.Certificate{endpoint.cert}, - InsecureSkipVerify: endpoint.insecure, - }) + switch strings.ToLower(mode) { + case "broker": + name := e.SelectAttrValue("name", "mqtt-broker") + return broker.New(name), nil + case "client": + name := e.SelectAttrValue("name", "mqtt-client") + return client.New(name), nil } - this.client = MQTT.NewClient(opts) - - if token := this.client.Connect(); token.Wait() && token.Error() != nil { - this.log("[ERROR] connecting to MQTT broker at %s: %v", endpoint.url, token.Error()) - return fmt.Errorf("connecting to MQTT broker: %w", token.Error()) - } - - this.log("[DEBUG] connected to MQTT broker at %s", endpoint.url) - - return nil + return nil, fmt.Errorf("unknown mode '%s' provided for MQTT module", mode) } -func (this *MQTTClient) run(ctx context.Context) { - if this.period == 0 { - return - } - - ticker := time.NewTicker(this.period) - - for { - select { - case <-ctx.Done(): - ticker.Stop() - - this.log("[ERROR] stopping publish loop: %v", ctx.Err()) - - return - case <-ticker.C: - this.RLock() - - for tag, value := range this.values { - go this.publish(tag, value) - } - - this.RUnlock() - } - } -} - -func (this *MQTTClient) publish(tag string, value float64) { - var ( - tstamp = time.Now().UTC() - topic = this.topics[tag] - - buf bytes.Buffer - ) - - pdata := data{ - Epoch: tstamp.Unix(), - Timestamp: tstamp.Format(this.timestampTmpl), - Client: this.id, - Topic: topic, - Value: value, - } - - if err := this.payloadTmpl.Execute(&buf, pdata); err != nil { - this.log("[ERROR] executing payload template: %v", err) - return - } - - token := this.client.Publish(topic, 0, false, buf.String()) - - this.log("[DEBUG] publishing %s --> %s to MQTT broker", topic, buf.String()) - - if token.Wait() && token.Error() != nil { - this.log("[ERROR] publishing topic %s to MQTT broker: %v", topic, token.Error()) - } else { - this.log("[DEBUG] published %s --> %f to MQTT broker", topic, value) - } -} - -func (this *MQTTClient) handleMsgBusStatus(env msgbus.Envelope) { - if env.Sender() == this.name { - return - } - - status, err := env.Status() - if err != nil { - if !errors.Is(err, msgbus.ErrKindNotStatus) { - this.log("[ERROR] getting Status message from envelope: %v", err) - } - - return - } - - this.Lock() - defer this.Unlock() - - for _, point := range status.Measurements { - if _, ok := this.values[point.Tag]; ok { - this.values[point.Tag] = point.Value - - if this.period == 0 { - go this.publish(point.Tag, point.Value) - } - } - } -} - -func (this *MQTTClient) lostConnectionHandler(ctx context.Context, cancel context.CancelFunc) MQTT.ConnectionLostHandler { - return func(client MQTT.Client, err error) { - this.log("[ERROR] connection to MQTT broker lost: %v", err) - - cancel() - - this.connectAndRun(ctx) - } -} - -func (this MQTTClient) log(format string, a ...any) { - fmt.Printf("[%s] %s\n", this.name, fmt.Sprintf(format, a...)) +func init() { + otsim.AddModuleFactory("mqtt", new(Factory)) } diff --git a/src/go/mqtt/types.go b/src/go/mqtt/types.go deleted file mode 100644 index 1667927..0000000 --- a/src/go/mqtt/types.go +++ /dev/null @@ -1,83 +0,0 @@ -package mqtt - -import ( - "bytes" - "crypto/tls" - "crypto/x509" - "fmt" - "net/url" - "os" - "text/template" -) - -type endpoint struct { - url string - - caPath string - keyPath string - certPath string - - uri *url.URL - cert tls.Certificate - roots *x509.CertPool - - insecure bool -} - -func (this *endpoint) validate() error { - var err error - - this.uri, err = url.Parse(this.url) - if err != nil { - return fmt.Errorf("parsing endpoint URL %s: %w", this.url, err) - } - - if this.uri.Scheme == "" { - return fmt.Errorf("endpoint URL is missing a scheme (must be tcp, ssl, or tls)") - } - - if this.uri.Scheme == "ssl" || this.uri.Scheme == "tls" { - if this.certPath == "" || this.keyPath == "" { - return fmt.Errorf("must provide 'certificate' and 'key' for MQTT module config when using ssl/tls") - } - - this.cert, err = tls.LoadX509KeyPair(this.certPath, this.keyPath) - if err != nil { - return fmt.Errorf("loading MQTT module certificate and key: %w", err) - } - - if this.caPath != "" { - caCert, err := os.ReadFile(this.caPath) - if err != nil { - return fmt.Errorf("reading MQTT module CA certificate: %w", err) - } - - this.roots = x509.NewCertPool() - - if ok := this.roots.AppendCertsFromPEM(caCert); !ok { - return fmt.Errorf("failed to parse MQTT module CA certificate") - } - } - } - - return nil -} - -// publication payload data -type data struct { - Epoch int64 - Timestamp string - Client string - Topic string - Value any -} - -func (this data) execute(tmpl *template.Template) (string, error) { - var buf bytes.Buffer - - if err := tmpl.Execute(&buf, this); err != nil { - return "", fmt.Errorf("executing template: %w", err) - } - - return buf.String(), nil -} diff --git a/src/go/mqtt/types/data.go b/src/go/mqtt/types/data.go new file mode 100644 index 0000000..22aa69b --- /dev/null +++ b/src/go/mqtt/types/data.go @@ -0,0 +1,25 @@ +package types + +import ( + "bytes" + "fmt" + "text/template" +) + +type Data struct { + Epoch int64 + Timestamp string + Client string + Topic string + Value any +} + +func (this Data) Execute(tmpl *template.Template) (string, error) { + var buf bytes.Buffer + + if err := tmpl.Execute(&buf, this); err != nil { + return "", fmt.Errorf("executing template: %w", err) + } + + return buf.String(), nil +} diff --git a/src/go/mqtt/types/enpoint.go b/src/go/mqtt/types/enpoint.go new file mode 100644 index 0000000..06e3a0e --- /dev/null +++ b/src/go/mqtt/types/enpoint.go @@ -0,0 +1,101 @@ +package types + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "net/url" + "os" +) + +type Endpoint struct { + Address string // broker + URL string // client + + CAPath string + KeyPath string + CertPath string + + URI *url.URL + Cert tls.Certificate + Roots *x509.CertPool + + Insecure bool +} + +func (this *Endpoint) Validate() error { + if this.Address == "" && this.URL == "" { + return fmt.Errorf("must provide either endpoint URL or endpoint address") + } + + if this.Address != "" && this.URL != "" { + return fmt.Errorf("can only provide one of endpoint URL or endpoint address") + } + + var err error + + if this.Address != "" { // broker + if this.CertPath != "" || this.KeyPath != "" { + if this.CertPath == "" || this.KeyPath == "" { + return fmt.Errorf("must provide both 'certificate' and 'key' to enable TLS") + } + + this.Cert, err = tls.LoadX509KeyPair(this.CertPath, this.KeyPath) + if err != nil { + return fmt.Errorf("loading MQTT module certificate and key: %w", err) + } + + if this.CAPath != "" { + caCert, err := os.ReadFile(this.CAPath) + if err != nil { + return fmt.Errorf("reading MQTT module CA certificate: %w", err) + } + + this.Roots = x509.NewCertPool() + + if ok := this.Roots.AppendCertsFromPEM(caCert); !ok { + return fmt.Errorf("failed to parse MQTT module CA certificate") + } + } + } else { + this.Insecure = true + } + } + + if this.URL != "" { // client + this.URI, err = url.Parse(this.URL) + if err != nil { + return fmt.Errorf("parsing endpoint URL %s: %w", this.URL, err) + } + + if this.URI.Scheme == "" { + return fmt.Errorf("endpoint URL is missing a scheme (must be tcp, ssl, or tls)") + } + + if this.URI.Scheme == "ssl" || this.URI.Scheme == "tls" { + if this.CertPath == "" || this.KeyPath == "" { + return fmt.Errorf("must provide 'certificate' and 'key' for MQTT module config when using ssl/tls") + } + + this.Cert, err = tls.LoadX509KeyPair(this.CertPath, this.KeyPath) + if err != nil { + return fmt.Errorf("loading MQTT module certificate and key: %w", err) + } + + if this.CAPath != "" { + caCert, err := os.ReadFile(this.CAPath) + if err != nil { + return fmt.Errorf("reading MQTT module CA certificate: %w", err) + } + + this.Roots = x509.NewCertPool() + + if ok := this.Roots.AppendCertsFromPEM(caCert); !ok { + return fmt.Errorf("failed to parse MQTT module CA certificate") + } + } + } + } + + return nil +}