Skip to content

Commit

Permalink
feat(mqtt): support subscribing to MQTT broker publications
Browse files Browse the repository at this point in the history
  • Loading branch information
activeshadow committed Jun 23, 2024
1 parent 5787355 commit 3153516
Showing 1 changed file with 85 additions and 9 deletions.
94 changes: 85 additions & 9 deletions src/go/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -61,20 +62,23 @@ func (Factory) NewModule(e *etree.Element) (otsim.Module, error) {
type MQTTClient struct {
sync.RWMutex

pubEndpoint string
pullEndpoint string
pubEndpoint string

endpoints []endpoint
period time.Duration

name string
id string

topics map[string]string
values map[string]float64
pubTopics map[string]string
subTopics map[string]string
values map[string]float64

// index of endpoint currently in use
endpoint int
client MQTT.Client
pusher *msgbus.Pusher

payloadTmpl *template.Template
timestampTmpl string
Expand All @@ -83,7 +87,8 @@ type MQTTClient struct {
func New(name string) *MQTTClient {
return &MQTTClient{
name: name,
topics: make(map[string]string),
pubTopics: make(map[string]string),
subTopics: make(map[string]string),
values: make(map[string]float64),
payloadTmpl: template.Must(template.New("payload").Parse(`{{ .Value }}`)),
timestampTmpl: time.RFC3339,
Expand All @@ -97,6 +102,8 @@ func (this MQTTClient) Name() string {
func (this *MQTTClient) Configure(e *etree.Element) error {
for _, child := range e.ChildElements() {
switch child.Tag {
case "pull-endpoint":
this.pullEndpoint = child.Text()
case "pub-endpoint":
this.pubEndpoint = child.Text()
case "endpoint":
Expand Down Expand Up @@ -138,10 +145,18 @@ func (this *MQTTClient) Configure(e *etree.Element) error {
case "tag":
var (
tag = child.Text()
mode = child.SelectAttrValue("mode", "publish")
topic = child.SelectAttrValue("topic", strings.ReplaceAll(tag, ".", "/"))
)

this.topics[tag] = topic
if strings.EqualFold(mode, "publish") {
this.pubTopics[tag] = topic
} else if strings.EqualFold(mode, "subscribe") {
this.subTopics[topic] = tag
} else {
return fmt.Errorf("tag mode must be 'publish' or 'subscribe'")
}

this.values[tag] = 0.0
case "payload-template":
var err error
Expand Down Expand Up @@ -174,17 +189,24 @@ func (this *MQTTClient) Configure(e *etree.Element) error {
return nil
}

func (this *MQTTClient) Run(ctx context.Context, pubEndpoint, _ string) error {
// Use ZeroMQ PUB endpoint specified in `mqtt` config block if provided.
func (this *MQTTClient) Run(ctx context.Context, pubEndpoint, pullEndpoint string) error {
// Use ZeroMQ PULL and PUB endpoints specified in `mqtt` config block if
// provided.
if this.pubEndpoint != "" {
pubEndpoint = this.pubEndpoint
}

if this.pullEndpoint != "" {
pullEndpoint = this.pullEndpoint
}

subscriber := msgbus.MustNewSubscriber(pubEndpoint)
this.pusher = msgbus.MustNewPusher(pullEndpoint)

if len(this.endpoints) == 0 {
return fmt.Errorf("no MQTT broker endpoints provided")
}

subscriber := msgbus.MustNewSubscriber(pubEndpoint)
subscriber.AddStatusHandler(this.handleMsgBusStatus)
subscriber.Start("RUNTIME")

Expand Down Expand Up @@ -233,6 +255,17 @@ func (this *MQTTClient) connect(ctx context.Context, cancel context.CancelFunc)
})
}

opts.OnConnect = func(c MQTT.Client) {
for topic := range this.subTopics {
this.log("[DEBUG] subscribing to broker for topic %s", topic)

token := c.Subscribe(topic, 0, this.newMessageReceivedHandler(this.pusher))
if token.Wait() && token.Error() != nil {
panic(fmt.Sprintf("unable to subscribe to broker for topic %s: %v", topic, token.Error()))
}
}
}

this.client = MQTT.NewClient(opts)

if token := this.client.Connect(); token.Wait() && token.Error() != nil {
Expand Down Expand Up @@ -275,11 +308,15 @@ func (this *MQTTClient) run(ctx context.Context) {
func (this *MQTTClient) publish(tag string, value float64) {
var (
tstamp = time.Now().UTC()
topic = this.topics[tag]
topic = this.pubTopics[tag]

buf bytes.Buffer
)

if topic == "" {
return
}

pdata := data{
Epoch: tstamp.Unix(),
Timestamp: tstamp.Format(this.timestampTmpl),
Expand Down Expand Up @@ -342,6 +379,45 @@ func (this *MQTTClient) lostConnectionHandler(ctx context.Context, cancel contex
}
}

func (this *MQTTClient) newMessageReceivedHandler(pusher *msgbus.Pusher) MQTT.MessageHandler {
return func(_ MQTT.Client, msg MQTT.Message) {
topic := msg.Topic()

tag := this.subTopics[topic]

if tag == "" { // shouldn't ever happen...
return
}

var (
buf = bytes.NewReader(msg.Payload())
value float64
)

if err := binary.Read(buf, binary.BigEndian, &value); err != nil {
this.log("[ERROR] parsing payload for topic %s: %v", topic, err)
return
}

this.Lock()
defer this.Unlock()

this.values[tag] = value

updates := []msgbus.Point{{Tag: tag, Value: value}}

env, err := msgbus.NewEnvelope(this.name, msgbus.Update{Updates: updates})
if err != nil {
this.log("[ERROR] creating new update message: %v\n", err)
return
}

if err := pusher.Push("RUNTIME", env); err != nil {
this.log("[ERROR] sending update message: %v", err)
}
}
}

func (this MQTTClient) log(format string, a ...any) {
fmt.Printf("[%s] %s\n", this.name, fmt.Sprintf(format, a...))
}

0 comments on commit 3153516

Please sign in to comment.