Skip to content

Commit

Permalink
Merge pull request #301 from openconfig/context-and-other-fixes
Browse files Browse the repository at this point in the history
Add context when programming routes and use batch on list node
  • Loading branch information
wenovus authored Oct 17, 2023
2 parents 1af3b0a + 5bc0200 commit b93be19
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 186 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/openconfig/gribigo v0.0.0-20230902004455-c7aff9365bec
github.com/openconfig/kne v0.1.14
github.com/openconfig/ondatra v0.2.7
github.com/openconfig/ygnmi v0.8.7
github.com/openconfig/ygnmi v0.8.11
github.com/openconfig/ygot v0.29.11
github.com/p4lang/p4runtime v1.4.0-rc.5.0.20220728214547-13f0d02a521e
github.com/sirupsen/logrus v1.9.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -965,8 +965,8 @@ github.com/openconfig/lemming/operator v0.2.0/go.mod h1:LKgEXSR5VK2CAeh2uKijKAXF
github.com/openconfig/ondatra v0.2.7 h1:abEI0qO4Q/BrXV/qi2unXn8dpkZFp9DF8rw6kZod5/E=
github.com/openconfig/ondatra v0.2.7/go.mod h1:Vfwg/PvsupSJOxTxpQvF078MyHxJGvff3jIr4H0vgzs=
github.com/openconfig/testt v0.0.0-20220311054427-efbb1a32ec07 h1:X631iD/B0ximGFb5P9LY5wHju4SiedxUhc5UZEo7VSw=
github.com/openconfig/ygnmi v0.8.7 h1:8K87+VztXhHqsU6/OYRnY/l/bGqFk+qU61mhhdxMCYo=
github.com/openconfig/ygnmi v0.8.7/go.mod h1:7up6qc9l9G4+Cfo37gzO0D7+2g4yqyW+xzh4vYsYTEE=
github.com/openconfig/ygnmi v0.8.11 h1:CCx2xl1jTwXeMh9G6B59k0BlRrKoz+K6eLTnowKQ6Vw=
github.com/openconfig/ygnmi v0.8.11/go.mod h1:WXnzNls/5ea6P9wx883ShLi5T/xa7sQjxn6SOH4kVwA=
github.com/openconfig/ygot v0.6.0/go.mod h1:o30svNf7O0xK+R35tlx95odkDmZWS9JyWWQSmIhqwAs=
github.com/openconfig/ygot v0.10.4/go.mod h1:oCQNdXnv7dWc8scTDgoFkauv1wwplJn5HspHcjlxSAQ=
github.com/openconfig/ygot v0.13.2/go.mod h1:kJN0yCXIH07dOXvNBEFm3XxXdnDD5NI6K99tnD5x49c=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,8 @@ func testTrafficAndEncap(t *testing.T, otg *otg.OTG, startingIP string, v6Traffi
}

