Skip to content

Commit

Permalink
improvements to server and client
Browse files Browse the repository at this point in the history
  • Loading branch information
activeshadow committed Jul 29, 2024
1 parent 1bdb3ba commit bea26a5
Show file tree
Hide file tree
Showing 6 changed files with 537 additions and 131 deletions.
18 changes: 16 additions & 2 deletions src/go/sunspec/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,19 @@ attribute to denote static values? I vote first option... less typing in config.
## TODO

* [x] Build out initial Model 1 (static data)
* [ ] Add OT-sim msg bus status and update handlers
* [ ] Figure out how to handle scaling config
* [x] Add OT-sim msg bus status handler
* do we need an update handler?
* [x] Figure out how to handle scaling config
* [ ] Support mapping OT-sim tag names client-side
* [ ] Support different scan rates for different models client-side
* [ ] Support writes client-side (subscribe to updates)

Server-side is "pretty much" done. Client side needs work 1) continuing to read
available models, and 2) mapping model points to OT-sim tags. The client doesn't
really need to know what models the server side is providing ahead of time since
it can query the server for that, but configuration-wise users will need to know
so they can assign tags to points. Alternatively, we could default to a
well-defined method of automatically mapping points to tags. This could be as
easy as the SunSpec point's name. If we find that point names are not unique
across all SunSpec models, we could prefix the name with the model number. We
can also do things like skip publishing scaling factors within OT-sim.
252 changes: 165 additions & 87 deletions src/go/sunspec/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package client
import (
"context"
"fmt"
"strconv"
"strings"
"time"

otsim "github.com/patsec/ot-sim"
"github.com/patsec/ot-sim/msgbus"
"github.com/patsec/ot-sim/sunspec/common"

"actshad.dev/modbus"
Expand All @@ -26,15 +28,23 @@ type SunSpecClient struct {
name string
id int
endpoint string
period time.Duration

models []int
registers map[int]common.Register
pusher *msgbus.Pusher
client modbus.Client

models *common.Models
registers map[int]*common.Register
points map[string]*common.Register
}

func New(name string) *SunSpecClient {
return &SunSpecClient{
name: name,
registers: make(map[int]common.Register),
period: 5 * time.Second,
models: &common.Models{Settings: make(map[int]common.ModelSettings)},
registers: make(map[int]*common.Register),
points: make(map[string]*common.Register),
}
}

Expand All @@ -51,136 +61,204 @@ func (this *SunSpecClient) Configure(e *etree.Element) error {
this.pubEndpoint = child.Text()
case "endpoint":
this.endpoint = child.Text()
case "model":
attr := child.SelectAttr("id")
if attr == nil {
return fmt.Errorf("missing 'id' attribute for SunSpec model")
}
case "period":
var err error

id, err := strconv.Atoi(attr.Value)
this.period, err = time.ParseDuration(child.Text())
if err != nil {
return fmt.Errorf("parsing model ID %s: %w", attr.Value, err)
return fmt.Errorf("invalid period '%s' provided for %s", child.Text(), this.name)
}

this.models = append(this.models, id)
}
}

return nil
}

func (this *SunSpecClient) Run(ctx context.Context, pubEndpoint, pullEndpoint string) error {
// Use ZeroMQ PUB endpoint specified in `sunspec` config block if provided.
if this.pubEndpoint != "" {
pubEndpoint = this.pubEndpoint
}

// Use ZeroMQ PULL endpoint specified in `sunspec` config block if provided.
if this.pullEndpoint != "" {
pullEndpoint = this.pullEndpoint
}

this.pusher = msgbus.MustNewPusher(pullEndpoint)

var handler modbus.ClientHandler

handler = modbus.NewTCPClientHandler(this.endpoint)
handler.(*modbus.TCPClientHandler).SlaveId = byte(this.id)

client := modbus.NewClient(handler)

r := common.IdentifierRegister
this.client = modbus.NewClient(handler)

data, err := client.ReadHoldingRegisters(40000, uint16(r.Count))
if err != nil {
return fmt.Errorf("reading identifier from SunSpec device %s: %w", this.endpoint, err)
if err := confirmIdentifier(this.client); err != nil {
return fmt.Errorf("confirming SunSpec identifier from SunSpec device %s: %w", this.endpoint, err)
}

value, err := r.Value(data)
if err != nil {
return fmt.Errorf("parsing identifier from SunSpec device %s: %w", this.endpoint, err)
}
var (
// start addr at 40002 after well known identifier
addr = 40002
model1 bool
)

if value != common.SunSpecIdentifier {
return fmt.Errorf("invalid identifier provided by remote device")
}
for {
id, length, err := nextModel(this.client, addr)
if err != nil {
return fmt.Errorf("getting next from SunSpec device %s: %w", this.endpoint, err)
}

// start addr at 40002 after well known identifier
addr := 40002
if !model1 {
if id == 1 {
model1 = true
} else {
return fmt.Errorf("remote SunSpec device missing required Model 1")
}
}

r = common.Register{DataType: "uint16"}
if id == int(common.EndRegister.InternalValue) {
break
}

if err := r.Init(); err != nil {
return fmt.Errorf("initializing generic model register %d: %w", addr, err)
}
// model id and length are 2 words long
addr += 2

data, err = client.ReadHoldingRegisters(uint16(addr), uint16(r.Count))
if err != nil {
return fmt.Errorf("reading model ID from SunSpec device %s: %w", this.endpoint, err)
}
// TODO: don't add model 1 once we're done testing
this.models.Order = append(this.models.Order, id)
this.models.Settings[id] = common.ModelSettings{StartAddr: addr, Length: length}

value, err = r.Value(data)
if err != nil {
return fmt.Errorf("parsing model ID from SunSpec device %s: %w", this.endpoint, err)
}
data, err := modelData(this.client, id, addr, length)
if err != nil {
return fmt.Errorf("reading Model %d data from SunSpec device %s: %w", id, this.endpoint, err)
}

if err := this.process(data); err != nil {
return fmt.Errorf("processing Model %d data: %w", id, err)
}

if value != 1 {
return fmt.Errorf("remote SunSpec device missing required Model 1")
addr += length
}

addr += r.Count
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(this.period):
for _, id := range this.models.Order {
var (
addr = this.models.Settings[id].StartAddr
length = this.models.Settings[id].Length
)

data, err := modelData(this.client, id, addr, length)
if err != nil {
this.log("reading Model %d data from SunSpec device %s: %v", id, this.endpoint, err)
}

if err := this.process(data); err != nil {
this.log("[ERROR] processing Model %d data: %v", id, err)
}
}
}
}
}()

