Skip to content

Commit

Permalink
Simplify remote port mgmt
Browse files Browse the repository at this point in the history
  • Loading branch information
DanG100 committed Apr 11, 2024
1 parent b678d6b commit c6fbe9f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 35 deletions.
61 changes: 37 additions & 24 deletions dataplane/saiserver/hostif.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ type hostif struct {
}

func (hostif *hostif) Reset() {
hostif.trapIDToHostifID = map[uint64]uint64{}
hostif.groupIDToQueue = map[uint64]uint32{}
hostif.remoteHostifs = map[uint64]*pktiopb.HostPortControlMessage{}
log.Info("reseting hostif")
for _, closeFn := range hostif.remoteClosers {
closeFn()
}
hostif.remoteClosers = nil
hostif.trapIDToHostifID = map[uint64]uint64{}
hostif.groupIDToQueue = map[uint64]uint32{}
hostif.remoteHostifs = map[uint64]*pktiopb.HostPortControlMessage{}
hostif.remotePortReq = nil
}

const switchID = 1
Expand Down Expand Up @@ -590,19 +592,32 @@ func (hostif *hostif) CPUPacketStream(srv pktiopb.PacketIO_CPUPacketStreamServer
}

func (hostif *hostif) HostPortControl(srv pktiopb.PacketIO_HostPortControlServer) error {
log.Info("started host port control channel")
_, err := srv.Recv()
if err != nil {
return err
}
log.Info("received init port control channel")

hostif.remoteMu.Lock()
ctx, cancelFn := context.WithCancel(srv.Context())
hostif.remoteClosers = append(hostif.remoteClosers, cancelFn)
reqCh := make(chan *pktiopb.HostPortControlMessage)
respCh := make(chan *pktiopb.HostPortControlRequest)
hostif.remoteClosers = append(hostif.remoteClosers, func() {
log.Info("canceling host port control")
cancelFn()
})

errCh := make(chan error)

hostif.remotePortReq = func(msg *pktiopb.HostPortControlMessage) error {
reqCh <- msg
resp := <-respCh
if err := srv.Send(msg); err != nil {
errCh <- err
return err
}
resp, err := srv.Recv()
if err != nil {
errCh <- err
return err
}
return status.FromProto(resp.GetStatus()).Err()
}

Expand All @@ -616,21 +631,19 @@ func (hostif *hostif) HostPortControl(srv pktiopb.PacketIO_HostPortControlServer
hostif.remoteMu.Unlock()

log.Info("initialized host port control channel")
for {
select {
case <-ctx.Done():
return nil
case req := <-reqCh:
if err := srv.Send(req); err != nil {
return err
}
log.Info("sent message to client: %+v", req)
resp, err := srv.Recv()
if err != nil {
return err
}
respCh <- resp
log.Info("received message from client: %+v", req)
}

// The HostPortControls exits in two cases: context cancels or RPC errors.
err = nil
select {
case <-ctx.Done():
log.Info("host port control done")
case err = <-errCh:
log.Info("host port control err: %v", err)
}

hostif.remoteMu.Lock()
hostif.remotePortReq = nil
hostif.remoteMu.Unlock()
log.Info("cleared host port control channel")
return err
}
1 change: 1 addition & 0 deletions dataplane/saiserver/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ func (port *port) RemovePort(ctx context.Context, req *saipb.RemovePortRequest)
}

func (port *port) Reset() {
log.Info("reseting port")
port.portToEth = make(map[uint64]string)
port.nextEth = 1
}
Expand Down
1 change: 1 addition & 0 deletions dataplane/saiserver/saiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func (s *Server) ObjectTypeQuery(_ context.Context, req *saipb.ObjectTypeQueryRe

func (s *Server) Initialize(ctx context.Context, _ *saipb.InitializeRequest) (*saipb.InitializeResponse, error) {
if s.initialized {
log.Info("dataplane already intialized, reseting")
s.mgr.Reset()
s.saiSwitch.Reset()
if err := s.Reset(ctx); err != nil {
Expand Down
1 change: 1 addition & 0 deletions dataplane/standalone/pkthandler/lucius-pkthandler.service
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ After=network-online.target
Type=exec
ExecStart=/usr/bin/pkthandler --alsologtostderr
Restart=always
RestartSec=1

[Install]
WantedBy=multi-user.target
33 changes: 22 additions & 11 deletions dataplane/standalone/pkthandler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"

Expand All @@ -39,11 +39,10 @@ const (
var portFile = flag.String("port_file", "/etc/sonic/pktio_ports.json", "File at which to include hostif info, for debugging only")

func main() {
flag.Parse()

ctx, cancelFn := context.WithTimeout(context.Background(), time.Minute)
defer cancelFn()

log.Info("dialing packetio server")
conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
if err != nil {
log.Exit(err)
Expand All @@ -60,10 +59,12 @@ func main() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
sig := <-sigCh
log.Infof("received signal %q exiting", sig)
cancel()
}()

log.Info("starting packetio RPCs")
portCtl, err := pktio.HostPortControl(ctx)
if err != nil {
log.Exit(err)
Expand All @@ -73,16 +74,26 @@ func main() {
log.Exit(err)
}

var wg sync.WaitGroup
wg.Add(2)
errCh := make(chan error)
go func() {
h.ManagePorts(portCtl)
wg.Done()
if err := h.ManagePorts(portCtl); err != nil {
errCh <- fmt.Errorf("HostPortControl rpc exited with err: %v", err)
return
}
errCh <- fmt.Errorf("HostPortControl rpc exited without error")
}()
go func() {
h.StreamPackets(packet)
wg.Done()
if err := h.StreamPackets(packet); err != nil {
errCh <- fmt.Errorf("StreamPackets rpc exited with err: %v", err)
return
}
errCh <- fmt.Errorf("StreamPackets rpc exited without error")
}()

wg.Wait()
err = <-errCh
log.Infof("stopped packetio RPCs: %v", err)
}

func init() {
flag.Parse()
}

0 comments on commit c6fbe9f

Please sign in to comment.