Skip to content

Commit

Permalink
Solve all golangci-lint issues
Browse files Browse the repository at this point in the history
Signed-off-by: Dario Freddi <[email protected]>
  • Loading branch information
drf committed Feb 11, 2022
1 parent b4a0eb7 commit 12e841f
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 179 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ linters:
- errcheck
- dupl
- gosec
- funlen

# Exclude lll issues for long lines with go:generate
- linters:
Expand Down
8 changes: 6 additions & 2 deletions device/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ func (d *Device) getTLSConfig() (*tls.Config, error) {

func (d *Device) getCryptoDir() string {
cryptoDir := filepath.Join(d.persistencyDir, "crypto")
os.MkdirAll(cryptoDir, 0700)
if err := os.MkdirAll(cryptoDir, 0700); err != nil {
fmt.Println("WARNING - could not access crypto dir!")
}
return cryptoDir
}

Expand Down Expand Up @@ -183,7 +185,9 @@ func (d *Device) ensureKeyPair() error {

// We need to generate the key
// First of all, clear the crypto dir, just to be sure.
d.ClearCrypto()
if err := d.ClearCrypto(); err != nil {
return err
}

reader := rand.Reader
// Certificates are short-lived, 2048 is fine.
Expand Down
6 changes: 3 additions & 3 deletions device/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"io/ioutil"
"path/filepath"

mqtt "github.com/ispirata/paho.mqtt.golang"
"github.com/astarte-platform/astarte-go/client"
"github.com/astarte-platform/astarte-go/interfaces"
"github.com/astarte-platform/astarte-go/misc"
backoff "github.com/cenkalti/backoff/v4"
mqtt "github.com/ispirata/paho.mqtt.golang"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -115,14 +115,14 @@ func newDevice(deviceID, realm, credentialsSecret string, pairingBaseURL string,
}

if err := d.migrateDb(); err != nil {
errors.New("Database migration failed")
return nil, err
}

return d, nil
}

// Connect connects the device through a goroutine
//nolint
func (d *Device) Connect(result chan<- error) {
go func(result chan<- error) {
// Are we connected already?
Expand Down Expand Up @@ -177,7 +177,7 @@ func (d *Device) Connect(result chan<- error) {
}

// initialize the client
if err := d.initializeMQTTClient(); err != nil {
if err = d.initializeMQTTClient(); err != nil {
if result != nil {
result <- err
}
Expand Down
274 changes: 107 additions & 167 deletions device/protocol_mqtt_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (d *Device) initializeMQTTClient() error {
opts.SetTLSConfig(tlsConfig)

opts.SetOnConnectHandler(func(client mqtt.Client, sessionPresent bool) {
astarteOnConnectHandler(d, client, sessionPresent)
astarteOnConnectHandler(d, sessionPresent)
})

opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
Expand All @@ -64,120 +64,127 @@ func (d *Device) initializeMQTTClient() error {
})

// This is our message handler
opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
if !strings.HasPrefix(msg.Topic(), d.getBaseTopic()) {
if d.OnErrors != nil {
d.OnErrors(d, fmt.Errorf("Received message on unexpected topic %s. This is an internal error", msg.Topic()))
}
return
opts.SetDefaultPublishHandler(d.astarteGoSDKDefaultPublishHandler)

d.m = mqtt.NewClient(opts)

return nil
}

func (d *Device) astarteGoSDKDefaultPublishHandler(client mqtt.Client, msg mqtt.Message) {
if !strings.HasPrefix(msg.Topic(), d.getBaseTopic()) {
if d.OnErrors != nil {
d.OnErrors(d, fmt.Errorf("Received message on unexpected topic %s. This is an internal error", msg.Topic()))
}
return
}

// We split up to 4 since this will give us the path in the correct format.
tokens := strings.SplitN(msg.Topic(), "/", 4)
if len(tokens) > 2 {
// Is it a control message?
if tokens[2] == "control" {
err := d.handleControlMessages(strings.Join(tokens[3:], "/"), msg.Payload())
if err != nil {
d.OnErrors(d, err)
}
return
// We split up to 4 since this will give us the path in the correct format.
tokens := strings.SplitN(msg.Topic(), "/", 4)
if len(tokens) > 2 {
// Is it a control message?
if tokens[2] == "control" {
err := d.handleControlMessages(strings.Join(tokens[3:], "/"), msg.Payload())
if err != nil {
d.OnErrors(d, err)
}
return
}

// It's a data message. Grab the interface name.
interfaceName := tokens[2]
// Parse the payload
parsed, err := parseBSONPayload(msg.Payload())
if err != nil {
if d.OnErrors != nil {
d.OnErrors(d, err)
}
return
// It's a data message. Grab the interface name.
interfaceName := tokens[2]
// Parse the payload
parsed, err := parseBSONPayload(msg.Payload())
if err != nil {
if d.OnErrors != nil {
d.OnErrors(d, err)
}
timestamp := time.Time{}
if t, ok := parsed["t"]; ok {
// We have a timestamp
if pT, ok := t.(primitive.DateTime); ok {
timestamp = pT.Time()
}
return
}
timestamp := time.Time{}
if t, ok := parsed["t"]; ok {
// We have a timestamp
if pT, ok := t.(primitive.DateTime); ok {
timestamp = pT.Time()
}
}

if iface, ok := d.interfaces[interfaceName]; ok {
// Is it individual?
switch {
case len(tokens) != 4:
if d.OnErrors != nil {
d.OnErrors(d, fmt.Errorf("could not parse incoming message on topic structure %s", tokens))
}
return
case iface.Aggregation == interfaces.IndividualAggregation:
interfacePath := "/" + tokens[3]
if iface, ok := d.interfaces[interfaceName]; ok {
d.processIncomingMessage(iface, tokens, msg.Payload(), parsed, timestamp)
} else if d.OnErrors != nil {
// Something is off.
d.OnErrors(d, fmt.Errorf("Received message for unregistered interface %s", interfaceName))
}
}
}

if iface.Type == interfaces.PropertiesType {
d.storeProperty(iface.Name, interfacePath, iface.MajorVersion, msg.Payload())
}
func (d *Device) processIncomingMessage(iface interfaces.AstarteInterface, tokens []string, payload []byte, parsed map[string]interface{}, timestamp time.Time) {
// Is it individual?
switch {
case len(tokens) != 4:
if d.OnErrors != nil {
d.OnErrors(d, fmt.Errorf("could not parse incoming message on topic structure %s", tokens))
}
return
case iface.Aggregation == interfaces.IndividualAggregation:
interfacePath := "/" + tokens[3]

// Create the message
m := IndividualMessage{
Interface: iface,
Path: interfacePath,
Value: parsed["v"],
Timestamp: timestamp,
}
if d.OnIndividualMessageReceived != nil {
d.OnIndividualMessageReceived(d, m)
}
case iface.Aggregation == interfaces.ObjectAggregation:
interfacePath := "/" + tokens[3]

if val, ok := parsed["v"].(map[string]interface{}); !ok {
d.OnErrors(d, fmt.Errorf("could not parse aggregate message payload"))
} else {
// We have to check whether we have some nested arrays or not in here.
for k, v := range val {
if bsonArray, ok := v.(primitive.A); ok {
// That is, in fact, the case. Convert to a generic Go slice first.
bsonArraySlice := []interface{}(bsonArray)
// Now reflect the heck out of it and specialize the slice
specializedSlice := reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(bsonArraySlice[0])), len(bsonArraySlice), cap(bsonArraySlice))
for i := 0; i < specializedSlice.Len(); i++ {
specializedSlice.Index(i).Set(reflect.ValueOf(bsonArraySlice[i]))
}
val[k] = specializedSlice.Interface()
}
}

// N.B.: properties with object aggregation are not yet supported by Astarte
if iface.Type == interfaces.PropertiesType {
d.storeProperty(iface.Name, interfacePath, iface.MajorVersion, msg.Payload())
}

// Create the message
m := AggregateMessage{
Interface: iface,
Path: interfacePath,
Values: val,
Timestamp: timestamp,
}

if d.OnAggregateMessageReceived != nil {
d.OnAggregateMessageReceived(d, m)
}
if iface.Type == interfaces.PropertiesType {
d.storeProperty(iface.Name, interfacePath, iface.MajorVersion, payload)
}

// Create the message
m := IndividualMessage{
Interface: iface,
Path: interfacePath,
Value: parsed["v"],
Timestamp: timestamp,
}
if d.OnIndividualMessageReceived != nil {
d.OnIndividualMessageReceived(d, m)
}
case iface.Aggregation == interfaces.ObjectAggregation:
interfacePath := "/" + tokens[3]

if val, ok := parsed["v"].(map[string]interface{}); !ok {
d.OnErrors(d, fmt.Errorf("could not parse aggregate message payload"))
} else {
// We have to check whether we have some nested arrays or not in here.
for k, v := range val {
if bsonArray, ok := v.(primitive.A); ok {
// That is, in fact, the case. Convert to a generic Go slice first.
bsonArraySlice := []interface{}(bsonArray)
// Now reflect the heck out of it and specialize the slice
specializedSlice := reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(bsonArraySlice[0])), len(bsonArraySlice), cap(bsonArraySlice))
for i := 0; i < specializedSlice.Len(); i++ {
specializedSlice.Index(i).Set(reflect.ValueOf(bsonArraySlice[i]))
}
val[k] = specializedSlice.Interface()
}
} else if d.OnErrors != nil {
// Something is off.
d.OnErrors(d, fmt.Errorf("Received message for unregistered interface %s", interfaceName))
}
}
})

d.m = mqtt.NewClient(opts)
// N.B.: properties with object aggregation are not yet supported by Astarte
if iface.Type == interfaces.PropertiesType {
d.storeProperty(iface.Name, interfacePath, iface.MajorVersion, payload)
}

return nil
// Create the message
m := AggregateMessage{
Interface: iface,
Path: interfacePath,
Values: val,
Timestamp: timestamp,
}

if d.OnAggregateMessageReceived != nil {
d.OnAggregateMessageReceived(d, m)
}
}
}
}

func (d *Device) handleControlMessages(message string, payload []byte) error {
//nolint
switch message {
case "consumer/properties":
return d.handlePurgeProperties(payload)
Expand All @@ -187,7 +194,7 @@ func (d *Device) handleControlMessages(message string, payload []byte) error {
return nil
}

func astarteOnConnectHandler(d *Device, client mqtt.Client, sessionPresent bool) {
func astarteOnConnectHandler(d *Device, sessionPresent bool) {
// Generate Introspection first
introspection := d.generateDeviceIntrospection()

Expand Down Expand Up @@ -467,7 +474,7 @@ func getIndividualMappingFromAggregate(astarteInterface interfaces.AstarteInterf
return astarteInterface.Mappings[0], nil
}

func (d *Device) publishMessage(message astarteMessageInfo) error {
func (d *Device) publishMessage(message astarteMessageInfo) {
topic := fmt.Sprintf("%s/%s%s", d.getBaseTopic(), message.InterfaceName, message.Path)

// MQTT client returns `true` to IsConnected()
Expand Down Expand Up @@ -499,8 +506,6 @@ func (d *Device) publishMessage(message astarteMessageInfo) error {
}
}
}

return nil
}

func (d *Device) storeMessage(message astarteMessageInfo) {
Expand Down Expand Up @@ -617,68 +622,3 @@ func parseBSONPayload(payload []byte) (map[string]interface{}, error) {
err := bson.Unmarshal(payload, parsed)
return parsed, err
}

func bsonRawValueToInterface(v bson.RawValue, valueType string) (interface{}, error) {
switch valueType {
case "string":
var val string
err := v.Unmarshal(val)
return val, err
case "double":
var val float64
err := v.Unmarshal(val)
return val, err
case "integer":
var val int32
err := v.Unmarshal(val)
return val, err
case "boolean":
var val bool
err := v.Unmarshal(val)
return val, err
case "longinteger":
var val int64
err := v.Unmarshal(val)
return val, err
case "binaryblob":
var val []byte
err := v.Unmarshal(val)
return val, err
case "datetime":
// TODO: verify this is true.
var val time.Time
err := v.Unmarshal(val)
return val, err
case "stringarray":
var val []string
err := v.Unmarshal(val)
return val, err
case "doublearray":
var val []float64
err := v.Unmarshal(val)
return val, err
case "integerarray":
var val []int32
err := v.Unmarshal(val)
return val, err
case "booleanarray":
var val []bool
err := v.Unmarshal(val)
return val, err
case "longintegerarray":
var val []int64
err := v.Unmarshal(val)
return val, err
case "binaryblobarray":
var val [][]byte
err := v.Unmarshal(val)
return val, err
case "datetimearray":
// TODO: verify this is true.
var val []time.Time
err := v.Unmarshal(val)
return val, err
}

return nil, fmt.Errorf("Could not decode for type %s", valueType)
}
Loading

0 comments on commit 12e841f

Please sign in to comment.