data, err = client.ReadHoldingRegisters(uint16(addr), uint16(r.Count))
if err != nil {
return fmt.Errorf("reading model 1 length from SunSpec device %s: %w", this.endpoint, err)
}
for _, id := range this.models.Order {
model, err := common.GetModelSchema(id)
if err != nil {
return fmt.Errorf("getting schema for model %d: %w", id, err)
}

value, err = r.Value(data)
if err != nil {
return fmt.Errorf("parsing model 1 length from SunSpec device %s: %w", this.endpoint, err)
}
for idx, point := range model.Group.Points {
if idx < 2 {
continue
}

addr += r.Count
r := this.points[point.Name]

data, err = client.ReadHoldingRegisters(uint16(addr), uint16(value))
if err != nil {
return fmt.Errorf("reading rest of model 1 data from SunSpec device %s: %w", this.endpoint, err)
}
if strings.HasPrefix(r.DataType, "string") {
value, err := r.String(r.Raw)
if err != nil {
return fmt.Errorf("parsing string value for SunSpec point: %w", err)
}

model, err := common.GetModelSchema(1)
if err != nil {
return fmt.Errorf("getting model schema: %w", err)
}
fmt.Printf("%s - %s\n", r.Name, value)
} else {
scaling := 0.0

// track position of current model data array
var pos int
if r.ScaleRegister != "" {
p, ok := this.points[r.ScaleRegister]
if !ok {
this.log("[ERROR] scaling factor %s does not exist", r.ScaleRegister)
continue
}

for idx, point := range model.Group.Points {
if idx < 2 {
continue
}
scaling = p.InternalValue
}

dt := string(point.Type)
if dt == string(common.PointTypeString) {
dt = fmt.Sprintf("string%d", point.Size)
}
value, err := r.Value(r.Raw, scaling)
if err != nil {
return fmt.Errorf("parsing value for SunSpec point: %w", err)
}

r := common.Register{
DataType: dt,
Tag: point.Name,
fmt.Printf("%s - %f\n", r.Name, value)
}
}
}

if err := r.Init(); err != nil {
return fmt.Errorf("initializing register %d: %w", addr, err)
}
return nil
}

bytes := data[pos : pos+(point.Size*2)]
func (this SunSpecClient) process(data map[string]*common.Register) error {
var points []msgbus.Point

if point.Type == common.PointTypeString {
value, err := r.String(bytes)
if err != nil {
return fmt.Errorf("parsing string value for SunSpec point: %w", err)
for name, reg := range data {
this.points[name] = reg

if !strings.HasPrefix(reg.DataType, "string") {
if reg.DataType == "pad" || reg.DataType == "sunssf" {
continue
}

scaling := 0.0

if reg.ScaleRegister != "" {
p, ok := this.points[reg.ScaleRegister]
if !ok {
this.log("[ERROR] scaling factor %s does not exist", reg.ScaleRegister)
continue
}

scaling = p.InternalValue
}

fmt.Printf("%s - %s\n", point.Name, value)
} else {
value, err := r.Value(bytes)
value, err := reg.Value(reg.Raw, scaling)
if err != nil {
return fmt.Errorf("parsing value for SunSpec point: %w", err)
this.log("[ERROR] parsing value for SunSpec point: %v", err)
continue
}

fmt.Printf("%s - %f\n", point.Name, value)
points = append(points, msgbus.Point{Tag: name, Value: value})
}
}

if len(points) > 0 {
points = append(points, msgbus.Point{Tag: fmt.Sprintf("%s.connected", this.name), Value: 1.0})
} else {
points = append(points, msgbus.Point{Tag: fmt.Sprintf("%s.connected", this.name), Value: 0.0})
this.log("[ERROR] no measurements read from %s", this.endpoint)
}

env, err := msgbus.NewEnvelope(this.name, msgbus.Status{Measurements: points})
if err != nil {
return fmt.Errorf("creating status message: %w", err)
}

pos += point.Size * 2
if err := this.pusher.Push("RUNTIME", env); err != nil {
return fmt.Errorf("sending status message: %w", err)
}

return nil
Expand Down
Loading

0 comments on commit bea26a5

Please sign in to comment.