Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3] Add shadow state handler. Send null values for missing objects w… #6

Merged
9 changes: 7 additions & 2 deletions cmd/aws-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/eclipse-kanto/aws-connector/flags"
"github.com/eclipse-kanto/aws-connector/routing/message/handlers"
"github.com/eclipse-kanto/aws-connector/routing/message/handlers/passthrough"
"github.com/eclipse-kanto/aws-connector/routing/message/handlers/state"

"github.com/eclipse-kanto/suite-connector/config"
suiteFlags "github.com/eclipse-kanto/suite-connector/flags"
Expand Down Expand Up @@ -80,10 +81,14 @@ func main() {
logger.Infof("Starting aws connector %s", version)
suiteFlags.ConfigCheck(logger, *fConfigFile)

shadowStateHandler := state.CreateDefaultShadowStateHandler()
cloudHandlers := []handlers.MessageHandler{
shadowStateHandler,
}

deviceHandlers := []handlers.MessageHandler{
passthrough.CreateDefaultDeviceHandler(),
passthrough.CreateDefaultDeviceHandler(shadowStateHandler.(passthrough.ShadowStateHolder)),
}
cloudHandlers := []handlers.MessageHandler{}

if err := app.MainLoop(settings, logger, deviceHandlers, cloudHandlers); err != nil {
logger.Error("Init failure", err, nil)
Expand Down
130 changes: 114 additions & 16 deletions routing/message/handlers/passthrough/device_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,24 @@ const (
topicRootShadow = "$aws/things/%s/shadow/%s"
// Used to update feature of root thing or attributes of child thing.
topicNamedShadow = "$aws/things/%s/shadow/name/%s/%s"
// Used to update feature of child thing.
topicComplexNamedShadow = "$aws/things/%s/shadow/name/%s:%s/%s"

topicUpdate = "update"
topicDelete = "delete"
)

type deviceHandler struct {
tenantID string
deviceID string
payloadFilters []*regexp.Regexp
topicFilter *regexp.Regexp
logger watermill.LoggerAdapter
defaultHandler message.HandlerFunc
tenantID string
deviceID string
payloadFilters []*regexp.Regexp
topicFilter *regexp.Regexp
logger watermill.LoggerAdapter
defaultHandler message.HandlerFunc
shadowStateHolder ShadowStateHolder
}

// CreateDefaultDeviceHandler instantiates a new passthrough handler that forwards messages received from local message broker on event and telemetry topics as device-to-cloud messages.
func CreateDefaultDeviceHandler() handlers.MessageHandler {
return &deviceHandler{}
func CreateDefaultDeviceHandler(shadowStateHolder ShadowStateHolder) handlers.MessageHandler {
return &deviceHandler{shadowStateHolder: shadowStateHolder}
}

// Init gets the device ID that is needed for the message forwarding towards AWS IoT Hub.
Expand Down Expand Up @@ -109,7 +108,7 @@ func (h *deviceHandler) HandleMessage(msg *message.Message) ([]*message.Message,
}

// toShadowTopic convert Ditto topic to its corresponding device shadow topic and if its an update message.
func (h *deviceHandler) toShadowTopic(topic *protocol.Topic, featureName string, value interface{}) (res string, update bool) {
func (h *deviceHandler) toShadowTopic(topic *protocol.Topic, featureName string, value interface{}) (res string, update bool, shadowID string) {
target := topicUpdate
if topic.Action == protocol.ActionDelete && h.isEntireShadow(value) {
target = topicDelete
Expand All @@ -122,17 +121,19 @@ func (h *deviceHandler) toShadowTopic(topic *protocol.Topic, featureName string,
if len(h.deviceID) == len(topicID) {
if featureName == "" {
// Update root thing attributes.
return fmt.Sprintf(topicRootShadow, h.deviceID, target), update
return fmt.Sprintf(topicRootShadow, h.deviceID, target), update, h.deviceID
}
// Update root thing feature.
return fmt.Sprintf(topicNamedShadow, h.deviceID, featureName, target), update
return fmt.Sprintf(topicNamedShadow, h.deviceID, featureName, target), update, featureName
}
if featureName == "" {
// Update child thing attributes.
return fmt.Sprintf(topicNamedShadow, h.deviceID, topicID[len(h.deviceID)+1:], target), update
shadowID = topicID[len(h.deviceID)+1:]
return fmt.Sprintf(topicNamedShadow, h.deviceID, shadowID, target), update, shadowID
k-gostev marked this conversation as resolved.
Show resolved Hide resolved
}
// Update child thing feature.
return fmt.Sprintf(topicComplexNamedShadow, h.deviceID, topicID[len(h.deviceID)+1:], featureName, target), update
shadowID = fmt.Sprintf("%s:%s", topicID[len(h.deviceID)+1:], featureName)
return fmt.Sprintf(topicNamedShadow, h.deviceID, shadowID, target), update, shadowID
}

// isDittoRequest returns true if provided message is Ditto request to the connected device.
Expand Down Expand Up @@ -187,10 +188,11 @@ func integrate(path []string, value interface{}) interface{} {

// toShadowMessage convert Ditto data to device shadow message.
func (h *deviceHandler) toShadowMessage(env *protocol.Envelope, featureName string, value interface{}) *message.Message {
topic, update := h.toShadowTopic(env.Topic, featureName, value)
topic, update, shadowID := h.toShadowTopic(env.Topic, featureName, value)

var payload message.Payload
if update {
value = h.mergeWithCurrentShadowState(shadowID, value, env)
// Adds shadow prefix infront: ["state", "reported"]
value = integrate([]string{valueStateTag, valueReportedTag}, value)
payload, _ = json.Marshal(value)
Expand All @@ -203,6 +205,102 @@ func (h *deviceHandler) toShadowMessage(env *protocol.Envelope, featureName stri
return message
}

func (h deviceHandler) mergeWithCurrentShadowState(featureName string, newState interface{}, envelope *protocol.Envelope) interface{} {
if envelope.Topic.Action != protocol.ActionModify {
return newState
}

if h.shadowStateHolder == nil {
return newState
}

currentState := h.shadowStateHolder.GetCurrentShadowState(featureName)
if currentState == nil {
return newState
}

if subpath, ok := isSinglePropertyOrAttributeUpdate(envelope.Path); ok {
return mergeSubpaths(currentState, newState, strings.Split(subpath, "/"))
}

return merge(currentState, newState)
}

func isSinglePropertyOrAttributeUpdate(path string) (string, bool) {
if propertiesIndex := strings.Index(path, valuePropertiesTag); propertiesIndex >= 0 {
return path[propertiesIndex+len(valuePropertiesTag):], true
}

if attributesIndex := strings.Index(path, valueAttributesTag); attributesIndex >= 0 {
return path[attributesIndex+len(valueAttributesTag):], true
}

return "", false
}

func mergeSubpaths(currentState interface{}, newState interface{}, propertyPath []string) interface{} {
currentStateMap, isCurrentStateMap := currentState.(map[string]interface{})
newStateMap, isNewStateMap := newState.(map[string]interface{})

if !isCurrentStateMap || !isNewStateMap {
return newState
}

if len(propertyPath) == 0 {
return merge(currentState, newState)
}

propertyName := propertyPath[0]

if len(propertyName) == 0 {
return mergeSubpaths(currentState, newState, propertyPath[1:])
}

newStateMap[propertyName] = mergeSubpaths(currentStateMap[propertyName], newStateMap[propertyName], propertyPath[1:])

return newStateMap

}

func merge(currentState interface{}, newState interface{}) interface{} {
newState = mergeAsMaps(currentState, newState)
newState = mergeAsArrays(currentState, newState)

return newState
}

func mergeAsMaps(currentState interface{}, newState interface{}) interface{} {
currentStateMap, isCurrentStateMap := currentState.(map[string]interface{})
newStateMap, isNewStateMap := newState.(map[string]interface{})

if !isCurrentStateMap || !isNewStateMap {
return newState
}
for key, currentValue := range currentStateMap {
if newValue, exists := newStateMap[key]; !exists {
newStateMap[key] = nil
} else {
newStateMap[key] = merge(currentValue, newValue)
}
}

return newStateMap
}

func mergeAsArrays(currentState interface{}, newState interface{}) interface{} {
currentStateArray, isCurrentStateArray := currentState.([]interface{})
newStateArray, isNewStateArray := newState.([]interface{})

if !isCurrentStateArray || !isNewStateArray {
return newState
}

for i := 0; i < len(newStateArray) && i < len(currentStateArray); i++ {
newStateArray[i] = merge(currentStateArray[i], newStateArray[i])
}
return newStateArray
}

// getFeatureProperties return all feature properties, including the definition.
func (h *deviceHandler) getFeatureProperties(obj interface{}) (interface{}, bool) {
res := map[string]interface{}{}
Expand Down
Loading
Loading