Skip to content

Commit

Permalink
fixup! feat: new OT-sim intercom module
Browse files Browse the repository at this point in the history
  • Loading branch information
activeshadow committed Jun 24, 2024
1 parent be5d962 commit 35c20bf
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 24 deletions.
55 changes: 51 additions & 4 deletions src/go/intercom/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"

"github.com/patsec/ot-sim/msgbus"

Expand All @@ -27,11 +28,24 @@ type IntercomBroker struct {
name string
endpoint string

pubStatus bool
pubUpdate bool
subStatus bool
subUpdate bool

server *mochi.Server
}

func New(name string) *IntercomBroker {
return &IntercomBroker{name: name}
return &IntercomBroker{
name: name,

// default to pub/sub all messages
pubStatus: true,
pubUpdate: true,
subStatus: true,
subUpdate: true,
}
}

func (this IntercomBroker) Name() string {
Expand All @@ -47,6 +61,28 @@ func (this *IntercomBroker) Configure(e *etree.Element) error {
this.pullEndpoint = child.Text()
case "endpoint":
this.endpoint = child.Text()
case "publish":
for _, child := range child.ChildElements() {
val, _ := strconv.ParseBool(child.Text())

switch child.Tag {
case "status":
this.pubStatus = val
case "update":
this.pubUpdate = val
}
}
case "subscribe":
for _, child := range child.ChildElements() {
val, _ := strconv.ParseBool(child.Text())

switch child.Tag {
case "status":
this.subStatus = val
case "update":
this.subUpdate = val
}
}
}
}

Expand All @@ -69,12 +105,15 @@ func (this *IntercomBroker) Run(ctx context.Context, pubEndpoint, pullEndpoint s
}

msgBusHook := &PublishToMsgBus{
name: this.name,
pusher: msgbus.MustNewPusher(pullEndpoint),
log: this.log,
name: this.name,
pusher: msgbus.MustNewPusher(pullEndpoint),
log: this.log,
subStatus: this.subStatus,
subUpdate: this.subUpdate,
}

subscriber := msgbus.MustNewSubscriber(pubEndpoint)
subscriber.AddStatusHandler(this.handleMsgBusEnvelope)
subscriber.AddUpdateHandler(this.handleMsgBusEnvelope)
subscriber.Start("RUNTIME")

