diff --git a/go.mod b/go.mod index 8bdd2e89..9c385c3f 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/openconfig/gribigo v0.0.0-20230902004455-c7aff9365bec github.com/openconfig/kne v0.1.14 github.com/openconfig/ondatra v0.2.7 - github.com/openconfig/ygnmi v0.8.7 + github.com/openconfig/ygnmi v0.8.11 github.com/openconfig/ygot v0.29.11 github.com/p4lang/p4runtime v1.4.0-rc.5.0.20220728214547-13f0d02a521e github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index faf6196e..de9eb3dc 100644 --- a/go.sum +++ b/go.sum @@ -965,8 +965,8 @@ github.com/openconfig/lemming/operator v0.2.0/go.mod h1:LKgEXSR5VK2CAeh2uKijKAXF github.com/openconfig/ondatra v0.2.7 h1:abEI0qO4Q/BrXV/qi2unXn8dpkZFp9DF8rw6kZod5/E= github.com/openconfig/ondatra v0.2.7/go.mod h1:Vfwg/PvsupSJOxTxpQvF078MyHxJGvff3jIr4H0vgzs= github.com/openconfig/testt v0.0.0-20220311054427-efbb1a32ec07 h1:X631iD/B0ximGFb5P9LY5wHju4SiedxUhc5UZEo7VSw= -github.com/openconfig/ygnmi v0.8.7 h1:8K87+VztXhHqsU6/OYRnY/l/bGqFk+qU61mhhdxMCYo= -github.com/openconfig/ygnmi v0.8.7/go.mod h1:7up6qc9l9G4+Cfo37gzO0D7+2g4yqyW+xzh4vYsYTEE= +github.com/openconfig/ygnmi v0.8.11 h1:CCx2xl1jTwXeMh9G6B59k0BlRrKoz+K6eLTnowKQ6Vw= +github.com/openconfig/ygnmi v0.8.11/go.mod h1:WXnzNls/5ea6P9wx883ShLi5T/xa7sQjxn6SOH4kVwA= github.com/openconfig/ygot v0.6.0/go.mod h1:o30svNf7O0xK+R35tlx95odkDmZWS9JyWWQSmIhqwAs= github.com/openconfig/ygot v0.10.4/go.mod h1:oCQNdXnv7dWc8scTDgoFkauv1wwplJn5HspHcjlxSAQ= github.com/openconfig/ygot v0.13.2/go.mod h1:kJN0yCXIH07dOXvNBEFm3XxXdnDD5NI6K99tnD5x49c= diff --git a/integration_tests/twodut_oneotg_tests/bgp_triggered_gue/bgp_triggered_gue_test.go b/integration_tests/twodut_oneotg_tests/bgp_triggered_gue/bgp_triggered_gue_test.go index 8ae02d09..fa6733d2 100644 --- a/integration_tests/twodut_oneotg_tests/bgp_triggered_gue/bgp_triggered_gue_test.go +++ b/integration_tests/twodut_oneotg_tests/bgp_triggered_gue/bgp_triggered_gue_test.go @@ -519,8 +519,8 @@ func testTrafficAndEncap(t *testing.T, otg *otg.OTG, startingIP string, v6Traffi } var expectedPacketCounter int - const wantPacketN = 20 - for i := 0; i != wantPacketN; i++ { + const packetN = 20 + for i := 0; i != packetN; i++ { pkt, err := ps.NextPacket() if err != nil { t.Fatalf("error reading next packet: %v", err) @@ -587,7 +587,7 @@ func testTrafficAndEncap(t *testing.T, otg *otg.OTG, startingIP string, v6Traffi expectedPacketCounter++ } - if expectedPacketCounter < wantPacketN { + if wantPacketN := packetN - 1; expectedPacketCounter < wantPacketN { t.Errorf("Got less than %d expected packets: %v", wantPacketN, expectedPacketCounter) } else { t.Logf("Got %d expected packets.", expectedPacketCounter) diff --git a/lemming.go b/lemming.go index 54b93f53..850e780d 100644 --- a/lemming.go +++ b/lemming.go @@ -213,7 +213,7 @@ func New(targetName, zapiURL string, opts ...Option) (*Device, error) { if err != nil { return nil, err } - if err := sysribServer.Start(cacheClient, targetName, zapiURL); err != nil { + if err := sysribServer.Start(context.Background(), cacheClient, targetName, zapiURL); err != nil { return nil, fmt.Errorf("sysribServer failed to start: %v", err) } diff --git a/repositories.bzl b/repositories.bzl index 14fa25ba..a432fcdd 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -1611,8 +1611,8 @@ def go_repositories(): go_repository( name = "com_github_openconfig_ygnmi", importpath = "github.com/openconfig/ygnmi", - sum = "h1:8K87+VztXhHqsU6/OYRnY/l/bGqFk+qU61mhhdxMCYo=", - version = "v0.8.7", + sum = "h1:CCx2xl1jTwXeMh9G6B59k0BlRrKoz+K6eLTnowKQ6Vw=", + version = "v0.8.11", ) go_repository( name = "com_github_openconfig_ygot", diff --git a/sysrib/server.go b/sysrib/server.go index 5361e010..7f455686 100644 --- a/sysrib/server.go +++ b/sysrib/server.go @@ -112,24 +112,24 @@ type dplane struct { } // programRoute programs the route in the dataplane, returning an error on failure. -func (d *dplane) programRoute(r *ResolvedRoute) error { +func (d *dplane) programRoute(ctx context.Context, r *ResolvedRoute) error { log.V(1).Infof("sysrib: programming resolved route: %+v", r) rr, err := resolvedRouteToRouteRequest(r) if err != nil { return err } - _, err = ygnmi.Replace(context.TODO(), d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr()), rr, ygnmi.WithSetFallbackEncoding()) + _, err = ygnmi.Replace(ctx, d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr()), rr, ygnmi.WithSetFallbackEncoding()) return err } // deprogramRoute de-programs the route in the dataplane, returning an error on failure. -func (d *dplane) deprogramRoute(r *ResolvedRoute) error { +func (d *dplane) deprogramRoute(ctx context.Context, r *ResolvedRoute) error { log.V(1).Infof("sysrib: deprogramming newly unresolved route: %+v", r) rr, err := resolvedRouteToRouteRequest(r) if err != nil { return err } - _, err = ygnmi.Delete(context.TODO(), d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr())) + _, err = ygnmi.Delete(ctx, d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr())) return err } @@ -157,7 +157,7 @@ func New(cfg *oc.Root) (*Server, error) { // sysrib during their initialization. // // - If zapiURL is not specified, then the ZAPI server will not be started. -func (s *Server) Start(gClient gpb.GNMIClient, target, zapiURL string) error { +func (s *Server) Start(ctx context.Context, gClient gpb.GNMIClient, target, zapiURL string) error { if s == nil { return errors.New("cannot start nil sysrib server") } @@ -168,15 +168,15 @@ func (s *Server) Start(gClient gpb.GNMIClient, target, zapiURL string) error { } s.dataplane = dplane{Client: yclient} - if err := s.monitorConnectedIntfs(yclient); err != nil { + if err := s.monitorConnectedIntfs(ctx, yclient); err != nil { return err } - if err := s.monitorBGPGUEPolicies(yclient); err != nil { + if err := s.monitorBGPGUEPolicies(ctx, yclient); err != nil { return err } - if err := s.monitorStaticRoutes(yclient); err != nil { + if err := s.monitorStaticRoutes(ctx, yclient); err != nil { return err } @@ -195,7 +195,7 @@ func (s *Server) Start(gClient gpb.GNMIClient, target, zapiURL string) error { // BEGIN Start ZAPI server. if zapiURL != "" { - if s.zServer, err = StartZServer(zapiURL, 0, s); err != nil { + if s.zServer, err = StartZServer(ctx, zapiURL, 0, s); err != nil { return err } } @@ -215,7 +215,7 @@ func (s *Server) Stop() { // // - gnmiServerAddr is the address of the central gNMI datastore. // - target is the name of the gNMI target. -func (s *Server) monitorConnectedIntfs(yclient *ygnmi.Client) error { +func (s *Server) monitorConnectedIntfs(ctx context.Context, yclient *ygnmi.Client) error { b := &ocpath.Batch{} b.AddPaths( ocpath.Root().InterfaceAny().Enabled().State().PathStruct(), @@ -227,7 +227,7 @@ func (s *Server) monitorConnectedIntfs(yclient *ygnmi.Client) error { ) interfaceWatcher := ygnmi.Watch( - context.Background(), + ctx, yclient, b.State(), func(root *ygnmi.Value[*oc.Root]) error { @@ -239,19 +239,19 @@ func (s *Server) monitorConnectedIntfs(yclient *ygnmi.Client) error { if intf.Enabled != nil { if intf.Ifindex != nil { ifindex := intf.GetIfindex() - s.setInterface(name, int32(ifindex), intf.GetEnabled()) + s.setInterface(ctx, name, int32(ifindex), intf.GetEnabled()) // TODO(wenbli): Support other VRFs. if subintf := intf.GetSubinterface(0); subintf != nil { for _, addr := range subintf.GetOrCreateIpv4().Address { if addr.Ip != nil && addr.PrefixLength != nil { - if err := s.addInterfacePrefix(name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil { + if err := s.addInterfacePrefix(ctx, name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil { log.Warningf("adding interface prefix failed: %v", err) } } } for _, addr := range subintf.GetOrCreateIpv6().Address { if addr.Ip != nil && addr.PrefixLength != nil { - if err := s.addInterfacePrefix(name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil { + if err := s.addInterfacePrefix(ctx, name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil { log.Warningf("adding interface prefix failed: %v", err) } } @@ -276,7 +276,7 @@ func (s *Server) monitorConnectedIntfs(yclient *ygnmi.Client) error { // monitorBGPGUEPolicies starts a gothread to check for BGP GUE policies being // added or deleted from the config, and informs the sysrib server accordingly // to update programmed routes. -func (s *Server) monitorBGPGUEPolicies(yclient *ygnmi.Client) error { +func (s *Server) monitorBGPGUEPolicies(ctx context.Context, yclient *ygnmi.Client) error { b := &ocpath.Batch{} b.AddPaths( ocpath.Root().BgpGueIpv4GlobalPolicyAny().Prefix().Config().PathStruct(), @@ -289,7 +289,7 @@ func (s *Server) monitorBGPGUEPolicies(yclient *ygnmi.Client) error { ) bgpGUEPolicyWatcher := ygnmi.Watch( - context.Background(), + ctx, yclient, b.Config(), func(root *ygnmi.Value[*oc.Root]) error { @@ -303,7 +303,7 @@ func (s *Server) monitorBGPGUEPolicies(yclient *ygnmi.Client) error { policiesFound[prefix] = true if existingPolicy := s.bgpGUEPolicies[prefix]; policy != existingPolicy { log.V(1).Infof("Adding new/updated BGP GUE policy: %s: %v", prefix, policy) - if err := s.setGUEPolicy(prefix, policy); err != nil { + if err := s.setGUEPolicy(ctx, prefix, policy); err != nil { log.Errorf("Failed while setting BGP GUE Policy: %v", err) } else { s.bgpGUEPolicies[prefix] = policy @@ -360,7 +360,7 @@ func (s *Server) monitorBGPGUEPolicies(yclient *ygnmi.Client) error { for prefix := range s.bgpGUEPolicies { if _, ok := policiesFound[prefix]; !ok { log.Infof("Deleting incomplete/non-existent policy: %s", prefix) - if err := s.deleteGUEPolicy(prefix); err != nil { + if err := s.deleteGUEPolicy(ctx, prefix); err != nil { log.Errorf("Failed while deleting BGP GUE Policy: %v", err) } else { delete(s.bgpGUEPolicies, prefix) @@ -545,7 +545,7 @@ func resolvedRouteToRouteRequest(r *ResolvedRoute) (*dpb.Route, error) { // ResolveAndProgramDiff walks through each prefix in the RIB, resolving it and // programs the forwarding plane. -func (s *Server) ResolveAndProgramDiff() error { +func (s *Server) ResolveAndProgramDiff(ctx context.Context) error { log.Info("Recalculating resolved RIB") if debugRIB { defer s.rib.PrintRIB() @@ -557,10 +557,10 @@ func (s *Server) ResolveAndProgramDiff() error { newResolvedRoutes := map[RouteKey]*Route{} for niName, ni := range s.rib.NI { for it := ni.IPV4.Iterate(); it.Next(); { - s.resolveAndProgramDiffAux(niName, ni, it.Address().String(), newResolvedRoutes) + s.resolveAndProgramDiffAux(ctx, niName, ni, it.Address().String(), newResolvedRoutes) } for it := ni.IPV6.Iterate(); it.Next(); { - s.resolveAndProgramDiffAux(niName, ni, it.Address().String(), newResolvedRoutes) + s.resolveAndProgramDiffAux(ctx, niName, ni, it.Address().String(), newResolvedRoutes) } } @@ -568,7 +568,7 @@ func (s *Server) ResolveAndProgramDiff() error { for routeKey, rr := range s.ProgrammedRoutes() { if _, ok := newResolvedRoutes[routeKey]; !ok { log.V(1).Infof("Deleting route %s", rr.RouteKey) - if err := s.dataplane.deprogramRoute(rr); err != nil { + if err := s.dataplane.deprogramRoute(ctx, rr); err != nil { log.Warningf("failed to deprogram route %+v: %v", rr, err) continue } @@ -593,7 +593,7 @@ func (s *Server) ResolveAndProgramDiff() error { // It carries out the following functionalities: // - Resolve a single route specified by prefix and program if it's different. // - Populate the resolved route into newResolvedRoutes. -func (s *Server) resolveAndProgramDiffAux(niName string, ni *NIRIB, prefix string, newResolvedRoutes map[RouteKey]*Route) { +func (s *Server) resolveAndProgramDiffAux(ctx context.Context, niName string, ni *NIRIB, prefix string, newResolvedRoutes map[RouteKey]*Route) { log.V(1).Infof("Iterating at prefix %v (v4 has %d tags) (v6 has %d tags)", prefix, ni.IPV4.CountTags(), ni.IPV6.CountTags()) _, pfx, err := net.ParseCIDR(prefix) if err != nil { @@ -631,7 +631,7 @@ func (s *Server) resolveAndProgramDiffAux(niName string, ni *NIRIB, prefix strin switch { case routeIsResolved && !reflect.DeepEqual(currentRoute, rr): log.V(1).Infof("(-currentRoute, +resolvedRoute):\n%s", cmp.Diff(currentRoute, rr)) - if err := s.dataplane.programRoute(rr); err != nil { + if err := s.dataplane.programRoute(ctx, rr); err != nil { log.Warningf("failed to program route %+v: %v", rr, err) return } @@ -663,7 +663,7 @@ func (s *Server) ProgrammedRoutes() map[RouteKey]*ResolvedRoute { } // SetRoute implements ROUTE_ADD and ROUTE_DELETE -func (s *Server) SetRoute(_ context.Context, req *sysribpb.SetRouteRequest) (*sysribpb.SetRouteResponse, error) { +func (s *Server) SetRoute(ctx context.Context, req *sysribpb.SetRouteRequest) (*sysribpb.SetRouteResponse, error) { pfx, err := prefixString(req.Prefix) if err != nil { return nil, err @@ -684,7 +684,7 @@ func (s *Server) SetRoute(_ context.Context, req *sysribpb.SetRouteRequest) (*sy // TODO(wenbli): Check if recursive resolution is an infinite recursion. This happens if there is a cycle. niName := vrfIDToNiName(req.GetVrfId()) - if err := s.setRoute(niName, &Route{ + if err := s.setRoute(ctx, niName, &Route{ // TODO(wenbli): check if pfx has to be canonical or does it tolerate it: i.e. 1.1.1.0/24 instead of 1.1.1.1/24 Prefix: pfx, NextHops: nexthops, @@ -709,21 +709,21 @@ func (s *Server) SetRoute(_ context.Context, req *sysribpb.SetRouteRequest) (*sy } // setRoute adds/deletes a route from the RIB manager. -func (s *Server) setRoute(niName string, route *Route, isDelete bool) error { +func (s *Server) setRoute(ctx context.Context, niName string, route *Route, isDelete bool) error { if err := s.rib.setRoute(niName, route, isDelete); err != nil { return fmt.Errorf("error while adding route to sysrib: %v", err) } - if err := s.ResolveAndProgramDiff(); err != nil { + if err := s.ResolveAndProgramDiff(ctx); err != nil { return fmt.Errorf("error while resolving sysrib: %v", err) } return nil } // addInterfacePrefix adds a prefix to the sysrib as a connected route. -func (s *Server) addInterfacePrefix(name string, ifindex int32, prefix string, niName string) error { +func (s *Server) addInterfacePrefix(ctx context.Context, name string, ifindex int32, prefix string, niName string) error { log.V(1).Infof("Adding interface prefix: intf %s, idx %d, prefix %s, ni %s", name, ifindex, prefix, niName) - return s.setRoute(niName, &Route{ + return s.setRoute(ctx, niName, &Route{ Prefix: prefix, Connected: &Interface{ Name: name, @@ -737,7 +737,7 @@ func (s *Server) addInterfacePrefix(name string, ifindex int32, prefix string, n } // setInterface responds to INTERFACE_UP/INTERFACE_DOWN messages from the dataplane. -func (s *Server) setInterface(name string, ifindex int32, enabled bool) error { +func (s *Server) setInterface(ctx context.Context, name string, ifindex int32, enabled bool) error { log.V(1).Infof("Setting interface %q(%d) to enabled=%v", name, ifindex, enabled) s.interfacesMu.Lock() s.interfaces[Interface{ @@ -746,7 +746,7 @@ func (s *Server) setInterface(name string, ifindex int32, enabled bool) error { }] = enabled s.interfacesMu.Unlock() - return s.ResolveAndProgramDiff() + return s.ResolveAndProgramDiff(ctx) } // TODO(wenbli): Do we need to handle interface deletion? @@ -754,12 +754,12 @@ func (s *Server) setInterface(name string, ifindex int32, enabled bool) error { // setGUEPolicy adds a new GUE policy and triggers resolved route // computation and programming. -func (s *Server) setGUEPolicy(prefix string, policy GUEPolicy) error { +func (s *Server) setGUEPolicy(ctx context.Context, prefix string, policy GUEPolicy) error { if err := s.rib.SetGUEPolicy(prefix, policy); err != nil { return fmt.Errorf("error while adding route to sysrib: %v", err) } - if err := s.ResolveAndProgramDiff(); err != nil { + if err := s.ResolveAndProgramDiff(ctx); err != nil { return fmt.Errorf("error while resolving sysrib: %v", err) } return nil @@ -767,12 +767,12 @@ func (s *Server) setGUEPolicy(prefix string, policy GUEPolicy) error { // deleteGUEPolicy adds a new GUE policy and triggers resolved route // computation and programming. -func (s *Server) deleteGUEPolicy(prefix string) error { +func (s *Server) deleteGUEPolicy(ctx context.Context, prefix string) error { if _, err := s.rib.DeleteGUEPolicy(prefix); err != nil { return fmt.Errorf("error while adding route to sysrib: %v", err) } - if err := s.ResolveAndProgramDiff(); err != nil { + if err := s.ResolveAndProgramDiff(ctx); err != nil { return fmt.Errorf("error while resolving sysrib: %v", err) } return nil diff --git a/sysrib/server_test.go b/sysrib/server_test.go index 6e8134d2..486d66b1 100644 --- a/sysrib/server_test.go +++ b/sysrib/server_test.go @@ -201,7 +201,7 @@ type SetAndProgramPair struct { ResolvedRoutes []*ResolvedRoute } -func getConnectedIntfSetupVars() ([]*AddIntfAction, []*dpb.Route) { +func getConnectedIntfSetupVarsV4() ([]*AddIntfAction, []*dpb.Route) { return []*AddIntfAction{{ name: "eth0", ifindex: 0, @@ -326,16 +326,23 @@ func getConnectedIntfSetupVars() ([]*AddIntfAction, []*dpb.Route) { }} } -func TestServer(t *testing.T) { - routesQuery := programmedRoutesQuery(t) - inInterfaces, wantConnectedRoutes := getConnectedIntfSetupVars() - inInterface6s, wantConnectedRoute6s := getConnectedIntfSetupVars() +func getConnectedIntfSetupVars(t *testing.T) ([]*AddIntfAction, []*dpb.Route) { + inInterfaces, wantConnectedRoutes := getConnectedIntfSetupVarsV4() + inInterface6s, wantConnectedRoute6s := getConnectedIntfSetupVarsV4() for _, intf := range inInterface6s { intf.prefix = mapAddressTo6(t, intf.prefix) } for _, route := range wantConnectedRoute6s { mapResolvedRouteTo6(t, route) } + inInterfaces = append(inInterfaces, inInterface6s...) + wantConnectedRoutes = append(wantConnectedRoutes, wantConnectedRoute6s...) + return inInterfaces, wantConnectedRoutes +} + +func TestServer(t *testing.T) { + routesQuery := programmedRoutesQuery(t) + inInterfaces, wantConnectedRoutes := getConnectedIntfSetupVars(t) tests := []struct { desc string @@ -994,10 +1001,59 @@ func TestServer(t *testing.T) { }}, }} + grpcServer := grpc.NewServer() + gnmiServer, err := gnmi.New(grpcServer, "local", nil) + if err != nil { + t.Fatal(err) + } + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to start listener: %v", err) + } + go func() { + grpcServer.Serve(lis) + }() + + s, err := New(nil) + if err != nil { + t.Fatal(err) + } + + // Update the interface configuration on the gNMI server. + client := gnmiServer.LocalClient() + if err := s.Start(context.Background(), client, "local", ""); err != nil { + t.Fatalf("cannot start sysrib server, %v", err) + } + defer s.Stop() + + c, err := ygnmi.NewClient(client, ygnmi.WithTarget("local")) + if err != nil { + t.Fatalf("cannot create ygnmi client: %v", err) + } + + for _, intf := range inInterfaces { + configureInterface(t, intf, c) + } + + // Wait for Sysrib to pick up the connected prefixes. + for i := 0; i != maxGNMIWaitQuanta; i++ { + var routes []*dpb.Route + routes, err = ygnmi.GetAll(context.Background(), c, routesQuery) + if err == nil { + if diff := cmp.Diff(wantConnectedRoutes, routes, protocmp.Transform(), protocmp.SortRepeatedFields(new(dpb.NextHopList), "hops")); diff != "" { + err = fmt.Errorf("routes not equal to wantConnectedRoutes (-want, +got):\n%s", diff) + } else { + break + } + } + time.Sleep(100 * time.Millisecond) + } + if err != nil { + t.Fatalf("After initial interface operations: %v", err) + } + for _, tt := range tests { t.Run(tt.desc, func(t *testing.T) { - inInterfaces := inInterfaces - wantConnectedRoutes := wantConnectedRoutes for _, v4 := range []bool{true, false} { desc := "v4" if !v4 { @@ -1006,8 +1062,6 @@ func TestServer(t *testing.T) { } desc = "v6" // Convert all v4 addresses to v6. - inInterfaces = inInterface6s - wantConnectedRoutes = wantConnectedRoute6s for _, req := range tt.inSetRouteRequests { mapPrefixTo6(t, req.RouteReq.Prefix) for _, nh := range req.RouteReq.Nexthops { @@ -1023,58 +1077,6 @@ func TestServer(t *testing.T) { } t.Run(desc, func(t *testing.T) { - // TODO(wenbli): Don't re-create gNMI server, simply erase it and then reconnect to it afterwards. - grpcServer := grpc.NewServer() - gnmiServer, err := gnmi.New(grpcServer, "local", nil) - if err != nil { - t.Fatal(err) - } - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Failed to start listener: %v", err) - } - go func() { - grpcServer.Serve(lis) - }() - - s, err := New(nil) - if err != nil { - t.Fatal(err) - } - - // Update the interface configuration on the gNMI server. - client := gnmiServer.LocalClient() - if err := s.Start(client, "local", ""); err != nil { - t.Fatalf("cannot start sysrib server, %v", err) - } - defer s.Stop() - - c, err := ygnmi.NewClient(client, ygnmi.WithTarget("local")) - if err != nil { - t.Fatalf("cannot create ygnmi client: %v", err) - } - - for _, intf := range inInterfaces { - configureInterface(t, intf, c) - } - - // Wait for Sysrib to pick up the connected prefixes. - for i := 0; i != maxGNMIWaitQuanta; i++ { - var routes []*dpb.Route - routes, err = ygnmi.GetAll(context.Background(), c, routesQuery) - if err == nil { - if diff := cmp.Diff(wantConnectedRoutes, routes, protocmp.Transform(), protocmp.SortRepeatedFields(new(dpb.NextHopList), "hops")); diff != "" { - err = fmt.Errorf("routes not equal to wantConnectedRoutes (-want, +got):\n%s", diff) - } else { - break - } - } - time.Sleep(100 * time.Millisecond) - } - if err != nil { - t.Fatalf("After initial interface operations: %v", err) - } - for _, req := range tt.inSetRouteRequests { // TODO(wenbli): Test SetRouteResponse _, err := s.SetRoute(context.Background(), req.RouteReq) @@ -1093,6 +1095,25 @@ func TestServer(t *testing.T) { })); diff != "" { t.Errorf("routes not equal to wantRoutes (-want, +got):\n%s", diff) } + + // Clean-up + for _, req := range tt.inSetRouteRequests { + isDelete := req.RouteReq.Delete + req.RouteReq.Delete = true + _, err := s.SetRoute(context.Background(), req.RouteReq) + req.RouteReq.Delete = isDelete + hasErr := err != nil + if hasErr != tt.wantErr { + t.Fatalf("%s: got error during call to SetRoute: %v, wantErr: %v", req.Desc, err, tt.wantErr) + } + } + + if routes, err = ygnmi.GetAll(context.Background(), c, routesQuery); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(wantConnectedRoutes, routes, protocmp.Transform(), protocmp.SortRepeatedFields(new(dpb.NextHopList), "hops")); diff != "" { + t.Errorf("routes not equal to wantConnectedRoutes (-want, +got):\n%s", diff) + } }) } }) @@ -1109,14 +1130,7 @@ func gueHeader(t *testing.T, layers ...gopacket.SerializableLayer) []byte { func TestBGPGUEPolicy(t *testing.T) { routesQuery := programmedRoutesQuery(t) - inInterfaces, wantConnectedRoutes := getConnectedIntfSetupVars() - inInterface6s, wantConnectedRoute6s := getConnectedIntfSetupVars() - for _, intf := range inInterface6s { - intf.prefix = mapAddressTo6(t, intf.prefix) - } - for _, route := range wantConnectedRoute6s { - mapResolvedRouteTo6(t, route) - } + inInterfaces, wantConnectedRoutes := getConnectedIntfSetupVars(t) // Note: This is a sequential test -- each test case depends on the previous one. tests := []struct { @@ -2426,68 +2440,63 @@ func TestBGPGUEPolicy(t *testing.T) { }} for _, v4 := range []bool{true, false} { - inInterfaces := inInterfaces - wantConnectedRoutes := wantConnectedRoutes desc := "v4" if !v4 { desc = "v6" - // Convert all v4 addresses to v6. - inInterfaces = inInterface6s - wantConnectedRoutes = wantConnectedRoute6s } - t.Run(desc, func(t *testing.T) { - grpcServer := grpc.NewServer() - gnmiServer, err := gnmi.New(grpcServer, "local", nil) - if err != nil { - t.Fatal(err) - } - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Failed to start listener: %v", err) - } - go func() { - grpcServer.Serve(lis) - }() + grpcServer := grpc.NewServer() + gnmiServer, err := gnmi.New(grpcServer, "local", nil) + if err != nil { + t.Fatal(err) + } + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to start listener: %v", err) + } + go func() { + grpcServer.Serve(lis) + }() - s, err := New(nil) - if err != nil { - t.Fatal(err) - } + s, err := New(nil) + if err != nil { + t.Fatal(err) + } - // Update the interface configuration on the gNMI server. - client := gnmiServer.LocalClient() - if err := s.Start(client, "local", ""); err != nil { - t.Fatalf("cannot start sysrib server, %v", err) - } - defer s.Stop() + // Update the interface configuration on the gNMI server. + client := gnmiServer.LocalClient() + if err := s.Start(context.Background(), client, "local", ""); err != nil { + t.Fatalf("cannot start sysrib server, %v", err) + } + defer s.Stop() - c, err := ygnmi.NewClient(client, ygnmi.WithTarget("local")) - if err != nil { - t.Fatalf("cannot create ygnmi client: %v", err) - } + c, err := ygnmi.NewClient(client, ygnmi.WithTarget("local")) + if err != nil { + t.Fatalf("cannot create ygnmi client: %v", err) + } - for _, intf := range inInterfaces { - configureInterface(t, intf, c) - } + for _, intf := range inInterfaces { + configureInterface(t, intf, c) + } - // Wait for Sysrib to pick up the connected prefixes. - for i := 0; i != maxGNMIWaitQuanta; i++ { - var routes []*dpb.Route - routes, err = ygnmi.GetAll(context.Background(), c, routesQuery) - if err == nil { - if diff := cmp.Diff(wantConnectedRoutes, routes, protocmp.Transform(), protocmp.SortRepeatedFields(new(dpb.NextHopList), "hops")); diff != "" { - err = fmt.Errorf("routes not equal to wantConnectedRoutes (-want, +got):\n%s", diff) - } else { - break - } + // Wait for Sysrib to pick up the connected prefixes. + for i := 0; i != maxGNMIWaitQuanta; i++ { + var routes []*dpb.Route + routes, err = ygnmi.GetAll(context.Background(), c, routesQuery) + if err == nil { + if diff := cmp.Diff(wantConnectedRoutes, routes, protocmp.Transform(), protocmp.SortRepeatedFields(new(dpb.NextHopList), "hops")); diff != "" { + err = fmt.Errorf("routes not equal to wantConnectedRoutes (-want, +got):\n%s", diff) + } else { + break } - time.Sleep(100 * time.Millisecond) - } - if err != nil { - t.Fatalf("After initial interface operations: %v", err) } + time.Sleep(100 * time.Millisecond) + } + if err != nil { + t.Fatalf("After initial interface operations: %v", err) + } + t.Run(desc, func(t *testing.T) { for _, tt := range tests { if v4 && tt.skipV4 { continue @@ -2529,10 +2538,10 @@ func TestBGPGUEPolicy(t *testing.T) { } } for prefix, policy := range tt.inAddPolicies { - s.setGUEPolicy(prefix, policy) + s.setGUEPolicy(context.Background(), prefix, policy) } for _, prefix := range tt.inDeletePolicies { - s.deleteGUEPolicy(prefix) + s.deleteGUEPolicy(context.Background(), prefix) } routes, err := ygnmi.GetAll(context.Background(), c, routesQuery) diff --git a/sysrib/server_zapi.go b/sysrib/server_zapi.go index 38204823..98b43c95 100644 --- a/sysrib/server_zapi.go +++ b/sysrib/server_zapi.go @@ -15,6 +15,7 @@ package sysrib import ( + "context" "fmt" "net" @@ -83,13 +84,13 @@ func convertToZAPIRoute(routeKey RouteKey, route *Route, rr *ResolvedRoute) (*ze } // setZebraRoute calls setRoute after reformatting a zebra-formatted input route. -func (s *Server) setZebraRoute(niName string, zroute *zebra.IPRouteBody) error { +func (s *Server) setZebraRoute(ctx context.Context, niName string, zroute *zebra.IPRouteBody) error { if s == nil { return fmt.Errorf("cannot add route to nil sysrib server") } log.V(1).Infof("setZebraRoute: %+v", *zroute) route := convertZebraRoute(niName, zroute) - return s.setRoute(niName, route, false) + return s.setRoute(ctx, niName, route, false) } // convertZebraRoute converts a zebra route to a Sysrib route. diff --git a/sysrib/static.go b/sysrib/static.go index e65e2ad8..fe98d502 100644 --- a/sysrib/static.go +++ b/sysrib/static.go @@ -44,9 +44,6 @@ func convertStaticRoute(sroute *oc.NetworkInstance_Protocol_Static) *Route { log.Warningf("sysrib: Unhandled static route nexthop type (%T): %v", nh, nh) } } - if len(nexthops) == 0 { - return nil - } return &Route{ Prefix: *sroute.Prefix, NextHops: nexthops, @@ -59,10 +56,12 @@ func convertStaticRoute(sroute *oc.NetworkInstance_Protocol_Static) *Route { // monitorStaticRoutes starts a gothread to check for static route // configuration changes. // It returns an error if there is an error before monitoring can begin. -func (s *Server) monitorStaticRoutes(yclient *ygnmi.Client) error { - b := &ocpath.Batch{} +func (s *Server) monitorStaticRoutes(ctx context.Context, yclient *ygnmi.Client) error { staticroot := ocpath.Root().NetworkInstance(fakedevice.DefaultNetworkInstance).Protocol(oc.PolicyTypes_INSTALL_PROTOCOL_TYPE_STATIC, fakedevice.StaticRoutingProtocol) staticpath := staticroot.StaticAny() + staticpathMap := staticroot.StaticMap() + + b := ygnmi.NewBatch[map[string]*oc.NetworkInstance_Protocol_Static](staticpathMap.Config()) b.AddPaths( staticpath.NextHopAny().NextHop().Config().PathStruct(), // TODO(wenbli): Handle these paths. @@ -73,24 +72,23 @@ func (s *Server) monitorStaticRoutes(yclient *ygnmi.Client) error { ) staticRouteWatcher := ygnmi.Watch( - context.Background(), + ctx, yclient, - b.Config(), - func(root *ygnmi.Value[*oc.Root]) error { - rootVal, ok := root.Val() + b.Query(), + func(static *ygnmi.Value[map[string]*oc.NetworkInstance_Protocol_Static]) error { + staticMap, ok := static.Val() if !ok { return ygnmi.Continue } - staticp := rootVal.GetOrCreateNetworkInstance(fakedevice.DefaultNetworkInstance).GetOrCreateProtocol(oc.PolicyTypes_INSTALL_PROTOCOL_TYPE_STATIC, fakedevice.StaticRoutingProtocol) - for _, sroute := range staticp.Static { + for _, sroute := range staticMap { if sroute == nil || sroute.Prefix == nil { continue } if route := convertStaticRoute(sroute); route != nil { - if err := s.setRoute(fakedevice.DefaultNetworkInstance, route, false); err != nil { + if err := s.setRoute(ctx, fakedevice.DefaultNetworkInstance, route, false); err != nil { log.Warningf("Failed to add static route: %v", err) } else { - gnmiclient.Replace(context.Background(), yclient, staticroot.Static(sroute.GetPrefix()).State(), sroute) + gnmiclient.Replace(ctx, yclient, staticroot.Static(sroute.GetPrefix()).State(), sroute) } } } diff --git a/sysrib/zapi.go b/sysrib/zapi.go index a3858c72..c3e953a7 100644 --- a/sysrib/zapi.go +++ b/sysrib/zapi.go @@ -29,6 +29,7 @@ package sysrib import ( + "context" "fmt" "net" "os" @@ -75,7 +76,7 @@ type ZServer struct { // redistributed routes. // // TODO: vrfID is not well-integrated with the sysrib. -func StartZServer(address string, vrfID uint32, sysrib *Server) (*ZServer, error) { +func StartZServer(ctx context.Context, address string, vrfID uint32, sysrib *Server) (*ZServer, error) { l := strings.SplitN(address, ":", 2) if len(l) != 2 { return nil, fmt.Errorf("unsupported ZAPI url, has to be \"protocol:address\", got: %s", address) @@ -128,7 +129,7 @@ func StartZServer(address string, vrfID uint32, sysrib *Server) (*ZServer, error client.zServer = zServer // Handle connections in a new go routine. - go client.HandleRequest(conn, vrfID) + go client.HandleRequest(ctx, conn, vrfID) } }() @@ -209,7 +210,7 @@ func (c *Client) RedistributeResolvedRoutes(conn net.Conn) { } // HandleRequest handles an incoming ZAPI client connection. -func (c *Client) HandleRequest(conn net.Conn, vrfID uint32) { +func (c *Client) HandleRequest(ctx context.Context, conn net.Conn, vrfID uint32) { version := zebra.MaxZapiVer software := zebra.MaxSoftware defer func() { @@ -260,7 +261,7 @@ func (c *Client) HandleRequest(conn net.Conn, vrfID uint32) { "Topic": "Sysrib", "Message": m, }) - if err := c.zServer.sysrib.setZebraRoute(vrfIDToNiName(vrfID), m.Body.(*zebra.IPRouteBody)); err != nil { + if err := c.zServer.sysrib.setZebraRoute(ctx, vrfIDToNiName(vrfID), m.Body.(*zebra.IPRouteBody)); err != nil { topicLogger.Warn(fmt.Sprintf("Could not add route to sysrib: %v", err), bgplog.Fields{ "Topic": "Sysrib", diff --git a/sysrib/zapi_test.go b/sysrib/zapi_test.go index 1968c91d..6063af76 100644 --- a/sysrib/zapi_test.go +++ b/sysrib/zapi_test.go @@ -77,7 +77,7 @@ func ZAPIServerStart(t *testing.T) *ZServer { t.Fatal(err) } - s, err := StartZServer("unix:/tmp/zserv.api", 0, sysribServer) + s, err := StartZServer(context.Background(), "unix:/tmp/zserv.api", 0, sysribServer) if err != nil { t.Fatal(err) } @@ -247,7 +247,7 @@ func testRouteRedistribution(t *testing.T, routeReadyBeforeDial bool) { t.Fatal(err) } client := gnmiServer.LocalClient() - if err := s.Start(client, "local", "unix:/tmp/zserv.api"); err != nil { + if err := s.Start(context.Background(), client, "local", "unix:/tmp/zserv.api"); err != nil { t.Fatalf("cannot start sysrib server, %v", err) } defer s.Stop()