From 510ab4f616f615682a87dcbf4f4c56f03a2506d9 Mon Sep 17 00:00:00 2001 From: Ricardo Olsen Date: Sun, 30 Jun 2024 11:33:55 -0300 Subject: [PATCH] Prepare for bulk write on Plc4x-client. --- src/plc4x-client/autotag.go | 6 +- src/plc4x-client/config.go | 11 +- src/plc4x-client/plc4x-client.go | 262 +++++++++++++++++++++++-------- 3 files changed, 202 insertions(+), 77 deletions(-) diff --git a/src/plc4x-client/autotag.go b/src/plc4x-client/autotag.go index 2ac247c7..062c9a88 100644 --- a/src/plc4x-client/autotag.go +++ b/src/plc4x-client/autotag.go @@ -174,14 +174,12 @@ func (pc *protocolConnection) AutoCreateTag(rtData *RtDataTag, rtDataCollection rtData.Id = float64(pc.AutoKeyId) rtData.Description = rtData.Tag rtData.UngroupedDescription = rtData.ProtocolSourceObjectAddress - rtData.Type = "analog" // default type rtData.EventTextFalse = "OFF" rtData.EventTextTrue = "ON" rtData.StateTextFalse = "OFF" rtData.StateTextTrue = "ON" rtData.Origin = "supervised" - rtData.Unit = "" // default unit + rtData.Unit = "" rtDataCollection.InsertOne(context.TODO(), rtData) - _ListCreatedTags[rtData.Tag] = "analog" - return + _ListCreatedTags[rtData.Tag] = rtData.Type } diff --git a/src/plc4x-client/config.go b/src/plc4x-client/config.go index 19ba56b1..ee83bd90 100644 --- a/src/plc4x-client/config.go +++ b/src/plc4x-client/config.go @@ -36,8 +36,6 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) -var logLevel = 1 - const ( logLevelMin = 0 logLevelBasic = 1 @@ -48,6 +46,8 @@ const ( //const udpChannelSize = 1000 //const udpReadBufferPackets = 100 +var logLevel = logLevelBasic + type configData struct { NodeName string `json:"nodeName"` MongoConnectionString string `json:"mongoConnectionString"` @@ -121,9 +121,10 @@ func checkFatalError(err error) { } } -func readConfigFile() (cfg configData, instanceNumber int, logLevel int) { +func readConfigFile() (cfg configData, instanceNumber int, instLogLevel int) { var err error instanceNumber = 1 + instLogLevel = logLevelBasic if os.Getenv("JS_"+DriverName+"_INSTANCE") != "" { i, err := strconv.Atoi(os.Getenv("JS_PLC4X_INSTANCE")) if err != nil { @@ -147,10 +148,10 @@ func readConfigFile() (cfg configData, instanceNumber int, logLevel int) { log.Println("JS_" + DriverName + "_LOGLEVEL environment variable should be a number!") os.Exit(2) } - logLevel = i + instLogLevel = i } if len(os.Args) > 2 { - logLevel, err = strconv.Atoi(os.Args[2]) + instLogLevel, err = strconv.Atoi(os.Args[2]) if err != nil { log.Println("Log Level parameter should be a number!") os.Exit(2) diff --git a/src/plc4x-client/plc4x-client.go b/src/plc4x-client/plc4x-client.go index d1297b7d..ebf58e69 100644 --- a/src/plc4x-client/plc4x-client.go +++ b/src/plc4x-client/plc4x-client.go @@ -23,8 +23,10 @@ import ( "context" "encoding/binary" "encoding/json" + "fmt" "log" "os" + "strconv" "strings" "sync" "time" @@ -33,10 +35,12 @@ import ( "github.com/apache/plc4x/plc4go/pkg/api/config" "github.com/apache/plc4x/plc4go/pkg/api/drivers" "github.com/apache/plc4x/plc4go/pkg/api/transports" + "github.com/apache/plc4x/plc4go/pkg/api/values" "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) var ( @@ -204,9 +208,6 @@ func main() { log.Println(SoftwareVersion) log.Println("Usage plc4x-client [instance number] [log level] [config file name]") - zerolog.TimeFieldFormat = zerolog.TimeFormatUnix - zerolog.SetGlobalLevel(zerolog.WarnLevel) - // Create a new instance of the PlcDriverManager driverManager := plc4go.NewPlcDriverManager(config.WithCustomLogger(zerolog.Logger{})) // Register the Transports @@ -234,7 +235,7 @@ func main() { someConnectionHasCommandsEnabled := false cfg, instanceNumber, ll := readConfigFile() - logLevel = ll + logLevel := ll log.Print("Mongodb - Try to connect server...") client, collectionRtData, collectionInstances, collectionConnections, collectionCommands, err = mongoConnect(cfg) @@ -242,6 +243,16 @@ func main() { defer client.Disconnect(context.TODO()) protocolConns, csCommands := configInstance(client, collectionInstances, collectionConnections, collectionCommands, instanceNumber) + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + if logLevel >= logLevelDebug { + zerolog.SetGlobalLevel(zerolog.DebugLevel) + } else if logLevel >= logLevelDetailed { + zerolog.SetGlobalLevel(zerolog.InfoLevel) + } else { + zerolog.SetGlobalLevel(zerolog.WarnLevel) + } + log.Println("Log level set to:", logLevel) + var waitGroup sync.WaitGroup if someConnectionHasCommandsEnabled { waitGroup.Add(1) @@ -273,7 +284,7 @@ func main() { } // log connection info - log.Printf("Instance:%d Connection:%d %s", protocolConn.ProtocolDriverInstanceNumber, protocolConn.ProtocolConnectionNumber, protocolConn.Name) + log.Printf("Instance: %d Connection: %d %s", protocolConn.ProtocolDriverInstanceNumber, protocolConn.ProtocolConnectionNumber, protocolConn.Name) log.Printf("%s: Server endpoint URL: %s", protocolConn.Name, protocolConn.EndpointURLs[0]) protocolId := strings.Split(protocolConn.EndpointURLs[0], ":")[0] addrSep := ":" @@ -363,6 +374,16 @@ func main() { plc4xAddress = topic } plc4xTagName = plc4xAddress + typeJsTag := "analog" + addr := strings.ToUpper(plc4xAddress) + switch { + case strings.HasSuffix(addr, "BOOL"): + typeJsTag = "digital" + case strings.HasSuffix(addr, "STRING"), strings.HasSuffix(addr, "CHAR"): + typeJsTag = "string" + case strings.HasSuffix(addr, "Struct"), strings.HasSuffix(addr, "LIST"), strings.HasSuffix(addr, "RAW_BYTE_ARRAY"): + typeJsTag = "json" + } /* switch len(spl) { case 1: @@ -396,10 +417,15 @@ func main() { rtd.Group1 = DriverName rtd.Group2 = protocolConn.Name rtd.Group3 = topic + rtd.Type = typeJsTag protocolConn.AutoCreateTag(&rtd, collectionRtData) - log.Printf(protocolConn.Name+": tagName: %s address: %s", rtd.Tag, plc4xAddress) + if logLevel >= logLevelBasic { + log.Printf(protocolConn.Name+": tagName: %s address: %s", rtd.Tag, plc4xAddress) + } } else { - log.Printf(protocolConn.Name+": address: %s", plc4xAddress) + if logLevel >= logLevelBasic { + log.Printf(protocolConn.Name+": address: %s", plc4xAddress) + } } } protocolConn.ReadRequest, err = reqBld.Build() @@ -409,7 +435,9 @@ func main() { } execRead := func() error { - log.Println(protocolConn.Name + ": integrity read...") + if logLevel >= logLevelBasic { + log.Println(protocolConn.Name + ": integrity read...") + } // Execute a read-request readResponseChanel := protocolConn.ReadRequest.Execute() @@ -422,65 +450,52 @@ func main() { } // Do something with the response + var updOpers []mongo.WriteModel for _, plc4xTagName := range readRequestResult.GetResponse().GetTagNames() { + updOper := mongo.NewUpdateOneModel() // update one + updOper.SetFilter(bson.D{ + {Key: "protocolSourceConnectionNumber", Value: protocolConn.ProtocolConnectionNumber}, + {Key: "protocolSourceObjectAddress", Value: plc4xTagName}, + }) v := readRequestResult.GetResponse().GetValue(plc4xTagName) - var valDbl float64 = 0 - var valStr string = "" - switch v.GetPlcValueType().String() { - case "Unknown": - case "NULL": - case "BOOL": - case "BYTE": - case "WORD": - case "DWORD": - case "LWORD": - case "USINT": - case "UINT": - case "UDINT": - case "ULINT": - case "SINT": - case "INT": - log.Printf(protocolConn.Name+": Read result '%s': %04xh %d\n", plc4xTagName, v.GetInt16(), v.GetInt16()) - valDbl = float64(v.GetFloat64()) - /* - wReqBld := protocolConn.PlcConn.WriteRequestBuilder() - wReqBld.AddTagAddress(plc4xTagName, plc4xTagName, v.GetInt16()+1) - if wReq, err := wReqBld.Build(); err != nil { - log.Printf(protocolConn.Name+": error preparing write-request: %s", err.Error()) - } else { - // Execute a write-request - resChan := wReq.Execute() - // Wait for the response to finish - wResponse := <-resChan - log.Println(wResponse.GetResponse().GetResponseCode(plc4xTagName)) - log.Println(wResponse.String()) - } - */ - - case "DINT": - case "LINT": - case "REAL": - case "LREAL": - case "CHAR": - case "WCHAR": - case "STRING": - valStr = v.GetString() - log.Printf(protocolConn.Name+": Read result '%s': %s\n", plc4xTagName, valStr) - case "WSTRING": - case "TIME": - case "LTIME": - case "DATE": - case "LDATE": - case "TIME_OF_DAY": - case "LTIME_OF_DAY": - case "DATE_AND_TIME": - case "DATE_AND_LTIME": - case "LDATE_AND_TIME": - case "Struct": - case "List": - case "RAW_BYTE_ARRAY": + + valDbl, valStr, valJson := extractValue(v, protocolConn, plc4xTagName, logLevel) + + updOper.SetUpdate(bson.D{ + {Key: "$set", Value: bson.D{ + {Key: "sourceDataUpdate", Value: bson.D{}}, + {Key: "valueAtSource", Value: float64(valDbl)}, + {Key: "valueStringAtSource", Value: valStr}, + {Key: "valueJsonAtSource", Value: valJson}, + {Key: "invalidAtSource", Value: false}, + {Key: "notTopicalAtSource", Value: false}, + {Key: "substitutedAtSource", Value: false}, + {Key: "blockedAtSource", Value: false}, + {Key: "overflowAtSource", Value: false}, + {Key: "transientAtSource", Value: false}, + {Key: "carryAtSource", Value: false}, + {Key: "asduAtSource", Value: v.GetPlcValueType().String()}, + {Key: "causeOfTransmissionAtSource", Value: 20}, + {Key: "timeTag", Value: time.Now()}, + //{Key: "timeTagAtSource", Value: time.Now()}, + //{Key: "timeTagAtSourceOk", Value: false}, + }}, + }) + updOpers = append(updOpers, updOper) + } + if len(updOpers) > 0 { + res, err := collectionRtData.BulkWrite( + context.Background(), + updOpers, + options.BulkWrite().SetOrdered(false), + ) + if res == nil || err != nil { + log.Println("Mongodb - bulk error!") + log.Println(err) + } + if logLevel >= logLevelDetailed { + log.Printf("Mongodb - Matched count: %d, Updated Count: %d", res.MatchedCount, res.ModifiedCount) } - _ = valDbl } return nil } @@ -506,11 +521,122 @@ func main() { } }() } - time.Sleep(10 * time.Second) } +} - // Make sure the connection is closed at the end - defer protocolConns[0].PlcConn.Close() - +func extractValue(v values.PlcValue, protocolConn *protocolConnection, plc4xTagName string, logLevel int) (valDbl float64, valStr string, valJson string) { + valJson = "{}" + switch v.GetPlcValueType().String() { + case "Unknown", "NULL": + valStr = v.GetPlcValueType().String() + if ba, err := json.Marshal(v); err == nil { + valJson = string(ba) + } + if logLevel >= logLevelDebug { + log.Printf(protocolConn.Name+": Read result '%s': %s\n", plc4xTagName, valStr) + } + case "BOOL": + if v.GetBool() { + valDbl = 1 + valStr = "true" + } else { + valDbl = 0 + valStr = "false" + } + if ba, err := json.Marshal(v.GetBool()); err == nil { + valJson = string(ba) + } + if logLevel >= logLevelDebug { + log.Printf(protocolConn.Name+": Read result '%s': %s\n", plc4xTagName, valStr) + } + case "BYTE", + "WORD", + "DWORD", + "LWORD", + "USINT", + "UINT", + "UDINT", + "ULINT", + "SINT", + "INT", + "DINT", + "LINT", + "REAL", + "LREAL": + valDbl = float64(v.GetFloat64()) + valStr = fmt.Sprintf("%f", valDbl) + if ba, err := json.Marshal(valDbl); err == nil { + valJson = string(ba) + } + if logLevel >= logLevelDetailed { + log.Printf(protocolConn.Name+": Read result '%s': %08xh %18.6f\n", plc4xTagName, uint64(valDbl), valDbl) + } + case "TIME", + "LTIME", + "DATE", + "LDATE", + "TIME_OF_DAY", + "LTIME_OF_DAY", + "DATE_AND_TIME", + "DATE_AND_LTIME", + "LDATE_AND_TIME": + valDbl := float64(v.GetDateTime().UnixMilli()) + valStr = fmt.Sprintf("%f", valDbl) + if ba, err := json.Marshal(v.GetDateTime()); err == nil { + valJson = string(ba) + } + if logLevel >= logLevelDetailed { + log.Printf(protocolConn.Name+": Read result '%s': %f %s\n", plc4xTagName, valDbl, v.GetDateTime()) + } + case "CHAR": + case "WCHAR": + case "STRING": + case "WSTRING": + valStr := v.GetString() + valDbl, _ = strconv.ParseFloat(valStr, 64) + if ba, err := json.Marshal(valStr); err == nil { + valJson = string(ba) + } + if logLevel >= logLevelDetailed { + log.Printf(protocolConn.Name+": Read result '%s': %s\n", plc4xTagName, valStr) + } + case "Struct": + if ba, err := json.Marshal(v.GetStruct()); err != nil { + log.Printf(protocolConn.Name+": error marshalling struct: %s", err.Error()) + } else { + valJson = string(ba) + valStr = valJson + } + if logLevel >= logLevelDetailed { + log.Printf(protocolConn.Name+": Read struct result '%s': %s\n", plc4xTagName, valStr) + } + case "List": + if len(v.GetList()) > 0 { + valDbl, _, _ = extractValue(v.GetList()[0], protocolConn, plc4xTagName, logLevel) + } + if ba, err := json.Marshal(v.GetList()); err != nil { + log.Printf(protocolConn.Name+": error marshalling list: %s", err.Error()) + } else { + valJson = string(ba) + valStr = valJson + } + if logLevel >= logLevelDetailed { + log.Printf(protocolConn.Name+": Read list result '%s': %s\n", plc4xTagName, valStr) + } + case "RAW_BYTE_ARRAY": + if len(v.GetRaw()) > 0 { + valDbl = float64(v.GetRaw()[0]) + } + if ba, err := json.Marshal(v.GetRaw()); err != nil { + log.Printf(protocolConn.Name+": error marshalling raw array: %s", err.Error()) + } else { + valJson = string(ba) + valStr = valJson + } + if logLevel >= logLevelDetailed { + log.Printf(protocolConn.Name+": Read raw array result '%s': %s\n", plc4xTagName, valStr) + } + } + return }