Expand Down Expand Up @@ -109,6 +148,10 @@ func (this *IntercomBroker) handleMsgBusEnvelope(env msgbus.Envelope) {

switch env.Kind {
case msgbus.ENVELOPE_STATUS:
if !this.pubStatus {
return
}

status, err := env.Status()
if err != nil {
if !errors.Is(err, msgbus.ErrKindNotStatus) {
Expand All @@ -130,6 +173,10 @@ func (this *IntercomBroker) handleMsgBusEnvelope(env msgbus.Envelope) {

this.server.Publish(topic, payload, false, 0)
case msgbus.ENVELOPE_UPDATE:
if !this.pubUpdate {
return
}

update, err := env.Update()
if err != nil {
if !errors.Is(err, msgbus.ErrKindNotUpdate) {
Expand Down
53 changes: 41 additions & 12 deletions src/go/intercom/broker/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package broker

import (
"bytes"
"strconv"
"encoding/json"
"strings"

mochi "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/packets"
Expand All @@ -12,12 +13,12 @@ import (
type PublishToMsgBus struct {
mochi.HookBase

name string

name string
pusher *msgbus.Pusher
topics map[string]string
log func(string, ...any)

log func(string, ...any)
subStatus bool
subUpdate bool
}

func (this *PublishToMsgBus) ID() string {
Expand All @@ -35,19 +36,20 @@ func (this *PublishToMsgBus) OnPublished(c *mochi.Client, p packets.Packet) {
return
}

this.log("[DEBUG] topic: %s -- payload: %s", p.TopicName, string(p.Payload))
if strings.HasSuffix(p.TopicName, "/status") {
if !this.subStatus {
return
}

this.log("[DEBUG] topic: %s -- payload: %s", p.TopicName, string(p.Payload))

if tag, ok := this.topics[p.TopicName]; ok {
var points []msgbus.Point

value, err := strconv.ParseFloat(string(p.Payload), 64)
if err != nil {
this.log("[ERROR] parsing payload for topic %s to float64: %v", p.TopicName, err)
if err := json.Unmarshal(p.Payload, &points); err != nil {
this.log("[ERROR] unmarshaling status points: %v", err)
return
}

points = append(points, msgbus.Point{Tag: tag, Value: value})

env, err := msgbus.NewEnvelope(this.name, msgbus.Status{Measurements: points})
if err != nil {
this.log("[ERROR] creating status message: %v", err)
Expand All @@ -56,6 +58,33 @@ func (this *PublishToMsgBus) OnPublished(c *mochi.Client, p packets.Packet) {

if err := this.pusher.Push("RUNTIME", env); err != nil {
this.log("[ERROR] sending status message: %v", err)
return
}
}

if strings.HasSuffix(p.TopicName, "/update") {
if !this.subUpdate {
return
}

this.log("[DEBUG] topic: %s -- payload: %s", p.TopicName, string(p.Payload))

var points []msgbus.Point

if err := json.Unmarshal(p.Payload, &points); err != nil {
this.log("[ERROR] unmarshaling update points: %v", err)
return
}

env, err := msgbus.NewEnvelope(this.name, msgbus.Update{Updates: points})
if err != nil {
this.log("[ERROR] creating update message: %v", err)
return
}

if err := this.pusher.Push("RUNTIME", env); err != nil {
this.log("[ERROR] sending update message: %v", err)
return
}
}
}
70 changes: 62 additions & 8 deletions src/go/intercom/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/url"
"strconv"
"strings"

"github.com/patsec/ot-sim/msgbus"
Expand All @@ -19,6 +20,14 @@ import (
<intercom mode="client">
<endpoint>broker.example.com:1883</endpoint>
<client-id>ot-sim-jitp-test</client-id>
<publish>
<status>true</status>
<update>true</update>
</publish>
<subscribe>
<status>true</status>
<update>true</update>
</subscribe>
</mqtt>
*/

Expand All @@ -30,6 +39,11 @@ type IntercomClient struct {
name string
endpoint string

pubStatus bool
pubUpdate bool
subStatus bool
subUpdate bool

client *autopaho.ConnectionManager
pusher *msgbus.Pusher
}
Expand All @@ -38,6 +52,12 @@ func New(name string) *IntercomClient {
return &IntercomClient{
id: name,
name: name,

// default to pub/sub all messages
pubStatus: true,
pubUpdate: true,
subStatus: true,
subUpdate: true,
}
}

Expand All @@ -54,6 +74,28 @@ func (this *IntercomClient) Configure(e *etree.Element) error {
this.endpoint = child.Text()
case "client-id":
this.id = child.Text()
case "publish":
for _, child := range child.ChildElements() {
val, _ := strconv.ParseBool(child.Text())

switch child.Tag {
case "status":
this.pubStatus = val
case "update":
this.pubUpdate = val
}
}
case "subscribe":
for _, child := range child.ChildElements() {
val, _ := strconv.ParseBool(child.Text())

switch child.Tag {
case "status":
this.subStatus = val
case "update":
this.subUpdate = val
}
}
}
}

Expand All @@ -79,6 +121,7 @@ func (this *IntercomClient) Run(ctx context.Context, pubEndpoint, pullEndpoint s

subscriber := msgbus.MustNewSubscriber(pubEndpoint)
subscriber.AddStatusHandler(this.handleMsgBusEnvelope(ctx))
subscriber.AddUpdateHandler(this.handleMsgBusEnvelope(ctx))
subscriber.Start("RUNTIME")

endpoint, err := url.Parse(this.endpoint)
Expand All @@ -94,14 +137,17 @@ func (this *IntercomClient) Run(ctx context.Context, pubEndpoint, pullEndpoint s
OnConnectionUp: func(mgr *autopaho.ConnectionManager, _ *paho.Connack) {
this.log("[DEBUG] Intercom client connection up")

_, err := mgr.Subscribe(ctx, &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{Topic: "devices/#/status", QoS: 0, NoLocal: true},
{Topic: "devices/#/update", QoS: 0, NoLocal: true},
},
})
var subs []paho.SubscribeOptions

if err != nil {
if this.subStatus {
subs = append(subs, paho.SubscribeOptions{Topic: "devices/+/status", QoS: 0, NoLocal: true})
}

if this.subUpdate {
subs = append(subs, paho.SubscribeOptions{Topic: "devices/+/update", QoS: 0, NoLocal: true})
}

if _, err := mgr.Subscribe(ctx, &paho.Subscribe{Subscriptions: subs}); err != nil {
this.log("[ERROR] unable to subscribe to Intercom status and update topics: %v", err)
}

Expand Down Expand Up @@ -174,14 +220,18 @@ func (this *IntercomClient) handleIntercom(pub paho.PublishReceived) (bool, erro
return true, nil
}

func (this *IntercomClient) handleMsgBusEnvelope(ctx context.Context) msgbus.StatusHandler {
func (this *IntercomClient) handleMsgBusEnvelope(ctx context.Context) func(msgbus.Envelope) {
return func(env msgbus.Envelope) {
if env.Sender() == this.name {
return
}

switch env.Kind {
case msgbus.ENVELOPE_STATUS:
if !this.pubStatus {
return
}

status, err := env.Status()
if err != nil {
if !errors.Is(err, msgbus.ErrKindNotStatus) {
Expand All @@ -204,6 +254,10 @@ func (this *IntercomClient) handleMsgBusEnvelope(ctx context.Context) msgbus.Sta
queue := &autopaho.QueuePublish{Publish: &paho.Publish{Topic: topic, QoS: 0, Payload: payload}}
this.client.PublishViaQueue(ctx, queue)
case msgbus.ENVELOPE_UPDATE:
if !this.pubUpdate {
return
}

update, err := env.Update()
if err != nil {
if !errors.Is(err, msgbus.ErrKindNotUpdate) {
Expand Down

0 comments on commit 35c20bf

Please sign in to comment.