Skip to content

Commit

Permalink
Merge pull request #45 from alshabib/ipv4-e2e
Browse files Browse the repository at this point in the history
Adding an IP e2e test.
  • Loading branch information
akrentsel authored Dec 19, 2023
2 parents 67ef6cb + c7c2d51 commit 3df080f
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 67 deletions.
11 changes: 9 additions & 2 deletions cmd/magna/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"github.com/go-ping/ping"
"github.com/open-traffic-generator/snappi/gosnappi/otg"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/magna/flows/ip"
"github.com/openconfig/magna/flows/mpls"
"github.com/openconfig/magna/lwotg"
"github.com/openconfig/magna/lwotgtelem"
Expand All @@ -30,8 +32,6 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"
"k8s.io/klog/v2"

gpb "github.com/openconfig/gnmi/proto/gnmi"
)

func main() {
Expand Down Expand Up @@ -66,8 +66,15 @@ func main() {
klog.Exitf("cannot initialise MPLS flow handler, %v", err)
}

ipFH, ipTask, err := ip.New()
if err != nil {
klog.Exitf("cannot initialise IP flow handler, %v", err)
}

otgSrv.AddFlowHandlers(fh)
otgSrv.AddFlowHandlers(ipFH)
telemSrv.AddTask(task)
telemSrv.AddTask(ipTask)

hintCh := make(chan lwotg.Hint, 100)
otgSrv.SetHintChannel(hintCh)
Expand Down
112 changes: 111 additions & 1 deletion e2e/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

mpb "github.com/openconfig/magna/proto/mirror"
"github.com/openconfig/ondatra"
"github.com/openconfig/ondatra/gnmi"
)
Expand Down Expand Up @@ -83,7 +84,7 @@ func TestScaleMPLS(t *testing.T) {
maddr := mirrorAddr(t)
client, stop := mirrorClient(t, maddr)
defer stop()
startTwoPortMirror(t, client)
startTwoPortMirror(t, client, mpb.StartRequest_TT_MPLS)
time.Sleep(1 * time.Second)
defer func() { stopTwoPortMirror(t, client) }()

Expand Down Expand Up @@ -117,3 +118,112 @@ func TestScaleMPLS(t *testing.T) {
})
}
}

