Skip to content

Commit

Permalink
PacketIO server feature flags
Browse files Browse the repository at this point in the history
  • Loading branch information
DanG100 committed Mar 5, 2024
1 parent b42d084 commit 3ae85a6
Show file tree
Hide file tree
Showing 11 changed files with 686 additions and 337 deletions.
3 changes: 3 additions & 0 deletions dataplane/dplaneopts/dplaneopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type Options struct {
// EthDevAsLane treats ethX and hardware lane X.
// If a port is created with multiple lanes only the first is used.
EthDevAsLane bool
// RemoteCPUPort enables sending all packets for the CPU over gRPC.
// TODO: In the future, only support this option.
RemoteCPUPort bool
}

// Option exposes additional configuration for the dataplane.
Expand Down
2 changes: 2 additions & 0 deletions dataplane/forwarding/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
deps = [
"//dataplane/forwarding/fwdaction",
"//dataplane/forwarding/fwdaction/actions",
"//dataplane/forwarding/fwdconfig",
"//dataplane/forwarding/fwdport",
"//dataplane/forwarding/fwdport/ports",
"//dataplane/forwarding/fwdtable",
Expand All @@ -36,5 +37,6 @@ go_library(
"//dataplane/forwarding/protocol/udp",
"//proto/forwarding",
"@com_github_golang_glog//:glog",
"@org_golang_google_grpc//status",
],
)
210 changes: 148 additions & 62 deletions dataplane/forwarding/fwd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ import (
"sync"

log "github.com/golang/glog"
"google.golang.org/grpc/status"

"github.com/openconfig/lemming/dataplane/forwarding/fwdaction"
"github.com/openconfig/lemming/dataplane/forwarding/fwdconfig"
"github.com/openconfig/lemming/dataplane/forwarding/fwdport"
"github.com/openconfig/lemming/dataplane/forwarding/fwdport/ports"
"github.com/openconfig/lemming/dataplane/forwarding/fwdtable"
"github.com/openconfig/lemming/dataplane/forwarding/infra/deadlock"
"github.com/openconfig/lemming/dataplane/forwarding/infra/fwdattribute"
Expand All @@ -39,7 +42,6 @@ import (
// essential, and there can be more to come, importing here is more
// beneficial.
_ "github.com/openconfig/lemming/dataplane/forwarding/fwdaction/actions"
_ "github.com/openconfig/lemming/dataplane/forwarding/fwdport/ports"
_ "github.com/openconfig/lemming/dataplane/forwarding/fwdtable/action"
_ "github.com/openconfig/lemming/dataplane/forwarding/fwdtable/bridge"
_ "github.com/openconfig/lemming/dataplane/forwarding/fwdtable/exact"
Expand Down Expand Up @@ -722,78 +724,87 @@ func (e *Server) FlowCounterQuery(_ context.Context, request *fwdpb.FlowCounterQ
return reply, nil
}

// PacketInject injects a packet in the specified forwarding context and port.
func (e *Server) PacketInject(srv fwdpb.Forwarding_PacketInjectServer) error {
for {
request, err := srv.Recv()
if err != nil {
return err
}
timer := deadlock.NewTimer(deadlock.Timeout, fmt.Sprintf("Processing %+v", request))
defer timer.Stop()
func (e *Server) injectPacket(contextID *fwdpb.ContextId, id *fwdpb.PortId, hid fwdpb.PacketHeaderId, frame []byte, preActions []*fwdpb.ActionDesc, debug bool, dir fwdpb.PortAction) error {
timer := deadlock.NewTimer(deadlock.Timeout, fmt.Sprintf("Processing packet"))
defer timer.Stop()

ctx, err := e.FindContext(request.GetContextId())
if err != nil {
return fmt.Errorf("fwd: PacketInject failed, err %v", err)
}
ctx, err := e.FindContext(contextID)
if err != nil {
return fmt.Errorf("fwd: PacketInject failed, err %v", err)
}

// In a goroutine, acquire a RWLock on the context, create the preprocessing
// actions and process the packet. The RPC does not wait for the complete
// packet processing. It returns once it has validated the input parameters.
status := make(chan error)
// In a goroutine, acquire a RWLock on the context, create the preprocessing
// actions and process the packet. The RPC does not wait for the complete
// packet processing. It returns once it has validated the input parameters.
status := make(chan error)

go func() {
ctx.RLock()
defer ctx.RUnlock()
go func() {
ctx.RLock()
defer ctx.RUnlock()

port, err := fwdport.Find(request.GetPortId(), ctx)
if err != nil {
status <- fmt.Errorf("fwd: PortInject failed, err %v", err)
return
}
port, err := fwdport.Find(id, ctx)
if err != nil {
status <- fmt.Errorf("fwd: PortInject failed, err %v", err)
return
}

packet, err := fwdpacket.New(request.GetStartHeader(), request.GetBytes())
if err != nil {
status <- fmt.Errorf("fwd: PortInject failed, err %v", err)
return
}
packet, err := fwdpacket.New(hid, frame)
if err != nil {
status <- fmt.Errorf("fwd: PortInject failed, err %v", err)
return
}

pre, err := fwdaction.NewActions(request.GetPreprocesses(), ctx)
if err != nil {
status <- fmt.Errorf("fwd: PortInject failed to create preprocessing actions %v, err %v", request.GetPreprocesses(), err)
return
}
pre, err := fwdaction.NewActions(preActions, ctx)
if err != nil {
status <- fmt.Errorf("fwd: PortInject failed to create preprocessing actions %v, err %v", preActions, err)
return
}

// Apply the preprocessing actions on the packet and inject it into the
// port while holding the context's RLock. After packet processing,
// cleanup the port and actions. Publish a "no error" on the status
// channel so that the RPC can return. Note that this also serializes
// the packets arriving from the CPU.
status <- nil

func() {
defer func() {
if pre != nil {
pre.Cleanup()
}
}()

packet.Debug(request.GetDebug())
if len(pre) != 0 {
packet.Log().WithValues("context", ctx.ID, "port", port.ID())
state, err := fwdaction.ProcessPacket(packet, pre, port)
if state != fwdaction.CONTINUE || err != nil {
log.Errorf("%v: preprocessing failed, state %v, err %v", port.ID(), state, err)
return
}
packet.Log().V(1).Info("injecting packet", "frame", fwdpacket.IncludeFrameInLog)
// Apply the preprocessing actions on the packet and inject it into the
// port while holding the context's RLock. After packet processing,
// cleanup the port and actions. Publish a "no error" on the status
// channel so that the RPC can return. Note that this also serializes
// the packets arriving from the CPU.
status <- nil

func() {
defer func() {
if pre != nil {
pre.Cleanup()
}
fwdport.Process(port, packet, request.GetAction(), ctx, "Control")
}()

packet.Debug(debug)
if len(pre) != 0 {
packet.Log().WithValues("context", ctx.ID, "port", port.ID())
state, err := fwdaction.ProcessPacket(packet, pre, port)
if state != fwdaction.CONTINUE || err != nil {
log.Errorf("%v: preprocessing failed, state %v, err %v", port.ID(), state, err)
return
}
packet.Log().V(1).Info("injecting packet", "frame", fwdpacket.IncludeFrameInLog)
}
fwdport.Process(port, packet, dir, ctx, "Control")
}()
}()

if err = <-status; err != nil {
return fmt.Errorf("fwd: PacketInject failed, err %v", err)
if err = <-status; err != nil {
return fmt.Errorf("fwd: PacketInject failed, err %v", err)
}
return nil
}

// PacketInject injects a packet in the specified forwarding context and port.
func (e *Server) PacketInject(srv fwdpb.Forwarding_PacketInjectServer) error {
for {
request, err := srv.Recv()
if err != nil {
return err
}
err = e.injectPacket(request.GetContextId(), request.GetPortId(), request.GetStartHeader(),
request.GetBytes(), request.GetPreprocesses(), request.GetDebug(), request.GetAction())
if err != nil {
return err
}
}
}
Expand Down Expand Up @@ -908,3 +919,78 @@ func (e *Server) ObjectNID(_ context.Context, request *fwdpb.ObjectNIDRequest) (
}
return &fwdpb.ObjectNIDReply{Nid: uint64(obj.NID())}, nil
}

func (e *Server) CPUPacketStream(srv fwdpb.Forwarding_CPUPacketStreamServer) error {
init, err := srv.Recv()
if err != nil {
return err
}
ctx, err := e.FindContext(init.GetContextId())
if err != nil {
return fmt.Errorf("failed to get context, err %v", err)
}

cpuPortID := ""
for _, id := range ctx.Objects.IDs() {
obj, err := ctx.Objects.FindID(&fwdpb.ObjectId{Id: string(id)})
if err != nil {
return err
}
if _, ok := obj.(*ports.CPUPort); ok {
cpuPortID = string(obj.ID())
}
}
if cpuPortID == "" {
return fmt.Errorf("couldn't find cpu port")
}

ctx.SetCPUPortSink(func(po *fwdpb.PacketOut) error {
return srv.Send(po)
})

for {
pkt, err := srv.Recv()
if err != nil {
continue
}
acts := []*fwdpb.ActionDesc{fwdconfig.Action(fwdconfig.UpdateAction(fwdpb.UpdateType_UPDATE_TYPE_SET, fwdpb.PacketFieldNum_PACKET_FIELD_NUM_HOST_PORT_ID).
WithUint64Value(pkt.GetPacket().GetHostPort())).Build()}

err = e.injectPacket(init.GetContextId(), &fwdpb.PortId{ObjectId: &fwdpb.ObjectId{Id: cpuPortID}}, fwdpb.PacketHeaderId_PACKET_HEADER_ID_ETHERNET,
pkt.GetPacket().GetFrame(), acts, true, fwdpb.PortAction_PORT_ACTION_INPUT)
if err != nil {
continue
}
}
}

func (e *Server) HostPortControl(srv fwdpb.Forwarding_HostPortControlServer) error {
init, err := srv.Recv()
if err != nil {
return err
}
ctx, err := e.FindContext(init.GetContextId())
if err != nil {
return fmt.Errorf("failed to get context, err %v", err)
}
reqCh := make(chan *fwdpb.HostPortControlMessage)
respCh := make(chan *fwdpb.HostPortControlRequest)
defer close(respCh)

ctx.SetPortControl(func(msg *fwdpb.HostPortControlMessage) error {
reqCh <- msg
resp := <-respCh
return status.FromProto(resp.GetStatus()).Err()
})
for {
req := <-reqCh
if err := srv.Send(req); err != nil {
return err
}
resp, err := srv.Recv()
if err != nil {
return err
}
respCh <- resp
}
}
Loading

0 comments on commit 3ae85a6

Please sign in to comment.