From c6fbe9fa996de14089ec1eacd48873840d45423d Mon Sep 17 00:00:00 2001 From: Daniel Grau Date: Thu, 11 Apr 2024 18:08:02 +0000 Subject: [PATCH] Simplify remote port mgmt --- dataplane/saiserver/hostif.go | 61 +++++++++++-------- dataplane/saiserver/ports.go | 1 + dataplane/saiserver/saiserver.go | 1 + .../pkthandler/lucius-pkthandler.service | 1 + dataplane/standalone/pkthandler/main.go | 33 ++++++---- 5 files changed, 62 insertions(+), 35 deletions(-) diff --git a/dataplane/saiserver/hostif.go b/dataplane/saiserver/hostif.go index d84daae8..4e104039 100644 --- a/dataplane/saiserver/hostif.go +++ b/dataplane/saiserver/hostif.go @@ -66,13 +66,15 @@ type hostif struct { } func (hostif *hostif) Reset() { - hostif.trapIDToHostifID = map[uint64]uint64{} - hostif.groupIDToQueue = map[uint64]uint32{} - hostif.remoteHostifs = map[uint64]*pktiopb.HostPortControlMessage{} + log.Info("reseting hostif") for _, closeFn := range hostif.remoteClosers { closeFn() } hostif.remoteClosers = nil + hostif.trapIDToHostifID = map[uint64]uint64{} + hostif.groupIDToQueue = map[uint64]uint32{} + hostif.remoteHostifs = map[uint64]*pktiopb.HostPortControlMessage{} + hostif.remotePortReq = nil } const switchID = 1 @@ -590,19 +592,32 @@ func (hostif *hostif) CPUPacketStream(srv pktiopb.PacketIO_CPUPacketStreamServer } func (hostif *hostif) HostPortControl(srv pktiopb.PacketIO_HostPortControlServer) error { + log.Info("started host port control channel") _, err := srv.Recv() if err != nil { return err } + log.Info("received init port control channel") hostif.remoteMu.Lock() ctx, cancelFn := context.WithCancel(srv.Context()) - hostif.remoteClosers = append(hostif.remoteClosers, cancelFn) - reqCh := make(chan *pktiopb.HostPortControlMessage) - respCh := make(chan *pktiopb.HostPortControlRequest) + hostif.remoteClosers = append(hostif.remoteClosers, func() { + log.Info("canceling host port control") + cancelFn() + }) + + errCh := make(chan error) + hostif.remotePortReq = func(msg *pktiopb.HostPortControlMessage) error { - reqCh <- msg - resp := <-respCh + if err := srv.Send(msg); err != nil { + errCh <- err + return err + } + resp, err := srv.Recv() + if err != nil { + errCh <- err + return err + } return status.FromProto(resp.GetStatus()).Err() } @@ -616,21 +631,19 @@ func (hostif *hostif) HostPortControl(srv pktiopb.PacketIO_HostPortControlServer hostif.remoteMu.Unlock() log.Info("initialized host port control channel") - for { - select { - case <-ctx.Done(): - return nil - case req := <-reqCh: - if err := srv.Send(req); err != nil { - return err - } - log.Info("sent message to client: %+v", req) - resp, err := srv.Recv() - if err != nil { - return err - } - respCh <- resp - log.Info("received message from client: %+v", req) - } + + // The HostPortControls exits in two cases: context cancels or RPC errors. + err = nil + select { + case <-ctx.Done(): + log.Info("host port control done") + case err = <-errCh: + log.Info("host port control err: %v", err) } + + hostif.remoteMu.Lock() + hostif.remotePortReq = nil + hostif.remoteMu.Unlock() + log.Info("cleared host port control channel") + return err } diff --git a/dataplane/saiserver/ports.go b/dataplane/saiserver/ports.go index 4be9d5db..18c3a775 100644 --- a/dataplane/saiserver/ports.go +++ b/dataplane/saiserver/ports.go @@ -524,6 +524,7 @@ func (port *port) RemovePort(ctx context.Context, req *saipb.RemovePortRequest) } func (port *port) Reset() { + log.Info("reseting port") port.portToEth = make(map[uint64]string) port.nextEth = 1 } diff --git a/dataplane/saiserver/saiserver.go b/dataplane/saiserver/saiserver.go index c7c4f4f6..b6b501d2 100644 --- a/dataplane/saiserver/saiserver.go +++ b/dataplane/saiserver/saiserver.go @@ -212,6 +212,7 @@ func (s *Server) ObjectTypeQuery(_ context.Context, req *saipb.ObjectTypeQueryRe func (s *Server) Initialize(ctx context.Context, _ *saipb.InitializeRequest) (*saipb.InitializeResponse, error) { if s.initialized { + log.Info("dataplane already intialized, reseting") s.mgr.Reset() s.saiSwitch.Reset() if err := s.Reset(ctx); err != nil { diff --git a/dataplane/standalone/pkthandler/lucius-pkthandler.service b/dataplane/standalone/pkthandler/lucius-pkthandler.service index b0655a71..24f42c56 100644 --- a/dataplane/standalone/pkthandler/lucius-pkthandler.service +++ b/dataplane/standalone/pkthandler/lucius-pkthandler.service @@ -7,6 +7,7 @@ After=network-online.target Type=exec ExecStart=/usr/bin/pkthandler --alsologtostderr Restart=always +RestartSec=1 [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/dataplane/standalone/pkthandler/main.go b/dataplane/standalone/pkthandler/main.go index 2c8f4cc2..608a5c7f 100644 --- a/dataplane/standalone/pkthandler/main.go +++ b/dataplane/standalone/pkthandler/main.go @@ -17,9 +17,9 @@ package main import ( "context" "flag" + "fmt" "os" "os/signal" - "sync" "syscall" "time" @@ -39,11 +39,10 @@ const ( var portFile = flag.String("port_file", "/etc/sonic/pktio_ports.json", "File at which to include hostif info, for debugging only") func main() { - flag.Parse() - ctx, cancelFn := context.WithTimeout(context.Background(), time.Minute) defer cancelFn() + log.Info("dialing packetio server") conn, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) if err != nil { log.Exit(err) @@ -60,10 +59,12 @@ func main() { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) go func() { - <-sigCh + sig := <-sigCh + log.Infof("received signal %q exiting", sig) cancel() }() + log.Info("starting packetio RPCs") portCtl, err := pktio.HostPortControl(ctx) if err != nil { log.Exit(err) @@ -73,16 +74,26 @@ func main() { log.Exit(err) } - var wg sync.WaitGroup - wg.Add(2) + errCh := make(chan error) go func() { - h.ManagePorts(portCtl) - wg.Done() + if err := h.ManagePorts(portCtl); err != nil { + errCh <- fmt.Errorf("HostPortControl rpc exited with err: %v", err) + return + } + errCh <- fmt.Errorf("HostPortControl rpc exited without error") }() go func() { - h.StreamPackets(packet) - wg.Done() + if err := h.StreamPackets(packet); err != nil { + errCh <- fmt.Errorf("StreamPackets rpc exited with err: %v", err) + return + } + errCh <- fmt.Errorf("StreamPackets rpc exited without error") }() - wg.Wait() + err = <-errCh + log.Infof("stopped packetio RPCs: %v", err) +} + +func init() { + flag.Parse() }