Skip to content

Commit

Permalink
Explode array read on PLC4X driver. Reconnect to redundant device.
Browse files Browse the repository at this point in the history
  • Loading branch information
riclolsen committed Jul 9, 2024
1 parent 3ce9b96 commit 89d5720
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 53 deletions.
1 change: 1 addition & 0 deletions src/plc4x-client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type protocolConnection struct {
ReadRequest model.PlcReadRequest
AddrSeparator string
AutoKeyId int
ReconnectCount int
}

// check error, terminate app if error
Expand Down
184 changes: 131 additions & 53 deletions src/plc4x-client/plc4x-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"log"
"os"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -220,8 +221,10 @@ func main() {
}

// log connection info
connUrl := protocolConn.EndpointURLs[protocolConn.ReconnectCount%len(protocolConn.EndpointURLs)]
protocolConn.ReconnectCount++
log.Printf("Instance: %d Connection: %d %s", protocolConn.ProtocolDriverInstanceNumber, protocolConn.ProtocolConnectionNumber, protocolConn.Name)
log.Printf("%s: Server endpoint URL: %s", protocolConn.Name, protocolConn.EndpointURLs[0])
log.Printf("%s: Server endpoint URL: %s", protocolConn.Name, connUrl)
protocolId := strings.Split(protocolConn.EndpointURLs[0], ":")[0]
addrSep := ":"
addrParts := 1
Expand Down Expand Up @@ -251,13 +254,13 @@ func main() {
addrParts = 3
case "simulated":
default:
log.Fatal(protocolConn.Name + ": Unsupported protocol - " + protocolConn.EndpointURLs[0])
log.Fatal(protocolConn.Name + ": Unsupported protocol - " + protocolId)
continue
}
protocolConn.AddrSeparator, _ = addrSep, addrParts

