From 12e841f2014116cc179e7bc3168df0afd8e3f233 Mon Sep 17 00:00:00 2001 From: Dario Freddi Date: Thu, 10 Feb 2022 16:53:40 +0100 Subject: [PATCH] Solve all golangci-lint issues Signed-off-by: Dario Freddi --- .golangci.yml | 1 + device/crypto.go | 8 +- device/device.go | 6 +- device/protocol_mqtt_v1.go | 274 +++++++++++++++---------------------- device/store.go | 21 ++- e2e_tests/e2e_test.go | 11 +- 6 files changed, 142 insertions(+), 179 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 3fa0879..2d78d61 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -154,6 +154,7 @@ linters: - errcheck - dupl - gosec + - funlen # Exclude lll issues for long lines with go:generate - linters: diff --git a/device/crypto.go b/device/crypto.go index 698847c..ea6ec62 100644 --- a/device/crypto.go +++ b/device/crypto.go @@ -97,7 +97,9 @@ func (d *Device) getTLSConfig() (*tls.Config, error) { func (d *Device) getCryptoDir() string { cryptoDir := filepath.Join(d.persistencyDir, "crypto") - os.MkdirAll(cryptoDir, 0700) + if err := os.MkdirAll(cryptoDir, 0700); err != nil { + fmt.Println("WARNING - could not access crypto dir!") + } return cryptoDir } @@ -183,7 +185,9 @@ func (d *Device) ensureKeyPair() error { // We need to generate the key // First of all, clear the crypto dir, just to be sure. - d.ClearCrypto() + if err := d.ClearCrypto(); err != nil { + return err + } reader := rand.Reader // Certificates are short-lived, 2048 is fine. diff --git a/device/device.go b/device/device.go index d045ba2..774204d 100644 --- a/device/device.go +++ b/device/device.go @@ -21,11 +21,11 @@ import ( "io/ioutil" "path/filepath" - mqtt "github.com/ispirata/paho.mqtt.golang" "github.com/astarte-platform/astarte-go/client" "github.com/astarte-platform/astarte-go/interfaces" "github.com/astarte-platform/astarte-go/misc" backoff "github.com/cenkalti/backoff/v4" + mqtt "github.com/ispirata/paho.mqtt.golang" "gorm.io/driver/sqlite" "gorm.io/gorm" ) @@ -115,7 +115,6 @@ func newDevice(deviceID, realm, credentialsSecret string, pairingBaseURL string, } if err := d.migrateDb(); err != nil { - errors.New("Database migration failed") return nil, err } @@ -123,6 +122,7 @@ func newDevice(deviceID, realm, credentialsSecret string, pairingBaseURL string, } // Connect connects the device through a goroutine +//nolint func (d *Device) Connect(result chan<- error) { go func(result chan<- error) { // Are we connected already? @@ -177,7 +177,7 @@ func (d *Device) Connect(result chan<- error) { } // initialize the client - if err := d.initializeMQTTClient(); err != nil { + if err = d.initializeMQTTClient(); err != nil { if result != nil { result <- err } diff --git a/device/protocol_mqtt_v1.go b/device/protocol_mqtt_v1.go index c7a995c..0b9c637 100644 --- a/device/protocol_mqtt_v1.go +++ b/device/protocol_mqtt_v1.go @@ -50,7 +50,7 @@ func (d *Device) initializeMQTTClient() error { opts.SetTLSConfig(tlsConfig) opts.SetOnConnectHandler(func(client mqtt.Client, sessionPresent bool) { - astarteOnConnectHandler(d, client, sessionPresent) + astarteOnConnectHandler(d, sessionPresent) }) opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { @@ -64,120 +64,127 @@ func (d *Device) initializeMQTTClient() error { }) // This is our message handler - opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) { - if !strings.HasPrefix(msg.Topic(), d.getBaseTopic()) { - if d.OnErrors != nil { - d.OnErrors(d, fmt.Errorf("Received message on unexpected topic %s. This is an internal error", msg.Topic())) - } - return + opts.SetDefaultPublishHandler(d.astarteGoSDKDefaultPublishHandler) + + d.m = mqtt.NewClient(opts) + + return nil +} + +func (d *Device) astarteGoSDKDefaultPublishHandler(client mqtt.Client, msg mqtt.Message) { + if !strings.HasPrefix(msg.Topic(), d.getBaseTopic()) { + if d.OnErrors != nil { + d.OnErrors(d, fmt.Errorf("Received message on unexpected topic %s. This is an internal error", msg.Topic())) } + return + } - // We split up to 4 since this will give us the path in the correct format. - tokens := strings.SplitN(msg.Topic(), "/", 4) - if len(tokens) > 2 { - // Is it a control message? - if tokens[2] == "control" { - err := d.handleControlMessages(strings.Join(tokens[3:], "/"), msg.Payload()) - if err != nil { - d.OnErrors(d, err) - } - return + // We split up to 4 since this will give us the path in the correct format. + tokens := strings.SplitN(msg.Topic(), "/", 4) + if len(tokens) > 2 { + // Is it a control message? + if tokens[2] == "control" { + err := d.handleControlMessages(strings.Join(tokens[3:], "/"), msg.Payload()) + if err != nil { + d.OnErrors(d, err) } + return + } - // It's a data message. Grab the interface name. - interfaceName := tokens[2] - // Parse the payload - parsed, err := parseBSONPayload(msg.Payload()) - if err != nil { - if d.OnErrors != nil { - d.OnErrors(d, err) - } - return + // It's a data message. Grab the interface name. + interfaceName := tokens[2] + // Parse the payload + parsed, err := parseBSONPayload(msg.Payload()) + if err != nil { + if d.OnErrors != nil { + d.OnErrors(d, err) } - timestamp := time.Time{} - if t, ok := parsed["t"]; ok { - // We have a timestamp - if pT, ok := t.(primitive.DateTime); ok { - timestamp = pT.Time() - } + return + } + timestamp := time.Time{} + if t, ok := parsed["t"]; ok { + // We have a timestamp + if pT, ok := t.(primitive.DateTime); ok { + timestamp = pT.Time() } + } - if iface, ok := d.interfaces[interfaceName]; ok { - // Is it individual? - switch { - case len(tokens) != 4: - if d.OnErrors != nil { - d.OnErrors(d, fmt.Errorf("could not parse incoming message on topic structure %s", tokens)) - } - return - case iface.Aggregation == interfaces.IndividualAggregation: - interfacePath := "/" + tokens[3] + if iface, ok := d.interfaces[interfaceName]; ok { + d.processIncomingMessage(iface, tokens, msg.Payload(), parsed, timestamp) + } else if d.OnErrors != nil { + // Something is off. + d.OnErrors(d, fmt.Errorf("Received message for unregistered interface %s", interfaceName)) + } + } +} - if iface.Type == interfaces.PropertiesType { - d.storeProperty(iface.Name, interfacePath, iface.MajorVersion, msg.Payload()) - } +func (d *Device) processIncomingMessage(iface interfaces.AstarteInterface, tokens []string, payload []byte, parsed map[string]interface{}, timestamp time.Time) { + // Is it individual? + switch { + case len(tokens) != 4: + if d.OnErrors != nil { + d.OnErrors(d, fmt.Errorf("could not parse incoming message on topic structure %s", tokens)) + } + return + case iface.Aggregation == interfaces.IndividualAggregation: + interfacePath := "/" + tokens[3] - // Create the message - m := IndividualMessage{ - Interface: iface, - Path: interfacePath, - Value: parsed["v"], - Timestamp: timestamp, - } - if d.OnIndividualMessageReceived != nil { - d.OnIndividualMessageReceived(d, m) - } - case iface.Aggregation == interfaces.ObjectAggregation: - interfacePath := "/" + tokens[3] - - if val, ok := parsed["v"].(map[string]interface{}); !ok { - d.OnErrors(d, fmt.Errorf("could not parse aggregate message payload")) - } else { - // We have to check whether we have some nested arrays or not in here. - for k, v := range val { - if bsonArray, ok := v.(primitive.A); ok { - // That is, in fact, the case. Convert to a generic Go slice first. - bsonArraySlice := []interface{}(bsonArray) - // Now reflect the heck out of it and specialize the slice - specializedSlice := reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(bsonArraySlice[0])), len(bsonArraySlice), cap(bsonArraySlice)) - for i := 0; i < specializedSlice.Len(); i++ { - specializedSlice.Index(i).Set(reflect.ValueOf(bsonArraySlice[i])) - } - val[k] = specializedSlice.Interface() - } - } - - // N.B.: properties with object aggregation are not yet supported by Astarte - if iface.Type == interfaces.PropertiesType { - d.storeProperty(iface.Name, interfacePath, iface.MajorVersion, msg.Payload()) - } - - // Create the message - m := AggregateMessage{ - Interface: iface, - Path: interfacePath, - Values: val, - Timestamp: timestamp, - } - - if d.OnAggregateMessageReceived != nil { - d.OnAggregateMessageReceived(d, m) - } + if iface.Type == interfaces.PropertiesType { + d.storeProperty(iface.Name, interfacePath, iface.MajorVersion, payload) + } + + // Create the message + m := IndividualMessage{ + Interface: iface, + Path: interfacePath, + Value: parsed["v"], + Timestamp: timestamp, + } + if d.OnIndividualMessageReceived != nil { + d.OnIndividualMessageReceived(d, m) + } + case iface.Aggregation == interfaces.ObjectAggregation: + interfacePath := "/" + tokens[3] + + if val, ok := parsed["v"].(map[string]interface{}); !ok { + d.OnErrors(d, fmt.Errorf("could not parse aggregate message payload")) + } else { + // We have to check whether we have some nested arrays or not in here. + for k, v := range val { + if bsonArray, ok := v.(primitive.A); ok { + // That is, in fact, the case. Convert to a generic Go slice first. + bsonArraySlice := []interface{}(bsonArray) + // Now reflect the heck out of it and specialize the slice + specializedSlice := reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(bsonArraySlice[0])), len(bsonArraySlice), cap(bsonArraySlice)) + for i := 0; i < specializedSlice.Len(); i++ { + specializedSlice.Index(i).Set(reflect.ValueOf(bsonArraySlice[i])) } + val[k] = specializedSlice.Interface() } - } else if d.OnErrors != nil { - // Something is off. - d.OnErrors(d, fmt.Errorf("Received message for unregistered interface %s", interfaceName)) } - } - }) - d.m = mqtt.NewClient(opts) + // N.B.: properties with object aggregation are not yet supported by Astarte + if iface.Type == interfaces.PropertiesType { + d.storeProperty(iface.Name, interfacePath, iface.MajorVersion, payload) + } - return nil + // Create the message + m := AggregateMessage{ + Interface: iface, + Path: interfacePath, + Values: val, + Timestamp: timestamp, + } + + if d.OnAggregateMessageReceived != nil { + d.OnAggregateMessageReceived(d, m) + } + } + } } func (d *Device) handleControlMessages(message string, payload []byte) error { + //nolint switch message { case "consumer/properties": return d.handlePurgeProperties(payload) @@ -187,7 +194,7 @@ func (d *Device) handleControlMessages(message string, payload []byte) error { return nil } -func astarteOnConnectHandler(d *Device, client mqtt.Client, sessionPresent bool) { +func astarteOnConnectHandler(d *Device, sessionPresent bool) { // Generate Introspection first introspection := d.generateDeviceIntrospection() @@ -467,7 +474,7 @@ func getIndividualMappingFromAggregate(astarteInterface interfaces.AstarteInterf return astarteInterface.Mappings[0], nil } -func (d *Device) publishMessage(message astarteMessageInfo) error { +func (d *Device) publishMessage(message astarteMessageInfo) { topic := fmt.Sprintf("%s/%s%s", d.getBaseTopic(), message.InterfaceName, message.Path) // MQTT client returns `true` to IsConnected() @@ -499,8 +506,6 @@ func (d *Device) publishMessage(message astarteMessageInfo) error { } } } - - return nil } func (d *Device) storeMessage(message astarteMessageInfo) { @@ -617,68 +622,3 @@ func parseBSONPayload(payload []byte) (map[string]interface{}, error) { err := bson.Unmarshal(payload, parsed) return parsed, err } - -func bsonRawValueToInterface(v bson.RawValue, valueType string) (interface{}, error) { - switch valueType { - case "string": - var val string - err := v.Unmarshal(val) - return val, err - case "double": - var val float64 - err := v.Unmarshal(val) - return val, err - case "integer": - var val int32 - err := v.Unmarshal(val) - return val, err - case "boolean": - var val bool - err := v.Unmarshal(val) - return val, err - case "longinteger": - var val int64 - err := v.Unmarshal(val) - return val, err - case "binaryblob": - var val []byte - err := v.Unmarshal(val) - return val, err - case "datetime": - // TODO: verify this is true. - var val time.Time - err := v.Unmarshal(val) - return val, err - case "stringarray": - var val []string - err := v.Unmarshal(val) - return val, err - case "doublearray": - var val []float64 - err := v.Unmarshal(val) - return val, err - case "integerarray": - var val []int32 - err := v.Unmarshal(val) - return val, err - case "booleanarray": - var val []bool - err := v.Unmarshal(val) - return val, err - case "longintegerarray": - var val []int64 - err := v.Unmarshal(val) - return val, err - case "binaryblobarray": - var val [][]byte - err := v.Unmarshal(val) - return val, err - case "datetimearray": - // TODO: verify this is true. - var val []time.Time - err := v.Unmarshal(val) - return val, err - } - - return nil, fmt.Errorf("Could not decode for type %s", valueType) -} diff --git a/device/store.go b/device/store.go index 4f1c76e..bae8df0 100644 --- a/device/store.go +++ b/device/store.go @@ -54,7 +54,9 @@ type property struct { func (d *Device) getDbDir() string { dbDir := filepath.Join(d.persistencyDir, "db") - os.MkdirAll(dbDir, 0700) + if err := os.MkdirAll(dbDir, 0700); err != nil { + fmt.Println("WARNING - could not access store dir!") + } return dbDir } @@ -195,10 +197,21 @@ func (d *Device) handlePurgeProperties(payload []byte) error { } buf := new(bytes.Buffer) - _, err = io.Copy(buf, flateReader) - if err != nil { - return err + // G110: Copy in chunks + var totalRead, n int64 + for { + n, err = io.CopyN(buf, flateReader, 1024) + totalRead += n + if err != nil { + if err == io.EOF { + // We're done + break + } + // Actual error + return err + } } + if e := flateReader.Close(); e != nil { return e } diff --git a/e2e_tests/e2e_test.go b/e2e_tests/e2e_test.go index 2ac4a4a..f9998b0 100644 --- a/e2e_tests/e2e_test.go +++ b/e2e_tests/e2e_test.go @@ -27,10 +27,12 @@ import ( "testing" "time" - "github.com/astarte-platform/astarte-device-sdk-go/device" + "github.com/stretchr/testify/suite" + "github.com/astarte-platform/astarte-go/client" "github.com/astarte-platform/astarte-go/interfaces" - "github.com/stretchr/testify/suite" + + "github.com/astarte-platform/astarte-device-sdk-go/device" ) var ( @@ -87,7 +89,9 @@ func (suite *EndToEndSuite) TearDownSuite() { func (suite *EndToEndSuite) TestDatastreamIndividualDevice() { // send everything for k, v := range expectedDatastreamIndividual { - suite.d.SendIndividualMessageWithTimestamp("org.astarte-platform.device.individual.datastream.Everything", k, v, time.Now()) + if err := suite.d.SendIndividualMessageWithTimestamp("org.astarte-platform.device.individual.datastream.Everything", k, v, time.Now()); err != nil { + suite.Fail("Error sending message", err) + } fmt.Printf("Sent %v on %s\n", v, k) time.Sleep(1 * time.Second) } @@ -181,6 +185,7 @@ func (suite *EndToEndSuite) setupDevice() { suite.d = d } +//nolint func individualValueToAstarteType(value interface{}, astarteType string) interface{} { // cast like there's no tomorrow yolo switch astarteType {