Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle pktiohandler restarts and ports info to file #392

Merged
merged 4 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ load-debug:
## Run integration tests
.PHONY: itest
itest:
bazel test --test_output=errors --cache_test_results=no $(shell bazel query 'tests("//...") except (attr(size, small, tests("//...")) + attr(size, medium, tests("//..."))) ')
bazel test --flaky_test_attempts=3 --test_output=errors --cache_test_results=no $(shell bazel query 'tests("//...") except (attr(size, small, tests("//...")) + attr(size, medium, tests("//..."))) ')

.PHONY: test
test:
Expand Down
34 changes: 34 additions & 0 deletions dataplane/forwarding/fwdconfig/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,40 @@ func (b TableEntryAddRequestBuilder) Build() *fwdpb.TableEntryAddRequest {
return req
}

// TableEntryAddRequest builds TableEntryAddRequests.
type TableEntryRemoveRequestBuilder struct {
contextID string
tableID string
entries []*EntryDescBuilder
}

// TableEntryRemoveRequest creates a new TableEntryRemoveRequestBuilder.
func TableEntryRemoveRequest(ctxID, tableID string) *TableEntryRemoveRequestBuilder {
return &TableEntryRemoveRequestBuilder{
contextID: ctxID,
tableID: tableID,
}
}

// AppendEntry adds an entry to the requests.
func (b *TableEntryRemoveRequestBuilder) AppendEntry(entry *EntryDescBuilder) *TableEntryRemoveRequestBuilder {
b.entries = append(b.entries, entry)
return b
}

// Build returns a new TableEntryAddRequest.
func (b TableEntryRemoveRequestBuilder) Build() *fwdpb.TableEntryRemoveRequest {
req := &fwdpb.TableEntryRemoveRequest{
ContextId: &fwdpb.ContextId{Id: b.contextID},
TableId: &fwdpb.TableId{ObjectId: &fwdpb.ObjectId{Id: b.tableID}},
Entries: []*fwdpb.EntryDesc{},
}
for _, entry := range b.entries {
req.Entries = append(req.Entries, entry.Build())
}
return req
}