// try to connect to plc
connectionRequestChanel := driverManager.GetConnection(protocolConn.EndpointURLs[0])
connectionRequestChanel := driverManager.GetConnection(connUrl)
connectionResult := <-connectionRequestChanel
if connectionResult.GetErr() != nil {
log.Printf("%s: Error connecting to PLC: %s", protocolConn.Name, connectionResult.GetErr().Error())
Expand Down Expand Up @@ -323,17 +326,47 @@ func main() {
reqBld.AddTagAddress(plc4xTagName, plc4xAddress)

if protocolConn.AutoCreateTags {
rtd := NewRtDataTag()
rtd.Tag = jsTagName
rtd.ProtocolSourceConnectionNumber = float64(protocolConn.ProtocolConnectionNumber)
rtd.ProtocolSourceObjectAddress = plc4xAddress
rtd.Group1 = DriverName
rtd.Group2 = protocolConn.Name
rtd.Group3 = plc4xAddress
rtd.Type = typeJsTag
protocolConn.AutoCreateTag(&rtd, collectionRtData)
if logLevel >= logLevelBasic {
log.Printf(protocolConn.Name+": tagName: %s address: %s", rtd.Tag, plc4xAddress)
// extract number between [ ] from plc4xAddress
sNum := regexp.MustCompile(`\[.*?\]`).FindString(plc4xAddress)
numElemArr := 0
if sNum != "" {
sNum = sNum[1 : len(sNum)-1]
var err error
numElemArr, err = strconv.Atoi(sNum)
if err != nil {
log.Println(protocolConn.Name + ": error parsing array number from address: " + plc4xAddress)
continue
}
}

if numElemArr > 1 {
for i := 0; i < numElemArr; i++ {
rtd := NewRtDataTag()
rtd.Tag = jsTagName + "[" + fmt.Sprint(i) + "]"
rtd.ProtocolSourceConnectionNumber = float64(protocolConn.ProtocolConnectionNumber)
rtd.ProtocolSourceObjectAddress = plc4xAddress + "[" + fmt.Sprint(i) + "]"
rtd.Group1 = DriverName
rtd.Group2 = protocolConn.Name
rtd.Group3 = plc4xAddress
rtd.Type = typeJsTag
protocolConn.AutoCreateTag(&rtd, collectionRtData)
if logLevel >= logLevelBasic {
log.Printf(protocolConn.Name+": tagName: %s address: %s", rtd.Tag, plc4xAddress+"["+fmt.Sprint(i)+"]")
}
}
} else {
rtd := NewRtDataTag()
rtd.Tag = jsTagName
rtd.ProtocolSourceConnectionNumber = float64(protocolConn.ProtocolConnectionNumber)
rtd.ProtocolSourceObjectAddress = plc4xAddress
rtd.Group1 = DriverName
rtd.Group2 = protocolConn.Name
rtd.Group3 = plc4xAddress
rtd.Type = typeJsTag
protocolConn.AutoCreateTag(&rtd, collectionRtData)
if logLevel >= logLevelBasic {
log.Printf(protocolConn.Name+": tagName: %s address: %s", rtd.Tag, plc4xAddress)
}
}
} else {
if logLevel >= logLevelBasic {
Expand Down Expand Up @@ -366,39 +399,75 @@ func main() {
// Do something with the response
var updOpers []mongo.WriteModel
for _, plc4xTagName := range readRequestResult.GetResponse().GetTagNames() {
updOper := mongo.NewUpdateOneModel() // update one
updOper.SetFilter(bson.D{
{Key: "protocolSourceConnectionNumber", Value: protocolConn.ProtocolConnectionNumber},
{Key: "protocolSourceObjectAddress", Value: plc4xTagName},
})
v := readRequestResult.GetResponse().GetValue(plc4xTagName)

valDbl, valStr, valJson := extractValue(v, protocolConn, plc4xTagName, logLevel)
var valBson bson.D
bson.Unmarshal([]byte(valJson), &valBson)
updOper.SetUpdate(bson.D{
{Key: "$set", Value: bson.D{
{Key: "sourceDataUpdate", Value: bson.D{
{Key: "valueAtSource", Value: float64(valDbl)},
{Key: "valueStringAtSource", Value: valStr},
{Key: "valueJsonAtSource", Value: valJson},
{Key: "valueBsonAtSource", Value: valBson},
{Key: "invalidAtSource", Value: false},
{Key: "notTopicalAtSource", Value: false},
{Key: "substitutedAtSource", Value: false},
{Key: "blockedAtSource", Value: false},
{Key: "overflowAtSource", Value: false},
{Key: "transientAtSource", Value: false},
{Key: "carryAtSource", Value: false},
{Key: "asduAtSource", Value: v.GetPlcValueType().String()},
{Key: "causeOfTransmissionAtSource", Value: 20},
{Key: "timeTag", Value: time.Now()},
//{Key: "timeTagAtSource", Value: time.Now()},
//{Key: "timeTagAtSourceOk", Value: false},
valDbl, valStr, valJson, valArrDbl := extractValue(v, protocolConn, plc4xTagName, logLevel)
if len(valArrDbl) > 1 {
for i := 0; i < len(valArrDbl); i++ {
valStr = fmt.Sprintf("%f", valArrDbl[i])
valJson = valStr
//var valBson bson.D
//bson.Unmarshal([]byte(valJson), &valBson)
updOper := mongo.NewUpdateOneModel() // update one
updOper.SetFilter(bson.D{
{Key: "protocolSourceConnectionNumber", Value: protocolConn.ProtocolConnectionNumber},
{Key: "protocolSourceObjectAddress", Value: plc4xTagName + "[" + fmt.Sprint(i) + "]"},
})
updOper.SetUpdate(bson.D{
{Key: "$set", Value: bson.D{
{Key: "sourceDataUpdate", Value: bson.D{
{Key: "valueAtSource", Value: valArrDbl[i]},
{Key: "valueStringAtSource", Value: valStr},
{Key: "valueJsonAtSource", Value: valJson},
//{Key: "valueBsonAtSource", Value: valBson},
{Key: "invalidAtSource", Value: false},
{Key: "notTopicalAtSource", Value: false},
{Key: "substitutedAtSource", Value: false},
{Key: "blockedAtSource", Value: false},
{Key: "overflowAtSource", Value: false},
{Key: "transientAtSource", Value: false},
{Key: "carryAtSource", Value: false},
{Key: "asduAtSource", Value: v.GetPlcValueType().String()},
{Key: "causeOfTransmissionAtSource", Value: 20},
{Key: "timeTag", Value: time.Now()},
//{Key: "timeTagAtSource", Value: time.Now()},
//{Key: "timeTagAtSourceOk", Value: false},
}},
}},
})
updOpers = append(updOpers, updOper)
}
} else {
var valBson bson.D
bson.Unmarshal([]byte(valJson), &valBson)
updOper := mongo.NewUpdateOneModel() // update one
updOper.SetFilter(bson.D{
{Key: "protocolSourceConnectionNumber", Value: protocolConn.ProtocolConnectionNumber},
{Key: "protocolSourceObjectAddress", Value: plc4xTagName},
})
updOper.SetUpdate(bson.D{
{Key: "$set", Value: bson.D{
{Key: "sourceDataUpdate", Value: bson.D{
{Key: "valueAtSource", Value: float64(valDbl)},
{Key: "valueStringAtSource", Value: valStr},
{Key: "valueJsonAtSource", Value: valJson},
{Key: "valueBsonAtSource", Value: valBson},
{Key: "invalidAtSource", Value: false},
{Key: "notTopicalAtSource", Value: false},
{Key: "substitutedAtSource", Value: false},
{Key: "blockedAtSource", Value: false},
{Key: "overflowAtSource", Value: false},
{Key: "transientAtSource", Value: false},
{Key: "carryAtSource", Value: false},
{Key: "asduAtSource", Value: v.GetPlcValueType().String()},
{Key: "causeOfTransmissionAtSource", Value: 20},
{Key: "timeTag", Value: time.Now()},
//{Key: "timeTagAtSource", Value: time.Now()},
//{Key: "timeTagAtSourceOk", Value: false},
}},
}},
}},
})
updOpers = append(updOpers, updOper)
})
updOpers = append(updOpers, updOper)
}
}
if len(updOpers) > 0 {
select {
Expand Down Expand Up @@ -435,7 +504,8 @@ func main() {
}
}

func extractValue(v values.PlcValue, protocolConn *protocolConnection, plc4xTagName string, logLevel int) (valDbl float64, valStr string, valJson string) {
func extractValue(v values.PlcValue, protocolConn *protocolConnection, plc4xTagName string, logLevel int) (valDbl float64, valStr string, valJson string, valArrDbl []float64) {
valArrDbl = []float64{}
valJson = "{}"
switch v.GetPlcValueType().String() {
case "Unknown", "NULL":
Expand Down Expand Up @@ -522,10 +592,14 @@ func extractValue(v values.PlcValue, protocolConn *protocolConnection, plc4xTagN
log.Printf(protocolConn.Name+": Read struct result '%s': %s\n", plc4xTagName, valStr)
}
case "List":
if len(v.GetList()) > 0 {
valDbl, _, _ = extractValue(v.GetList()[0], protocolConn, plc4xTagName, logLevel)
for i := 0; i < len(v.GetList()); i++ {
vd, _, _, _ := extractValue(v.GetList()[i], protocolConn, plc4xTagName, logLevel)
valArrDbl = append(valArrDbl, float64(vd))
if i == 0 {
valDbl = vd
}
}
if ba, err := json.Marshal(v.GetList()); err != nil {
if ba, err := json.Marshal(valArrDbl); err != nil {
log.Printf(protocolConn.Name+": error marshalling list: %s", err.Error())
} else {
valJson = string(ba)
Expand All @@ -535,10 +609,14 @@ func extractValue(v values.PlcValue, protocolConn *protocolConnection, plc4xTagN
log.Printf(protocolConn.Name+": Read list result '%s': %s\n", plc4xTagName, valStr)
}
case "RAW_BYTE_ARRAY":
if len(v.GetRaw()) > 0 {
valDbl = float64(v.GetRaw()[0])
vdArr := []float64{}
for i := 0; i < len(v.GetRaw()); i++ {
valArrDbl = append(vdArr, float64(v.GetRaw()[i]))
if i == 0 {
valDbl = float64(v.GetRaw()[i])
}
}
if ba, err := json.Marshal(v.GetRaw()); err != nil {
if ba, err := json.Marshal(valArrDbl); err != nil {
log.Printf(protocolConn.Name+": error marshalling raw array: %s", err.Error())
} else {
valJson = string(ba)
Expand Down Expand Up @@ -581,7 +659,7 @@ func mongoWriter(cfg configData, instanceNumber int, chMongoBw chan []mongo.Writ
break
}
if logLevel >= logLevelDetailed {
log.Printf("Mongodb - Matched count: %d, Updated Count: %d", res.MatchedCount, res.ModifiedCount)
log.Printf("Mongodb - Opers: %d, Matched count: %d, Updated Count: %d", len(updOpers), res.MatchedCount, res.ModifiedCount)
}
}
}
Expand Down

0 comments on commit 89d5720

Please sign in to comment.