func TestScaleIPv4(t *testing.T) {

tests := []struct {
desc string
inFlowCount int
inPPS uint64
inLossTolerance uint64
inNumPacketsPerFlow uint32
}{{
desc: "10 flows, 1 pps",
inFlowCount: 10,
inPPS: 1,
inLossTolerance: 1,
}, {
desc: "10 flows, 1 pps, 5 packets",
inFlowCount: 10,
inPPS: 1,
inNumPacketsPerFlow: 5,
}, {
desc: "1 flow, 10 pps",
inFlowCount: 1,
inPPS: 10,
inLossTolerance: 1,
}, {
desc: "1 flow, 100 pps",
inFlowCount: 1,
inPPS: 100,
inNumPacketsPerFlow: 800, // send for 8 seconds.
}, {
desc: "10 flows, 10 pps",
inFlowCount: 10,
inPPS: 10,
inNumPacketsPerFlow: 50, // send for 5 seconds
}, {
desc: "100 flows, 10 pps",
inFlowCount: 100,
inPPS: 10,
inNumPacketsPerFlow: 20, // send for 2 seconds
}, {
desc: "1 flow, 600 pps",
inFlowCount: 1,
inPPS: 600,
inNumPacketsPerFlow: 1800, // run for 3 seconds
}, {
desc: "1 flow, 1000 pps",
inFlowCount: 1,
inPPS: 1000,
inNumPacketsPerFlow: 5 * 1000, // run for 5 seconds
}, {
desc: "100 flows",
inFlowCount: 100,
inPPS: 1,
inNumPacketsPerFlow: 6, // send for 6 seconds
}, {
desc: "254 flows, 1 pps",
inFlowCount: 254,
inPPS: 1,
inNumPacketsPerFlow: 5, // send for 5 seconds
/*
Currently beyond the scale limit.
}, {
desc: "254 flows, 10 pps",
inFlowCount: 254,
inPPS: 10,
inNumPacketsPerFlow: 20, // send for 2 seconds
*/
}}

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
// Start a mirroring session to copy packets.
maddr := mirrorAddr(t)
client, stop := mirrorClient(t, maddr)
defer stop()
startTwoPortMirror(t, client, mpb.StartRequest_TT_IP)
time.Sleep(1 * time.Second)
defer func() { stopTwoPortMirror(t, client) }()

otgCfg := pushBaseConfigs(t, ondatra.ATE(t, "ate"))

otg := ondatra.ATE(t, "ate").OTG()
otgCfg.Flows().Clear().Items()
for i := 0; i < tt.inFlowCount; i++ {
addIPv4Flow(t, otgCfg, fmt.Sprintf("flow%d", i), ateSrc.Name, ateDst.Name, fmt.Sprintf("100.64.%d.1", i), fmt.Sprintf("100.64.%d.2", i), tt.inPPS, tt.inNumPacketsPerFlow)
}

otg.PushConfig(t, otgCfg)

t.Logf("Starting IP traffic...")
otg.StartTraffic(t)
t.Logf("Sleeping for %s...", *sleepTime)
time.Sleep(*sleepTime)
t.Logf("Stopping IP traffic...")
otg.StopTraffic(t)

// Avoid a race with telemetry updates.
time.Sleep(2 * time.Second)
for i := 0; i < tt.inFlowCount; i++ {
metrics := gnmi.Get(t, otg, gnmi.OTG().Flow(fmt.Sprintf("flow%d", i)).State())
got, want := metrics.GetCounters().GetInPkts(), metrics.GetCounters().GetOutPkts()
t.Logf("flow %d: sent: %d, recv: %d", i, want, got)
if lossPackets := want - got; lossPackets > tt.inLossTolerance {
t.Errorf("flow %d: did not get expected number of packets, got: %d, want: %d", i, got, want)
}
}
})
}
}
50 changes: 41 additions & 9 deletions e2e/simple_ondatra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ import (

"github.com/open-traffic-generator/snappi/gosnappi"
"github.com/openconfig/featureprofiles/topologies/binding"
tpb "github.com/openconfig/kne/proto/topo"
mpb "github.com/openconfig/magna/proto/mirror"
"github.com/openconfig/ondatra"
"github.com/openconfig/ondatra/gnmi"
"github.com/openconfig/ondatra/knebind/solver"
"github.com/openconfig/ondatra/otg"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

tpb "github.com/openconfig/kne/proto/topo"
mpb "github.com/openconfig/magna/proto/mirror"
)

