Skip to content

Commit

Permalink
Add gNMI Extension field parsing support
Browse files Browse the repository at this point in the history
gNMI allows for the use of an `extension` field in each top-level message of the gNMI RPCs: https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-extensions.md
Given this is an arbitrary Protobyte payload, the default gNMI protobufs can't decode the payload contained within the field.
This PR adds the necessary configuration options to load in an arbitrary Protobuf file per `extension` identifier, with a message-name to lookup the message type.

In this change:
1. A new function `DecodeExtension` is added. This function uses `protoreflect` to dynamically marshal arbitrary protoBytes into JSON. The loaded JSON is then put back in the Extension message as bytes (mainting type)
2. The `Target` type has an `ExtensionProtoMap` added, allowing for the lookup of Extension IDs to loaded-in protobufs
3. The required changes to the `TargetConfig` type to support loading in the new configuration
4. Modified `collector.go` to output the gNMI message _after_ inlining the decoded protoBytes
5. Loading in the protobufs was added to `app/target.go`: `parseExtensionProtos`. This uses the `Parser` provided by `protoreflect/desc/protoparse`
6. Added functionality to `event.go` to insert the K/Vs provided by the Extension as Tags. Given we come from JSON, all numbers are float64, so the only 2 types supported currently are `string` and `float64`
7. Minor helper function to turn the arbitrary JSON into an arbitrary map.

This has been tested with a device emiting an `extension` field:
```
[gnmic] target "edge01_test01": gNMI Subscribe Response: &{
SubscriptionName:port_stats
SubscriptionConfig:{"name":"port_stats","paths":["/interfaces/interface/"],"mode":"STREAM","stream-mode":"SAMPLE","encoding":"JSON","sample-interval":15000000000,"heartbeat-interval":15000000000,"outputs":["prom-scrape-output"]}
Response:
update:{timestamp:1723653363502452302  prefix:{elem:{name:"interfaces"}  elem:{name:"interface"  key:{key:"name"  value:"et-1/0/3"}}}
update:{path:{elem:{name:"state"}  elem:{name:"hardware-port"}}  val:{json_val:"\"FPC1:PIC0:PORT3\""}}
update:{path:{elem:{name:"state"}  elem:{name:"transceiver"}}  val:{json_val:"\"FPC1:PIC0:PORT3:Xcvr0\""}}}
extension:{registered_ext:{id:1
    msg:"{\"systemId\":\"edge01_test01\",\"componentId\":65535,\"sensorName\":\"sensor_1005_2_1\",\"subscribedPath\":\"/interfaces/interface/\",\"streamedPath\":\"/interfaces/interface/\",\"component\":\"chassisd\",\"sequenceNumber\":\"770002\",\"payloadGetTimestamp\":\"1723653363502\",\"streamCreationTimestamp\":\"1723653361858\",\"exportTimestamp\":\"1723653363504\",\"streamId\":\"PERIODIC\"}"}}}
```
Which is then properly rendered to a Prometheus metric:
```
gnmi_interfaces_interface_state_hardware_port{component="chassisd",componentId="65535",hardware_port="FPC1:PIC0:PORT3",interface_name="et-1/0/3",metric_source="edge01_test01",subscription_name="port_stats",systemId="edge01_test01"} 1
```
Note that some label-drop rules have been added to remove the spurious labels to avoid a cardinality explosion.
  • Loading branch information
Ichabond committed Aug 14, 2024
1 parent fb7904e commit a5e0eaa
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 5 deletions.
19 changes: 19 additions & 0 deletions pkg/api/target/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,25 @@ func (t *Target) DecodeProtoBytes(resp *gnmi.SubscribeResponse) error {
return nil
}

func (t *Target) DecodeExtension(resp *gnmi.SubscribeResponse) error {
if t.ExtensionProtoMap == nil {
return nil
}
for _, extension := range resp.Extension {
m := dynamic.NewMessage(t.ExtensionProtoMap[int(extension.GetRegisteredExt().GetId().Number())])
err := m.Unmarshal(extension.GetRegisteredExt().GetMsg())
if err != nil {
return err
}
jsondata, err := m.MarshalJSON()
if err != nil {
return err
}
extension.GetRegisteredExt().Msg = jsondata
}
return nil
}

