diff --git a/gnmi/BUILD b/gnmi/BUILD index 6c3e619c..9323ba09 100644 --- a/gnmi/BUILD +++ b/gnmi/BUILD @@ -27,6 +27,7 @@ go_library( "@org_golang_google_grpc//peer", "@org_golang_google_grpc//status", "@org_golang_google_protobuf//encoding/prototext", + "@org_golang_google_protobuf//proto", "@org_golang_x_exp//slices", ], ) diff --git a/gnmi/collector.go b/gnmi/collector.go index 2c9ac2d3..8fbf8c2d 100644 --- a/gnmi/collector.go +++ b/gnmi/collector.go @@ -22,6 +22,7 @@ import ( "github.com/openconfig/gnmi/cache" gpb "github.com/openconfig/gnmi/proto/gnmi" "github.com/openconfig/gnmi/subscribe" + "google.golang.org/protobuf/proto" ) var ( @@ -104,12 +105,11 @@ func (c *Collector) Stop() { // handleUpdate handles an input gNMI SubscribeResponse that is received by // the target. func (c *Collector) handleUpdate(resp *gpb.SubscribeResponse) error { - t := c.cache.GetTarget(c.name) switch v := resp.Response.(type) { case *gpb.SubscribeResponse_Update: - return t.GnmiUpdate(v.Update) + return c.GnmiUpdate(v.Update) case *gpb.SubscribeResponse_SyncResponse: - t.Sync() + c.cache.GetTarget(c.name).Sync() case *gpb.SubscribeResponse_Error: return fmt.Errorf("error in response: %s", v) default: @@ -118,6 +118,25 @@ func (c *Collector) handleUpdate(resp *gpb.SubscribeResponse) error { return nil } +// GnmiUpdate sends a pb.Notification into the target cache. +// +// It simply forwards it to the gNMI cache implementation, populating the +// target (without copying the message) if empty. +func (c *Collector) GnmiUpdate(n *gpb.Notification) error { + t := c.cache.GetTarget(c.name) + // If target is not specified, then set it to the initialized + // value. + if n.GetPrefix().GetTarget() == "" { + if n.Prefix == nil { + n.Prefix = &gpb.Path{} + } else { + n.Prefix = proto.Clone(n.Prefix).(*gpb.Path) + } + n.Prefix.Target = c.name + } + return t.GnmiUpdate(n) +} + // periodic runs the function fn every period. func periodic(period time.Duration, fn func()) { if period == 0 { diff --git a/gnmi/gnmi.go b/gnmi/gnmi.go index 76a062df..319fc50c 100644 --- a/gnmi/gnmi.go +++ b/gnmi/gnmi.go @@ -25,7 +25,6 @@ import ( "time" log "github.com/golang/glog" - "github.com/openconfig/gnmi/cache" "github.com/openconfig/gnmi/subscribe" "github.com/openconfig/ygot/util" "github.com/openconfig/ygot/ygot" @@ -37,6 +36,7 @@ import ( "google.golang.org/grpc/peer" "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" "github.com/openconfig/lemming/gnmi/oc" "github.com/openconfig/lemming/gnmi/reconciler" @@ -127,7 +127,7 @@ func newServer(ctx context.Context, targetName string, enableSet bool, recs ...r if err := ygot.PruneConfigFalse(configSchema.RootSchema(), configSchema.Root); err != nil { return nil, fmt.Errorf("gnmi: %v", err) } - if err := updateCache(c.cache, configSchema.Root, emptySchema.Root, targetName, OpenConfigOrigin, true, 0, "", nil); err != nil { + if err := updateCache(c, configSchema.Root, emptySchema.Root, OpenConfigOrigin, true, 0, "", nil); err != nil { return nil, fmt.Errorf("gnmi newServer: %v", err) } } @@ -140,7 +140,7 @@ func newServer(ctx context.Context, targetName string, enableSet bool, recs ...r if err := setupSchema(stateSchema, false); err != nil { return nil, err } - if err := updateCache(c.cache, stateSchema.Root, emptySchema.Root, targetName, OpenConfigOrigin, true, 0, "", nil); err != nil { + if err := updateCache(c, stateSchema.Root, emptySchema.Root, OpenConfigOrigin, true, 0, "", nil); err != nil { return nil, fmt.Errorf("gnmi newServer: %v", err) } } @@ -190,7 +190,7 @@ func setupSchema(schema *ytypes.Schema, config bool) error { // If root is nil, then it is assumed the cache is empty, and the entirety of // the dirtyRoot is put into the cache. // - auth adds authorization to before writing vals to the cache, if set to nil, not authorization is checked. -func updateCache(cache *cache.Cache, dirtyRoot, root ygot.GoStruct, target, origin string, preferShadowPath bool, timestamp int64, user string, auth PathAuth) error { +func updateCache(collector *Collector, dirtyRoot, root ygot.GoStruct, origin string, preferShadowPath bool, timestamp int64, user string, auth PathAuth) error { var nos []*gpb.Notification if root == nil { if timestamp == 0 { @@ -227,7 +227,7 @@ func updateCache(cache *cache.Cache, dirtyRoot, root ygot.GoStruct, target, orig } } - return updateCacheNotifs(cache, nos, target, origin) + return updateCacheNotifs(collector, nos, origin) } func checkWritePermission(auth PathAuth, user string, nos ...*gpb.Notification) (bool, error) { @@ -256,15 +256,13 @@ func checkWritePermission(auth PathAuth, user string, nos ...*gpb.Notification) return true, nil } -// updateCacheNotifs updates the target cache with the given notifications. -func updateCacheNotifs(ca *cache.Cache, nos []*gpb.Notification, target, origin string) error { - cacheTarget := ca.GetTarget(target) +// updateCacheNotifs updates the cache with the given notifications. +func updateCacheNotifs(c *Collector, nos []*gpb.Notification, origin string) error { for _, n := range nos { if n.Prefix == nil { n.Prefix = &gpb.Path{} } n.Prefix.Origin = origin - n.Prefix.Target = target if n.Prefix.Origin == "" { n.Prefix.Origin = OpenConfigOrigin } @@ -284,7 +282,7 @@ func updateCacheNotifs(ca *cache.Cache, nos []*gpb.Notification, target, origin log.V(1).Infof("datastore: deleting the following paths: %+v", pathsForDelete) } log.V(1).Infof("datastore: calling GnmiUpdate with the following notification:\n%s", prototext.Format(n)) - if err := cacheTarget.GnmiUpdate(n); err != nil { + if err := c.GnmiUpdate(n); err != nil { return fmt.Errorf("%w: notification:\n%s\n%s", err, prototext.Format(n), string(debug.Stack())) } if enableDebugLog && (len(n.Delete) != 0 || len(n.Update) != 0) { @@ -346,7 +344,7 @@ func unmarshalSetRequest(schema *ytypes.Schema, req *gpb.SetRequest, preferShado // - timestamp specifies the timestamp of the values that are to be updated in // the gNMI cache. If zero, then time.Now().UnixNano() is used. // - auth adds authorization to before writing vals to the cache, if set to nil, not authorization is checked. -func set(schema *ytypes.Schema, cache *cache.Cache, target string, req *gpb.SetRequest, preferShadowPath bool, validators []func(*oc.Root) error, timestamp int64, user string, auth PathAuth) error { +func set(schema *ytypes.Schema, c *Collector, req *gpb.SetRequest, preferShadowPath bool, validators []func(*oc.Root) error, timestamp int64, user string, auth PathAuth) error { // skip diffing and deepcopy for performance when handling state update paths. // Currently this is not possible for replace/delete paths, since // without doing a diff, it is not possible to compute what was @@ -370,7 +368,7 @@ func set(schema *ytypes.Schema, cache *cache.Cache, target string, req *gpb.SetR return err } - if err := updateCacheNotifs(cache, notifs, target, req.Prefix.Origin); err != nil { + if err := updateCacheNotifs(c, notifs, req.Prefix.Origin); err != nil { return err } @@ -411,7 +409,7 @@ func set(schema *ytypes.Schema, cache *cache.Cache, target string, req *gpb.SetR } } - if err := updateCache(cache, schema.Root, prevRoot, target, req.Prefix.Origin, preferShadowPath, timestamp, user, auth); err != nil { + if err := updateCache(c, schema.Root, prevRoot, req.Prefix.Origin, preferShadowPath, timestamp, user, auth); err != nil { return status.Error(codes.Internal, err.Error()) } success = true @@ -426,6 +424,9 @@ const ( // handleInternalOrigin handles SetRequests whose path has schemaless values. func (s *Server) handleInternalOrigin(req *gpb.SetRequest) (bool, error) { + if req.Prefix == nil { + req.Prefix = &gpb.Path{} + } notif := &gpb.Notification{ Prefix: &gpb.Path{ Origin: InternalOrigin, @@ -443,7 +444,7 @@ func (s *Server) handleInternalOrigin(req *gpb.SetRequest) (bool, error) { } } if hasInternal { - if err := s.c.cache.GnmiUpdate(notif); err != nil { + if err := s.c.GnmiUpdate(notif); err != nil { return true, err } } @@ -465,7 +466,7 @@ func (s *Server) handleInternalOrigin(req *gpb.SetRequest) (bool, error) { } log.V(2).Infof("internal origin notification: %v", notif) if hasInternal { - return true, s.c.cache.GnmiUpdate(notif) + return true, s.c.GnmiUpdate(notif) } return false, nil } @@ -537,7 +538,7 @@ func (s *Server) Set(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse // TODO(wenbli): Reject paths that try to modify read-only values. // TODO(wenbli): Question: what to do if there are operational-state values in a container that is specified to be replaced or deleted? - err := set(s.configSchema, s.c.cache, s.c.name, req, true, s.validators, timestamp, user, s.pathAuth) + err := set(s.configSchema, s.c, req, true, s.validators, timestamp, user, s.pathAuth) // TODO(wenbli): Currently the SetResponse is not filled. return &gpb.SetResponse{ @@ -553,7 +554,7 @@ func (s *Server) Set(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse } // TODO(wenbli): Reject values that modify config values. We only allow modifying state in this mode. // Don't authorize setting state since only internal reconcilers do that. - if err := set(s.stateSchema, s.c.cache, s.c.name, req, false, nil, timestamp, user, nil); err != nil { + if err := set(s.stateSchema, s.c, req, false, nil, timestamp, user, nil); err != nil { return &gpb.SetResponse{}, err } @@ -592,6 +593,79 @@ type PathAuth interface { IsInitialized() bool } +// subscribeTargetUpdateStream wraps around the embedded grpc.ServerStream, and +// intercepts the RecvMsg and SendMsg method call to populate gNMI Subscribe's +// target if it's not populated in the SubscribeRequest. +type subscribeTargetUpdateStream struct { + grpc.ServerStream + // target is the target of the gNMI collector used by the lemming + // instance. + target string + // subscribeTarget stores the target field value of the last + // SubscribeRequest in the stream. + subscribeTarget string +} + +func (w *subscribeTargetUpdateStream) RecvMsg(m any) error { + if err := w.ServerStream.RecvMsg(m); err != nil { + return err + } + if req, ok := m.(*gpb.SubscribeRequest); ok { + sub := req.GetSubscribe() + if sub != nil { + w.subscribeTarget = sub.GetPrefix().GetTarget() + if sub.GetPrefix().GetTarget() == "" { + if sub.Prefix == nil { + sub.Prefix = &gpb.Path{} + } + sub.Prefix.Target = w.target + } + } + } + return nil +} + +func (w *subscribeTargetUpdateStream) SendMsg(m any) error { + // Clear target if it's not specified in the original SubscribeRequest: + // https://www.openconfig.net/docs/gnmi/gnmi-specification/#2221-path-target + if resp, ok := m.(*gpb.SubscribeResponse); w.subscribeTarget == "" && ok { + if notif := resp.GetUpdate(); notif != nil && notif.GetPrefix().GetTarget() != "" { + // A clone of the entire notification is + // required; otherwise the notification in the + // collector cache is also altered. + // + // TODO(wenbli): We should use a shallow-copy of the + // proto here for better performance: + // https://github.com/golang/protobuf/issues/1155 + resp.Response = &gpb.SubscribeResponse_Update{Update: proto.Clone(notif).(*gpb.Notification)} + resp.GetUpdate().Prefix.Target = "" + } + } + return w.ServerStream.SendMsg(m) +} + +func newSubscribeTargetUpdateStream(s grpc.ServerStream, target string) grpc.ServerStream { + return &subscribeTargetUpdateStream{ + ServerStream: s, + target: target, + } +} + +// NewSubscribeTargetUpdateInterceptor returns a stream interceptor to populate +// gNMI Subscribe's target if it's not populated in the SubscribeRequest +// message in order to retrieve the device's data. +// +// ref: https://www.openconfig.net/docs/gnmi/gnmi-specification/#2221-path-target +func NewSubscribeTargetUpdateInterceptor(target string) func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + err := handler(srv, newSubscribeTargetUpdateStream(ss, target)) + if err != nil { + fmt.Printf("ERROR in interceptor stream: %v\n", err) + } + return err + } +} + type subscribeWithAuth struct { gpb.GNMI_SubscribeServer auth PathAuth diff --git a/gnmi/gnmi_test.go b/gnmi/gnmi_test.go index 7f43af5d..bfdffc37 100644 --- a/gnmi/gnmi_test.go +++ b/gnmi/gnmi_test.go @@ -43,6 +43,10 @@ import ( gpb "github.com/openconfig/gnmi/proto/gnmi" ) +const ( + targetName = "local" +) + func mustPath(s string) *gpb.Path { p, err := ygot.StringToStructuredPath(s) if err != nil { @@ -99,10 +103,11 @@ const ( ) type upd struct { - T updateType - TS int64 - Path string - Val interface{} + T updateType + TS int64 + Target string + Path string + Val interface{} } func (u *upd) String() string { @@ -110,7 +115,7 @@ func (u *upd) String() string { b.WriteString("<") switch u.T { case VAL: - b.WriteString(fmt.Sprintf("value, @%d %s=%v", u.TS, u.Path, u.Val)) + b.WriteString(fmt.Sprintf("value, @%d (%s) %s=%v", u.TS, u.Target, u.Path, u.Val)) case METACONNECTED: b.WriteString("meta/connected=true") case METASYNC: @@ -142,10 +147,11 @@ func toUpd(r *gpb.SubscribeResponse) []*upd { }) default: ret = append(ret, &upd{ - T: VAL, - TS: v.Update.GetTimestamp(), - Path: mustPathToString(u.Path), - Val: mustToScalar(u.Val), + T: VAL, + TS: v.Update.GetTimestamp(), + Target: v.Update.GetPrefix().GetTarget(), + Path: mustPathToString(u.Path), + Val: mustToScalar(u.Val), }) } } @@ -161,7 +167,7 @@ func toUpd(r *gpb.SubscribeResponse) []*upd { // errors encounted whilst setting it up. func startServer(s *Server) (string, error) { // Start gNMI server. - srv := grpc.NewServer() + srv := grpc.NewServer(grpc.StreamInterceptor(NewSubscribeTargetUpdateInterceptor(targetName))) gpb.RegisterGNMIServer(srv, s) // Forward streaming updates to clients. // Register listening port and start serving. @@ -181,35 +187,70 @@ func startServer(s *Server) (string, error) { return lis.Addr().String(), nil } -// TestONCE tests the subscribe mode of gnmit. -func TestONCE(t *testing.T) { - testGNMI(t, "/hello", false, false, false) -} +func TestSetAndSubscribeOnce(t *testing.T) { + tests := []struct { + desc string + pathStr string + useSet bool + config bool + internal bool + }{{ + desc: "subscribe-once", + pathStr: "/hello", + useSet: false, + config: false, + internal: false, + }, { + desc: "set-config", + pathStr: "/interfaces/interface[name=foo]/config/description", + useSet: true, + config: true, + internal: false, + }, { + desc: "set-state", + pathStr: "/interfaces/interface[name=foo]/state/description", + useSet: true, + config: false, + internal: false, + }, { + desc: "set-internal", + pathStr: "/test/foo", + useSet: true, + config: true, + internal: true, + }} -// TestSetConfig tests gnmi.Set on a config value. -// -// It purposely avoids using ygnmi in order to test lower-level details -// (e.g. timestamp metadata) -func TestSetConfig(t *testing.T) { - testGNMI(t, "/interfaces/interface[name=foo]/config/description", true, true, false) + for _, tt := range tests { + path := mustPath(tt.pathStr) + if tt.internal { + path.Origin = InternalOrigin + } + client, cleanup := testSetSubSetup(t, mustTargetPath(targetName, "", tt.useSet && !tt.internal), path, tt.useSet, tt.config) + defer cleanup() + t.Run(tt.desc+"-with-target", func(t *testing.T) { + testSetSub(t, client, path, tt.config, targetName, tt.useSet, tt.internal) + }) + t.Run(tt.desc+"-no-target", func(t *testing.T) { + testSetSub(t, client, path, tt.config, "", tt.useSet, tt.internal) + }) + // Run this again for repeatability (e.g. make sure Target in + // the notification didn't get overwritten). + t.Run(tt.desc+"-with-target-2", func(t *testing.T) { + testSetSub(t, client, path, tt.config, targetName, tt.useSet, tt.internal) + }) + t.Run(tt.desc+"-no-target-2", func(t *testing.T) { + testSetSub(t, client, path, tt.config, "", tt.useSet, tt.internal) + }) + } } -// TestSetState tests gnmi.Set on a state value. +// testSetSubSetup tests gnmi.Set and/or gnmi.Subscribe/ONCE on a config or state value. // // It purposely avoids using ygnmi in order to test lower-level details // (e.g. timestamp metadata) -func TestSetState(t *testing.T) { - testGNMI(t, "/interfaces/interface[name=foo]/state/description", true, false, false) -} - -// TestSetInternal tests that the server is able to handle schemaless queries. -func TestSetInternal(t *testing.T) { - testGNMI(t, "/test/foo", true, true, true) -} - -func testGNMI(t *testing.T, pathStr string, useSet, config, internal bool) { +func testSetSubSetup(t *testing.T, prefix, path *gpb.Path, useSet, config bool) (gpb.GNMIClient, func()) { ctx := context.Background() - gnmiServer, err := newServer(ctx, "local", useSet) + gnmiServer, err := newServer(ctx, targetName, useSet) if err != nil { t.Fatalf("cannot create server, got err: %v", err) } @@ -218,13 +259,27 @@ func testGNMI(t *testing.T, pathStr string, useSet, config, internal bool) { t.Fatalf("cannot start server, got err: %v", err) } - path := mustPath(pathStr) - if internal { - path.Origin = InternalOrigin + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(local.NewCredentials())) + if err != nil { + t.Fatalf("cannot dial gNMI server, %v", err) } - prefix := mustTargetPath("local", "", useSet && !internal) - if !useSet { + client := gpb.NewGNMIClient(conn) + + if useSet { + if !config { + ctx = metadata.AppendToOutgoingContext(ctx, GNMIModeMetadataKey, string(StateMode)) + } + if _, err := client.Set(metadata.AppendToOutgoingContext(ctx, TimestampMetadataKey, strconv.FormatInt(42, 10)), &gpb.SetRequest{ + Prefix: prefix, + Replace: []*gpb.Update{{ + Path: path, + Val: mustTypedValue("world"), + }}, + }); err != nil { + t.Fatalf("set request failed: %v", err) + } + } else { gnmiServer.c.TargetUpdate(&gpb.SubscribeResponse{ Response: &gpb.SubscribeResponse_Update{ Update: &gpb.Notification{ @@ -244,35 +299,24 @@ func testGNMI(t *testing.T, pathStr string, useSet, config, internal bool) { }) } + return client, func() { + gnmiServer.c.Stop() + } +} + +// testSetSub tests gnmi.Set and/or gnmi.Subscribe/ONCE on a config or state value. +// +// It purposely avoids using ygnmi in order to test lower-level details +// (e.g. timestamp metadata) +func testSetSub(t *testing.T, client gpb.GNMIClient, path *gpb.Path, config bool, wantTarget string, useSet, internal bool) { + prefix := mustTargetPath(wantTarget, "", useSet && !internal) + pathStr := mustPathToString(path) + got := []*upd{} clientCtx, cancel := context.WithCancel(context.Background()) var sendErr, recvErr error go func(ctx context.Context) { defer cancel() - conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(local.NewCredentials())) - if err != nil { - sendErr = fmt.Errorf("cannot dial gNMI server, %v", err) - return - } - - client := gpb.NewGNMIClient(conn) - - if useSet { - if !config { - ctx = metadata.AppendToOutgoingContext(ctx, GNMIModeMetadataKey, string(StateMode)) - } - if _, err := client.Set(metadata.AppendToOutgoingContext(ctx, TimestampMetadataKey, strconv.FormatInt(42, 10)), &gpb.SetRequest{ - Prefix: prefix, - Replace: []*gpb.Update{{ - Path: path, - Val: mustTypedValue("world"), - }}, - }); err != nil { - sendErr = fmt.Errorf("set request failed: %v", err) - return - } - } - subc, err := client.Subscribe(ctx) if err != nil { sendErr = err @@ -310,8 +354,6 @@ func testGNMI(t *testing.T, pathStr string, useSet, config, internal bool) { <-clientCtx.Done() - gnmiServer.c.Stop() - if sendErr != nil { t.Errorf("got unexpected send error, %v", sendErr) } @@ -326,10 +368,11 @@ func testGNMI(t *testing.T, pathStr string, useSet, config, internal bool) { } if diff := cmp.Diff(got, []*upd{{ - T: VAL, - TS: 42, - Path: pathStr, - Val: "world", + T: VAL, + TS: 42, + Target: wantTarget, + Path: pathStr, + Val: "world", }, { T: SYNC, }}, cmpOptions...); diff != "" { @@ -339,10 +382,11 @@ func testGNMI(t *testing.T, pathStr string, useSet, config, internal bool) { if config { // Test that timestamp is not 42: we don't want the timestamp metadata to affect config values. if cmp.Equal(got, []*upd{{ - T: VAL, - TS: 42, - Path: pathStr, - Val: "world", + T: VAL, + TS: 42, + Target: wantTarget, + Path: pathStr, + Val: "world", }, { T: SYNC, }}) { @@ -646,7 +690,7 @@ func TestSetYGNMI(t *testing.T) { wantErr: `"24" does not match regular expression pattern`, }} - gnmiServer, err := newServer(context.Background(), "local", true) + gnmiServer, err := newServer(context.Background(), targetName, true) if err != nil { t.Fatalf("cannot create server, got err: %v", err) } @@ -660,11 +704,11 @@ func TestSetYGNMI(t *testing.T) { if err != nil { t.Fatalf("cannot dial gNMI server, %v", err) } - configClient, err := ygnmi.NewClient(gpb.NewGNMIClient(conn), ygnmi.WithTarget("local")) + configClient, err := ygnmi.NewClient(gpb.NewGNMIClient(conn), ygnmi.WithTarget(targetName)) if err != nil { t.Fatalf("failed to create client: %v", err) } - stateClient, err := ygnmi.NewClient(gnmiServer.LocalClient(), ygnmi.WithTarget("local")) + stateClient, err := ygnmi.NewClient(gnmiServer.LocalClient(), ygnmi.WithTarget(targetName)) if err != nil { t.Fatalf("failed to create client: %v", err) } @@ -735,7 +779,7 @@ func TestSetWithAuth(t *testing.T) { }} for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - gnmiServer, err := newServer(context.Background(), "local", true) + gnmiServer, err := newServer(context.Background(), targetName, true) if err != nil { t.Fatalf("cannot create server, got err: %v", err) } @@ -749,7 +793,7 @@ func TestSetWithAuth(t *testing.T) { if err != nil { t.Fatalf("cannot dial gNMI server, %v", err) } - c, err := ygnmi.NewClient(gpb.NewGNMIClient(conn), ygnmi.WithTarget("local")) + c, err := ygnmi.NewClient(gpb.NewGNMIClient(conn), ygnmi.WithTarget(targetName)) if err != nil { t.Fatalf("failed to create client: %v", err) } @@ -768,7 +812,7 @@ func TestSetWithAuth(t *testing.T) { // TestSTREAM tests the STREAM mode of gnmit. func TestSTREAM(t *testing.T) { ctx := context.Background() - gnmiServer, err := newServer(ctx, "local", false) + gnmiServer, err := newServer(ctx, targetName, false) if err != nil { t.Fatalf("cannot create server, got err: %v", err) } @@ -780,7 +824,7 @@ func TestSTREAM(t *testing.T) { gnmiServer.c.TargetUpdate(&gpb.SubscribeResponse{ Response: &gpb.SubscribeResponse_Update{ Update: &gpb.Notification{ - Prefix: mustTargetPath("local", "", false), + Prefix: mustTargetPath(targetName, "", false), Timestamp: 42, Update: []*gpb.Update{{ Path: mustPath("/hello"), @@ -827,7 +871,7 @@ func TestSTREAM(t *testing.T) { sr := &gpb.SubscribeRequest{ Request: &gpb.SubscribeRequest_Subscribe{ Subscribe: &gpb.SubscriptionList{ - Prefix: mustTargetPath("local", "", false), + Prefix: mustTargetPath(targetName, "", false), Mode: gpb.SubscriptionList_STREAM, Subscription: []*gpb.Subscription{{ Path: mustPath("/"), @@ -871,7 +915,7 @@ func TestSTREAM(t *testing.T) { gnmiServer.c.TargetUpdate(&gpb.SubscribeResponse{ Response: &gpb.SubscribeResponse_Update{ Update: &gpb.Notification{ - Prefix: mustTargetPath("local", "", false), + Prefix: mustTargetPath(targetName, "", false), Timestamp: int64(42 + 1 + i), Update: []*gpb.Update{{ Path: mustPath("/hello"), @@ -921,32 +965,37 @@ func TestSTREAM(t *testing.T) { }, { T: METASYNC, }, { - T: VAL, - TS: 42, - Path: "/hello", - Val: "world", + T: VAL, + TS: 42, + Target: targetName, + Path: "/hello", + Val: "world", }, { T: SYNC, }, { - T: VAL, - TS: 43, - Path: "/hello", - Val: "mercury", + T: VAL, + TS: 43, + Target: targetName, + Path: "/hello", + Val: "mercury", }, { - T: VAL, - TS: 44, - Path: "/hello", - Val: "venus", + T: VAL, + TS: 44, + Target: targetName, + Path: "/hello", + Val: "venus", }, { - T: VAL, - TS: 45, - Path: "/hello", - Val: "earth", + T: VAL, + TS: 45, + Target: targetName, + Path: "/hello", + Val: "earth", }, { - T: VAL, - TS: 46, - Path: "/hello", - Val: "mars", + T: VAL, + TS: 46, + Target: targetName, + Path: "/hello", + Val: "mars", }}, cmpopts.SortSlices(func(a, b *upd) bool { if a.T != b.T { return a.T < b.T @@ -1002,7 +1051,7 @@ func TestSubscribeWithAuth(t *testing.T) { }} for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - gnmiServer, err := newServer(context.Background(), "local", false) + gnmiServer, err := newServer(context.Background(), targetName, false) if err != nil { t.Fatalf("cannot create server, got err: %v", err) } @@ -1014,7 +1063,7 @@ func TestSubscribeWithAuth(t *testing.T) { gnmiServer.c.TargetUpdate(&gpb.SubscribeResponse{ Response: &gpb.SubscribeResponse_Update{ Update: &gpb.Notification{ - Prefix: mustTargetPath("local", "", true), + Prefix: mustTargetPath(targetName, "", true), Timestamp: 1, Update: []*gpb.Update{{ Path: mustPath("/interfaces/interface[name=eth0]/state/oper-status"), @@ -1028,7 +1077,7 @@ func TestSubscribeWithAuth(t *testing.T) { if err != nil { t.Fatalf("cannot dial gNMI server, %v", err) } - c, err := ygnmi.NewClient(gpb.NewGNMIClient(conn), ygnmi.WithTarget("local")) + c, err := ygnmi.NewClient(gpb.NewGNMIClient(conn), ygnmi.WithTarget(targetName)) if err != nil { t.Fatalf("failed to create client: %v", err) } diff --git a/lemming.go b/lemming.go index 32e16408..54b93f53 100644 --- a/lemming.go +++ b/lemming.go @@ -188,6 +188,7 @@ func New(targetName, zapiURL string, opts ...Option) (*Device, error) { if creds != nil { grpcOpts = append(grpcOpts, grpc.Creds(creds)) } + grpcOpts = append(grpcOpts, grpc.StreamInterceptor(fgnmi.NewSubscribeTargetUpdateInterceptor(targetName))) s := grpc.NewServer(grpcOpts...) diff --git a/repositories.bzl b/repositories.bzl index 8ed66da9..4d0b01ba 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -2420,8 +2420,8 @@ def go_repositories(): go_repository( name = "com_google_cloud_go_gaming", importpath = "cloud.google.com/go/gaming", - sum = "h1:5qZmZEWzMf8GEFgm9NeC3bjFRpt7x4S6U7oLbxaf7N8=", - version = "v1.10.1", + sum = "h1:7vEhFnZmd931Mo7sZ6pJy7uQPDxF7m7v8xtBheG08tc=", + version = "v1.9.0", ) go_repository( name = "com_google_cloud_go_gkebackup", @@ -2450,8 +2450,8 @@ def go_repositories(): go_repository( name = "com_google_cloud_go_grafeas", importpath = "cloud.google.com/go/grafeas", - sum = "h1:oyTL/KjiUeBs9eYLw/40cpSZglUC+0F7X4iu/8t7NWs=", - version = "v0.3.0", + sum = "h1:CYjC+xzdPvbV65gi6Dr4YowKcmLo045pm18L0DhdELM=", + version = "v0.2.0", ) go_repository( name = "com_google_cloud_go_gsuiteaddons", @@ -3348,8 +3348,8 @@ def go_repositories(): go_repository( name = "org_golang_x_tools", importpath = "golang.org/x/tools", - sum = "h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo=", - version = "v0.9.1", + sum = "h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4=", + version = "v0.7.0", ) go_repository( name = "org_golang_x_xerrors",