type entryDescBuilder interface {
set(*fwdpb.EntryDesc)
}
Expand Down
3 changes: 3 additions & 0 deletions dataplane/saiserver/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ go_test(
"//dataplane/dplaneopts",
"//dataplane/forwarding/infra/fwdcontext",
"//dataplane/forwarding/infra/fwdobject",
"//dataplane/proto/packetio",
"//dataplane/proto/sai",
"//dataplane/saiserver/attrmgr",
"//proto/forwarding",
"@com_github_google_go_cmp//cmp",
"@com_github_openconfig_gnmi//errdiff",
"@org_golang_google_genproto_googleapis_rpc//status",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_protobuf//proto",
"@org_golang_google_protobuf//testing/protocmp",
Expand Down
94 changes: 85 additions & 9 deletions dataplane/saiserver/hostif.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package saiserver
import (
"context"
"fmt"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -42,6 +43,7 @@ func newHostif(mgr *attrmgr.AttrMgr, dataplane switchDataplaneAPI, s *grpc.Serve
dataplane: dataplane,
trapIDToHostifID: map[uint64]uint64{},
groupIDToQueue: map[uint64]uint32{},
remoteHostifs: map[uint64]*pktiopb.HostPortControlMessage{},
opts: opts,
}

Expand All @@ -57,7 +59,20 @@ type hostif struct {
trapIDToHostifID map[uint64]uint64
groupIDToQueue map[uint64]uint32
opts *dplaneopts.Options
sendPortReq func(msg *pktiopb.HostPortControlMessage) error
remoteMu sync.Mutex
remoteHostifs map[uint64]*pktiopb.HostPortControlMessage
remoteClosers []func()
remotePortReq func(msg *pktiopb.HostPortControlMessage) error
}

func (hostif *hostif) Reset() {
hostif.trapIDToHostifID = map[uint64]uint64{}
hostif.groupIDToQueue = map[uint64]uint32{}
hostif.remoteHostifs = map[uint64]*pktiopb.HostPortControlMessage{}
for _, closeFn := range hostif.remoteClosers {
DanG100 marked this conversation as resolved.
Show resolved Hide resolved
closeFn()
}
hostif.remoteClosers = nil
}

const switchID = 1
Expand Down Expand Up @@ -294,18 +309,68 @@ func (hostif *hostif) createRemoteHostif(ctx context.Context, req *saipb.CreateH
return nil, status.Errorf(codes.InvalidArgument, "unknown type %v", req.GetType())
}

hostif.remoteMu.Lock()
defer hostif.remoteMu.Unlock()

if hostif.remotePortReq == nil {
return nil, status.Error(codes.FailedPrecondition, "remote port control not configured")
}
if err := hostif.remotePortReq(ctlReq); err != nil {
return nil, err
}

attr := &saipb.HostifAttribute{
OperStatus: proto.Bool(true),
}
hostif.mgr.StoreAttributes(id, attr)
hostif.remoteHostifs[id] = ctlReq

if hostif.sendPortReq != nil {
if err := hostif.sendPortReq(ctlReq); err != nil {
return nil, err
}
return &saipb.CreateHostifResponse{Oid: id}, nil
}

func (hostif *hostif) RemoveHostif(ctx context.Context, req *saipb.RemoveHostifRequest) (*saipb.RemoveHostifResponse, error) {
if !hostif.opts.RemoteCPUPort {
return nil, status.Error(codes.FailedPrecondition, "only remote cpu port is supported")
}
hostif.remoteMu.Lock()
defer hostif.remoteMu.Unlock()

return &saipb.CreateHostifResponse{Oid: id}, nil
nid, err := hostif.dataplane.ObjectNID(ctx, &fwdpb.ObjectNIDRequest{
ContextId: &fwdpb.ContextId{Id: hostif.dataplane.ID()},
ObjectId: &fwdpb.ObjectId{Id: fmt.Sprint(hostif.remoteHostifs[req.GetOid()])},
})
if err != nil {
return nil, err
}

delReq := fwdconfig.TableEntryRemoveRequest(hostif.dataplane.ID(), hostifToPortTable).AppendEntry(
fwdconfig.EntryDesc(fwdconfig.ExactEntry(fwdconfig.PacketFieldBytes(fwdpb.PacketFieldNum_PACKET_FIELD_NUM_HOST_PORT_ID).WithUint64(req.GetOid()))),
).Build()
if _, err := hostif.dataplane.TableEntryRemove(ctx, delReq); err != nil {
return nil, err
}

delReq = fwdconfig.TableEntryRemoveRequest(hostif.dataplane.ID(), portToHostifTable).AppendEntry(
fwdconfig.EntryDesc(fwdconfig.ExactEntry(fwdconfig.PacketFieldBytes(fwdpb.PacketFieldNum_PACKET_FIELD_NUM_PACKET_PORT_INPUT).WithUint64(nid.GetNid()))),
).Build()
if _, err := hostif.dataplane.TableEntryRemove(ctx, delReq); err != nil {
return nil, err
}

ctlReq := &pktiopb.HostPortControlMessage{
Create: false,
PortId: req.Oid,
}

if hostif.remotePortReq == nil {
return nil, status.Error(codes.FailedPrecondition, "remote port control not configured")
}
if err := hostif.remotePortReq(ctlReq); err != nil {
return nil, err
}
delete(hostif.remoteHostifs, req.Oid)

return &saipb.RemoveHostifResponse{}, nil
}

// SetHostifAttribute sets the attributes in the request.
Expand Down Expand Up @@ -529,20 +594,31 @@ func (hostif *hostif) HostPortControl(srv pktiopb.PacketIO_HostPortControlServer
if err != nil {
return err
}
// TODO: Whenever this RPC is connected, should resend all existing hostifs to tolerate the client restarting.

hostif.remoteMu.Lock()
ctx, cancelFn := context.WithCancel(srv.Context())
hostif.remoteClosers = append(hostif.remoteClosers, cancelFn)
reqCh := make(chan *pktiopb.HostPortControlMessage)
respCh := make(chan *pktiopb.HostPortControlRequest)
hostif.sendPortReq = func(msg *pktiopb.HostPortControlMessage) error {
hostif.remotePortReq = func(msg *pktiopb.HostPortControlMessage) error {
reqCh <- msg
resp := <-respCh
return status.FromProto(resp.GetStatus()).Err()
}

// Send all existing hostif.
for _, msg := range hostif.remoteHostifs {
if err := hostif.remotePortReq(msg); err != nil {
hostif.remoteMu.Unlock()
return err
}
}
hostif.remoteMu.Unlock()

log.Info("initialized host port control channel")
for {
select {
case <-srv.Context().Done():
case <-ctx.Done():
return nil
case req := <-reqCh:
if err := srv.Send(req); err != nil {
Expand Down
91 changes: 86 additions & 5 deletions dataplane/saiserver/hostif_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ import (
"context"
"net"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/openconfig/gnmi/errdiff"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"

"github.com/openconfig/lemming/dataplane/dplaneopts"
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdcontext"
pktiopb "github.com/openconfig/lemming/dataplane/proto/packetio"
saipb "github.com/openconfig/lemming/dataplane/proto/sai"
"github.com/openconfig/lemming/dataplane/saiserver/attrmgr"
fwdpb "github.com/openconfig/lemming/proto/forwarding"
Expand Down Expand Up @@ -81,7 +85,7 @@ func TestCreateHostif(t *testing.T) {
ctx: fwdcontext.New("foo", "foo"),
}
dplane.ctx.SetPacketSink(func(*fwdpb.PacketSinkResponse) error { return nil })
c, mgr, stopFn := newTestHostif(t, dplane)
c, mgr, stopFn := newTestHostif(t, dplane, false)
// Create switch and ports
mgr.StoreAttributes(mgr.NextID(), &saipb.SwitchAttribute{
CpuPort: proto.Uint64(10),
Expand Down Expand Up @@ -115,6 +119,72 @@ func TestCreateHostif(t *testing.T) {
}
}

func TestRemoveHostif(t *testing.T) {
tests := []struct {
desc string
req *saipb.RemoveHostifRequest
want *pktiopb.HostPortControlMessage
wantErr string
}{{
desc: "sucess",
req: &saipb.RemoveHostifRequest{
Oid: 1,
},
want: &pktiopb.HostPortControlMessage{
PortId: 1,
},
}}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
dplane := &fakeSwitchDataplane{
portIDToNID: map[string]uint64{
"1": 10,
},
}
c, mgr, stopFn := newTestHostif(t, dplane, true)
defer stopFn()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mgr.StoreAttributes(1, &saipb.CreateHostifRequest{Name: []byte("eth1")})

msgCh := make(chan *pktiopb.HostPortControlMessage, 1)
pc, err := c.HostPortControl(ctx)
if err != nil {
t.Fatal(err)
}
if err := pc.Send(&pktiopb.HostPortControlRequest{Msg: &pktiopb.HostPortControlRequest_Init{}}); err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond)
go func() {
msg, _ := pc.Recv()
msgCh <- msg
pc.Send(&pktiopb.HostPortControlRequest{
Msg: &pktiopb.HostPortControlRequest_Status{
Status: &status.Status{
Code: int32(codes.OK),
Message: "",
},
},
})
}()

_, gotErr := c.RemoveHostif(context.TODO(), tt.req)
if diff := errdiff.Check(gotErr, tt.wantErr); diff != "" {
t.Fatalf("RemoveHostif() unexpected err: %s", diff)
}
if gotErr != nil {
return
}
got := <-msgCh
if d := cmp.Diff(got, tt.want, protocmp.Transform()); d != "" {
t.Errorf("RemoveHostif() failed: diff(-got,+want)\n:%s", d)
}
})
}
}

func TestSetHostifAttribute(t *testing.T) {
tests := []struct {
desc string
Expand Down Expand Up @@ -144,7 +214,7 @@ func TestSetHostifAttribute(t *testing.T) {
return nil, tt.getInterfaceErr
}
dplane := &fakeSwitchDataplane{}
c, mgr, stopFn := newTestHostif(t, dplane)
c, mgr, stopFn := newTestHostif(t, dplane, false)
defer stopFn()
_, gotErr := c.SetHostifAttribute(context.TODO(), tt.req)
if diff := errdiff.Check(gotErr, tt.wantErr); diff != "" {
Expand All @@ -167,9 +237,20 @@ func TestSetHostifAttribute(t *testing.T) {
}
}

func newTestHostif(t testing.TB, api switchDataplaneAPI) (saipb.HostifClient, *attrmgr.AttrMgr, func()) {
type hostifClient struct {
saipb.HostifClient
pktiopb.PacketIOClient
}

func newTestHostif(t testing.TB, api switchDataplaneAPI, remotePort bool) (*hostifClient, *attrmgr.AttrMgr, func()) {
conn, mgr, stopFn := newTestServer(t, func(mgr *attrmgr.AttrMgr, srv *grpc.Server) {
newHostif(mgr, api, srv, &dplaneopts.Options{HostifNetDevType: fwdpb.PortType_PORT_TYPE_KERNEL})
newHostif(mgr, api, srv, &dplaneopts.Options{
HostifNetDevType: fwdpb.PortType_PORT_TYPE_KERNEL,
RemoteCPUPort: remotePort,
})
})
return saipb.NewHostifClient(conn), mgr, stopFn
return &hostifClient{
HostifClient: saipb.NewHostifClient(conn),
PacketIOClient: pktiopb.NewPacketIOClient(conn),
}, mgr, stopFn
}
1 change: 1 addition & 0 deletions dataplane/saiserver/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ func (sw *saiSwitch) PortStateChangeNotification(_ *saipb.PortStateChangeNotific

func (sw saiSwitch) Reset() {
sw.port.Reset()
sw.hostif.Reset()
}

// createFIBSelector creates a table that controls which forwarding table is used.
Expand Down
2 changes: 1 addition & 1 deletion dataplane/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (d *Dataplane) Start(ctx context.Context, c gpb.GNMIClient, target string)
return err
}

h, err := pktiohandler.New()
h, err := pktiohandler.New("")
if err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion dataplane/standalone/pkthandler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main

import (
"context"
"flag"
"os"
"os/signal"
"sync"
Expand All @@ -35,7 +36,11 @@ const (
addr = "10.0.2.2:50000"
)

var portFile = flag.String("port_file", "/etc/sonic/pktio_ports.json", "File at which to include hostif info, for debugging only")

func main() {
flag.Parse()

ctx, cancelFn := context.WithTimeout(context.Background(), time.Minute)
defer cancelFn()

Expand All @@ -46,7 +51,7 @@ func main() {

pktio := pktiopb.NewPacketIOClient(conn)

h, err := pktiohandler.New()
h, err := pktiohandler.New(*portFile)
if err != nil {
log.Exit(err)
}
Expand Down
Loading
Loading