func (t *Target) DeleteSubscription(name string) {
t.m.Lock()
defer t.m.Unlock()
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/target/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ type Target struct {
subscribeResponses chan *SubscribeResponse
errors chan *TargetError
stopped bool
StopChan chan struct{} `json:"-"`
Cfn context.CancelFunc `json:"-"`
RootDesc desc.Descriptor `json:"-"`
StopChan chan struct{} `json:"-"`
Cfn context.CancelFunc `json:"-"`
RootDesc desc.Descriptor `json:"-"`
ExtensionProtoMap map[int]*desc.MessageDescriptor `json:"-"`
}

// NewTarget //
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/types/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ type TargetConfig struct {
GRPCKeepalive *clientKeepalive `mapstructure:"grpc-keepalive,omitempty" yaml:"grpc-keepalive,omitempty" json:"grpc-keepalive,omitempty"`

tlsConfig *tls.Config

RegisteredExtensions []*RegisteredExtension `mapstructure:"registered-extensions,omitempty" yaml:"registered-extensions,omitempty" json:"registered-extensions,omitempty"`
}

type RegisteredExtension struct {
Id int `mapstructure:"id,omitempty" yaml:"id,omitempty" json:"id,omitempty"`
MessageName string `mapstructure:"message-name,omitempty" yaml:"message-name,omitempty" json:"message-name,omitempty"`
ProtoFile string `mapstructure:"proto-file,omitempty" yaml:"proto-file,omitempty" json:"proto-file,omitempty"`
}

type clientKeepalive struct {
Expand Down
12 changes: 10 additions & 2 deletions pkg/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,22 @@ func (a *App) StartCollector(ctx context.Context) {
select {
case rsp := <-rspChan:
subscribeResponseReceivedCounter.WithLabelValues(t.Config.Name, rsp.SubscriptionConfig.Name).Add(1)
if a.Config.Debug {
a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp)
// decode gNMI extensions
if extensions := rsp.Response.Extension; len(extensions) > 0 {
err := t.DecodeExtension(rsp.Response)
if err != nil {
a.Logger.Printf("target %q: failed to decode extension field: %v", t.Config.Name, err)
continue
}
}
err := t.DecodeProtoBytes(rsp.Response)
if err != nil {
a.Logger.Printf("target %q: failed to decode proto bytes: %v", t.Config.Name, err)
continue
}
if a.Config.Debug {
a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp)
}
m := outputs.Meta{
"source": t.Config.Name,
"format": a.Config.Format,
Expand Down
35 changes: 35 additions & 0 deletions pkg/app/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (

"github.com/fullstorydev/grpcurl"

"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"

"github.com/openconfig/gnmic/pkg/api/target"
"github.com/openconfig/gnmic/pkg/api/types"
)
Expand All @@ -39,6 +42,10 @@ func (a *App) initTarget(tc *types.TargetConfig) (*target.Target, error) {
if err != nil {
return nil, err
}
err = a.parseExtensionProtos(t)
if err != nil {
return nil, err
}
a.Targets[t.Config.Name] = t
return t, nil
}
Expand Down Expand Up @@ -155,6 +162,34 @@ func (a *App) parseProtoFiles(t *target.Target) error {
return nil
}

// Dynamically parse (and load) protobuf files defined in config for specific extension IDs
func (a *App) parseExtensionProtos(t *target.Target) error {
parser := protoparse.Parser{}
extensionProtoMap := make(map[int]*desc.MessageDescriptor)
a.Logger.Printf("Target %q loading protofiles for gNMI extensions", t.Config.Name)
if len(t.Config.RegisteredExtensions) == 0 {
return nil
}
for _, extension := range t.Config.RegisteredExtensions {
descSources, err := parser.ParseFiles(extension.ProtoFile)
if err != nil {
a.Logger.Printf("target %q could not load protofile: %s: %v", t.Config.Name, extension.ProtoFile, err)
return err
}
// Only a single file is ever provided to ParseFiles, so we can just grab offset 0 from the returned slice
// Verify if the provided message exists in the proto and assign
if desc := descSources[0].FindMessage(extension.MessageName); desc != nil {
extensionProtoMap[extension.Id] = desc
} else {
a.Logger.Printf("target %q could not find message %q", t.Config.Name, extension.MessageName)
return fmt.Errorf("target %q could not find message %q", t.Config.Name, extension.MessageName)
}
}
t.ExtensionProtoMap = extensionProtoMap
a.Logger.Printf("target %q loaded proto files for gNMI extensions", t.Config.Name)
return nil
}

func (a *App) targetConfigExists(name string) bool {
a.configLock.RLock()
_, ok := a.Config.Targets[name]
Expand Down
35 changes: 35 additions & 0 deletions pkg/formatters/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"encoding/json"
"fmt"
"math"
"strconv"
"strings"

flattener "github.com/karimra/go-map-flattener"
"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/gnmi/proto/gnmi_ext"
)

// EventMsg represents a gNMI update message,
Expand All @@ -40,9 +42,28 @@ func ResponseToEventMsgs(name string, rsp *gnmi.SubscribeResponse, meta map[stri
return nil, nil
}
evs := make([]*EventMsg, 0, len(rsp.GetUpdate().GetUpdate())+len(rsp.GetUpdate().GetDelete()))
response := rsp
switch rsp := rsp.Response.(type) {
case *gnmi.SubscribeResponse_Update:
namePrefix, prefixTags := tagsFromGNMIPath(rsp.Update.GetPrefix())
// Extension message to tags
if prefixTags == nil {
prefixTags = make(map[string]string)
}
for _, ext := range response.Extension {
extensionValues, err := extensionToMap(ext)
if err != nil {
return nil, err
}
for k, v := range extensionValues {
switch v := v.(type) {
case string:
prefixTags[k] = v
case float64:
prefixTags[k] = strconv.FormatFloat(v, 'G', -1, 64)
}
}
}
// notification updates
uevs, err := updatesToEvent(name, namePrefix, rsp.Update.GetTimestamp(), rsp.Update.GetUpdate(), prefixTags, meta)
if err != nil {
Expand Down Expand Up @@ -200,6 +221,20 @@ func tagsFromGNMIPath(p *gnmi.Path) (string, map[string]string) {
return sb.String(), tags
}

func extensionToMap(ext *gnmi_ext.Extension) (map[string]interface{}, error) {
jsondata := ext.GetRegisteredExt().GetMsg()

var anyJson map[string]interface{}
if len(jsondata) != 0 {
err := json.Unmarshal(jsondata, &anyJson)
if err != nil {
return nil, err
}
return anyJson, nil
}
return nil, fmt.Errorf("0 length JSON decoded")
}

func getValueFlat(prefix string, updValue *gnmi.TypedValue) (map[string]interface{}, error) {
if updValue == nil {
return nil, nil
Expand Down

0 comments on commit a5e0eaa

Please sign in to comment.