var expectedPacketCounter int
const wantPacketN = 20
for i := 0; i != wantPacketN; i++ {
const packetN = 20
for i := 0; i != packetN; i++ {
pkt, err := ps.NextPacket()
if err != nil {
t.Fatalf("error reading next packet: %v", err)
Expand Down Expand Up @@ -587,7 +587,7 @@ func testTrafficAndEncap(t *testing.T, otg *otg.OTG, startingIP string, v6Traffi
expectedPacketCounter++
}

if expectedPacketCounter < wantPacketN {
if wantPacketN := packetN - 1; expectedPacketCounter < wantPacketN {
t.Errorf("Got less than %d expected packets: %v", wantPacketN, expectedPacketCounter)
} else {
t.Logf("Got %d expected packets.", expectedPacketCounter)
Expand Down
2 changes: 1 addition & 1 deletion lemming.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func New(targetName, zapiURL string, opts ...Option) (*Device, error) {
if err != nil {
return nil, err
}
if err := sysribServer.Start(cacheClient, targetName, zapiURL); err != nil {
if err := sysribServer.Start(context.Background(), cacheClient, targetName, zapiURL); err != nil {
return nil, fmt.Errorf("sysribServer failed to start: %v", err)
}

Expand Down
4 changes: 2 additions & 2 deletions repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1611,8 +1611,8 @@ def go_repositories():
go_repository(
name = "com_github_openconfig_ygnmi",
importpath = "github.com/openconfig/ygnmi",
sum = "h1:8K87+VztXhHqsU6/OYRnY/l/bGqFk+qU61mhhdxMCYo=",
version = "v0.8.7",
sum = "h1:CCx2xl1jTwXeMh9G6B59k0BlRrKoz+K6eLTnowKQ6Vw=",
version = "v0.8.11",
)
go_repository(
name = "com_github_openconfig_ygot",
Expand Down
72 changes: 36 additions & 36 deletions sysrib/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,24 +112,24 @@ type dplane struct {
}

// programRoute programs the route in the dataplane, returning an error on failure.
func (d *dplane) programRoute(r *ResolvedRoute) error {
func (d *dplane) programRoute(ctx context.Context, r *ResolvedRoute) error {
log.V(1).Infof("sysrib: programming resolved route: %+v", r)
rr, err := resolvedRouteToRouteRequest(r)
if err != nil {
return err
}
_, err = ygnmi.Replace(context.TODO(), d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr()), rr, ygnmi.WithSetFallbackEncoding())
_, err = ygnmi.Replace(ctx, d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr()), rr, ygnmi.WithSetFallbackEncoding())
return err
}

// deprogramRoute de-programs the route in the dataplane, returning an error on failure.
func (d *dplane) deprogramRoute(r *ResolvedRoute) error {
func (d *dplane) deprogramRoute(ctx context.Context, r *ResolvedRoute) error {
log.V(1).Infof("sysrib: deprogramming newly unresolved route: %+v", r)
rr, err := resolvedRouteToRouteRequest(r)
if err != nil {
return err
}
_, err = ygnmi.Delete(context.TODO(), d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr()))
_, err = ygnmi.Delete(ctx, d.Client, handlers.RouteQuery(rr.GetPrefix().GetVrfId(), rr.GetPrefix().GetCidr()))
return err
}

Expand Down Expand Up @@ -157,7 +157,7 @@ func New(cfg *oc.Root) (*Server, error) {
// sysrib during their initialization.
//
// - If zapiURL is not specified, then the ZAPI server will not be started.
func (s *Server) Start(gClient gpb.GNMIClient, target, zapiURL string) error {
func (s *Server) Start(ctx context.Context, gClient gpb.GNMIClient, target, zapiURL string) error {
if s == nil {
return errors.New("cannot start nil sysrib server")
}
Expand All @@ -168,15 +168,15 @@ func (s *Server) Start(gClient gpb.GNMIClient, target, zapiURL string) error {
}
s.dataplane = dplane{Client: yclient}

if err := s.monitorConnectedIntfs(yclient); err != nil {
if err := s.monitorConnectedIntfs(ctx, yclient); err != nil {
return err
}

if err := s.monitorBGPGUEPolicies(yclient); err != nil {
if err := s.monitorBGPGUEPolicies(ctx, yclient); err != nil {
return err
}

if err := s.monitorStaticRoutes(yclient); err != nil {
if err := s.monitorStaticRoutes(ctx, yclient); err != nil {
return err
}

Expand All @@ -195,7 +195,7 @@ func (s *Server) Start(gClient gpb.GNMIClient, target, zapiURL string) error {

// BEGIN Start ZAPI server.
if zapiURL != "" {
if s.zServer, err = StartZServer(zapiURL, 0, s); err != nil {
if s.zServer, err = StartZServer(ctx, zapiURL, 0, s); err != nil {
return err
}
}
Expand All @@ -215,7 +215,7 @@ func (s *Server) Stop() {
//
// - gnmiServerAddr is the address of the central gNMI datastore.
// - target is the name of the gNMI target.
func (s *Server) monitorConnectedIntfs(yclient *ygnmi.Client) error {
func (s *Server) monitorConnectedIntfs(ctx context.Context, yclient *ygnmi.Client) error {
b := &ocpath.Batch{}
b.AddPaths(
ocpath.Root().InterfaceAny().Enabled().State().PathStruct(),
Expand All @@ -227,7 +227,7 @@ func (s *Server) monitorConnectedIntfs(yclient *ygnmi.Client) error {
)

interfaceWatcher := ygnmi.Watch(
context.Background(),
ctx,
yclient,
b.State(),
func(root *ygnmi.Value[*oc.Root]) error {
Expand All @@ -239,19 +239,19 @@ func (s *Server) monitorConnectedIntfs(yclient *ygnmi.Client) error {
if intf.Enabled != nil {
if intf.Ifindex != nil {
ifindex := intf.GetIfindex()
s.setInterface(name, int32(ifindex), intf.GetEnabled())
s.setInterface(ctx, name, int32(ifindex), intf.GetEnabled())
// TODO(wenbli): Support other VRFs.
if subintf := intf.GetSubinterface(0); subintf != nil {
for _, addr := range subintf.GetOrCreateIpv4().Address {
if addr.Ip != nil && addr.PrefixLength != nil {
if err := s.addInterfacePrefix(name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil {
if err := s.addInterfacePrefix(ctx, name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil {
log.Warningf("adding interface prefix failed: %v", err)
}
}
}
for _, addr := range subintf.GetOrCreateIpv6().Address {
if addr.Ip != nil && addr.PrefixLength != nil {
if err := s.addInterfacePrefix(name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil {
if err := s.addInterfacePrefix(ctx, name, int32(ifindex), fmt.Sprintf("%s/%d", addr.GetIp(), addr.GetPrefixLength()), fakedevice.DefaultNetworkInstance); err != nil {
log.Warningf("adding interface prefix failed: %v", err)
}
}
Expand All @@ -276,7 +276,7 @@ func (s *Server) monitorConnectedIntfs(yclient *ygnmi.Client) error {
// monitorBGPGUEPolicies starts a gothread to check for BGP GUE policies being
// added or deleted from the config, and informs the sysrib server accordingly
// to update programmed routes.
func (s *Server) monitorBGPGUEPolicies(yclient *ygnmi.Client) error {
func (s *Server) monitorBGPGUEPolicies(ctx context.Context, yclient *ygnmi.Client) error {
b := &ocpath.Batch{}
b.AddPaths(
ocpath.Root().BgpGueIpv4GlobalPolicyAny().Prefix().Config().PathStruct(),
Expand All @@ -289,7 +289,7 @@ func (s *Server) monitorBGPGUEPolicies(yclient *ygnmi.Client) error {
)

bgpGUEPolicyWatcher := ygnmi.Watch(
context.Background(),
ctx,
yclient,
b.Config(),
func(root *ygnmi.Value[*oc.Root]) error {
Expand All @@ -303,7 +303,7 @@ func (s *Server) monitorBGPGUEPolicies(yclient *ygnmi.Client) error {
policiesFound[prefix] = true
if existingPolicy := s.bgpGUEPolicies[prefix]; policy != existingPolicy {
log.V(1).Infof("Adding new/updated BGP GUE policy: %s: %v", prefix, policy)
if err := s.setGUEPolicy(prefix, policy); err != nil {
if err := s.setGUEPolicy(ctx, prefix, policy); err != nil {
log.Errorf("Failed while setting BGP GUE Policy: %v", err)
} else {
s.bgpGUEPolicies[prefix] = policy
Expand Down Expand Up @@ -360,7 +360,7 @@ func (s *Server) monitorBGPGUEPolicies(yclient *ygnmi.Client) error {
for prefix := range s.bgpGUEPolicies {
if _, ok := policiesFound[prefix]; !ok {
log.Infof("Deleting incomplete/non-existent policy: %s", prefix)
if err := s.deleteGUEPolicy(prefix); err != nil {
if err := s.deleteGUEPolicy(ctx, prefix); err != nil {
log.Errorf("Failed while deleting BGP GUE Policy: %v", err)
} else {
delete(s.bgpGUEPolicies, prefix)
Expand Down Expand Up @@ -545,7 +545,7 @@ func resolvedRouteToRouteRequest(r *ResolvedRoute) (*dpb.Route, error) {

// ResolveAndProgramDiff walks through each prefix in the RIB, resolving it and
// programs the forwarding plane.
func (s *Server) ResolveAndProgramDiff() error {
func (s *Server) ResolveAndProgramDiff(ctx context.Context) error {
log.Info("Recalculating resolved RIB")
if debugRIB {
defer s.rib.PrintRIB()
Expand All @@ -557,18 +557,18 @@ func (s *Server) ResolveAndProgramDiff() error {
newResolvedRoutes := map[RouteKey]*Route{}
for niName, ni := range s.rib.NI {
for it := ni.IPV4.Iterate(); it.Next(); {
s.resolveAndProgramDiffAux(niName, ni, it.Address().String(), newResolvedRoutes)
s.resolveAndProgramDiffAux(ctx, niName, ni, it.Address().String(), newResolvedRoutes)
}
for it := ni.IPV6.Iterate(); it.Next(); {
s.resolveAndProgramDiffAux(niName, ni, it.Address().String(), newResolvedRoutes)
s.resolveAndProgramDiffAux(ctx, niName, ni, it.Address().String(), newResolvedRoutes)
}
}

// Deprogram newly unresolved routes.
for routeKey, rr := range s.ProgrammedRoutes() {
if _, ok := newResolvedRoutes[routeKey]; !ok {
log.V(1).Infof("Deleting route %s", rr.RouteKey)
if err := s.dataplane.deprogramRoute(rr); err != nil {
if err := s.dataplane.deprogramRoute(ctx, rr); err != nil {
log.Warningf("failed to deprogram route %+v: %v", rr, err)
continue
}
Expand All @@ -593,7 +593,7 @@ func (s *Server) ResolveAndProgramDiff() error {
// It carries out the following functionalities:
// - Resolve a single route specified by prefix and program if it's different.
// - Populate the resolved route into newResolvedRoutes.
func (s *Server) resolveAndProgramDiffAux(niName string, ni *NIRIB, prefix string, newResolvedRoutes map[RouteKey]*Route) {
func (s *Server) resolveAndProgramDiffAux(ctx context.Context, niName string, ni *NIRIB, prefix string, newResolvedRoutes map[RouteKey]*Route) {
log.V(1).Infof("Iterating at prefix %v (v4 has %d tags) (v6 has %d tags)", prefix, ni.IPV4.CountTags(), ni.IPV6.CountTags())
_, pfx, err := net.ParseCIDR(prefix)
if err != nil {
Expand Down Expand Up @@ -631,7 +631,7 @@ func (s *Server) resolveAndProgramDiffAux(niName string, ni *NIRIB, prefix strin
switch {
case routeIsResolved && !reflect.DeepEqual(currentRoute, rr):
log.V(1).Infof("(-currentRoute, +resolvedRoute):\n%s", cmp.Diff(currentRoute, rr))
if err := s.dataplane.programRoute(rr); err != nil {
if err := s.dataplane.programRoute(ctx, rr); err != nil {
log.Warningf("failed to program route %+v: %v", rr, err)
return
}
Expand Down Expand Up @@ -663,7 +663,7 @@ func (s *Server) ProgrammedRoutes() map[RouteKey]*ResolvedRoute {
}

// SetRoute implements ROUTE_ADD and ROUTE_DELETE
func (s *Server) SetRoute(_ context.Context, req *sysribpb.SetRouteRequest) (*sysribpb.SetRouteResponse, error) {
func (s *Server) SetRoute(ctx context.Context, req *sysribpb.SetRouteRequest) (*sysribpb.SetRouteResponse, error) {
pfx, err := prefixString(req.Prefix)
if err != nil {
return nil, err
Expand All @@ -684,7 +684,7 @@ func (s *Server) SetRoute(_ context.Context, req *sysribpb.SetRouteRequest) (*sy
// TODO(wenbli): Check if recursive resolution is an infinite recursion. This happens if there is a cycle.

niName := vrfIDToNiName(req.GetVrfId())
if err := s.setRoute(niName, &Route{
if err := s.setRoute(ctx, niName, &Route{
// TODO(wenbli): check if pfx has to be canonical or does it tolerate it: i.e. 1.1.1.0/24 instead of 1.1.1.1/24
Prefix: pfx,
NextHops: nexthops,
Expand All @@ -709,21 +709,21 @@ func (s *Server) SetRoute(_ context.Context, req *sysribpb.SetRouteRequest) (*sy
}

// setRoute adds/deletes a route from the RIB manager.
func (s *Server) setRoute(niName string, route *Route, isDelete bool) error {
func (s *Server) setRoute(ctx context.Context, niName string, route *Route, isDelete bool) error {
if err := s.rib.setRoute(niName, route, isDelete); err != nil {
return fmt.Errorf("error while adding route to sysrib: %v", err)
}

if err := s.ResolveAndProgramDiff(); err != nil {
if err := s.ResolveAndProgramDiff(ctx); err != nil {
return fmt.Errorf("error while resolving sysrib: %v", err)
}
return nil
}

// addInterfacePrefix adds a prefix to the sysrib as a connected route.
func (s *Server) addInterfacePrefix(name string, ifindex int32, prefix string, niName string) error {
func (s *Server) addInterfacePrefix(ctx context.Context, name string, ifindex int32, prefix string, niName string) error {
log.V(1).Infof("Adding interface prefix: intf %s, idx %d, prefix %s, ni %s", name, ifindex, prefix, niName)
return s.setRoute(niName, &Route{
return s.setRoute(ctx, niName, &Route{
Prefix: prefix,
Connected: &Interface{
Name: name,
Expand All @@ -737,7 +737,7 @@ func (s *Server) addInterfacePrefix(name string, ifindex int32, prefix string, n
}

// setInterface responds to INTERFACE_UP/INTERFACE_DOWN messages from the dataplane.
func (s *Server) setInterface(name string, ifindex int32, enabled bool) error {
func (s *Server) setInterface(ctx context.Context, name string, ifindex int32, enabled bool) error {
log.V(1).Infof("Setting interface %q(%d) to enabled=%v", name, ifindex, enabled)
s.interfacesMu.Lock()
s.interfaces[Interface{
Expand All @@ -746,33 +746,33 @@ func (s *Server) setInterface(name string, ifindex int32, enabled bool) error {
}] = enabled
s.interfacesMu.Unlock()

return s.ResolveAndProgramDiff()
return s.ResolveAndProgramDiff(ctx)
}

// TODO(wenbli): Do we need to handle interface deletion?
// This is not required in the MVP since basic tests will just need to enable/disable interfaces.

// setGUEPolicy adds a new GUE policy and triggers resolved route
// computation and programming.
func (s *Server) setGUEPolicy(prefix string, policy GUEPolicy) error {
func (s *Server) setGUEPolicy(ctx context.Context, prefix string, policy GUEPolicy) error {
if err := s.rib.SetGUEPolicy(prefix, policy); err != nil {
return fmt.Errorf("error while adding route to sysrib: %v", err)
}

if err := s.ResolveAndProgramDiff(); err != nil {
if err := s.ResolveAndProgramDiff(ctx); err != nil {
return fmt.Errorf("error while resolving sysrib: %v", err)
}
return nil
}

// deleteGUEPolicy adds a new GUE policy and triggers resolved route
// computation and programming.
func (s *Server) deleteGUEPolicy(prefix string) error {
func (s *Server) deleteGUEPolicy(ctx context.Context, prefix string) error {
if _, err := s.rib.DeleteGUEPolicy(prefix); err != nil {
return fmt.Errorf("error while adding route to sysrib: %v", err)
}

if err := s.ResolveAndProgramDiff(); err != nil {
if err := s.ResolveAndProgramDiff(ctx); err != nil {
return fmt.Errorf("error while resolving sysrib: %v", err)
}
return nil
Expand Down
Loading

0 comments on commit b93be19

Please sign in to comment.