Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dataplane reset #283

Merged
merged 4 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ http_archive(

http_archive(
name = "com_github_opencomputeproject_sai",
patch_args = ["-p1"],
patches = ["//patches:sai.patch"],
build_file_content = """
cc_library(
name = "sai",
Expand All @@ -141,6 +139,8 @@ cc_library(
visibility = ["//visibility:public"],
)
""",
patch_args = ["-p1"],
patches = ["//patches:sai.patch"],
sha256 = "240d0211bbea2758faabfdbfa5e5488d837a47d42839bfe99b4bfbff52ab6c11",
strip_prefix = "SAI-1.11.0",
urls = ["https://github.com/opencomputeproject/SAI/archive/refs/tags/v1.11.0.tar.gz"],
Expand Down
38 changes: 22 additions & 16 deletions dataplane/forwarding/fwdport/ports/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package ports
import (
"fmt"
"os"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/afpacket"
"github.com/google/gopacket/layers"
"github.com/vishvananda/netlink"

"github.com/openconfig/lemming/dataplane/forwarding/fwdaction"
Expand Down Expand Up @@ -50,7 +50,7 @@ type kernelPort struct {
output fwdaction.Actions
ctx *fwdcontext.Context // Forwarding context containing the port
handle packetHandle
linkDoneCh chan struct{}
doneCh chan struct{}
linkUpdateCh chan netlink.LinkUpdate
ifaceMgr kernel.Interfaces
}
Expand All @@ -72,8 +72,7 @@ func (p *kernelPort) String() string {
func (p *kernelPort) Cleanup() {
p.input.Cleanup()
p.output.Cleanup()
p.handle.Close()
close(p.linkDoneCh)
close(p.doneCh)
p.input = nil
p.output = nil
}
Expand Down Expand Up @@ -102,21 +101,28 @@ func (p *kernelPort) Update(upd *fwdpb.PortUpdateDesc) error {
}

func (p *kernelPort) process() {
startStateWatch(p.linkUpdateCh, p.devName, p, p.ctx)
src := gopacket.NewPacketSource(p.handle, layers.LinkTypeEthernet)
startStateWatch(p.linkUpdateCh, p.doneCh, p.devName, p, p.ctx)
go func() {
for {
select {
case pkt, ok := <-src.Packets():
if !ok {
log.Warning("src chan closed")
return
case <-p.doneCh:
log.Warningf("src chan closed: %v", p.devName)
p.handle.Close()
return
default:
d, _, err := p.handle.ReadPacketData()
if err == afpacket.ErrTimeout { // Don't log this error as it is very spammy.
continue
}
if err != nil {
log.Warningf("err reading packet data for %v: %v", p.devName, err)
continue
}
fwdPkt, err := fwdpacket.New(fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET, pkt.Data())
fwdPkt, err := fwdpacket.New(fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET, d)
if err != nil {
log.Warningf("failed to create new packet: %v", err)
log.V(1).Info(pkt.Dump())
fwdport.Increment(p, len(pkt.Data()), fwdpb.CounterId_COUNTER_ID_RX_BAD_PACKETS, fwdpb.CounterId_COUNTER_ID_RX_BAD_OCTETS)
log.V(1).Info(d)
fwdport.Increment(p, len(d), fwdpb.CounterId_COUNTER_ID_RX_BAD_PACKETS, fwdpb.CounterId_COUNTER_ID_RX_BAD_OCTETS)
continue
}
fwdPkt.Debug(debug.ExternalPortPacketTrace)
Expand Down Expand Up @@ -177,22 +183,22 @@ func (kernelBuilder) Build(portDesc *fwdpb.PortDesc, ctx *fwdcontext.Context) (f
}

// TODO: configure MTU
handle, err := afpacket.NewTPacket(afpacket.OptInterface(kp.Kernel.GetDeviceName()), afpacket.OptPollTimeout(-1))
handle, err := afpacket.NewTPacket(afpacket.OptInterface(kp.Kernel.GetDeviceName()), afpacket.OptPollTimeout(time.Second))
if err != nil {
return nil, fmt.Errorf("failed to create afpacket: %v", err)
}
p := &kernelPort{
ctx: ctx,
handle: handle,
devName: kp.Kernel.DeviceName,
linkDoneCh: make(chan struct{}),
doneCh: make(chan struct{}),
linkUpdateCh: make(chan netlink.LinkUpdate),
}
list := append(fwdport.CounterList, fwdaction.CounterList...)
if err := p.InitCounters("", list...); err != nil {
return nil, err
}
if err := p.ifaceMgr.LinkSubscribe(p.linkUpdateCh, p.linkDoneCh); err != nil {
if err := p.ifaceMgr.LinkSubscribe(p.linkUpdateCh, p.doneCh); err != nil {
return nil, err
}

Expand Down
95 changes: 57 additions & 38 deletions dataplane/forwarding/fwdport/ports/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package ports

import (
"errors"
"fmt"
"net"
"os"
"time"

"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
Expand Down Expand Up @@ -51,7 +53,7 @@ type tapPort struct {
ctx *fwdcontext.Context // Forwarding context containing the port
fd int
devName string
linkDoneCh chan struct{}
doneCh chan struct{}
linkUpdateCh chan netlink.LinkUpdate
file *os.File
ifaceMgr kernel.Interfaces
Expand All @@ -68,8 +70,7 @@ func (p *tapPort) String() string {
func (p *tapPort) Cleanup() {
p.input.Cleanup()
p.output.Cleanup()
p.file.Close()
close(p.linkDoneCh)
close(p.doneCh)
p.input = nil
p.output = nil
}
Expand Down Expand Up @@ -99,23 +100,35 @@ func (p *tapPort) Update(upd *fwdpb.PortUpdateDesc) error {
}

func (p *tapPort) process() {
startStateWatch(p.linkUpdateCh, p.devName, p, p.ctx)
startStateWatch(p.linkUpdateCh, p.doneCh, p.devName, p, p.ctx)
go func() {
buf := make([]byte, 1500) // TODO: MTU
for {
n, err := p.file.Read(buf)
if err != nil {
log.Warningf("failed to read packet: %v", err)
}
fwdPkt, err := fwdpacket.New(fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET, buf[0:n])
if err != nil {
log.Warningf("failed to create new packet: %v", err)
fwdport.Increment(p, n, fwdpb.CounterId_COUNTER_ID_RX_BAD_PACKETS, fwdpb.CounterId_COUNTER_ID_RX_BAD_OCTETS)
continue
select {
case <-p.doneCh:
log.Warningf("stopping readding tap packet to read packet: %v", p.devName)
p.file.Close()
return
default:
p.file.SetReadDeadline(time.Now().Add(time.Second))
n, err := p.file.Read(buf)
if errors.Is(err, os.ErrDeadlineExceeded) { // Don't log this error as it is very spammy.
continue
}
if err != nil {
log.Warningf("failed to read packet: %v", err)
continue
}
fwdPkt, err := fwdpacket.New(fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET, buf[0:n])
if err != nil {
log.Warningf("failed to create new packet: %v", err)
fwdport.Increment(p, n, fwdpb.CounterId_COUNTER_ID_RX_BAD_PACKETS, fwdpb.CounterId_COUNTER_ID_RX_BAD_OCTETS)
continue
}
fwdPkt.Debug(debug.TAPPortPacketTrace)
fwdPkt.Log().V(2).Info("input packet", "device", p.devName, "port", p.ID(), "frame", fwdpacket.IncludeFrameInLog)
fwdport.Process(p, fwdPkt, fwdpb.PortAction_PORT_ACTION_INPUT, p.ctx, "TAP")
}
fwdPkt.Debug(debug.TAPPortPacketTrace)
fwdPkt.Log().V(2).Info("input packet", "device", p.devName, "port", p.ID(), "frame", fwdpacket.IncludeFrameInLog)
fwdport.Process(p, fwdPkt, fwdpb.PortAction_PORT_ACTION_INPUT, p.ctx, "TAP")
}
}()
}
Expand Down Expand Up @@ -197,31 +210,37 @@ func getAndSetState(name string, ifMgr *kernel.Interfaces, pi *fwdpb.PortInfo) (
return reply, err
}

func startStateWatch(updCh chan netlink.LinkUpdate, devName string, port fwdport.Port, ctx *fwdcontext.Context) {
func startStateWatch(updCh chan netlink.LinkUpdate, doneCh chan struct{}, devName string, port fwdport.Port, ctx *fwdcontext.Context) {
go func() {
for {
upd, ok := <-updCh
if !ok {
select {
case <-doneCh:
log.Warningf("state watch stopping for dev: %v", devName)
return
}
if upd.Attrs().Name != devName {
continue
}
admin, oper := stateFromAttrs(upd.Attrs())
log.V(1).Infof("dataplane receive link update: port %q, admin %v, oper %v", devName, admin.String(), oper.String())
ctx.Notify(&fwdpb.EventDesc{
Event: fwdpb.Event_EVENT_PORT,
Desc: &fwdpb.EventDesc_Port{
Port: &fwdpb.PortEventDesc{
Context: &fwdpb.ContextId{Id: ctx.ID},
PortId: &fwdpb.PortId{ObjectId: &fwdpb.ObjectId{Id: string(port.ID())}},
PortInfo: &fwdpb.PortInfo{
AdminStatus: admin,
OperStatus: oper,
case upd, ok := <-updCh:
if !ok {
log.Info("update chan close for dev: %v", devName)
return
}
if upd.Attrs() == nil || upd.Attrs().Name != devName {
continue
}
admin, oper := stateFromAttrs(upd.Attrs())
log.V(1).Infof("dataplane receive link update: port %q, admin %v, oper %v", devName, admin.String(), oper.String())
ctx.Notify(&fwdpb.EventDesc{
Event: fwdpb.Event_EVENT_PORT,
Desc: &fwdpb.EventDesc_Port{
Port: &fwdpb.PortEventDesc{
Context: &fwdpb.ContextId{Id: ctx.ID},
PortId: &fwdpb.PortId{ObjectId: &fwdpb.ObjectId{Id: string(port.ID())}},
PortInfo: &fwdpb.PortInfo{
AdminStatus: admin,
OperStatus: oper,
},
},
},
},
})
})
}
}
}()
}
Expand Down Expand Up @@ -254,14 +273,14 @@ func (tp tapBuilder) Build(portDesc *fwdpb.PortDesc, ctx *fwdcontext.Context) (f
file: file,
fd: fd,
devName: kp.Tap.GetDeviceName(),
linkDoneCh: make(chan struct{}),
doneCh: make(chan struct{}),
linkUpdateCh: make(chan netlink.LinkUpdate),
}
list := append(fwdport.CounterList, fwdaction.CounterList...)
if err := p.InitCounters("", list...); err != nil {
return nil, err
}
if err := tp.ifaceMgr.LinkSubscribe(p.linkUpdateCh, p.linkDoneCh); err != nil {
if err := tp.ifaceMgr.LinkSubscribe(p.linkUpdateCh, p.doneCh); err != nil {
return nil, err
}

Expand Down
62 changes: 48 additions & 14 deletions dataplane/internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Engine struct {
internalToExternalIDMu sync.Mutex
// internalToExternalID is a map from the internal port id to it's corresponding external port.
internalToExternalID map[string]string
cancelFn func()
}

// New creates a new engine and sets up the forwarding tables.
Expand All @@ -90,15 +91,51 @@ func New(ctx context.Context) (*Engine, error) {
internalToExternalID: map[string]string{},
}

e.handleIPUpdates()
ctx, e.cancelFn = context.WithCancel(ctx)
e.handleIPUpdates(ctx)

_, err := e.Server.ContextCreate(context.Background(), &fwdpb.ContextCreateRequest{
_, err := e.Server.ContextCreate(ctx, &fwdpb.ContextCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
})
if err != nil {
return nil, err
}
e.setupTables(ctx)

return e, nil
}

func (e *Engine) Reset(ctx context.Context) error {
e.cancelFn()
_, err := e.Server.ContextDelete(ctx, &fwdpb.ContextDeleteRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
})
if err != nil {
return err
}

e.idToNID = map[string]uint64{}
e.nextHopGroups = map[uint64]*dpb.NextHopIDList{}
e.ifaceToPort = map[string]string{}
e.ipToDevName = map[string]string{}
e.devNameToPortID = map[string]string{}
e.internalToExternalID = map[string]string{}
e.nextNHGID.Store(0)
e.nextNHID.Store(0)

_, err = e.Server.ContextCreate(ctx, &fwdpb.ContextCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
})
if err != nil {
return err
}

ctx, e.cancelFn = context.WithCancel(ctx)
e.setupTables(ctx)
return nil
}

func (e *Engine) setupTables(ctx context.Context) error {
v4FIB := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Desc: &fwdpb.TableDesc{
Expand All @@ -121,7 +158,7 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, v4FIB); err != nil {
return nil, err
return err
}
v6FIB := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Expand All @@ -145,7 +182,7 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, v6FIB); err != nil {
return nil, err
return err
}
portMAC := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Expand All @@ -165,7 +202,7 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, portMAC); err != nil {
return nil, err
return err
}
neighbor := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Expand All @@ -189,7 +226,7 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, neighbor); err != nil {
return nil, err
return err
}
nh := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Expand All @@ -209,7 +246,7 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, nh); err != nil {
return nil, err
return err
}
nhg := &fwdpb.TableCreateRequest{
ContextId: &fwdpb.ContextId{Id: e.id},
Expand All @@ -229,18 +266,15 @@ func New(ctx context.Context) (*Engine, error) {
},
}
if _, err := e.Server.TableCreate(ctx, nhg); err != nil {
return nil, err
return err
}
if err := createFIBSelector(ctx, e.id, e.Server); err != nil {
return nil, err
return err
}
if err := createLayer2PuntTable(ctx, e.id, e.Server); err != nil {
return nil, err
}
if err := createLayer3PuntTable(ctx, e.id, e.Server); err != nil {
return nil, err
return err
}
return e, nil
return createLayer3PuntTable(ctx, e.id, e.Server)
}

// ID returns the engine's forwarding context id.
Expand Down
Loading