const (
Expand Down Expand Up @@ -127,12 +126,13 @@ func mirrorClient(t *testing.T, addr string) (mpb.MirrorClient, func() error) {

// startTwoPortMirror begins traffic mirroring between port1 and port2 on the mirror
// container in the topology.
func startTwoPortMirror(t *testing.T, client mpb.MirrorClient) {
func startTwoPortMirror(t *testing.T, client mpb.MirrorClient, kind mpb.StartRequest_TrafficType) {
t.Helper()
mirror := ondatra.DUT(t, "mirror")
startMirrors(t, client, &mpb.StartRequest{
From: mirror.Port(t, "port1").Name(),
To: mirror.Port(t, "port2").Name(),
From: mirror.Port(t, "port1").Name(),
To: mirror.Port(t, "port2").Name(),
TrafficType: kind,
})
}

Expand Down Expand Up @@ -180,7 +180,7 @@ func TestMirror(t *testing.T) {
addr := mirrorAddr(t)
client, stop := mirrorClient(t, addr)
defer stop()
startTwoPortMirror(t, client)
startTwoPortMirror(t, client, mpb.StartRequest_TT_MPLS)
time.Sleep(1 * time.Second)
stopTwoPortMirror(t, client)
}
Expand Down Expand Up @@ -226,6 +226,38 @@ func addMPLSFlow(t *testing.T, otgCfg gosnappi.Config, name, srcName, dstName, s

}

// addIPv4Flow adds a new IPv4 flow to the specified otgCfg. The flow is named
// according to the name argument, and runs between a srcName and dstName port,
// with the srcv4 IPv4 source address, and dstv4 destination address. The flow
// runs at pps packets per second, until totPackets packets have been sent. If
// totPackets is 0, packets are sent at pps until the flow is terminated
// through OTG.
func addIPv4Flow(t *testing.T, otgCfg gosnappi.Config, name, srcName, dstName, srcv4, dstv4 string, pps uint64, totPackets uint32) {
ipFLow := otgCfg.Flows().Add().SetName(name)
ipFLow.Metrics().SetEnable(true)
ipFLow.TxRx().Port().SetTxName(srcName).SetRxNames([]string{dstName})

ipFLow.Rate().SetChoice("pps").SetPps(pps)
if totPackets != 0 {
ipFLow.Duration().SetChoice("fixed_packets").FixedPackets().SetPackets(totPackets)
}

// OTG specifies that the order of the <flow>.Packet().Add() calls determines
// the stack of headers that are to be used, starting at the outer-most and
// ending with the inner-most.

// Set up ethernet layer.
eth := ipFLow.Packet().Add().Ethernet()
eth.Src().SetChoice("value").SetValue(ateSrc.MAC)
eth.Dst().SetChoice("value").SetValue(ateDst.MAC)

ip4 := ipFLow.Packet().Add().Ipv4()
ip4.Src().SetChoice("value").SetValue(srcv4)
ip4.Dst().SetChoice("value").SetValue(dstv4)
ip4.Version().SetChoice("value").SetValue(4)

}

const (
// lossTolerance indicates the number of packets we are prepared to lose during
// a test. If the packets per second generation rate is low then the flow can be
Expand All @@ -241,7 +273,7 @@ func TestBasicMPLS(t *testing.T) {
maddr := mirrorAddr(t)
client, stop := mirrorClient(t, maddr)
defer stop()
startTwoPortMirror(t, client)
startTwoPortMirror(t, client, mpb.StartRequest_TT_MPLS)
time.Sleep(1 * time.Second)
defer func() { stopTwoPortMirror(t, client) }()

Expand Down Expand Up @@ -342,7 +374,7 @@ func TestMPLSFlowsTwoPorts(t *testing.T) {
maddr := mirrorAddr(t)
client, stop := mirrorClient(t, maddr)
defer stop()
startTwoPortMirror(t, client)
startTwoPortMirror(t, client, mpb.StartRequest_TT_MPLS)
time.Sleep(1 * time.Second)
defer func() { stopTwoPortMirror(t, client) }()

Expand Down
2 changes: 1 addition & 1 deletion lwotg/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *Server) handleFlows(flows []*otg.Flow) ([]*TXRXWrapper, error) {
for _, fn := range s.flowHandlers {
txrx, ok, err := fn(flow, intfs)
switch {
case err != nil:
case ok && err != nil:
// The flow could be handled, but an error occurred.
return nil, status.Errorf(codes.Internal, "error generating flows, %v", err)
case !ok:
Expand Down
25 changes: 21 additions & 4 deletions mirrorsrv/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"

mpb "github.com/openconfig/magna/proto/mirror"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"

mpb "github.com/openconfig/magna/proto/mirror"
)

type Server struct {
Expand Down Expand Up @@ -49,6 +47,18 @@ const (
packetSize = 9000
)

func ipFilter(p gopacket.Packet) bool {
e, ok := p.Layer(layers.LayerTypeEthernet).(*layers.Ethernet)
if !ok {
return false
}
if e.EthernetType != layers.EthernetTypeIPv4 {
return false
}
klog.Infof("copying IP packet, %v", p)
return true
}

// mplsFilter returns true if a packet is an MPLS unicast packet.
func mplsFilter(p gopacket.Packet) bool {
e, ok := p.Layer(layers.LayerTypeEthernet).(*layers.Ethernet)
Expand All @@ -58,7 +68,7 @@ func mplsFilter(p gopacket.Packet) bool {
if e.EthernetType != layers.EthernetTypeMPLSUnicast {
return false
}
klog.Infof("copying packet, %v", p)
klog.Infof("copying MPLS packet, %v", p)
return true
}

Expand Down Expand Up @@ -154,6 +164,13 @@ func (s *Server) Start(ctx context.Context, req *mpb.StartRequest) (*mpb.StartRe
return nil, status.Errorf(codes.AlreadyExists, "session between %s and %s already exists", req.From, req.To)
}

switch req.TrafficType {
case mpb.StartRequest_TT_IP:
filterFunc = ipFilter
case mpb.StartRequest_TT_MPLS:
filterFunc = mplsFilter
}

fn := copyFunc(req.From, req.To, filterFunc)
stop := make(chan struct{})
s.addSession(req.From, req.To, stop)
Expand Down
Loading

0 comments on commit 3df080f

Please sign in to comment.