Skip to content

Commit

Permalink
Merge pull request #291 from openconfig/autopop-target
Browse files Browse the repository at this point in the history
Don't require Target when doing gNMI requests
  • Loading branch information
wenovus authored Oct 9, 2023
2 parents 588ec61 + a6ba0b7 commit 5523fd3
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 129 deletions.
1 change: 1 addition & 0 deletions gnmi/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"@org_golang_google_grpc//peer",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//encoding/prototext",
"@org_golang_google_protobuf//proto",
"@org_golang_x_exp//slices",
],
)
Expand Down
25 changes: 22 additions & 3 deletions gnmi/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/openconfig/gnmi/cache"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/gnmi/subscribe"
"google.golang.org/protobuf/proto"
)

var (
Expand Down Expand Up @@ -104,12 +105,11 @@ func (c *Collector) Stop() {
// handleUpdate handles an input gNMI SubscribeResponse that is received by
// the target.
func (c *Collector) handleUpdate(resp *gpb.SubscribeResponse) error {
t := c.cache.GetTarget(c.name)
switch v := resp.Response.(type) {
case *gpb.SubscribeResponse_Update:
return t.GnmiUpdate(v.Update)
return c.GnmiUpdate(v.Update)
case *gpb.SubscribeResponse_SyncResponse:
t.Sync()
c.cache.GetTarget(c.name).Sync()
case *gpb.SubscribeResponse_Error:
return fmt.Errorf("error in response: %s", v)
default:
Expand All @@ -118,6 +118,25 @@ func (c *Collector) handleUpdate(resp *gpb.SubscribeResponse) error {
return nil
}

// GnmiUpdate sends a pb.Notification into the target cache.
//
// It simply forwards it to the gNMI cache implementation, populating the
// target (without copying the message) if empty.
func (c *Collector) GnmiUpdate(n *gpb.Notification) error {
t := c.cache.GetTarget(c.name)
// If target is not specified, then set it to the initialized
// value.
if n.GetPrefix().GetTarget() == "" {
if n.Prefix == nil {
n.Prefix = &gpb.Path{}
} else {
n.Prefix = proto.Clone(n.Prefix).(*gpb.Path)
}
n.Prefix.Target = c.name
}
return t.GnmiUpdate(n)
}

// periodic runs the function fn every period.
func periodic(period time.Duration, fn func()) {
if period == 0 {
Expand Down
108 changes: 91 additions & 17 deletions gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

log "github.com/golang/glog"
"github.com/openconfig/gnmi/cache"
"github.com/openconfig/gnmi/subscribe"
"github.com/openconfig/ygot/util"
"github.com/openconfig/ygot/ygot"
Expand All @@ -37,6 +36,7 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"github.com/openconfig/lemming/gnmi/oc"
"github.com/openconfig/lemming/gnmi/reconciler"
Expand Down Expand Up @@ -127,7 +127,7 @@ func newServer(ctx context.Context, targetName string, enableSet bool, recs ...r
if err := ygot.PruneConfigFalse(configSchema.RootSchema(), configSchema.Root); err != nil {
return nil, fmt.Errorf("gnmi: %v", err)
}
if err := updateCache(c.cache, configSchema.Root, emptySchema.Root, targetName, OpenConfigOrigin, true, 0, "", nil); err != nil {
if err := updateCache(c, configSchema.Root, emptySchema.Root, OpenConfigOrigin, true, 0, "", nil); err != nil {
return nil, fmt.Errorf("gnmi newServer: %v", err)
}
}
Expand All @@ -140,7 +140,7 @@ func newServer(ctx context.Context, targetName string, enableSet bool, recs ...r
if err := setupSchema(stateSchema, false); err != nil {
return nil, err
}
if err := updateCache(c.cache, stateSchema.Root, emptySchema.Root, targetName, OpenConfigOrigin, true, 0, "", nil); err != nil {
if err := updateCache(c, stateSchema.Root, emptySchema.Root, OpenConfigOrigin, true, 0, "", nil); err != nil {
return nil, fmt.Errorf("gnmi newServer: %v", err)
}
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func setupSchema(schema *ytypes.Schema, config bool) error {
// If root is nil, then it is assumed the cache is empty, and the entirety of
// the dirtyRoot is put into the cache.
// - auth adds authorization to before writing vals to the cache, if set to nil, not authorization is checked.
func updateCache(cache *cache.Cache, dirtyRoot, root ygot.GoStruct, target, origin string, preferShadowPath bool, timestamp int64, user string, auth PathAuth) error {
func updateCache(collector *Collector, dirtyRoot, root ygot.GoStruct, origin string, preferShadowPath bool, timestamp int64, user string, auth PathAuth) error {
var nos []*gpb.Notification
if root == nil {
if timestamp == 0 {
Expand Down Expand Up @@ -227,7 +227,7 @@ func updateCache(cache *cache.Cache, dirtyRoot, root ygot.GoStruct, target, orig
}
}

return updateCacheNotifs(cache, nos, target, origin)
return updateCacheNotifs(collector, nos, origin)
}

func checkWritePermission(auth PathAuth, user string, nos ...*gpb.Notification) (bool, error) {
Expand Down Expand Up @@ -256,15 +256,13 @@ func checkWritePermission(auth PathAuth, user string, nos ...*gpb.Notification)
return true, nil
}

// updateCacheNotifs updates the target cache with the given notifications.
func updateCacheNotifs(ca *cache.Cache, nos []*gpb.Notification, target, origin string) error {
cacheTarget := ca.GetTarget(target)
// updateCacheNotifs updates the cache with the given notifications.
func updateCacheNotifs(c *Collector, nos []*gpb.Notification, origin string) error {
for _, n := range nos {
if n.Prefix == nil {
n.Prefix = &gpb.Path{}
}
n.Prefix.Origin = origin
n.Prefix.Target = target
if n.Prefix.Origin == "" {
n.Prefix.Origin = OpenConfigOrigin
}
Expand All @@ -284,7 +282,7 @@ func updateCacheNotifs(ca *cache.Cache, nos []*gpb.Notification, target, origin
log.V(1).Infof("datastore: deleting the following paths: %+v", pathsForDelete)
}
log.V(1).Infof("datastore: calling GnmiUpdate with the following notification:\n%s", prototext.Format(n))
if err := cacheTarget.GnmiUpdate(n); err != nil {
if err := c.GnmiUpdate(n); err != nil {
return fmt.Errorf("%w: notification:\n%s\n%s", err, prototext.Format(n), string(debug.Stack()))
}
if enableDebugLog && (len(n.Delete) != 0 || len(n.Update) != 0) {
Expand Down Expand Up @@ -346,7 +344,7 @@ func unmarshalSetRequest(schema *ytypes.Schema, req *gpb.SetRequest, preferShado
// - timestamp specifies the timestamp of the values that are to be updated in
// the gNMI cache. If zero, then time.Now().UnixNano() is used.
// - auth adds authorization to before writing vals to the cache, if set to nil, not authorization is checked.
func set(schema *ytypes.Schema, cache *cache.Cache, target string, req *gpb.SetRequest, preferShadowPath bool, validators []func(*oc.Root) error, timestamp int64, user string, auth PathAuth) error {
func set(schema *ytypes.Schema, c *Collector, req *gpb.SetRequest, preferShadowPath bool, validators []func(*oc.Root) error, timestamp int64, user string, auth PathAuth) error {
// skip diffing and deepcopy for performance when handling state update paths.
// Currently this is not possible for replace/delete paths, since
// without doing a diff, it is not possible to compute what was
Expand All @@ -370,7 +368,7 @@ func set(schema *ytypes.Schema, cache *cache.Cache, target string, req *gpb.SetR
return err
}

if err := updateCacheNotifs(cache, notifs, target, req.Prefix.Origin); err != nil {
if err := updateCacheNotifs(c, notifs, req.Prefix.Origin); err != nil {
return err
}

Expand Down Expand Up @@ -411,7 +409,7 @@ func set(schema *ytypes.Schema, cache *cache.Cache, target string, req *gpb.SetR
}
}

if err := updateCache(cache, schema.Root, prevRoot, target, req.Prefix.Origin, preferShadowPath, timestamp, user, auth); err != nil {
if err := updateCache(c, schema.Root, prevRoot, req.Prefix.Origin, preferShadowPath, timestamp, user, auth); err != nil {
return status.Error(codes.Internal, err.Error())
}
success = true
Expand All @@ -426,6 +424,9 @@ const (

// handleInternalOrigin handles SetRequests whose path has schemaless values.
func (s *Server) handleInternalOrigin(req *gpb.SetRequest) (bool, error) {
if req.Prefix == nil {
req.Prefix = &gpb.Path{}
}
notif := &gpb.Notification{
Prefix: &gpb.Path{
Origin: InternalOrigin,
Expand All @@ -443,7 +444,7 @@ func (s *Server) handleInternalOrigin(req *gpb.SetRequest) (bool, error) {
}
}
if hasInternal {
if err := s.c.cache.GnmiUpdate(notif); err != nil {
if err := s.c.GnmiUpdate(notif); err != nil {
return true, err
}
}
Expand All @@ -465,7 +466,7 @@ func (s *Server) handleInternalOrigin(req *gpb.SetRequest) (bool, error) {
}
log.V(2).Infof("internal origin notification: %v", notif)
if hasInternal {
return true, s.c.cache.GnmiUpdate(notif)
return true, s.c.GnmiUpdate(notif)
}
return false, nil
}
Expand Down Expand Up @@ -537,7 +538,7 @@ func (s *Server) Set(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse

// TODO(wenbli): Reject paths that try to modify read-only values.
// TODO(wenbli): Question: what to do if there are operational-state values in a container that is specified to be replaced or deleted?
err := set(s.configSchema, s.c.cache, s.c.name, req, true, s.validators, timestamp, user, s.pathAuth)
err := set(s.configSchema, s.c, req, true, s.validators, timestamp, user, s.pathAuth)

// TODO(wenbli): Currently the SetResponse is not filled.
return &gpb.SetResponse{
Expand All @@ -553,7 +554,7 @@ func (s *Server) Set(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse
}
// TODO(wenbli): Reject values that modify config values. We only allow modifying state in this mode.
// Don't authorize setting state since only internal reconcilers do that.
if err := set(s.stateSchema, s.c.cache, s.c.name, req, false, nil, timestamp, user, nil); err != nil {
if err := set(s.stateSchema, s.c, req, false, nil, timestamp, user, nil); err != nil {
return &gpb.SetResponse{}, err
}

Expand Down Expand Up @@ -592,6 +593,79 @@ type PathAuth interface {
IsInitialized() bool
}

// subscribeTargetUpdateStream wraps around the embedded grpc.ServerStream, and
// intercepts the RecvMsg and SendMsg method call to populate gNMI Subscribe's
// target if it's not populated in the SubscribeRequest.
type subscribeTargetUpdateStream struct {
grpc.ServerStream
// target is the target of the gNMI collector used by the lemming
// instance.
target string
// subscribeTarget stores the target field value of the last
// SubscribeRequest in the stream.
subscribeTarget string
}

func (w *subscribeTargetUpdateStream) RecvMsg(m any) error {
if err := w.ServerStream.RecvMsg(m); err != nil {
return err
}
if req, ok := m.(*gpb.SubscribeRequest); ok {
sub := req.GetSubscribe()
if sub != nil {
w.subscribeTarget = sub.GetPrefix().GetTarget()
if sub.GetPrefix().GetTarget() == "" {
if sub.Prefix == nil {
sub.Prefix = &gpb.Path{}
}
sub.Prefix.Target = w.target
}
}
}
return nil
}

func (w *subscribeTargetUpdateStream) SendMsg(m any) error {
// Clear target if it's not specified in the original SubscribeRequest:
// https://www.openconfig.net/docs/gnmi/gnmi-specification/#2221-path-target
if resp, ok := m.(*gpb.SubscribeResponse); w.subscribeTarget == "" && ok {
if notif := resp.GetUpdate(); notif != nil && notif.GetPrefix().GetTarget() != "" {
// A clone of the entire notification is
// required; otherwise the notification in the
// collector cache is also altered.
//
// TODO(wenbli): We should use a shallow-copy of the
// proto here for better performance:
// https://github.com/golang/protobuf/issues/1155
resp.Response = &gpb.SubscribeResponse_Update{Update: proto.Clone(notif).(*gpb.Notification)}
resp.GetUpdate().Prefix.Target = ""
}
}
return w.ServerStream.SendMsg(m)
}

func newSubscribeTargetUpdateStream(s grpc.ServerStream, target string) grpc.ServerStream {
return &subscribeTargetUpdateStream{
ServerStream: s,
target: target,
}
}

// NewSubscribeTargetUpdateInterceptor returns a stream interceptor to populate
// gNMI Subscribe's target if it's not populated in the SubscribeRequest
// message in order to retrieve the device's data.
//
// ref: https://www.openconfig.net/docs/gnmi/gnmi-specification/#2221-path-target
func NewSubscribeTargetUpdateInterceptor(target string) func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
err := handler(srv, newSubscribeTargetUpdateStream(ss, target))
if err != nil {
fmt.Printf("ERROR in interceptor stream: %v\n", err)
}
return err
}
}

type subscribeWithAuth struct {
gpb.GNMI_SubscribeServer
auth PathAuth
Expand Down
Loading

0 comments on commit 5523fd3

Please sign in to comment.