Skip to content

Commit

Permalink
Implement dataplane reset (#283)
Browse files Browse the repository at this point in the history
* Implement dataplane reset

* build

* lint

* entry
  • Loading branch information
DanG100 authored Oct 3, 2023
1 parent 07a2d40 commit 48c0f9c
Show file tree
Hide file tree
Showing 17 changed files with 12,333 additions and 11,900 deletions.
4 changes: 2 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ http_archive(

http_archive(
name = "com_github_opencomputeproject_sai",
patch_args = ["-p1"],
patches = ["//patches:sai.patch"],
build_file_content = """
cc_library(
name = "sai",
Expand All @@ -141,6 +139,8 @@ cc_library(
visibility = ["//visibility:public"],
)
""",
patch_args = ["-p1"],
patches = ["//patches:sai.patch"],
sha256 = "240d0211bbea2758faabfdbfa5e5488d837a47d42839bfe99b4bfbff52ab6c11",
strip_prefix = "SAI-1.11.0",
urls = ["https://github.com/opencomputeproject/SAI/archive/refs/tags/v1.11.0.tar.gz"],
Expand Down
38 changes: 22 additions & 16 deletions dataplane/forwarding/fwdport/ports/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package ports
import (
"fmt"
"os"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/afpacket"
"github.com/google/gopacket/layers"
"github.com/vishvananda/netlink"

"github.com/openconfig/lemming/dataplane/forwarding/fwdaction"
Expand Down Expand Up @@ -50,7 +50,7 @@ type kernelPort struct {
output fwdaction.Actions
ctx *fwdcontext.Context // Forwarding context containing the port
handle packetHandle
linkDoneCh chan struct{}
doneCh chan struct{}
linkUpdateCh chan netlink.LinkUpdate
ifaceMgr kernel.Interfaces
}
Expand All @@ -72,8 +72,7 @@ func (p *kernelPort) String() string {
func (p *kernelPort) Cleanup() {
p.input.Cleanup()
p.output.Cleanup()
p.handle.Close()
close(p.linkDoneCh)
close(p.doneCh)
p.input = nil
p.output = nil
}
Expand Down Expand Up @@ -102,21 +101,28 @@ func (p *kernelPort) Update(upd *fwdpb.PortUpdateDesc) error {
}

func (p *kernelPort) process() {
startStateWatch(p.linkUpdateCh, p.devName, p, p.ctx)
src := gopacket.NewPacketSource(p.handle, layers.LinkTypeEthernet)
startStateWatch(p.linkUpdateCh, p.doneCh, p.devName, p, p.ctx)
go func() {
for {
select {
case pkt, ok := <-src.Packets():
if !ok {
log.Warning("src chan closed")
return
case <-p.doneCh:
log.Warningf("src chan closed: %v", p.devName)
p.handle.Close()
return
default:
d, _, err := p.handle.ReadPacketData()
if err == afpacket.ErrTimeout { // Don't log this error as it is very spammy.
continue
}
if err != nil {
log.Warningf("err reading packet data for %v: %v", p.devName, err)
continue
}
fwdPkt, err := fwdpacket.New(fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET, pkt.Data())
fwdPkt, err := fwdpacket.New(fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET, d)
if err != nil {
log.Warningf("failed to create new packet: %v", err)
log.V(1).Info(pkt.Dump())
fwdport.Increment(p, len(pkt.Data()), fwdpb.CounterId_COUNTER_ID_RX_BAD_PACKETS, fwdpb.CounterId_COUNTER_ID_RX_BAD_OCTETS)
log.V(1).Info(d)
fwdport.Increment(p, len(d), fwdpb.CounterId_COUNTER_ID_RX_BAD_PACKETS, fwdpb.CounterId_COUNTER_ID_RX_BAD_OCTETS)
continue
}
fwdPkt.Debug(debug.ExternalPortPacketTrace)
Expand Down Expand Up @@ -177,22 +183,22 @@ func (kernelBuilder) Build(portDesc *fwdpb.PortDesc, ctx *fwdcontext.Context) (f
}

// TODO: configure MTU
handle, err := afpacket.NewTPacket(afpacket.OptInterface(kp.Kernel.GetDeviceName()), afpacket.OptPollTimeout(-1))
handle, err := afpacket.NewTPacket(afpacket.OptInterface(kp.Kernel.GetDeviceName()), afpacket.OptPollTimeout(time.Second))
if err != nil {
return nil, fmt.Errorf("failed to create afpacket: %v", err)
}
p := &kernelPort{
ctx: ctx,
handle: handle,
devName: kp.Kernel.DeviceName,
linkDoneCh: make(chan struct{}),
doneCh: make(chan struct{}),
linkUpdateCh: make(chan netlink.LinkUpdate),
}
list := append(fwdport.CounterList, fwdaction.CounterList...)
if err := p.InitCounters("", list...); err != nil {
return nil, err
}
if err := p.ifaceMgr.LinkSubscribe(p.linkUpdateCh, p.linkDoneCh); err != nil {
if err := p.ifaceMgr.LinkSubscribe(p.linkUpdateCh, p.doneCh); err != nil {
return nil, err
}

Expand Down
95 changes: 57 additions & 38 deletions dataplane/forwarding/fwdport/ports/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package ports

import (
"errors"
"fmt"
"net"
"os"
"time"

"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -51,7 +53,7 @@ type tapPort struct {
ctx *fwdcontext.Context // Forwarding context containing the port
fd int
devName string
linkDoneCh chan struct{}
doneCh chan struct{}
linkUpdateCh chan netlink.LinkUpdate
file *os.File
ifaceMgr kernel.Interfaces
Expand All @@ -68,8 +70,7 @@ func (p *tapPort) String() string {
func (p *tapPort) Cleanup() {
p.input.Cleanup()
p.output.Cleanup()
p.file.Close()
close(p.linkDoneCh)
close(p.doneCh)
p.input = nil
p.output = nil
}
Expand Down Expand Up @@ -99,23 +100,35 @@ func (p *tapPort) Update(upd *fwdpb.PortUpdateDesc) error {
}

func (p *tapPort) process() {
startStateWatch(p.linkUpdateCh, p.devName, p, p.ctx)
startStateWatch(p.linkUpdateCh, p.doneCh, p.devName, p, p.ctx)
go func() {
buf := make([]byte, 1500) // TODO: MTU
for {
n, err := p.file.Read(buf)
if err != nil {
log.Warningf("failed to read packet: %v", err)
}
fwdPkt, err := fwdpacket.New(fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET, buf[0:n])
if err != nil {
log.Warningf("failed to create new packet: %v", err)
fwdport.Increment(p, n, fwdpb.CounterId_COUNTER_ID_RX_BAD_PACKETS, fwdpb.CounterId_COUNTER_ID_RX_BAD_OCTETS)
continue
select {
case <-p.doneCh:
log.Warningf("stopping readding tap packet to read packet: %v", p.devName)
p.file.Close()
return
default:
p.file.SetReadDeadline(time.Now().Add(time.Second))
n, err := p.file.Read(buf)
if errors.Is(err, os.ErrDeadlineExceeded) { // Don't log this error as it is very spammy.
continue
}
if err != nil {
log.Warningf("failed to read packet: %v", err)
continue
}
fwdPkt, err := fwdpacket.New(fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET, buf[0:n])
if err != nil {
log.Warningf("failed to create new packet: %v", err)
fwdport.Increment(p, n, fwdpb.CounterId_COUNTER_ID_RX_BAD_PACKETS, fwdpb.CounterId_COUNTER_ID_RX_BAD_OCTETS)
continue
}
fwdPkt.Debug(debug.TAPPortPacketTrace)
fwdPkt.Log().V(2).Info("input packet", "device", p.devName, "port", p.ID(), "frame", fwdpacket.IncludeFrameInLog)
fwdport.Process(p, fwdPkt, fwdpb.PortAction_PORT_ACTION_INPUT, p.ctx, "TAP")
}
fwdPkt.Debug(debug.TAPPortPacketTrace)
fwdPkt.Log().V(2).Info("input packet", "device", p.devName, "port", p.ID(), "frame", fwdpacket.IncludeFrameInLog)
fwdport.Process(p, fwdPkt, fwdpb.PortAction_PORT_ACTION_INPUT, p.ctx, "TAP")
}
}()
}
Expand Down Expand Up @@ -197,31 +210,37 @@ func getAndSetState(name string, ifMgr *kernel.Interfaces, pi *fwdpb.PortInfo) (
return reply, err
}

func startStateWatch(updCh chan netlink.LinkUpdate, devName string, port fwdport.Port, ctx *fwdcontext.Context) {
func startStateWatch(updCh chan netlink.LinkUpdate, doneCh chan struct{}, devName string, port fwdport.Port, ctx *fwdcontext.Context) {
go func() {
for {
upd, ok := <-updCh
if !ok {
select {
case <-doneCh:
log.Warningf("state watch stopping for dev: %v", devName)
return
}
if upd.Attrs().Name != devName {
continue
}
admin, oper := stateFromAttrs(upd.Attrs())
log.V(1).Infof("dataplane receive link update: port %q, admin %v, oper %v", devName, admin.String(), oper.String())
ctx.Notify(&fwdpb.EventDesc{
Event: fwdpb.Event_EVENT_PORT,
Desc: &fwdpb.EventDesc_Port{
Port: &fwdpb.PortEventDesc{
Context: &fwdpb.ContextId{Id: ctx.ID},
PortId: &fwdpb.PortId{ObjectId: &fwdpb.ObjectId{Id: string(port.ID())}},
PortInfo: &fwdpb.PortInfo{
AdminStatus: admin,
OperStatus: oper,
case upd, ok := <-updCh:
if !ok {
log.Info("update chan close for dev: %v", devName)
return
}
if upd.Attrs() == nil || upd.Attrs().Name != devName {
continue
}
admin, oper := stateFromAttrs(upd.Attrs())
log.V(1).Infof("dataplane receive link update: port %q, admin %v, oper %v", devName, admin.String(), oper.String())
ctx.Notify(&fwdpb.EventDesc{
Event: fwdpb.Event_EVENT_PORT,
Desc: &fwdpb.EventDesc_Port{
Port: &fwdpb.PortEventDesc{
Context: &fwdpb.ContextId{Id: ctx.ID},
PortId: &fwdpb.PortId{ObjectId: &fwdpb.ObjectId{Id: string(port.ID())}},
PortInfo: &fwdpb.PortInfo{
AdminStatus: admin,
OperStatus: oper,
},
},
},
},
})
})
}
}
}()
}
Expand Down Expand Up @@ -254,14 +273,14 @@ func (tp tapBuilder) Build(portDesc *fwdpb.PortDesc, ctx *fwdcontext.Context) (f
file: file,
fd: fd,
devName: kp.Tap.GetDeviceName(),
linkDoneCh: make(chan struct{}),
doneCh: make(chan struct{}),
linkUpdateCh: make(chan netlink.LinkUpdate),
}
list := append(fwdport.CounterList, fwdaction.CounterList...)
if err := p.InitCounters("", list...); err != nil {
return nil, err
}
if err := tp.ifaceMgr.LinkSubscribe(p.linkUpdateCh, p.linkDoneCh); err != nil {
if err := tp.ifaceMgr.LinkSubscribe(p.linkUpdateCh, p.doneCh); err != nil {
return nil, err
}

Expand Down
62 changes: 48 additions & 14 deletions dataplane/internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Engine struct {
internalToExternalIDMu sync.Mutex
// internalToExternalID is a map from the internal port id to it's corresponding external port.
internalToExternalID map[string]string
cancelFn func()
}

// New creates a new engine and sets up the forwarding tables.
Expand All @@ -90,15 +91,51 @@ func New(ctx context.Context) (*Engine, error) {
internalToExternalID: map[string]string{},
}

e.handleIPUpdates()
ctx, e.cancelFn = context.WithCancel(ctx)
e.handleIPUpdates(ctx)

_, err := e.Server.ContextCreate(context.Background(), &fwdpb.ContextCreateRequest{
_, err := e.Server.ContextCreate(ctx, &fwdpb.ContextCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
})
if err != nil {
return nil, err
}
e.setupTables(ctx)

return e, nil
}

func (e *Engine) Reset(ctx context.Context) error {
e.cancelFn()
_, err := e.Server.ContextDelete(ctx, &fwdpb.ContextDeleteRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
})
if err != nil {
return err
}

e.idToNID = map[string]uint64{}
e.nextHopGroups = map[uint64]*dpb.NextHopIDList{}
e.ifaceToPort = map[string]string{}
e.ipToDevName = map[string]string{}
e.devNameToPortID = map[string]string{}
e.internalToExternalID = map[string]string{}
e.nextNHGID.Store(0)
e.nextNHID.Store(0)

_, err = e.Server.ContextCreate(ctx, &fwdpb.ContextCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
})
if err != nil {
return err
}

ctx, e.cancelFn = context.WithCancel(ctx)
e.setupTables(ctx)
return nil
}

func (e *Engine) setupTables(ctx context.Context) error {
v4FIB := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Desc: &fwdpb.TableDesc{
Expand All @@ -121,7 +158,7 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, v4FIB); err != nil {
return nil, err
return err
}
v6FIB := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Expand All @@ -145,7 +182,7 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, v6FIB); err != nil {
return nil, err
return err
}
portMAC := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Expand All @@ -165,7 +202,7 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, portMAC); err != nil {
return nil, err
return err
}
neighbor := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Expand All @@ -189,7 +226,7 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, neighbor); err != nil {
return nil, err
return err
}
nh := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Expand All @@ -209,7 +246,7 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, nh); err != nil {
return nil, err
return err
}
nhg := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Expand All @@ -229,18 +266,15 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, nhg); err != nil {
return nil, err
return err
}
if err := createFIBSelector(ctx, e.id, e.Server); err != nil {
return nil, err
return err
}
if err := createLayer2PuntTable(ctx, e.id, e.Server); err != nil {
return nil, err
}
if err := createLayer3PuntTable(ctx, e.id, e.Server); err != nil {
return nil, err
return err
}
return e, nil
return createLayer3PuntTable(ctx, e.id, e.Server)
}

// ID returns the engine's forwarding context id.
Expand Down
Loading

0 comments on commit 48c0f9c

Please sign in to comment.