From 5fcebbb66ccaf4efc2cd75ca625adfd96a0dcc94 Mon Sep 17 00:00:00 2001 From: "Bryan T. Richardson" Date: Mon, 29 Jul 2024 14:38:24 -0600 Subject: [PATCH] handle OT-sim updates in the SunSpec client (need to test) --- src/go/sunspec/README.md | 2 +- src/go/sunspec/client/client.go | 74 +++++++++++++++++++++++++++---- src/go/sunspec/client/util.go | 4 ++ src/go/sunspec/common/register.go | 3 +- 4 files changed, 73 insertions(+), 10 deletions(-) diff --git a/src/go/sunspec/README.md b/src/go/sunspec/README.md index c2d060c..100d9a7 100644 --- a/src/go/sunspec/README.md +++ b/src/go/sunspec/README.md @@ -52,7 +52,7 @@ attribute to denote static values? I vote first option... less typing in config. * [x] Figure out how to handle scaling config * [ ] Support mapping OT-sim tag names client-side * [ ] Support different scan rates for different models client-side -* [ ] Support writes client-side (subscribe to updates) +* [-] Support writes client-side (subscribe to updates) Server-side is "pretty much" done. Client side needs work 1) continuing to read available models, and 2) mapping model points to OT-sim tags. The client doesn't diff --git a/src/go/sunspec/client/client.go b/src/go/sunspec/client/client.go index ab6deb3..a0c67b8 100644 --- a/src/go/sunspec/client/client.go +++ b/src/go/sunspec/client/client.go @@ -2,6 +2,7 @@ package client import ( "context" + "errors" "fmt" "strings" "time" @@ -33,18 +34,16 @@ type SunSpecClient struct { pusher *msgbus.Pusher client modbus.Client - models *common.Models - registers map[int]*common.Register - points map[string]*common.Register + models *common.Models + points map[string]*common.Register } func New(name string) *SunSpecClient { return &SunSpecClient{ - name: name, - period: 5 * time.Second, - models: &common.Models{Settings: make(map[int]common.ModelSettings)}, - registers: make(map[int]*common.Register), - points: make(map[string]*common.Register), + name: name, + period: 5 * time.Second, + models: &common.Models{Settings: make(map[int]common.ModelSettings)}, + points: make(map[string]*common.Register), } } @@ -86,6 +85,10 @@ func (this *SunSpecClient) Run(ctx context.Context, pubEndpoint, pullEndpoint st } this.pusher = msgbus.MustNewPusher(pullEndpoint) + subscriber := msgbus.MustNewSubscriber(pubEndpoint) + + subscriber.AddUpdateHandler(this.handleMsgBusUpdate) + subscriber.Start("RUNTIME") var handler modbus.ClientHandler @@ -209,6 +212,11 @@ func (this *SunSpecClient) Run(ctx context.Context, pubEndpoint, pullEndpoint st } } + go func() { + <-ctx.Done() + subscriber.Stop() + }() + return nil } @@ -264,6 +272,56 @@ func (this SunSpecClient) process(data map[string]*common.Register) error { return nil } +func (this *SunSpecClient) handleMsgBusUpdate(env msgbus.Envelope) { + if env.Sender() == this.name { + return + } + + update, err := env.Update() + if err != nil { + if !errors.Is(err, msgbus.ErrKindNotUpdate) { + this.log("[ERROR] getting update message from envelope: %v", err) + } + + return + } + + for _, point := range update.Updates { + if reg, ok := this.points[point.Tag]; ok { + if reg.Addr == 0 { + // Not a R/W point in SunSpec - this only gets initialized to a non-zero + // address in the `modelData` function if the SunSpec point is marked + // with R/W access. + continue + } + + scaling := 0.0 + + if reg.ScaleRegister != "" { + p, ok := this.points[reg.ScaleRegister] + if !ok { + this.log("[ERROR] scaling factor %s does not exist", reg.ScaleRegister) + continue + } + + scaling = p.InternalValue + } + + data, err := reg.Bytes(point.Value, scaling) + if err != nil { + this.log("[ERROR] converting register value to bytes: %v", err) + continue + } + + if _, err := this.client.WriteMultipleRegisters(uint16(reg.Addr), uint16(reg.Count), data); err != nil { + this.log("[ERROR] writing SunSpec point %s (addr: %d) to %f at %s: %v", reg.Name, uint16(reg.Addr), point.Value, this.endpoint, err) + } + + this.log("writing SunSpec point %s (addr: %d) at %s --> %f", reg.Name, uint16(reg.Addr), this.endpoint, point.Value) + } + } +} + func (this SunSpecClient) log(format string, a ...any) { fmt.Printf("[%s] %s\n", this.name, fmt.Sprintf(format, a...)) } diff --git a/src/go/sunspec/client/util.go b/src/go/sunspec/client/util.go index 110700a..884ec05 100644 --- a/src/go/sunspec/client/util.go +++ b/src/go/sunspec/client/util.go @@ -85,6 +85,10 @@ func modelData(c modbus.Client, m, a, l int) (map[string]*common.Register, error Model: m, } + if p.Access == common.PointAccessRW { + r.Addr = a + pos + } + switch sf := p.Sf.(type) { case nil: // noop diff --git a/src/go/sunspec/common/register.go b/src/go/sunspec/common/register.go index dd91096..1019dce 100644 --- a/src/go/sunspec/common/register.go +++ b/src/go/sunspec/common/register.go @@ -21,7 +21,8 @@ type Register struct { InternalValue float64 InternalString string - Raw []byte // used for storing raw bytes until SF is known + Raw []byte // used for storing raw bytes until SF is known + Addr int // used for writing data } func (this *Register) Init() error {